From 62154cdde95ad579d4b2b98f59fac4817be8a0f4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 5 Jan 2008 13:39:40 +0000 Subject: added the "direct" queueing mode to queue class (no queing at all) --- queue.c | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 71 insertions(+), 18 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 73bbf168..0b18b6db 100644 --- a/queue.c +++ b/queue.c @@ -252,6 +252,45 @@ rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) return iRet; } +/* -------------------- direct (no queueing) -------------------- */ +rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis) +{ + return RS_RET_OK; +} + + +rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis) +{ + return RS_RET_OK; +} + +rsRetVal qAddDirect(queue_t *pThis, void* pUsr) +{ + DEFiRet; + rsRetVal iRetLocal; + + assert(pThis != NULL); + + /* TODO: calling the consumer should go into its own function! -- rgerhards, 2008-01-05*/ + iRetLocal = pThis->pConsumer(pUsr); + if(iRetLocal != RS_RET_OK) + dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", + (unsigned long) pThis, iRetLocal); + --pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given + * the somewhat astonishing fact that this queue type does not actually + * queue anything ;) + */ + + return iRet; +} + +rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) +{ + return RS_RET_OK; +} + + + /* --------------- end type-specific handlers -------------------- */ @@ -409,6 +448,12 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, pThis->qAdd = qAddDisk; pThis->qDel = qDelDisk; break; + case QUEUETYPE_DIRECT: + pThis->qConstruct = qConstructDirect; + pThis->qDestruct = qDestructDirect; + pThis->qAdd = qAddDirect; + pThis->qDel = qDelDirect; + break; } /* call type-specific constructor */ @@ -433,11 +478,13 @@ rsRetVal queueStart(queue_t *pThis) { int i; - /* fire up the worker thread */ - pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ - i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis); - dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n", - (unsigned long) pThis, (int) pThis->qType, i); + if(pThis->qType != QUEUETYPE_DIRECT) { + /* fire up the worker thread */ + pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ + i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis); + dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n", + (unsigned long) pThis, (int) pThis->qType, i); + } return RS_RET_OK; } @@ -449,15 +496,17 @@ rsRetVal queueDestruct(queue_t *pThis) assert(pThis != NULL); - /* first stop the worker thread */ - dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); - pThis->bDoRun = 0; - /* It's actually not "not empty" below but awaking the worker. The worker - * then finds out that it shall terminate and does so. - */ - pthread_cond_signal(pThis->notEmpty); - pthread_join(pThis->thrdWorker, NULL); - dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis); + if(pThis->qType != QUEUETYPE_DIRECT) { + /* first stop the worker thread */ + dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); + pThis->bDoRun = 0; + /* It's actually not "not empty" below but awaking the worker. The worker + * then finds out that it shall terminate and does so. + */ + pthread_cond_signal(pThis->notEmpty); + pthread_join(pThis->thrdWorker, NULL); + dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis); + } /* ... then free resources */ pthread_mutex_destroy(pThis->mut); @@ -492,7 +541,8 @@ queueEnqObj(queue_t *pThis, void *pUsr) assert(pThis != NULL); - pthread_mutex_lock(pThis->mut); + if(pThis->qType != QUEUETYPE_DIRECT) + pthread_mutex_lock(pThis->mut); while(pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis); @@ -511,9 +561,12 @@ queueEnqObj(queue_t *pThis, void *pUsr) finalize_it: /* now activate the worker thread */ - pthread_mutex_unlock(pThis->mut); - i = pthread_cond_signal(pThis->notEmpty); - dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); + if(pThis->qType != QUEUETYPE_DIRECT) { + pthread_mutex_unlock(pThis->mut); + i = pthread_cond_signal(pThis->notEmpty); + dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); + } + return iRet; } /* -- cgit