diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-26 17:30:51 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-26 17:30:51 +0000 |
commit | 439f19ad38f5dcc3282d450a2a15cea3238fc754 (patch) | |
tree | 1cc92b98f533a92de5fc97c528497e27b588714f | |
parent | 1fb59126b0f9facb88cbbdc0331035f7e9dd80a7 (diff) | |
download | rsyslog-439f19ad38f5dcc3282d450a2a15cea3238fc754.tar.gz rsyslog-439f19ad38f5dcc3282d450a2a15cea3238fc754.tar.xz rsyslog-439f19ad38f5dcc3282d450a2a15cea3238fc754.zip |
partially fixed bug that caused rsyslogd to stall processing enqueued
messages after turning off DA mode and before any new message were
arrived (if a new message arrived, everything went back to normal, so
it was a temporary halt)
-rw-r--r-- | queue.c | 62 | ||||
-rw-r--r-- | queue.h | 1 | ||||
-rw-r--r-- | syslogd.c | 1 |
3 files changed, 55 insertions, 9 deletions
@@ -67,6 +67,7 @@ static int queueChkStopWrkrDA(queue_t *pThis); static int queueIsIdleDA(queue_t *pThis); static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); +static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis); /* methods */ @@ -125,6 +126,13 @@ dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQue queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", queueGetID(pThis), iRet); + /* now we need to check if the regular queue has some messages. This may be the case + * when it is waiting that the high water mark is reached again. If so, we need to start up + * a regular worker. -- rgerhards, 2008-01-26 + */ + if(pThis->iQueueSize > 0) { + queueAdviseMaxWorkers(pThis); + } } RETiRet; @@ -1229,22 +1237,40 @@ dbgprintf("DAConsumer returns with iRet %d\n", iRet); /* must only be called when the queue mutex is locked, else results * are not stable! - * Version when running in DA mode. + * If we are a child, we have done our duty when the queue is empty. In that case, + * we can terminate. + * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from + * the DA queue */ static int queueChkStopWrkrDA(queue_t *pThis) { - return pThis->bEnqOnly || !pThis->bRunsDA; + /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate + * until we have done so. + */ + int bStopWrkr= + pThis->bEnqOnly + || !pThis->bRunsDA + || (pThis->pqDA != NULL && pThis->pqDA->iQueueSize == 0 && pThis->bQueueInDestruction); +RUNLOG_VAR("%d", bStopWrkr); + return pThis->bEnqOnly + || !pThis->bRunsDA + ; + // || (pThis->pqDA != NULL && pThis->pqDA->iQueueSize == 0 && pThis->bQueueInDestruction); } - /* must only be called when the queue mutex is locked, else results * are not stable! - * Version when running in non-DA mode. + * If we are a child, we have done our duty when the queue is empty. In that case, + * we can terminate. + * Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from + * the DA queue */ static int queueChkStopWrkrReg(queue_t *pThis) { - return pThis->bEnqOnly || pThis->bRunsDA; + int bStopWrkr = pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0); +RUNLOG_VAR("%d", bStopWrkr); + return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0); } @@ -1271,6 +1297,26 @@ queueIsIdleReg(queue_t *pThis) } +/* This function is called when a worker thread for the regular queue is shut down. + * If we are the primary queue, this is not really interesting to us. If, however, + * we are the DA (child) queue, that means the DA queue is empty. In that case, we + * need to signal the parent queue's DA worker, so that it can terminate DA mode. + * rgerhards, 2008-01-26 + */ +static rsRetVal +queueRegOnWrkrShutdown(queue_t *pThis) +{ + DEFiRet; + + if(pThis->pqParent != NULL) { + assert(pThis->pqParent->pWtpDA != NULL); + wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ + } + + RETiRet; +} + + /* start up the queue - it must have been constructed and parameters defined * before. */ @@ -1322,6 +1368,7 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, queueRegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -1475,8 +1522,6 @@ rsRetVal queueDestruct(queue_t **ppThis) pThis = *ppThis; ISOBJ_TYPE_assert(pThis, queue); -pThis->bSaveOnShutdown = 1; // TODO: Test remove - pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ /* shut down all workers (handles *all* of the persistence logic) */ @@ -1731,6 +1776,7 @@ DEFpropSetMeth(queue, iDiscardMrk, int); DEFpropSetMeth(queue, iDiscardSeverity, int); DEFpropSetMeth(queue, bIsDA, int); DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); +DEFpropSetMeth(queue, bSaveOnShutdown, int); /* This function can be used as a generic way to set properties. Only the subset @@ -1764,8 +1810,6 @@ finalize_it: BEGINObjClassInit(queue, 1) OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); //OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); -//fprintf(stdout, "queueChkStopWrkrReg: %p\n", queueChkStopWrkrReg); -//fprintf(stdout, "queueChkStopWrkrDA: %p\n", queueChkStopWrkrDA); ENDObjClassInit(queue) /* @@ -167,6 +167,7 @@ PROTOTYPEpropSetMeth(queue, iLowWtrMrk, int); PROTOTYPEpropSetMeth(queue, iDiscardMrk, int); PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int); PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int); +PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int); #define queueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ @@ -3424,6 +3424,7 @@ init(void) setQPROP(queueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark); setQPROP(queueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity); setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); // TODO: implement config directive! + setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); // TODO: implement config directive! # undef setQPROP # undef setQPROPstr |