diff options
-rw-r--r-- | ChangeLog | 4 | ||||
-rw-r--r-- | action.c | 13 | ||||
-rw-r--r-- | plugins/imdiag/imdiag.c | 3 | ||||
-rw-r--r-- | runtime/obj.c | 8 | ||||
-rw-r--r-- | runtime/queue.c | 492 | ||||
-rw-r--r-- | runtime/queue.h | 9 | ||||
-rw-r--r-- | runtime/wti.c | 19 | ||||
-rw-r--r-- | runtime/wtp.c | 35 | ||||
-rw-r--r-- | runtime/wtp.h | 10 | ||||
-rwxr-xr-x | tests/arrayqueue.sh | 4 | ||||
-rwxr-xr-x | tests/daqueue-persist-drvr.sh | 11 | ||||
-rwxr-xr-x | tests/inputname.sh | 2 | ||||
-rwxr-xr-x | tests/queue-persist-drvr.sh | 1 | ||||
-rw-r--r-- | tools/syslogd.c | 10 | ||||
-rw-r--r-- | tools/syslogd.h | 1 |
15 files changed, 160 insertions, 462 deletions
@@ -1,5 +1,9 @@ --------------------------------------------------------------------------- Version 5.3.2 [DEVEL] (rgerhards), 2009-10-?? +- simplified and thus speeded up the queue engine, also fixed some + potential race conditions (in very unusual shutdown conditions) + along the way. The threading model has seriously changes, so there may + be some regressions. - enhanced omfile to support transactional interface. This will increase performance in many cases. - added multi-ruleset support to imudp @@ -50,7 +50,7 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -291,7 +291,7 @@ actionConstructFinalize(action_t *pThis) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, - (rsRetVal (*)(void*, batch_t*))processBatchMain)); + (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -917,7 +917,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) * rgerhards, 2009-05-12 */ static rsRetVal -processAction(action_t *pAction, batch_t *pBatch) +processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { int i; msg_t *pMsg; @@ -934,7 +934,8 @@ processAction(action_t *pAction, batch_t *pBatch) CHKiRet(localRet); /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - for(i = 0 ; i < pBatch->nElem ; i++) { + //for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + for(i = 0 ; i < pBatch->nElem ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; } iRet = finishBatch(pAction); @@ -950,7 +951,7 @@ finalize_it: * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch) +processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { DEFiRet; @@ -964,7 +965,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - iRet = processAction(pAction, pBatch); + iRet = processAction(pAction, pBatch, pbShutdownImmediate); pthread_cleanup_pop(1); /* unlock mutex */ diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 4359cda1..cabdf157 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -263,6 +263,9 @@ waitMainQEmpty(tcps_sess_t *pSess) CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); while(iMsgQueueSize > 0) { + /* DEV DEBUG ONLY if(iPrint++ % 500) + printf("imdiag: main msg queue size: %d\n", iMsgQueueSize); + */ if(iPrint++ % 500 == 0) dbgprintf("imdiag sleeping, wait mainq drain, curr size %d\n", iMsgQueueSize); srSleep(0,2); /* wait a little bit */ diff --git a/runtime/obj.c b/runtime/obj.c index aebea332..3692b957 100644 --- a/runtime/obj.c +++ b/runtime/obj.c @@ -1129,7 +1129,7 @@ UseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf) /* DEV debug only: dbgprintf("source file %s requests object '%s', ifIsLoaded %d\n", srcFile, pObjName, pIf->ifIsLoaded); */ - d_pthread_mutex_lock(&mutObjGlobalOp); + pthread_mutex_lock(&mutObjGlobalOp); if(pIf->ifIsLoaded == 1) { ABORT_FINALIZE(RS_RET_OK); /* we are already set */ @@ -1170,7 +1170,7 @@ UseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf) pIf->ifIsLoaded = 1; /* we are happy */ finalize_it: - d_pthread_mutex_unlock(&mutObjGlobalOp); + pthread_mutex_unlock(&mutObjGlobalOp); if(pStr != NULL) rsCStrDestruct(&pStr); @@ -1193,7 +1193,7 @@ ReleaseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf) /* dev debug only dbgprintf("source file %s releasing object '%s', ifIsLoaded %d\n", srcFile, pObjName, pIf->ifIsLoaded); */ - d_pthread_mutex_lock(&mutObjGlobalOp); + pthread_mutex_lock(&mutObjGlobalOp); if(pObjFile == NULL) FINALIZE; /* if it is not a lodable module, we do not need to do anything... */ @@ -1214,7 +1214,7 @@ ReleaseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf) pIf->ifIsLoaded = 0; /* indicated "no longer valid" */ finalize_it: - d_pthread_mutex_unlock(&mutObjGlobalOp); + pthread_mutex_unlock(&mutObjGlobalOp); if(pStr != NULL) rsCStrDestruct(&pStr); diff --git a/runtime/queue.c b/runtime/queue.c index 101052a1..dacf1f13 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -55,6 +55,7 @@ #include "wti.h" #include "msg.h" #include "atomic.h" +#include "unicode-helper.h" #include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS @@ -68,11 +69,9 @@ DEFobjCurrIf(strm) /* forward-definitions */ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); -static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); -static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); @@ -228,7 +227,8 @@ static inline void queueDrain(qqueue_t *pThis) * this point in time. The mutex must be locked when * ths function is called. -- rgerhards, 2008-01-25 */ -static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) +static inline rsRetVal +qqueueAdviseMaxWorkers(qqueue_t *pThis) { DEFiRet; int iMaxWorkers; @@ -236,48 +236,20 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); if(!pThis->bEnqOnly) { - if(pThis->bRunsDA) { - /* if we have not yet reached the high water mark, there is no need to start a - * worker. -- rgerhards, 2008-01-26 - */ - if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ - } - } - /* regular workers always run */ - if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; +dbgprintf("AdviseMaxWorkers: log Queue Size: %d, high water mark %d\n", + getLogicalQueueSize(pThis) , pThis->iHighWtrMrk); + if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + if(getLogicalQueueSize(pThis) == 0) { + iMaxWorkers = 0; + } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; + } else { + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ - } - - RETiRet; -} - - -/* Destruct DA queue. This is the last part of DA-to-normal-mode - * transistion. This is called asynchronously and some time quite a - * while after the actual transistion. The key point is that we need to - * do it at some later time, because we need to destruct the DA queue. That, - * however, can not be done in a thread that has been signalled - * This is to be called when we revert back to our own queue. - * This function must be called with the queue mutex locked (the wti - * class ensures this). - * rgerhards, 2008-01-15 - */ -static rsRetVal -TurnOffDAMode(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - ASSERT(pThis->bRunsDA); - if(getLogicalQueueSize(pThis->pqDA) == 0) { - pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ - DBGOPRINT((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", - iRet); } RETiRet; @@ -348,33 +320,18 @@ StartDA(qqueue_t *pThis) CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, pThis->toQShutdown)); CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); - // experimental: XXX - CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0)); - - if(pThis->toQShutdown == 0) { - CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ - } else { - /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND - * have an obviously large backlog, we can't finish it in any case. So there is no point - * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 - */ - CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1)); - } - iRet = qqueueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ - - DBGOPRINT((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", + DBGOPRINT((obj_t*) pThis, "DA queue initialized, disk queue 0x%lx\n", qqueueGetID(pThis->pqDA)); finalize_it: @@ -412,91 +369,35 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * rgerhards, 2008-01-24 * NOTE: this is the DA worker *pool*, not the DA queue! */ - if(pThis->pWtpDA == NULL) { - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis)); - CHKiRet(wtpConstruct (&pThis->pWtpDA)); - CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); - CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); - CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode)); - CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); - CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); - CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); - CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); - CHKiRet(wtpConstructFinalize (pThis->pWtpDA)); - } + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis)); + CHKiRet(wtpConstruct (&pThis->pWtpDA)); + CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); + CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); + CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); + CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); + CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); + CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); + CHKiRet(wtpConstructFinalize (pThis->pWtpDA)); /* if we reach this point, we have a "good" DA worker pool */ - /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ - pThis->bDAEnqOnly = bEnqOnly; - /* now construct the actual queue (if it does not already exist) */ if(pThis->pqDA == NULL) { CHKiRet(StartDA(pThis)); } + pThis->bEnqOnly = bEnqOnly; // TODO: I think this is not needed, but first clean up shutdown processing! pThis->bRunsDA = 1; - /* now we must now adivse the wtp that we need one worker. If none is yet active, - * that will also start one up. If we forgot that step, everything would be stalled - * until the next enqueue request. - */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */ - finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); RETiRet; } -/* check if we need to start disk assisted mode and send some signals to - * keep it running if we are already in it. It also checks if DA mode is - * partially initialized, in which case it waits for initialization to - * complete. - * rgerhards, 2008-01-14 - */ -static rsRetVal -ChkStrtDA(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - /* if we do not hit the high water mark, we have nothing to do */ - if(getPhysicalQueueSize(pThis) != pThis->iHighWtrMrk) - ABORT_FINALIZE(RS_RET_OK); - - if(pThis->bRunsDA) { - /* then we need to signal that we are at the high water mark again. If that happens - * on our way down the queue, that doesn't matter, because then nobody is waiting - * on the condition variable. - * (Remember that a DA queue stops draining the queue once it has reached the low - * water mark and restarts it when the high water mark is reached again - this is - * what this code here is responsible for. Please note that all workers may have been - * terminated due to the inactivity timeout, thus we need to advise the pool that - * we need at least one). - */ - DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", - getPhysicalQueueSize(pThis)); - qqueueAdviseMaxWorkers(pThis); - } else { - /* this is the case when we are currently not running in DA mode. So it is time - * to turn it back on. - */ - DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", - getPhysicalQueueSize(pThis)); - InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ - } - -finalize_it: - RETiRet; -} - - /* --------------- end code for disk-assisted queue modes -------------------- */ @@ -733,44 +634,6 @@ finalize_it: } -/* This method checks if we have a QIF file for the current queue (no matter of - * queue mode). Returns RS_RET_OK if we have a QIF file or an error status otherwise. - * rgerhards, 2008-01-15 - */ -static rsRetVal -qqueueHaveQIF(qqueue_t *pThis) -{ - DEFiRet; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; - struct stat stat_buf; - - ISOBJ_TYPE_assert(pThis, qqueue); - - if(pThis->pszFilePrefix == NULL) - ABORT_FINALIZE(RS_RET_NO_FILEPREFIX); - - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - - /* check if the file exists */ - if(stat((char*) pszQIFNam, &stat_buf) == -1) { - if(errno == ENOENT) { - DBGOPRINT((obj_t*) pThis, "no .qi file found\n"); - ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); - } else { - DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno); - ABORT_FINALIZE(RS_RET_IO_ERROR); - } - } - /* If we reach this point, we have a .qi file */ - -finalize_it: - RETiRet; -} - - /* The method loads the persistent queue information. * rgerhards, 2008-01-11 */ @@ -1041,7 +904,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) batchObj.pUsrp = (obj_t*) pUsr; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; - iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); + iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); objDestruct(pUsr); RETiRet; @@ -1112,15 +975,17 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) } -/* Try to terminate queue worker threads within the regular shutdown interval. - * Both the regular and DA queue (if it exists) is waited for, but on the same timeout. - * After this function returns, the workers must either be finished or some force - * to finish them must be applied. - * This function also instructs the DA worker pool (if it exists) to terminate. This is done - * in preparation of final queue shutdown. - * rgerhards, 2009-05-27 +/* Try to shut down regular and DA queue workers, within the queue timeout + * period. That means processing continues as usual. This is the expected + * usual case, where during shutdown those messages remaining are being + * processed. At this point, it is acceptable that the queue can not be + * fully depleted, that case is handled in the next step. During this phase, + * we first shut down the main queue DA worker to prevent new data to arrive + * at the DA queue, and then we ask the regular workers of both the Regular + * and DA queue to try complete processing. + * rgerhards, 2009-10-14 */ -static rsRetVal +static inline rsRetVal tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) { struct timespec tTimeout; @@ -1130,30 +995,26 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ - if(getPhysicalQueueSize(pThis) > 0) { - if(pThis->bRunsDA) { - /* We may have waited on the low water mark. As it may have changed, we - * see if we reactivate the worker. - */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); - } + if(pThis->bIsDA) { + /* We need to lock the mutex, as otherwise we may have a race that prevents + * us from awaking the DA worker. */ + d_pthread_mutex_lock(pThis->mut); + + /* tell regular queue DA worker to stop shuffling messages to DA queue... */ + pThis->pqDA->bEnqOnly = 1; + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); + DBGOPRINT((obj_t*) pThis, "awoke DA worker, told it to shut down.\n"); + + /* also tell the DA queue worker to shut down, so that it already knows... */ + wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN); + wtpAdviseMaxWorkers(pThis->pqDA->pWtpReg, 1); /* awake its lone worker */ + DBGOPRINT((obj_t*) pThis, "awoke DA queue regular worker, told it to shut down when done.\n"); + + d_pthread_mutex_unlock(pThis->mut); } - d_pthread_mutex_unlock(pThis->mut); - /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found - * out there are no active workers - that doesn't matter: the wtp knows about that and so will - * return immediately. - * We do not yet care about the DA worker - that will be handled down later in the process. - * Note that we must not request shutdown right now - that may introduce a race: if the regular queue - * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA - * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead - * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way - * (from a performance point of view). So we don't do anything right now. The DA queue will continue to - * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything - * by not requesting shutdown now. - * rgerhards, 2008-01-25 - */ + /* first calculate absolute timeout - we need the absolute value here, because we need to coordinate * shutdown of both the regular and DA queue on *the same* timeout. */ @@ -1180,16 +1041,6 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } else { DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n"); } - /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting - * the queue, it is restarted at a later stage. We don't care here if a timeout happens. - */ - DBGOPRINT((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); - } else { - DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n"); - } } RETiRet; @@ -1197,11 +1048,12 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) /* Try to shut down regular and DA queue workers, within the action timeout - * period. Note that the main queue DA worker is still unaffected (and may shuffle - * data to the disk queue while we terminate the other workers). Not finishing - * processing all messages is now OK (but they may be preserved later, depending - * on bSaveOnShutdown setting). - * rgerhards, 2009-05-27 + * period. This aborts processing, but at the end of the current action, in + * a well-defined manner. During this phase, we terminate all three worker + * pools, including the regular queue DA worker if it not yet has terminated. + * Not finishing processing all messages is OK (and expected) at this stage + * (they may be preserved later, depending * on bSaveOnShutdown setting). + * rgerhards, 2009-10-14 */ static rsRetVal tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) @@ -1210,20 +1062,15 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) rsRetVal iRetLocal; DEFiRet; +RUNLOG_STR("trying to shutdown workers within Action Timeout"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ - /* note that we modify bEnqOnly directly, because going through the method would - * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 - */ pThis->bEnqOnly = 1; - /* need to set this so that the DA queue begins shutdown in parallel! */ - if(pThis->pqDA != NULL) { - pThis->pqDA->bEnqOnly = 1; - wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); - } + pThis->bShutdownImmediate = 1; +// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance... /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ timeoutComp(&tTimeout, pThis->toActShutdown); DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n"); @@ -1247,17 +1094,14 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } - /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This - * is necessary because we may be in a situation where the DA queue regular worker and the - * main queue worker stopped rather quickly. In this case, there is almost no time (and - * probably no thread switch!) between the point where we instructed the main queue DA - * worker to shutdown and this code location. In consequence, it may not even have - * noticed that it should should down, less acutally done this. So we provide it with a - * fixed 100ms timeout to try complete its work, what usually should be sufficient. - * rgerhards, 2009-10-06 + + /* and now we need to terminate the DA worker itself. We always grant it a 100ms timeout, + * which should be sufficient and usually not be required (it is expected to have finished + * long before while we were processing the queue timeout in shutdown phase 1). + * rgerhards, 2009-10-14 */ timeoutComp(&tTimeout, 100); - DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n"); + DBGOPRINT((obj_t*) pThis, "trying regular shutdown of main queue DA worker pool\n"); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool " @@ -1272,7 +1116,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) /* This function cancels all remaining regular workers for both the main and the DA - * queue. The main queue's DA worker pool continues to run (if it exists and is active). + * queue. * rgerhards, 2009-05-29 */ static rsRetVal @@ -1306,7 +1150,7 @@ cancelWorkers(qqueue_t *pThis) * big trouble when resetting the logical dequeue pointer. This operation can only be * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 */ - DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n"); + DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n"); iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */ } @@ -1341,14 +1185,8 @@ ShutdownWorkers(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); - /* we reduce the low water mark in any case. This is not absolutely necessary, but - * it is useful because we enable DA mode at several spots below and so we do not need - * to think about the low water mark each time. - */ - pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */ - pThis->iLowWtrMrk = 0; - CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); +dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis)); if(getPhysicalQueueSize(pThis) > 0) { CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis)); @@ -1375,7 +1213,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) { DEFiRet; qqueue_t *pThis; @@ -1384,9 +1222,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread ASSERT(pConsumer != NULL); ASSERT(iWorkerThreads >= 0); - if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + CHKmalloc(pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))); /* we have an object, so let's fill the properties */ objConstructSetObjInfo(pThis); @@ -1397,7 +1233,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */ pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */ - pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir); + pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; pThis->nLogDeq = 0; @@ -1786,7 +1622,7 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("YYY: deqeueu for consumer"); +dbgprintf("YYY: dequeue for consumer\n"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) @@ -1835,7 +1671,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1880,11 +1716,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) d_pthread_mutex_unlock(pThis->mut); /* iterate over returned results and enqueue them in DA queue */ + //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ +dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); } @@ -1913,6 +1751,7 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; +#if 0 } else { if(pThis->bRunsDA) { ASSERT(pThis->pqDA != NULL); @@ -1925,13 +1764,15 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; RUNLOG_STR("XXX: re-start reg worker"); -qqueueAdviseMaxWorkers(pThis); +if(!pThis->bShutdownImmediate) + qqueueAdviseMaxWorkers(pThis); RUNLOG_STR("XXX: done re-start reg worker"); } } else { // experimental iRet = RS_RET_TERMINATE_NOW; ; } +#endif } RETiRet; @@ -1968,60 +1809,12 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal) DEFiRet; assert(pVal != NULL); *pVal = pThis->iDeqBatchSize; -if(pThis->pqParent != NULL) +if(pThis->pqParent != NULL) // TODO: check why we actually do this! *pVal = 16; RETiRet; } -/* must only be called when the queue mutex is locked, else results - * are not stable! DA worker version (pThis *is* the *main* queue, not DA!) - */ -static int -qqueueIsIdleDA(qqueue_t *pThis) -{ - return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk); -} -/* must only be called when the queue mutex is locked, else results - * are not stable! Regular worker version. - */ -static int -IsIdleReg(qqueue_t *pThis) -{ - return(getPhysicalQueueSize(pThis) == 0); -} - - -/* This function is called when a worker thread for the regular queue is shut down. - * If we are the primary queue, this is not really interesting to us. If, however, - * we are the DA (child) queue, that means the DA queue is empty. In that case, we - * need to signal the parent queue's DA worker, so that it can terminate DA mode. - * rgerhards, 2008-01-26 - * rgerhards, 2008-02-27: HOWEVER, in a shutdown condition, it may be that the parent's worker thread pool - * has already been terminated and destructed. This *is* a legal condition and happens - * from time to time in practice. So we need to signal only if there still is a - * parent DA worker queue. Please keep in mind that the the parent's DA worker - * pool is DIFFERENT from our (DA queue) regular worker pool. So when the parent's - * pWtpDA is destructed, there can still be some of our (DAq/wtp) threads be running. - * I am telling this, because I, too, always get confused by those... - */ -static rsRetVal -RegOnWrkrShutdown(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - if(pThis->pqParent != NULL) { - if(pThis->pqParent->pWtpDA != NULL) { /* see comment in function header from 2008-02-27 */ - wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ - } - } - - RETiRet; -} - - /* start up the queue - it must have been constructed and parameters defined * before. */ @@ -2029,8 +1822,6 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; - rsRetVal iRetLocal; - int bInitialized = 0; /* is queue already initialized? */ uchar pszBuf[64]; size_t lenBuf; @@ -2072,8 +1863,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ - /* create worker thread pools for regular operation. The DA pool is created on an as-needed - * basis, which potentially means never under most circumstances. + /* create worker thread pools for regular and DA operation. */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); @@ -2081,10 +1871,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -2092,27 +1880,11 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); CHKiRet(wtpConstructFinalize (pThis->pWtpReg)); - /* initialize worker thread instances */ - if(pThis->bIsDA) { - /* If we are disk-assisted, we need to check if there is a QIF file - * which we need to load. -- rgerhards, 2008-01-15 - */ - iRetLocal = qqueueHaveQIF(pThis); - if(iRetLocal == RS_RET_OK) { - DBGOPRINT((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); - InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ - bInitialized = 1; /* we are done */ - } else { - /* TODO: use logerror? -- rgerhards, 2008-01-16 */ - DBGOPRINT((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. " - "Some data may be lost\n", iRetLocal); - } - } + /* set up DA system if we have a disk-assisted queue */ + if(pThis->bIsDA) + InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ - if(Debug && !bInitialized) { - DBGOPRINT((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA " - "queue itself!)\n"); - } + DBGOPRINT((obj_t*) pThis, "queue finished initialization\n"); /* if the queue already contains data, we need to start the correct number of worker threads. This can be * the case when a disk queue has been loaded. If we did not start it here, it would never start. @@ -2255,10 +2027,16 @@ DoSaveOnShutdown(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); - InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ -dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); - /* make sure we do not timeout before we are done */ - DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); + /* we reduce the low water mark, otherwise the DA worker would terminate when + * it is reached. + */ + DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown set, restarting DA worker...\n"); + pThis->bShutdownImmediate = 0; /* would termiante the DA worker! */ + pThis->iLowWtrMrk = 0; + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN); /* shutdown worker (only) when done (was _IMMEDIATE!) */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* restart DA worker */ + + DBGOPRINT((obj_t*) pThis, "waiting for DA worker to terminate...\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); @@ -2276,8 +2054,6 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g /* destructor for the queue object */ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(qqueue) - pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) @@ -2415,10 +2191,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); - /* then check if we need to add an assistance disk queue */ - if(pThis->bIsDA) - CHKiRet(ChkStrtDA(pThis)); - /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. * Basic flow control has always been implemented and protects the queue structures @@ -2462,6 +2234,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); timeoutComp(&t, pThis->toEnq); +// TODO : handle enqOnly => discard! if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); objDestruct(pUsr); @@ -2500,7 +2273,6 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) } for(i = 0 ; i < pMultiSub->nElem ; ++i) { -dbgprintf("queueMultiEnq: %d\n", i); CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); } @@ -2555,58 +2327,6 @@ dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); } -/* set queue mode to enqueue only or not - * There is one subtle issue: this method may be called during queue - * construction or while it is running. In the former case, the queue - * mutex does not yet exist (it is NULL), while in the later case it - * must be locked. The function detects the state and operates as - * required. - * rgerhards, 2008-01-16 - */ -static rsRetVal -SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, qqueue); - - /* for simplicity, we do one big mutex lock. This method is extremely seldom - * called, so that doesn't matter... -- rgerhards, 2008-01-16 - */ - if(pThis->mut != NULL) { - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); - } - - if(bEnqOnly == pThis->bEnqOnly) - FINALIZE; /* no change, nothing to do */ - - if(pThis->bQueueStarted) { - /* we need to adjust queue operation only if we are not during initial param setup */ - if(bEnqOnly == 1) { - /* switch to enqueue-only mode */ - /* this means we need to terminate all workers - that's it... */ - DBGOPRINT((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); - if(pThis->pWtpReg != NULL) - wtpWakeupAllWrkr(pThis->pWtpReg); - if(pThis->pWtpDA != NULL) - wtpWakeupAllWrkr(pThis->pWtpDA); - } else { - /* switch back to regular mode */ - ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ - } - } - - pThis->bEnqOnly = bEnqOnly; - -finalize_it: - if(pThis->mut != NULL) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - } - RETiRet; -} - - /* some simple object access methods */ DEFpropSetMeth(qqueue, bSyncQueueFiles, int) DEFpropSetMeth(qqueue, iPersistUpdCnt, int) diff --git a/runtime/queue.h b/runtime/queue.h index 73c62b52..74bf2d31 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -59,10 +59,10 @@ typedef struct queue_s { BEGINobjInstance; queueType_t qType; int nLogDeq; /* number of elements currently logically dequeued */ + int bShutdownImmediate; /* should all workers cease processing messages? */ bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ - bool bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */ int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ @@ -101,10 +101,11 @@ typedef struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 - * is pointer to an array of message message pointers) + * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero + * during normal operations and one if the consumer must urgently shut down. */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -185,7 +186,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); diff --git a/runtime/wti.c b/runtime/wti.c index 53b695b0..24988cbe 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -189,6 +189,7 @@ finalize_it: * the cancel cleanup handler (and have been cancelled). * rgerhards, 2008-01-16 */ +// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14 static void wtiWorkerCancelCleanup(void *arg) { @@ -202,9 +203,6 @@ wtiWorkerCancelCleanup(void *arg) DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); - /* call user supplied handler */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); - ENDfunc } @@ -222,8 +220,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) BEGINfunc DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); - pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - if(pThis->bAlwaysRunning) { /* never shut down any started worker */ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); @@ -235,6 +231,7 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } + dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n"); ENDfunc } @@ -249,7 +246,6 @@ wtiWorker(wti_t *pThis) int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; - int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -259,20 +255,17 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - pWtp->pfOnWorkerStartup(pWtp->pUsr); - /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); } -dbgprintf("YYY/ZZZ: pre lock mutex\n"); d_pthread_mutex_lock(pWtp->pmutUsr); -dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); +RUNLOG_VAR("%d", terminateRet); if(terminateRet == RS_RET_TERMINATE_NOW) { /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); @@ -288,7 +281,6 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); -dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); @@ -305,12 +297,7 @@ dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); } /* indicate termination */ - d_pthread_mutex_lock(pWtp->pmutUsr); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_cleanup_pop(0); /* remove cleanup handler */ - pWtp->pfOnWorkerShutdown(pWtp->pUsr); - pthread_setcancelstate(iCancelStateSave, NULL); - d_pthread_mutex_unlock(pWtp->pmutUsr); RETiRet; } diff --git a/runtime/wtp.c b/runtime/wtp.c index 40d031dc..3e76bb56 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -94,13 +94,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; pThis->pfGetDeqBatchSize = NotImplementedDummy; - pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfObjProcessed = NotImplementedDummy; - pThis->pfOnIdle = NotImplementedDummy; - pThis->pfOnWorkerCancel = NotImplementedDummy; - pThis->pfOnWorkerStartup = NotImplementedDummy; - pThis->pfOnWorkerShutdown = NotImplementedDummy; ENDobjConstruct(wtp) @@ -160,22 +155,6 @@ CODESTARTobjDestruct(wtp) ENDobjDestruct(wtp) -/* wake up all worker threads. - * rgerhards, 2008-01-16 - */ -rsRetVal -wtpWakeupAllWrkr(wtp_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wtp); - d_pthread_mutex_lock(pThis->pmutUsr); - pthread_cond_broadcast(pThis->pcondBusy); - d_pthread_mutex_unlock(pThis->pmutUsr); - RETiRet; -} - - /* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21 * We do not need to do atomic instructions as set operations are only * called when terminating the pool, and then in strict sequence. So we @@ -211,8 +190,10 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex) wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState); if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) { +RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_NOW"); ABORT_FINALIZE(RS_RET_TERMINATE_NOW); } else if(wtpState == wtpState_SHUTDOWN) { +RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_WHEN_IDLE"); ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); } @@ -241,8 +222,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout ISOBJ_TYPE_assert(pThis, wtp); + /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */ + d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); - wtpWakeupAllWrkr(pThis); + pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ + d_pthread_mutex_unlock(pThis->pmutUsr); /* wait for worker thread termination */ d_pthread_mutex_lock(&pThis->mutWtp); @@ -430,7 +414,7 @@ dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, p CHKiRet(wtpStartWrkr(pThis)); } } else { -dbgprintf("YYY: adivse signal cond busy"); +dbgprintf("YYY: wtpAdviseMaxWorkers, sufficient workers, just doing adivse signal cond busy\n"); pthread_cond_signal(pThis->pcondBusy); } @@ -450,13 +434,8 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) -DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*)) DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) -DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) -DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) -DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)) /* set the debug header message diff --git a/runtime/wtp.h b/runtime/wtp.h index 0505b91c..05c02a8c 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -62,12 +62,7 @@ struct wtp_s { rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */ rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */ rsRetVal (*pfRateLimiter)(void *pUsr); - rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp); rsRetVal (*pfDoWork)(void *pUsr, void *pWti); - rsRetVal (*pfOnIdle)(void *pUsr, int); - rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti); - rsRetVal (*pfOnWorkerStartup)(void *pUsr); - rsRetVal (*pfOnWorkerShutdown)(void *pUsr); /* end user objects */ uchar *pszDbgHdr; /* header string for debug messages */ }; @@ -91,13 +86,8 @@ PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)); -PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)); PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*)); PROTOTYPEpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)); -PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); -PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*)); -PROTOTYPEpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)); -PROTOTYPEpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)); PROTOTYPEpropSetMeth(wtp, toWrkShutdown, long); PROTOTYPEpropSetMeth(wtp, wtpState, wtpState_t); PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int); diff --git a/tests/arrayqueue.sh b/tests/arrayqueue.sh index 58fd24ae..baf303f3 100755 --- a/tests/arrayqueue.sh +++ b/tests/arrayqueue.sh @@ -1,7 +1,7 @@ # Test for fixedArray queue mode # added 2009-05-20 by rgerhards # This file is part of the rsyslog project, released under GPLv3 -echo testing queue fixedArray queue mode +echo \[arrayqueue.sh\]: testing queue fixedArray queue mode source $srcdir/diag.sh init source $srcdir/diag.sh startup arrayqueue.conf @@ -13,5 +13,5 @@ kill `cat rsyslog.pid` # now wait until rsyslog.pid is gone (and the process finished) source $srcdir/diag.sh wait-shutdown -source $srcdir/diag.sh seq-check 0 39999 +source $srcdir/diag.sh seq-check 39999 source $srcdir/diag.sh exit diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh index d95991fc..f5937541 100755 --- a/tests/daqueue-persist-drvr.sh +++ b/tests/daqueue-persist-drvr.sh @@ -12,6 +12,9 @@ source $srcdir/diag.sh init echo \$MainMsgQueueType $1 > work-queuemode.conf echo "*.* :omtesting:sleep 0 1000" > work-delay.conf +#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction" +#export RSYSLOG_DEBUGLOG="log0" + # inject 10000 msgs, so that DO hit the high watermark source $srcdir/diag.sh startup queue-persist.conf source $srcdir/diag.sh injectmsg 0 10000 @@ -19,13 +22,19 @@ $srcdir/diag.sh shutdown-immediate $srcdir/diag.sh wait-shutdown source $srcdir/diag.sh check-mainq-spool +echo "Enter phase 2, rsyslogd restart" + #exit +#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction" +#export RSYSLOG_DEBUGLOG="log" +#valgrind="valgrind --tool=helgrind --log-fd=1" + # restart engine and have rest processed #remove delay echo "#" > work-delay.conf source $srcdir/diag.sh startup queue-persist.conf source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages $srcdir/diag.sh wait-shutdown -source $srcdir/diag.sh seq-check 0 4999 +source $srcdir/diag.sh seq-check 0 99999 source $srcdir/diag.sh exit diff --git a/tests/inputname.sh b/tests/inputname.sh index e1a58517..71f11c1e 100755 --- a/tests/inputname.sh +++ b/tests/inputname.sh @@ -1,4 +1,4 @@ -echo testing $InputTCPServerInputName directive +echo \[inputname.sh\]: testing $InputTCPServerInputName directive $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason echo port 12514 diff --git a/tests/queue-persist-drvr.sh b/tests/queue-persist-drvr.sh index ea5386a7..53fbcb8b 100755 --- a/tests/queue-persist-drvr.sh +++ b/tests/queue-persist-drvr.sh @@ -24,5 +24,6 @@ source $srcdir/diag.sh check-mainq-spool echo "#" > work-delay.conf source $srcdir/diag.sh startup queue-persist.conf source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +$srcdir/diag.sh wait-shutdown source $srcdir/diag.sh seq-check 0 4999 source $srcdir/diag.sh exit diff --git a/tools/syslogd.c b/tools/syslogd.c index 99ccd1f1..1c494dea 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -158,6 +158,7 @@ DEFobjCurrIf(net) /* TODO: make go away! */ /* forward definitions */ static rsRetVal GlobalClassExit(void); +static void logmsg(msg_t *pMsg, int flags); #ifndef _PATH_LOGCONF @@ -400,6 +401,8 @@ static int usage(void) */ /* return back the approximate current number of messages in the main message queue + * This number includes the messages that reside in an associated DA queue (if + * it exists) -- rgerhards, 2009-10-14 */ rsRetVal diagGetMainMsgQSize(int *piSize) @@ -585,6 +588,7 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags) MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp()); MsgSetRcvFromIP(pMsg, pLocalHostIP); + MsgSetMSGoffs(pMsg, 0); /* check if we have an error code associated and, if so, * adjust the tag. -- rgerhards, 2008-06-27 */ @@ -632,7 +636,7 @@ finalize_it: * for the main queue. */ static rsRetVal -msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch) +msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate) { int i; msg_t *pMsg; @@ -640,7 +644,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch) assert(pBatch != NULL); - for(i = 0 ; i < pBatch->nElem ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem); if((pMsg->msgFlags & NEEDS_PARSING) != 0) { @@ -1080,7 +1084,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub) * potential for misinterpretation, which we simply can not solve under the * circumstances given. */ -void +static void logmsg(msg_t *pMsg, int flags) { char *msg; diff --git a/tools/syslogd.h b/tools/syslogd.h index 3dfdbe2b..c3b99f9d 100644 --- a/tools/syslogd.h +++ b/tools/syslogd.h @@ -32,7 +32,6 @@ /* the following prototypes should go away once we have an input * module interface -- rgerhards, 2007-12-12 */ -void logmsg(msg_t *pMsg, int flags); extern int NoHops; extern int send_to_all; extern int Debug; |