diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-27 14:46:23 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-27 14:46:23 +0000 |
commit | 7d8b1c293746d325db7f93d343a952e382da9ddd (patch) | |
tree | 11eb0c0bceb920fc7e89ecb1fe83bd89e46b9fd2 | |
parent | ea7fd874d7b294dacc909a0f8e9c51dcc639d879 (diff) | |
download | rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.tar.gz rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.tar.xz rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.zip |
fixed a bug when shutting down DA queue
-rw-r--r-- | queue.c | 33 | ||||
-rw-r--r-- | queue.h | 6 | ||||
-rw-r--r-- | syslogd.c | 23 | ||||
-rw-r--r-- | wti.c | 10 | ||||
-rw-r--r-- | wtp.c | 6 |
5 files changed, 50 insertions, 28 deletions
@@ -854,11 +854,11 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); - // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25 /* we reduce the low water mark in any case. This is not absolutely necessary, but * it is useful because we enable DA mode at several spots below and so we do not need * to think about the low water mark each time. */ + pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */ pThis->iLowWtrMrk = 0; /* first try to shutdown the queue within the regular shutdown period */ @@ -916,8 +916,13 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* when we reach this point, both queues are either empty or the regular queue shutdown timeout * has expired. Now we need to check if we are configured to not loose messages. If so, we need - * to persist the queue to disk (this is only possible if the queue is DA-enabled). + * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also + * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer + * is done. This is especially important as we otherwise may interfere with queue order while the + * DA consumer is running. -- rgerhards, 2008-01-27 */ + wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */ + // TODO: what about pure disk queues and bSaveOnShutdown? BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ @@ -936,8 +941,8 @@ RUNLOG; /* make sure we do not timeout before we are done */ dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis)); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); - /* and run the primary's queue worker to drain the queue */ - iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + /* and run the primary queue's DA worker to drain the queue */ + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal != RS_RET_OK) { dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, " "continuing, but results are unpredictable\n", @@ -949,13 +954,15 @@ RUNLOG; RUNLOG; /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we - * can now request immediate shutdown of any remaining workers. + * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set, + * the queue is now empty. If regular workers are still running, and try to pull the next message, + * they will automatically terminate as there no longer is any message left to process. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); - timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); + timeoutComp(&tTimeout, pThis->toActShutdown); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal != RS_RET_OK) { dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue " @@ -992,12 +999,12 @@ RUNLOG_VAR("%d", pThis->iQueueSize); /* ... and now the DA queue, if it exists (should always be after the primary one) */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ //TODO: use right mutex! - //old: if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { + // was used: if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) { if(pThis->pqDA != NULL) { RUNLOG_VAR("%p", pThis->pqDA->pWtpReg); RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd); } - if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) { + if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n", queueGetID(pThis)); @@ -1098,6 +1105,7 @@ finalize_it: * Params: * arg1 - user pointer (in this case a queue_t) * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!]) + * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists! * rgerhards, 2008-01-16 */ static rsRetVal @@ -1109,7 +1117,6 @@ queueConsumerCancelCleanup(void *arg1, void *arg2) obj_t *pUsr = (obj_t*) arg2; ISOBJ_TYPE_assert(pThis, queue); - ISOBJ_assert(pUsr); dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n", queueGetID(pThis)); @@ -1278,6 +1285,12 @@ queueChkStopWrkrDA(queue_t *pThis) bStopWrkr = 1; } else { if(pThis->bRunsDA) { +#if 0 +RUNLOG_VAR("%d", pThis->iQueueSize); +RUNLOG_VAR("%d", pThis->iHighWtrMrk); +if(pThis->pqDA != NULL) + RUNLOG_VAR("%d", pThis->pqDA->bQueueStarted); +#endif if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { @@ -1288,7 +1301,7 @@ queueChkStopWrkrDA(queue_t *pThis) } } -RUNLOG_VAR("%d", bStopWrkr); +//RUNLOG_VAR("%d", bStopWrkr); ENDfunc return bStopWrkr; } @@ -92,9 +92,9 @@ typedef struct queue_s { pthread_cond_t notFull, notEmpty; pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */ pthread_cond_t condThrdTrm;/* signalled when threads terminate */ // TODO: no longer used? - pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */ - pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */ - pthread_cond_t *condSignalOnEmpty2;/* another condition to be signalled on empty */ + //pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */ + //pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */ + //pthread_cond_t *condSignalOnEmpty2;/* another condition to be signalled on empty */ //int bSignalOnEmpty; /* signal caller when queue is empty via xxxSignalOnEmpty cond/mut, // 0 = do not, 1 = signal only condSignalOnEmpty, 2 = signal both condSig..*/ // TODO: no longer needed? @@ -418,6 +418,9 @@ static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates static int iMainMsgQtoQShutdown = 0; /* queue shutdown */ static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */ static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */ +static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ +static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ +static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ /* This structure represents the files that will have log @@ -529,6 +532,9 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iMainMsgQtoQShutdown = 0; iMainMsgQtoActShutdown = 1000; iMainMsgQtoEnq = 2000; + iMainMsgQtoWrkShutdown = 60000; + iMainMsgQWrkMinMsgs = 100; + bMainMsgQSaveOnShutdown = 1; MainMsgQueType = QUEUETYPE_FIXED_ARRAY; return RS_RET_OK; @@ -3149,6 +3155,14 @@ static void dbgPrintInitInfo(void) iMainMsgQtoQShutdown, iMainMsgQtoActShutdown, iMainMsgQtoEnq); dbgprintf("Main queue watermarks: high: %d, low: %d, discard: %d, discard-severity: %d\n", iMainMsgQHighWtrMark, iMainMsgQLowWtrMark, iMainMsgQDiscardMark, iMainMsgQDiscardSeverity); + /* TODO: add + static int iMainMsgQtoWrkShutdown = 60000; + static int iMainMsgQtoWrkMinMsgs = 100; + static int iMainMsgQbSaveOnShutdown = 1; + setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000); + setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); + setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); + */ dbgprintf("Work Directory: '%s'.\n", pszWorkDir); } @@ -3417,14 +3431,14 @@ init(void) setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); setQPROP(queueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); - setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000); // TODO: implement config directive! + setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", iMainMsgQtoWrkShutdown); setQPROP(queueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq); setQPROP(queueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark); setQPROP(queueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark); setQPROP(queueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark); setQPROP(queueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity); - setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); // TODO: implement config directive! - setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); // TODO: implement config directive! + setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs); + setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown); # undef setQPROP # undef setQPROPstr @@ -4566,11 +4580,14 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoQShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iMainMsgQtoActShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iMainMsgQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"dropmsgswithmaliciousdnsptrrecords", 0, eCmdHdlrBinary, NULL, &bDropMalPTRMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL)); @@ -197,17 +197,13 @@ rsRetVal wtiDestruct(wti_t **ppThis) wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */ d_pthread_mutex_lock(&pThis->mut); -RUNLOG_VAR("%d", pThis->tCurrCmd); if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n", wtiGetDbgHdr(pThis), pThis); /* let's hope the caller actually instructed it to shutdown... */ pthread_cond_wait(&pThis->condExitDone, &pThis->mut); -RUNLOG; wtiJoinThrd(pThis); -RUNLOG; } -RUNLOG; d_pthread_mutex_unlock(&pThis->mut); /* actual destruction */ @@ -267,13 +263,11 @@ wtiJoinThrd(wti_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, wti); - dbgprintf("wti: waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); + dbgprintf("waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); pthread_join(pThis->thrdID, NULL); -RUNLOG; wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */ -RUNLOG_VAR("%p", pThis->thrdID); pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis)); + dbgprintf("worker %s has stopped\n", wtiGetDbgHdr(pThis)); RETiRet; } @@ -350,11 +350,11 @@ wtpCancelAll(wtp_t *pThis) /* process any pending thread requests so that we know who actually is still running */ wtpProcessThrdChanges(pThis); -RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); +//RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { // TODO: mutex lock! -RUNLOG_VAR("%d", pThis->pWrkr[i]->tCurrCmd); +//RUNLOG_VAR("%d", pThis->pWrkr[i]->tCurrCmd); if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); pthread_cancel(pThis->pWrkr[i]->thrdID); @@ -483,8 +483,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); pThis->iCurNumWrkThrd++; -dbgPrintAllDebugInfo(); -RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); /* find free spot in thread table. If we find at least one worker that is in initialization, * we do NOT start a new one. Let's give the other one a chance, first. |