From 553d1880d47b57b2f4e023c2017675f010afd9a0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:36:53 +0100 Subject: some cleanup --- runtime/queue.c | 72 --------------------------------------------------------- runtime/queue.h | 1 - runtime/wti.c | 4 +--- runtime/wtp.c | 10 +++----- 4 files changed, 4 insertions(+), 83 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index 1539db6d..781d115f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -239,7 +239,6 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); if(!pThis->bEnqOnly) { -dbgprintf("AdviseMaxWorkers: log Queue Size: %d, high water mark %d\n", getLogicalQueueSize(pThis) , pThis->iHighWtrMrk); if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ @@ -486,26 +485,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis) } -/* reset the logical dequeue pointer to the physical dequeue position. - * This is only needed after we cancelled workers (during queue shutdown). - */ -static rsRetVal -qUnDeqAllFixedArray(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - DBGOPRINT((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n", - pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq); - - pThis->tVars.farray.deqhead = pThis->tVars.farray.head; - pThis->nLogDeq = 0; - - RETiRet; -} - - /* -------------------- linked list -------------------- */ @@ -597,26 +576,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis) } -/* reset the logical dequeue pointer to the physical dequeue position. - * This is only needed after we cancelled workers (during queue shutdown). - */ -static rsRetVal -qUnDeqAllLinkedList(qqueue_t *pThis) -{ - DEFiRet; - - ASSERT(pThis != NULL); - - DBGOPRINT((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n", - pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq); - - pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot; - pThis->nLogDeq = 0; - - RETiRet; -} - - /* -------------------- disk -------------------- */ @@ -863,16 +822,6 @@ finalize_it: } -/* This is a dummy function for disks - we do not need to reset anything - * because everything is already persisted... - */ -static rsRetVal -qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis) -{ - return RS_RET_OK; -} - - /* -------------------- direct (no queueing) -------------------- */ static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -917,12 +866,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal -qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis) -{ - return RS_RET_OK; -} - /* --------------- end type-specific handlers -------------------- */ @@ -1192,7 +1135,6 @@ ShutdownWorkers(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); -dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis)); if(getPhysicalQueueSize(pThis) > 0) { CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis)); @@ -1260,7 +1202,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; - pThis->qUnDeqAll = qUnDeqAllFixedArray; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1268,7 +1209,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; - pThis->qUnDeqAll = qUnDeqAllLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1276,7 +1216,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; - pThis->qUnDeqAll = qUnDeqAllDisk; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1285,7 +1224,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; - pThis->qUnDeqAll = qUnDeqAllDirect; break; } @@ -1471,7 +1409,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { -dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ @@ -1652,7 +1589,6 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("YYY: dequeue for consumer\n"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) @@ -2080,14 +2016,6 @@ CODESTARTobjDestruct(qqueue) if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) ShutdownWorkers(pThis); - /* now all workers are terminated. Messages may exist. Also, some logically dequeued - * messages may never have been processed because their worker was terminated. So - * we need to reset the logical dequeue pointer, persist the queue if configured to do - * so and then destruct everything. -- rgerhards, 2009-05-26 - */ -RUNLOG_STR("XXX: NOT undequeueing entries!"); - //CHKiRet(pThis->qUnDeqAll(pThis)); - if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); } diff --git a/runtime/queue.h b/runtime/queue.h index 338f091b..3b5d7038 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -113,7 +113,6 @@ typedef struct queue_s { rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr); rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); rsRetVal (*qDel)(struct queue_s *pThis); - rsRetVal (*qUnDeqAll)(struct queue_s *pThis); /* end type-specific handler */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ diff --git a/runtime/wti.c b/runtime/wti.c index aade156e..288670b6 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -119,7 +119,7 @@ wtiSetState(wti_t *pThis, bool bNewVal) * Note that when waiting for the thread to terminate, we do a busy wait, checking * progress every 10ms. It is very unlikely that we will ever cancel a thread * and, if so, it will only happen at the end of the rsyslog run. So doing this - * kind of not optimal wait is considered preferable over using condition variables. + * kind of non-optimal wait is considered preferable over using condition variables. * rgerhards, 2008-02-26 */ rsRetVal @@ -134,7 +134,6 @@ wtiCancelThrd(wti_t *pThis) pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ while(wtiGetState(pThis)) { -//fprintf(stderr, "sleep loop for getState\n"); srSleep(0, 10000); } } @@ -271,7 +270,6 @@ wtiWorker(wti_t *pThis) /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); -RUNLOG_VAR("%d", terminateRet); if(terminateRet == RS_RET_TERMINATE_NOW) { /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); diff --git a/runtime/wtp.c b/runtime/wtp.c index e075e5b8..060e6627 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -117,8 +117,7 @@ wtpConstructFinalize(wtp_t *pThis) /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ - if((pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + CHKmalloc(pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads)); for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { CHKiRet(wtiConstruct(&pThis->pWrkr[i])); @@ -190,10 +189,8 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex) wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState); if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) { -RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_NOW"); ABORT_FINALIZE(RS_RET_TERMINATE_NOW); } else if(wtpState == wtpState_SHUTDOWN) { -RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_WHEN_IDLE"); ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); } @@ -429,7 +426,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) ISOBJ_TYPE_assert(pThis, wtp); -int nMaxWrkrTmp = nMaxWrkr; if(nMaxWrkr == 0) FINALIZE; @@ -437,10 +433,10 @@ int nMaxWrkrTmp = nMaxWrkr; nMaxWrkr = pThis->iNumWorkerThreads; nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd); -dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, pThis->iNumWorkerThreads, nMissing); if(nMissing > 0) { - DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); + DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", + wtpGetDbgHdr(pThis), nMissing); /* start the rqtd nbr of workers */ for(i = 0 ; i < nMissing ; ++i) { CHKiRet(wtpStartWrkr(pThis)); -- cgit