summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-27 14:46:23 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-27 14:46:23 +0000
commit7d8b1c293746d325db7f93d343a952e382da9ddd (patch)
tree11eb0c0bceb920fc7e89ecb1fe83bd89e46b9fd2
parentea7fd874d7b294dacc909a0f8e9c51dcc639d879 (diff)
downloadrsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.tar.gz
rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.tar.xz
rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.zip
fixed a bug when shutting down DA queue
-rw-r--r--queue.c33
-rw-r--r--queue.h6
-rw-r--r--syslogd.c23
-rw-r--r--wti.c10
-rw-r--r--wtp.c6
5 files changed, 50 insertions, 28 deletions
diff --git a/queue.c b/queue.c
index ac891584..d26a6e24 100644
--- a/queue.c
+++ b/queue.c
@@ -854,11 +854,11 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
- // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25
/* we reduce the low water mark in any case. This is not absolutely necessary, but
* it is useful because we enable DA mode at several spots below and so we do not need
* to think about the low water mark each time.
*/
+ pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
pThis->iLowWtrMrk = 0;
/* first try to shutdown the queue within the regular shutdown period */
@@ -916,8 +916,13 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* when we reach this point, both queues are either empty or the regular queue shutdown timeout
* has expired. Now we need to check if we are configured to not loose messages. If so, we need
- * to persist the queue to disk (this is only possible if the queue is DA-enabled).
+ * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also
+ * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer
+ * is done. This is especially important as we otherwise may interfere with queue order while the
+ * DA consumer is running. -- rgerhards, 2008-01-27
*/
+ wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
+
// TODO: what about pure disk queues and bSaveOnShutdown?
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
@@ -936,8 +941,8 @@ RUNLOG;
/* make sure we do not timeout before we are done */
dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis));
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
- /* and run the primary's queue worker to drain the queue */
- iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
+ /* and run the primary queue's DA worker to drain the queue */
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
"continuing, but results are unpredictable\n",
@@ -949,13 +954,15 @@ RUNLOG;
RUNLOG;
/* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
- * can now request immediate shutdown of any remaining workers.
+ * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set,
+ * the queue is now empty. If regular workers are still running, and try to pull the next message,
+ * they will automatically terminate as there no longer is any message left to process.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ timeoutComp(&tTimeout, pThis->toActShutdown);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue "
@@ -992,12 +999,12 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
/* ... and now the DA queue, if it exists (should always be after the primary one) */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
//TODO: use right mutex!
- //old: if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
+ // was used: if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) {
if(pThis->pqDA != NULL) {
RUNLOG_VAR("%p", pThis->pqDA->pWtpReg);
RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd);
}
- if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) {
+ if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n",
queueGetID(pThis));
@@ -1098,6 +1105,7 @@ finalize_it:
* Params:
* arg1 - user pointer (in this case a queue_t)
* arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
+ * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists!
* rgerhards, 2008-01-16
*/
static rsRetVal
@@ -1109,7 +1117,6 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
obj_t *pUsr = (obj_t*) arg2;
ISOBJ_TYPE_assert(pThis, queue);
- ISOBJ_assert(pUsr);
dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n",
queueGetID(pThis));
@@ -1278,6 +1285,12 @@ queueChkStopWrkrDA(queue_t *pThis)
bStopWrkr = 1;
} else {
if(pThis->bRunsDA) {
+#if 0
+RUNLOG_VAR("%d", pThis->iQueueSize);
+RUNLOG_VAR("%d", pThis->iHighWtrMrk);
+if(pThis->pqDA != NULL)
+ RUNLOG_VAR("%d", pThis->pqDA->bQueueStarted);
+#endif
if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
@@ -1288,7 +1301,7 @@ queueChkStopWrkrDA(queue_t *pThis)
}
}
-RUNLOG_VAR("%d", bStopWrkr);
+//RUNLOG_VAR("%d", bStopWrkr);
ENDfunc
return bStopWrkr;
}
diff --git a/queue.h b/queue.h
index c9abced0..fd97596d 100644
--- a/queue.h
+++ b/queue.h
@@ -92,9 +92,9 @@ typedef struct queue_s {
pthread_cond_t notFull, notEmpty;
pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */
pthread_cond_t condThrdTrm;/* signalled when threads terminate */ // TODO: no longer used?
- pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */
- pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */
- pthread_cond_t *condSignalOnEmpty2;/* another condition to be signalled on empty */
+ //pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */
+ //pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */
+ //pthread_cond_t *condSignalOnEmpty2;/* another condition to be signalled on empty */
//int bSignalOnEmpty; /* signal caller when queue is empty via xxxSignalOnEmpty cond/mut,
// 0 = do not, 1 = signal only condSignalOnEmpty, 2 = signal both condSig..*/ // TODO: no longer needed?
diff --git a/syslogd.c b/syslogd.c
index 07abdc8b..a1595127 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -418,6 +418,9 @@ static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates
static int iMainMsgQtoQShutdown = 0; /* queue shutdown */
static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */
static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */
+static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
+static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
+static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
/* This structure represents the files that will have log
@@ -529,6 +532,9 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iMainMsgQtoQShutdown = 0;
iMainMsgQtoActShutdown = 1000;
iMainMsgQtoEnq = 2000;
+ iMainMsgQtoWrkShutdown = 60000;
+ iMainMsgQWrkMinMsgs = 100;
+ bMainMsgQSaveOnShutdown = 1;
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
return RS_RET_OK;
@@ -3149,6 +3155,14 @@ static void dbgPrintInitInfo(void)
iMainMsgQtoQShutdown, iMainMsgQtoActShutdown, iMainMsgQtoEnq);
dbgprintf("Main queue watermarks: high: %d, low: %d, discard: %d, discard-severity: %d\n",
iMainMsgQHighWtrMark, iMainMsgQLowWtrMark, iMainMsgQDiscardMark, iMainMsgQDiscardSeverity);
+ /* TODO: add
+ static int iMainMsgQtoWrkShutdown = 60000;
+ static int iMainMsgQtoWrkMinMsgs = 100;
+ static int iMainMsgQbSaveOnShutdown = 1;
+ setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000);
+ setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100);
+ setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1);
+ */
dbgprintf("Work Directory: '%s'.\n", pszWorkDir);
}
@@ -3417,14 +3431,14 @@ init(void)
setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
setQPROP(queueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown);
- setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000); // TODO: implement config directive!
+ setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", iMainMsgQtoWrkShutdown);
setQPROP(queueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq);
setQPROP(queueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark);
setQPROP(queueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark);
setQPROP(queueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark);
setQPROP(queueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity);
- setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); // TODO: implement config directive!
- setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); // TODO: implement config directive!
+ setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs);
+ setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown);
# undef setQPROP
# undef setQPROPstr
@@ -4566,11 +4580,14 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoQShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iMainMsgQtoActShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iMainMsgQtoEnq, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoWrkShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"dropmsgswithmaliciousdnsptrrecords", 0, eCmdHdlrBinary, NULL, &bDropMalPTRMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL));
diff --git a/wti.c b/wti.c
index 23ad2415..688f1267 100644
--- a/wti.c
+++ b/wti.c
@@ -197,17 +197,13 @@ rsRetVal wtiDestruct(wti_t **ppThis)
wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */
d_pthread_mutex_lock(&pThis->mut);
-RUNLOG_VAR("%d", pThis->tCurrCmd);
if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n",
wtiGetDbgHdr(pThis), pThis);
/* let's hope the caller actually instructed it to shutdown... */
pthread_cond_wait(&pThis->condExitDone, &pThis->mut);
-RUNLOG;
wtiJoinThrd(pThis);
-RUNLOG;
}
-RUNLOG;
d_pthread_mutex_unlock(&pThis->mut);
/* actual destruction */
@@ -267,13 +263,11 @@ wtiJoinThrd(wti_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
- dbgprintf("wti: waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
+ dbgprintf("waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
pthread_join(pThis->thrdID, NULL);
-RUNLOG;
wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */
-RUNLOG_VAR("%p", pThis->thrdID);
pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
- dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis));
+ dbgprintf("worker %s has stopped\n", wtiGetDbgHdr(pThis));
RETiRet;
}
diff --git a/wtp.c b/wtp.c
index 1e16de3b..3e5fb937 100644
--- a/wtp.c
+++ b/wtp.c
@@ -350,11 +350,11 @@ wtpCancelAll(wtp_t *pThis)
/* process any pending thread requests so that we know who actually is still running */
wtpProcessThrdChanges(pThis);
-RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
+//RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
// TODO: mutex lock!
-RUNLOG_VAR("%d", pThis->pWrkr[i]->tCurrCmd);
+//RUNLOG_VAR("%d", pThis->pWrkr[i]->tCurrCmd);
if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) {
dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
pthread_cancel(pThis->pWrkr[i]->thrdID);
@@ -483,8 +483,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
pThis->iCurNumWrkThrd++;
-dbgPrintAllDebugInfo();
-RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
/* find free spot in thread table. If we find at least one worker that is in initialization,
* we do NOT start a new one. Let's give the other one a chance, first.