From a9c4b26d462dd3c9dbd0575a3a1acc6d8df1c3b3 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 May 2009 18:47:26 +0200 Subject: some cleanup --- runtime/queue.c | 86 --------------------------------------------------------- 1 file changed, 86 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 9855dac8..0f87b235 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -72,7 +72,6 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); -static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -206,7 +205,6 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); -// TODO: ULTRA it may be a good idea to check validitity once again /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { pThis->qDeq(pThis, &pUsr); @@ -473,7 +471,6 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); @@ -613,7 +610,6 @@ static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) ASSERT(pThis != NULL); *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead]; -//MULTIdbgprintf("ULTRA qDeqFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail); pThis->tVars.farray.deqhead++; if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize) pThis->tVars.farray.deqhead = 0; @@ -627,7 +623,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis) ASSERT(pThis != NULL); -//MULTIdbgprintf("ULTRA qDelFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail); pThis->tVars.farray.head++; if (pThis->tVars.farray.head == pThis->iMaxQueueSize) pThis->tVars.farray.head = 0; @@ -638,58 +633,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis) /* -------------------- linked list -------------------- */ -/* first some generic functions which are also used for the unget linked list */ - -static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) -{ - qLinkedList_t *pEntry; - DEFiRet; - - ASSERT(ppRoot != NULL); - ASSERT(ppLast != NULL); - - CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t)))); - - pEntry->pNext = NULL; - pEntry->pUsr = pUsr; - - if(*ppRoot == NULL) { - *ppRoot = *ppLast = pEntry; - } else { - (*ppLast)->pNext = pEntry; - *ppLast = pEntry; - } - -finalize_it: - RETiRet; -} - -static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr) -{ - DEFiRet; - qLinkedList_t *pEntry; - - ASSERT(ppRoot != NULL); - ASSERT(ppLast != NULL); - ASSERT(ppUsr != NULL); - ASSERT(*ppRoot != NULL); - - pEntry = *ppRoot; - *ppUsr = pEntry->pUsr; - - if(*ppRoot == *ppLast) { - *ppRoot = NULL; - *ppLast = NULL; - } else { - *ppRoot = pEntry->pNext; - } - free(pEntry); - - RETiRet; -} - -/* end generic functions which are also used for the unget linked list */ - static rsRetVal qConstructLinkedList(qqueue_t *pThis) { @@ -1455,33 +1398,6 @@ finalize_it: } -/* cancellation cleanup handler for queueWorker () - * Updates admin structure and frees ressources. - * Params: - * arg1 - user pointer (in this case a qqueue_t) - * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!]) - * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists! - * rgerhards, 2008-01-16 - */ -static rsRetVal -ConsumerCancelCleanup(void *arg1, void *arg2) -{ - //TODO: looks like we no longer need it! - /* - DEFiRet; - - qqueue_t *pThis = (qqueue_t*) arg1; - obj_t *pUsr = (obj_t*) arg2; - - ISOBJ_TYPE_assert(pThis, qqueue); - - RETiRet; - */ - return RS_RET_OK; -} - - - /* This function checks if the provided message shall be discarded and does so, if needed. * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to * provide real-time creation of spool files. @@ -1636,7 +1552,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz rsRetVal localRet; DEFiRet; - /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ DeleteProcessedBatch(pThis, &pWti->batch); nDequeued = 0; @@ -2075,7 +1990,6 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); -- cgit