diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-27 17:38:20 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-27 17:38:20 +0000 |
commit | a67cd9683ec4a7091a512be705eef105f989410d (patch) | |
tree | f3b54a9fd131c2bb76922eabe923b01a1cb99364 | |
parent | 4b44a34d71b8b9bfc2574adeff4e735a8e97c876 (diff) | |
download | rsyslog-a67cd9683ec4a7091a512be705eef105f989410d.tar.gz rsyslog-a67cd9683ec4a7091a512be705eef105f989410d.tar.xz rsyslog-a67cd9683ec4a7091a512be705eef105f989410d.zip |
fixed queue termination in case bSaveOnShutdown is 0
-rw-r--r-- | debug.c | 4 | ||||
-rw-r--r-- | queue.c | 52 | ||||
-rwxr-xr-x | srUtils.c | 11 | ||||
-rw-r--r-- | syslogd.c | 2 |
4 files changed, 51 insertions, 18 deletions
@@ -726,8 +726,8 @@ dbgprintf(char *fmt, ...) if(bWasNL) { if(bPrintTime) { clock_gettime(CLOCK_REALTIME, &t); - fprintf(stddbg, "%4.4ld.%9.9ld:", t.tv_sec % 1000, t.tv_nsec); - if(altdbg != NULL) fprintf(altdbg, "%4.4ld.%9.9ld:", t.tv_sec % 1000, t.tv_nsec); + fprintf(stddbg, "%4.4ld.%9.9ld:", t.tv_sec % 10000, t.tv_nsec); + if(altdbg != NULL) fprintf(altdbg, "%4.4ld.%9.9ld:", t.tv_sec % 10000, t.tv_nsec); } fprintf(stddbg, "%s: ", pszThrdName); if(altdbg != NULL) fprintf(altdbg, "%s: ", pszThrdName); @@ -963,17 +963,33 @@ RUNLOG; BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ RUNLOG_VAR("%d", pThis->iQueueSize); //old: if(pThis->iQueueSize > 0) { - if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + if(pThis->iQueueSize > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); - iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and " - "triggers cancellation)\n", queueGetID(pThis)); - } else if(iRetLocal != RS_RET_OK) { - dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue " - "in disk save mode. Continuing, but results are unpredictable\n", - queueGetID(pThis), iRetLocal); + if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and " + "triggers cancellation)\n", queueGetID(pThis)); + } else if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue " + "in disk save mode. Continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); + } + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + } + if(wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { + /* and now the same for the DA queue */ + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgprintf("Queue 0x%lx: immediate shutdown timed out on DA queue (this is acceptable and " + "triggers cancellation)\n", queueGetID(pThis)); + } else if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the DA queue " + "in disk save mode. Continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); + } } } else { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1370,9 +1386,16 @@ queueRegOnWrkrShutdown(queue_t *pThis) if(pThis->pqParent != NULL) { RUNLOG_VAR("%p", pThis->pqParent); RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); - assert(pThis->pqParent->pWtpDA != NULL); - pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ - wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ + if(pThis->pqParent->pWtpDA == NULL) { + /* this can happen if we are not set to save on an eternal timeout. We + * log a warning but otherwise do nothing + */ + dbgprintf("Queue 0x%lx: warning: pThis->pqParent->pWtpDA is NULL (this may be OK if the parent is not set to " + " bSaveOnShutdown\n", queueGetID(pThis)); + } else { + pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ + wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ + } } RETiRet; @@ -1611,7 +1634,8 @@ rsRetVal queueDestruct(queue_t **ppThis) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ /* shut down all workers (handles *all* of the persistence logic) */ - if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */ + //if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */ + if(!pThis->bEnqOnly && pThis->pqParent == NULL) /* in enque-only mode, we have no worker pool! */ queueShutdownWorkers(pThis); RUNLOG; @@ -344,8 +344,17 @@ timeoutVal(struct timespec *pt) assert(pt != NULL); /* compute timeout */ clock_gettime(CLOCK_REALTIME, &t); - iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000; +RUNLOG_VAR("%ld", pt->tv_sec); +RUNLOG_VAR("%ld", t.tv_sec); +RUNLOG_VAR("%ld", pt->tv_nsec); +RUNLOG_VAR("%ld", t.tv_nsec); + iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000000; +RUNLOG_VAR("%ld", iTimeout); iTimeout += (pt->tv_sec - t.tv_sec) * 1000; +RUNLOG_VAR("%ld", iTimeout); + + if(iTimeout < 0) + iTimeout = 0; return iTimeout; } @@ -4581,7 +4581,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iMainMsgQtoActShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iMainMsgQtoEnq, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoWrkShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL)); |