diff options
-rw-r--r-- | runtime/debug.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 314 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 | ||||
-rw-r--r-- | runtime/srUtils.h | 12 | ||||
-rw-r--r-- | runtime/wti.c | 30 | ||||
-rw-r--r-- | runtime/wti.h | 4 | ||||
-rw-r--r-- | runtime/wtp.c | 15 | ||||
-rw-r--r-- | runtime/wtp.h | 2 | ||||
-rwxr-xr-x | tests/daqueue-persist-drvr.sh | 3 | ||||
-rwxr-xr-x | tests/diag.sh | 7 |
10 files changed, 275 insertions, 115 deletions
diff --git a/runtime/debug.h b/runtime/debug.h index 1375493d..03702250 100644 --- a/runtime/debug.h +++ b/runtime/debug.h @@ -135,7 +135,7 @@ void dbgPrintAllDebugInfo(void); /* debug aides */ //#ifdef RTINST -#if 0 // temporarily removed for helgrind +#if 1 // temporarily removed for helgrind #define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_mutex_trylock(x) dbgMutexTryLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) diff --git a/runtime/queue.c b/runtime/queue.c index 539cf4ec..48d1c6e3 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -66,7 +66,7 @@ DEFobjCurrIf(glbl) /* forward-definitions */ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); -static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); +static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); @@ -205,7 +205,7 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); BEGINfunc - dbgoprint((obj_t*) pThis, "queue will lose %d messages, destroying...\n", pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { pThis->qDeq(pThis, &pUsr); @@ -366,7 +366,7 @@ qqueueChkIsDA(qqueue_t *pThis) * rgerhards, 2008-01-15 */ static rsRetVal -qqueueStartDA(qqueue_t *pThis) +StartDA(qqueue_t *pThis) { DEFiRet; uchar pszDAQName[128]; @@ -396,7 +396,7 @@ qqueueStartDA(qqueue_t *pThis) CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); + CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); @@ -450,8 +450,8 @@ finalize_it: * If this function fails (should not happen), DA mode is not turned on. * rgerhards, 2008-01-16 */ -static inline rsRetVal -qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) +static rsRetVal +InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -464,16 +464,17 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * is intentional. We assume that when we need it once, we may also need it on another * occasion. Ressources used are quite minimal when no worker is running. * rgerhards, 2008-01-24 + * NOTE: this is the DA worker *pool*, not the DA queue! */ if(pThis->pWtpDA == NULL) { - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); @@ -493,6 +494,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * until the next enqueue request. */ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */ +RUNLOG_VAR("%d", pThis->bRunsDA); finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -536,7 +538,7 @@ qqueueChkStrtDA(qqueue_t *pThis) */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", getPhysicalQueueSize(pThis)); - qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ + InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } finalize_it: @@ -706,7 +708,6 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) if(pThis->tVars.linklist.pDeqRoot == NULL) { pThis->tVars.linklist.pDeqRoot = pEntry; } -RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot); finalize_it: RETiRet; @@ -1157,25 +1158,14 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) } -/* This function shuts down all worker threads and waits until they - * have terminated. If they timeout, they are cancelled. - * rgerhards, 2008-01-24 - * Please note that this function shuts down BOTH the parent AND the child queue - * in DA case. This is necessary because their timeouts are tightly coupled. Most - * importantly, the timeouts would be applied twice (or logic be extremely - * complex) if each would have its own shutdown. The function does not self check - * this condition - the caller must make sure it is not called with a parent. - * rgerhards, 2009-05-26: we do NO logner persist the queue here if bSaveOnShutdown - * is set. This must be handled by the caller. Not doing that cleans up the queue - * shutdown considerably. Also, older engines had a potential hang condition when - * the DA queue was already started and the DA worker configured for infinite - * retries and the action was during retry processing. This was a design issue, - * which is solved as of now. Note that the shutdown now may take a little bit - * longer, because we no longer can persist the queue in parallel to waiting - * on worker timeouts. +/* Try to terminate queue worker threads within the regular shutdown interval. + * Both the regular and DA queue (if it exists) is waited for, but on the same timeout. + * After this function returns, the workers must either be finished or some force + * to finish them must be applied. + * rgerhards, 2009-05-27 */ static rsRetVal -ShutdownWorkers(qqueue_t *pThis) +tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) { DEFVARS_mutexProtection; struct timespec tTimeout; @@ -1185,16 +1175,6 @@ ShutdownWorkers(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); - - /* we reduce the low water mark in any case. This is not absolutely necessary, but - * it is useful because we enable DA mode at several spots below and so we do not need - * to think about the low water mark each time. - */ - pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */ - pThis->iLowWtrMrk = 0; - - /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(getPhysicalQueueSize(pThis) > 0) { if(pThis->bRunsDA) { @@ -1206,6 +1186,11 @@ ShutdownWorkers(qqueue_t *pThis) } END_MTX_PROTECTED_OPERATIONS(pThis->mut); + /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ + if(pThis->bRunsDA) { + qqueueWaitDAModeInitialized(pThis); + } + /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found * out there are no active workers - that doesn't matter: the wtp knows about that and so will * return immediately. @@ -1228,74 +1213,109 @@ ShutdownWorkers(qqueue_t *pThis) if(iRetLocal == RS_RET_TIMED_OUT) { dbgoprint((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n"); } else { - /* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */ dbgoprint((obj_t*) pThis, "regular queue workers shut down.\n"); - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(pThis->bRunsDA) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", - qqueueGetID(pThis->pqDA)); - /* we use the same absolute timeout as above, so we do not use more than the configured - * timeout interval! - */ - dbgoprint((obj_t*) pThis, "trying shutdown of DA workers\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue (this is OK)\n"); - } + } + + /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->bRunsDA) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", + qqueueGetID(pThis->pqDA)); + /* we use the same absolute timeout as above, so we do not use more than the configured + * timeout interval! + */ + dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n"); + iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue worker (this is OK)\n"); } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n"); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } - /* when we reach this point, both queues are either empty or the regular queue shutdown timeout - * has expired. We must set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate - * as soon as its consumer is done. In particular, it does no longer need try to empty the queue. - */ - wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */ + RETiRet; +} - /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ - if(pThis->bRunsDA) { - qqueueWaitDAModeInitialized(pThis); - wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); /* also stop DA queue */ + +/* Try to shut down regular and DA queue workers, within the action timeout + * period. Note that the main queue DA worker is still unaffected (and may shuffle + * data to the disk queue while we terminate the other workers). Not finishing + * processing all messages is now OK (but they may be preserved later, depending + * on bSaveOnShutdown setting). + * rgerhards, 2009-05-27 + */ +static rsRetVal +tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) +{ + DEFVARS_mutexProtection; + struct timespec tTimeout; + rsRetVal iRetLocal; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ + + /* instruct workers to finish ASAP, even if still work exists */ +RUNLOG_STR("setting enqOnly for main queue"); + //TODO:SetEnqOnly(pThis, 1, LOCK_MUTEX); /* start no new workers */ + pThis->bEnqOnly = 1; + wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); + if(pThis->pqDA != NULL) { +RUNLOG_STR("setting enqOnly for DA queue"); + //TODO:SetEnqOnly(pThis->pqDA, 1, LOCK_MUTEX); + pThis->pqDA->bEnqOnly = 1; + wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); } /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ + timeoutComp(&tTimeout, pThis->toActShutdown); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(getPhysicalQueueSize(pThis) > 0) { - timeoutComp(&tTimeout, pThis->toActShutdown); - if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and " - "triggers cancellation)\n"); - } else if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " - "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); - } - /* we need to re-aquire the mutex for the next check in this case! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - } - if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { - /* and now the same for the DA queue */ - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and " - "triggers cancellation)\n"); - } else if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " - "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); - } - } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and " + "triggers cancellation)\n"); + } else if(iRetLocal != RS_RET_OK) { + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " + "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } - } else { + /* we need to re-aquire the mutex for the next check in this case! */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); + } + + if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) { + /* and now the same for the DA queue */ END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n"); + iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and " + "triggers cancellation)\n"); + } else if(iRetLocal != RS_RET_OK) { + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " + "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); + } } + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + + RETiRet; +} + + +/* This function cancels all remenaing regular workers for both the main and the DA + * queue. The main queue's DA worker pool continues to run (if it exists and is active). + * rgerhards, 2009-05-29 + */ +static rsRetVal +cancelWorkers(qqueue_t *pThis) +{ + rsRetVal iRetLocal; + DEFiRet; /* Now queue workers should have terminated. If not, we need to cancel them as we have applied * all timeout setting. If any worker in any queue still executes, its consumer is possibly @@ -1318,13 +1338,60 @@ ShutdownWorkers(qqueue_t *pThis) } } + RETiRet; +} + + +/* This function shuts down all worker threads and waits until they + * have terminated. If they timeout, they are cancelled. + * rgerhards, 2008-01-24 + * Please note that this function shuts down BOTH the parent AND the child queue + * in DA case. This is necessary because their timeouts are tightly coupled. Most + * importantly, the timeouts would be applied twice (or logic be extremely + * complex) if each would have its own shutdown. The function does not self check + * this condition - the caller must make sure it is not called with a parent. + * rgerhards, 2009-05-26: we do NO longer persist the queue here if bSaveOnShutdown + * is set. This must be handled by the caller. Not doing that cleans up the queue + * shutdown considerably. Also, older engines had a potential hang condition when + * the DA queue was already started and the DA worker configured for infinite + * retries and the action was during retry processing. This was a design issue, + * which is solved as of now. Note that the shutdown now may take a little bit + * longer, because we no longer can persist the queue in parallel to waiting + * on worker timeouts. + */ +static rsRetVal +ShutdownWorkers(qqueue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ + + dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); + + /* we reduce the low water mark in any case. This is not absolutely necessary, but + * it is useful because we enable DA mode at several spots below and so we do not need + * to think about the low water mark each time. + */ + pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */ + pThis->iLowWtrMrk = 0; + + CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); + + if(getPhysicalQueueSize(pThis) > 0) { + CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis)); + } + + CHKiRet(cancelWorkers(pThis)); + /* ... finally ... all worker threads have terminated :-) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers - * may still be running. + * may still be running. Note that the main queue's DA worker may still be running. */ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +finalize_it: RETiRet; } @@ -1498,8 +1565,8 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - pTdl = tdlPeek(pThis); - if(pTdl == NULL) { + pTdl = tdlPeek(pThis); /* get current head element */ + if(pTdl == NULL) { /* to-delete list empty */ DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; @@ -1512,6 +1579,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) } } else { /* can not delete, insert into to-delete list */ + dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq)); } @@ -1769,6 +1837,27 @@ finalize_it: RETiRet; } + +/* This is called when a batch is processed and the worker does not + * ask for another batch (e.g. because it is to be terminated) + * rgerhards, 2009-05-27 + */ +static rsRetVal +batchProcessedReg(qqueue_t *pThis, wti_t *pWti) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ISOBJ_TYPE_assert(pWti, wti); +dbgprintf("XXX: batchProcessedReg deletes %d records\n", pWti->batch.nElemDeq); + + DeleteProcessedBatch(pThis, &pWti->batch); + qqueueChkPersist(pThis, pWti->batch.nElemDeq); + + RETiRet; +} + + /* This is the queue consumer in the regular (non-DA) case. It is * protected by the queue mutex, but MUST release it as soon as possible. * rgerhards, 2008-01-21 @@ -1844,7 +1933,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) DEFiRet; if(pThis->bEnqOnly) { - iRet = RS_RET_TERMINATE_NOW; + iRet = RS_RET_TERMINATE_WHEN_IDLE; +RUNLOG; } else { if(pThis->bRunsDA) { ASSERT(pThis->pqDA != NULL); @@ -1852,11 +1942,14 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) && pThis->pqDA->sizeOnDiskMax > 0 && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ +RUNLOG; iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { +RUNLOG; iRet = RS_RET_TERMINATE_NOW; } } else { +RUNLOG; iRet = RS_RET_TERMINATE_NOW; } } @@ -1880,10 +1973,14 @@ ChkStopWrkrReg(qqueue_t *pThis) return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); * TODO: remove when verified! -- rgerhards, 2009-05-26 */ - if(pThis->bEnqOnly || pThis->bRunsDA) +RUNLOG; + if(pThis->bEnqOnly || pThis->bRunsDA) { +RUNLOG; iRet = RS_RET_TERMINATE_NOW; - else if(pThis->pqParent != NULL) + } else if(pThis->pqParent != NULL) { +RUNLOG; iRet = RS_RET_TERMINATE_WHEN_IDLE; + } RETiRet; } @@ -2039,6 +2136,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessedReg)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); @@ -2056,7 +2154,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ iRetLocal = qqueueHaveQIF(pThis); if(iRetLocal == RS_RET_OK) { dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); - qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ + InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { /* TODO: use logerror? -- rgerhards, 2008-01-16 */ @@ -2212,9 +2310,16 @@ DoSaveOnShutdown(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); - qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ +dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + if(pThis->bRunsDA != 2) { + InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ +dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +RUNLOG_VAR("%d", pThis->bRunsDA); +RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); + qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ + } /* make sure we do not timeout before we are done */ - dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n"); + dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); @@ -2247,7 +2352,7 @@ CODESTARTobjDestruct(qqueue) * we need to reset the logical dequeue pointer, persist the queue if configured to do * so and then destruct everything. -- rgerhards, 2009-05-26 */ - CHKiRet(pThis->qUnDeqAll(pThis)); +//!!!! //CHKiRet(pThis->qUnDeqAll(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); @@ -2470,7 +2575,7 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) +SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -2495,13 +2600,16 @@ qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); if(pThis->pWtpReg != NULL) wtpWakeupAllWrkr(pThis->pWtpReg); +RUNLOG; if(pThis->pWtpDA != NULL) wtpWakeupAllWrkr(pThis->pWtpDA); +RUNLOG; } else { /* switch back to regular mode */ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ } } +RUNLOG; pThis->bEnqOnly = bEnqOnly; diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index a43c0327..58346d03 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -89,6 +89,7 @@ typedef struct nsd_gsspi_s nsd_gsspi_t; typedef struct nsd_nss_s nsd_nss_t; typedef struct nsdsel_ptcp_s nsdsel_ptcp_t; typedef struct nsdsel_gtls_s nsdsel_gtls_t; +typedef struct wti_s wti_t; typedef obj_t nsd_t; typedef obj_t nsdsel_t; typedef struct msg msg_t; diff --git a/runtime/srUtils.h b/runtime/srUtils.h index bfce4cbb..288e9dd7 100644 --- a/runtime/srUtils.h +++ b/runtime/srUtils.h @@ -117,11 +117,23 @@ int getSubString(uchar **ppSrc, char *pDst, size_t DstSize, char cSep); if(bMustLock == LOCK_MUTEX) { \ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \ d_pthread_mutex_lock(mut); \ + assert(bLockedOpIsLocked == 0); \ bLockedOpIsLocked = 1; \ } #define END_MTX_PROTECTED_OPERATIONS(mut) \ if(bLockedOpIsLocked) { \ d_pthread_mutex_unlock(mut); \ + bLockedOpIsLocked = 0; \ pthread_setcancelstate(iCancelStateSave, NULL); \ } +/* The unconditional versions of the macro always lock the mutex. They are preferred in + * complex scenarios, where the simple ones might get mixed up by multiple calls. + */ +#define BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \ + d_pthread_mutex_lock(mut); +#define END_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \ + d_pthread_mutex_unlock(mut); \ + pthread_setcancelstate(iCancelStateSave, NULL); + #endif diff --git a/runtime/wti.c b/runtime/wti.c index 3b6bf1b9..75e497b8 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -343,6 +343,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) { struct timespec t; + BEGINfunc DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); @@ -356,6 +357,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } + ENDfunc } @@ -390,9 +392,12 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); +dbgprintf("XXX: worker startup\n"); + RUNLOG_STR("MUTEX lock"); + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); pWtp->pfOnWorkerStartup(pWtp->pUsr); - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); + RUNLOG_STR("MUTEX release"); + END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -405,17 +410,25 @@ wtiWorker(wti_t *pThis) } wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); + RUNLOG_STR("MUTEX lock"); + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ +RUNLOG; terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED); +RUNLOG_VAR("%d", terminateRet); if(terminateRet == RS_RET_TERMINATE_NOW) { - // TODO: we need to free the old batch! -- rgerhards, 2009-05-26 MULTI + /* we now need to free the old batch */ + localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); + dbgoprint((obj_t*) pThis, "terminating worker because auf TERMINATE_NOW mode, del iRet %d\n", + localRet); break; } +RUNLOG; /* try to execute and process whatever we have */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); + /* This function must and does RELEASE the MUTEX! */ if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) { @@ -426,17 +439,20 @@ wtiWorker(wti_t *pThis) /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */ break; /* end worker thread run */ } + RUNLOG_STR("MUTEX lock"); + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); + RUNLOG_STR("MUTEX release"); + END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); continue; /* request next iteration */ } - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); bInactivityTOOccured = 0; /* reset for next run */ } /* if we exit the loop, the mutex is locked and must be unlocked */ - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); + RUNLOG_STR("MUTEX release"); + END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); diff --git a/runtime/wti.h b/runtime/wti.h index 0990941e..17038ec5 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -31,7 +31,7 @@ /* the worker thread instance class */ -typedef struct wti_s { +struct wti_s { BEGINobjInstance; int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */ pthread_t thrdID; /* thread ID */ @@ -42,7 +42,7 @@ typedef struct wti_s { int bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ -} wti_t; +}; /* some symbolic constants for easier reference */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 41fcd8d9..7786a656 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -91,10 +91,12 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; + pThis->pfObjProcessed = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; pThis->pfOnWorkerCancel = NotImplementedDummy; pThis->pfOnWorkerStartup = NotImplementedDummy; pThis->pfOnWorkerShutdown = NotImplementedDummy; +dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState); ENDobjConstruct(wtp) @@ -139,6 +141,7 @@ BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODE int i; CODESTARTobjDestruct(wtp) wtpProcessThrdChanges(pThis); /* process thread changes one last time */ +RUNLOG_STR("wtpDestruct"); /* destruct workers */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) @@ -260,17 +263,22 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) ISOBJ_TYPE_assert(pThis, wtp); +RUNLOG; BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) { +RUNLOG; ABORT_FINALIZE(RS_RET_TERMINATE_NOW); } else if(pThis->wtpState == wtpState_SHUTDOWN) { ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); +RUNLOG; } +RUNLOG_VAR("%d", iRet); /* try customer handler if one was set and we do not yet have a definite result */ if(pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } +RUNLOG_VAR("%d", iRet); finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -292,13 +300,17 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout ISOBJ_TYPE_assert(pThis, wtp); +dbgprintf("XXX:10 wtp %p, state %d\n", pThis, pThis->wtpState); wtpSetState(pThis, tShutdownCmd); +dbgprintf("XXX:20 wtp %p, state %d\n", pThis, pThis->wtpState); wtpWakeupAllWrkr(pThis); +dbgprintf("XXX:30 wtp %p, state %d\n", pThis, pThis->wtpState); /* see if we need to harvest (join) any terminated threads (even in timeout case, * some may have terminated... */ wtpProcessThrdChanges(pThis); +dbgprintf("XXX:40 wtp %p, state %d\n", pThis, pThis->wtpState); /* and wait for their termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -306,7 +318,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; +dbgprintf("XXX:50 wtp %p, state %d\n", pThis, pThis->wtpState); while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { +dbgprintf("XXX:60 wtp %p, state %d\n", pThis, pThis->wtpState); dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); @@ -581,6 +595,7 @@ DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) +DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)) diff --git a/runtime/wtp.h b/runtime/wtp.h index d9d582af..e1180177 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -72,6 +72,7 @@ struct wtp_s { pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */ + rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */ rsRetVal (*pfRateLimiter)(void *pUsr); rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp); rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int); @@ -108,6 +109,7 @@ PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)); PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)); PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)); +PROTOTYPEpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)); PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*)); PROTOTYPEpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)); diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh index 11058110..0ec76b47 100755 --- a/tests/daqueue-persist-drvr.sh +++ b/tests/daqueue-persist-drvr.sh @@ -19,6 +19,9 @@ $srcdir/diag.sh shutdown-immediate $srcdir/diag.sh wait-shutdown source $srcdir/diag.sh check-mainq-spool +echo DEBUG EXIT! +#exit + # restart engine and have rest processed #remove delay echo "#" > work-delay.conf diff --git a/tests/diag.sh b/tests/diag.sh index 01b335ee..5c6a0ab8 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -5,8 +5,11 @@ # not always able to convey back states to the upper-level test driver # begun 2009-05-27 by rgerhards # This file is part of the rsyslog project, released under GPLv3 +#valgrind="valgrind --log-fd=1" +#valgrind="valgrind --tool=drd --log-fd=1" +#valgrind="valgrind --tool=helgrind --log-fd=1" #set -o xtrace -export RSYSLOG_DEBUG="debug nostdout" +#export RSYSLOG_DEBUG="debug Xnostdout printmutexaction" export RSYSLOG_DEBUGLOG="log" case $1 in 'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason @@ -22,7 +25,7 @@ case $1 in ;; 'startup') # start rsyslogd with default params. $2 is the config file name to use # returns only after successful startup - valgrind ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/$2 & + $valgrind ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/$2 & $srcdir/diag.sh wait-startup ;; 'wait-startup') # wait for rsyslogd startup |