summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-27 17:38:20 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-27 17:38:20 +0000
commita67cd9683ec4a7091a512be705eef105f989410d (patch)
treef3b54a9fd131c2bb76922eabe923b01a1cb99364
parent4b44a34d71b8b9bfc2574adeff4e735a8e97c876 (diff)
downloadrsyslog-a67cd9683ec4a7091a512be705eef105f989410d.tar.gz
rsyslog-a67cd9683ec4a7091a512be705eef105f989410d.tar.xz
rsyslog-a67cd9683ec4a7091a512be705eef105f989410d.zip
fixed queue termination in case bSaveOnShutdown is 0
-rw-r--r--debug.c4
-rw-r--r--queue.c52
-rwxr-xr-xsrUtils.c11
-rw-r--r--syslogd.c2
4 files changed, 51 insertions, 18 deletions
diff --git a/debug.c b/debug.c
index 474df03c..32bfab7c 100644
--- a/debug.c
+++ b/debug.c
@@ -726,8 +726,8 @@ dbgprintf(char *fmt, ...)
if(bWasNL) {
if(bPrintTime) {
clock_gettime(CLOCK_REALTIME, &t);
- fprintf(stddbg, "%4.4ld.%9.9ld:", t.tv_sec % 1000, t.tv_nsec);
- if(altdbg != NULL) fprintf(altdbg, "%4.4ld.%9.9ld:", t.tv_sec % 1000, t.tv_nsec);
+ fprintf(stddbg, "%4.4ld.%9.9ld:", t.tv_sec % 10000, t.tv_nsec);
+ if(altdbg != NULL) fprintf(altdbg, "%4.4ld.%9.9ld:", t.tv_sec % 10000, t.tv_nsec);
}
fprintf(stddbg, "%s: ", pszThrdName);
if(altdbg != NULL) fprintf(altdbg, "%s: ", pszThrdName);
diff --git a/queue.c b/queue.c
index e89b35b8..53b2b7b5 100644
--- a/queue.c
+++ b/queue.c
@@ -963,17 +963,33 @@ RUNLOG;
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
RUNLOG_VAR("%d", pThis->iQueueSize);
//old: if(pThis->iQueueSize > 0) {
- if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ if(pThis->iQueueSize > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
- iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and "
- "triggers cancellation)\n", queueGetID(pThis));
- } else if(iRetLocal != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue "
- "in disk save mode. Continuing, but results are unpredictable\n",
- queueGetID(pThis), iRetLocal);
+ if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and "
+ "triggers cancellation)\n", queueGetID(pThis));
+ } else if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue "
+ "in disk save mode. Continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
+ }
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ }
+ if(wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
+ /* and now the same for the DA queue */
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgprintf("Queue 0x%lx: immediate shutdown timed out on DA queue (this is acceptable and "
+ "triggers cancellation)\n", queueGetID(pThis));
+ } else if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the DA queue "
+ "in disk save mode. Continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
+ }
}
} else {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1370,9 +1386,16 @@ queueRegOnWrkrShutdown(queue_t *pThis)
if(pThis->pqParent != NULL) {
RUNLOG_VAR("%p", pThis->pqParent);
RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
- assert(pThis->pqParent->pWtpDA != NULL);
- pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
- wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
+ if(pThis->pqParent->pWtpDA == NULL) {
+ /* this can happen if we are not set to save on an eternal timeout. We
+ * log a warning but otherwise do nothing
+ */
+ dbgprintf("Queue 0x%lx: warning: pThis->pqParent->pWtpDA is NULL (this may be OK if the parent is not set to "
+ " bSaveOnShutdown\n", queueGetID(pThis));
+ } else {
+ pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
+ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
+ }
}
RETiRet;
@@ -1611,7 +1634,8 @@ rsRetVal queueDestruct(queue_t **ppThis)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
/* shut down all workers (handles *all* of the persistence logic) */
- if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */
+ //if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */
+ if(!pThis->bEnqOnly && pThis->pqParent == NULL) /* in enque-only mode, we have no worker pool! */
queueShutdownWorkers(pThis);
RUNLOG;
diff --git a/srUtils.c b/srUtils.c
index 7e585ad7..9724190a 100755
--- a/srUtils.c
+++ b/srUtils.c
@@ -344,8 +344,17 @@ timeoutVal(struct timespec *pt)
assert(pt != NULL);
/* compute timeout */
clock_gettime(CLOCK_REALTIME, &t);
- iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000;
+RUNLOG_VAR("%ld", pt->tv_sec);
+RUNLOG_VAR("%ld", t.tv_sec);
+RUNLOG_VAR("%ld", pt->tv_nsec);
+RUNLOG_VAR("%ld", t.tv_nsec);
+ iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000000;
+RUNLOG_VAR("%ld", iTimeout);
iTimeout += (pt->tv_sec - t.tv_sec) * 1000;
+RUNLOG_VAR("%ld", iTimeout);
+
+ if(iTimeout < 0)
+ iTimeout = 0;
return iTimeout;
}
diff --git a/syslogd.c b/syslogd.c
index a1595127..26c6fb84 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -4581,7 +4581,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iMainMsgQtoActShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iMainMsgQtoEnq, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoWrkShutdown, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));