summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/omtesting/omtesting.c1
-rw-r--r--queue.c93
-rw-r--r--queue.h3
-rw-r--r--syslogd.c57
4 files changed, 91 insertions, 63 deletions
diff --git a/plugins/omtesting/omtesting.c b/plugins/omtesting/omtesting.c
index 8fec66ac..3781eb76 100644
--- a/plugins/omtesting/omtesting.c
+++ b/plugins/omtesting/omtesting.c
@@ -97,6 +97,7 @@ CODESTARTdoAction
tvSelectTimeout.tv_sec = pData->iWaitSeconds;
tvSelectTimeout.tv_usec = pData->iWaitUSeconds; /* milli seconds */
select(0, NULL, NULL, NULL, &tvSelectTimeout);
+ //dbgprintf(":omtesting: end doAction(), iRet %d\n", iRet);
ENDdoAction
diff --git a/queue.c b/queue.c
index c7602b26..bd8f9223 100644
--- a/queue.c
+++ b/queue.c
@@ -336,39 +336,39 @@ queueDel(queue_t *pThis, void *pUsr)
}
-
-/* Worker thread management function carried out each time
- * the main worker is awoken.
- */
-static rsRetVal queueManageWorkers(queue_t *pThis)
-{
- DEFiRet;
-
- return iRet;
-}
-
-
/* Worker thread management function carried out when the main
* worker is about to terminate.
*/
-static rsRetVal queueManageWorkersOnShutdown(queue_t *pThis)
+static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
int i;
+ qWrkCmd_t tShutdownCmd;
- /* ask all other workers to terminate */
- for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i)
- pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_SHUTDOWN;
+pThis->bImmediateShutdown = 1; /*testing */
+ assert(pThis != NULL);
- /* awake those that sleep */
+ /* select shutdown mode */
+ tShutdownCmd = (pThis->bImmediateShutdown) ? eWRKTHRDCMD_SHUTDOWN_IMMEDIATE : eWRKTHRDCMD_SHUTDOWN;
+
+ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence, mode %d...\n",
+ (unsigned long) pThis, (int) tShutdownCmd);
+
+ /* tell all workers to terminate */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+
+ /* awake them... */
pthread_cond_broadcast(pThis->notEmpty);
/* and wait for their termination */
- for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) {
-dbgprintf("WrkShutdown joining thread %d\n", i);
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
}
+ dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
+ (unsigned long) pThis, pThis->iQueueSize);
+
return iRet;
}
@@ -398,7 +398,6 @@ queueWorker(void *arg)
void *pUsr;
sigset_t sigSet;
int iMyThrdIndx; /* index for this thread in queue thread table */
- int bIsAdmin; /* does this thread have admin chores? */
assert(pThis != NULL);
@@ -411,14 +410,12 @@ queueWorker(void *arg)
break;
assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
- bIsAdmin = (iMyThrdIndx == 0 && pThis->iNumWorkerThreads > 1) ? 1 : 0;
- dbgprintf("Queue 0x%lx/w%d: worker thread startup (isAdmin=%d).\n", (unsigned long) pThis, iMyThrdIndx, bIsAdmin);
+ dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx);
/* now we have our identity, on to real processing */
- while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || !pThis->iQueueSize == 0) {
- if(bIsAdmin) /* main worker must do special chores */
- queueManageWorkers(pThis);
-
+ while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
+ || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
+dbgprintf("worker %d runs, cmd %d\n", iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
pthread_mutex_lock(pThis->mut);
while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
@@ -441,6 +438,8 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
(unsigned long) pThis, iMyThrdIndx);
iRetLocal = pThis->pConsumer(pUsr);
+ dbgprintf("Queue 0x%lx/w%d: worker: consumer returnd %d\n",
+ (unsigned long) pThis, iMyThrdIndx, iRetLocal);
if(iRetLocal != RS_RET_OK)
dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
(unsigned long) pThis, iMyThrdIndx, iRetLocal);
@@ -451,16 +450,29 @@ queueWorker(void *arg)
} else { /* the mutex must be unlocked in any case (important for termination) */
pthread_mutex_unlock(pThis->mut);
}
+
+ /* We now yield to give the other threads a chance to obtain the mutex. If we do not
+ * do that, this thread may very well aquire the mutex again before another thread
+ * has even a chance to run. The reason is that mutex operations are free to be
+ * implemented in the quickest possible way (and they typically are!). That is, the
+ * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily
+ * schedule other threads waiting on the same mutex. That can lead to the same thread
+ * aquiring the mutex ever and ever again while all others are starving for it. We
+ * have exactly seen this behaviour when we deliberately introduced a long-running
+ * test action which basically did a sleep. I understand that with real actions the
+ * likelihood of this starvation condition is very low - but it could still happen
+ * and would be very hard to debug. The yield() is a sure fix, its performance overhead
+ * should be well accepted given the above facts. -- rgerhards, 2008-01-10
+ */
+ pthread_yield();
- if(Debug && !(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) && pThis->iQueueSize > 0)
+ if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0)
dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
- "messages to process.\n", (unsigned long) pThis, iMyThrdIndx);
+ " %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
}
- if(bIsAdmin) /* main worker must do special chores */
- queueManageWorkersOnShutdown(pThis);
-
- dbgprintf("Queue 0x%lx/w%d: worker thread terminates.\n", (unsigned long) pThis, iMyThrdIndx);
+ dbgprintf("Queue 0x%lx/w%d: worker thread terminates with %d entries left in queue.\n",
+ (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */
pthread_exit(0);
}
@@ -569,7 +581,6 @@ rsRetVal queueStart(queue_t *pThis)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
/* fire up the worker threads */
- pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN;
iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
@@ -589,21 +600,11 @@ rsRetVal queueDestruct(queue_t *pThis)
assert(pThis != NULL);
+ /* first, terminate worker threads */
if(pThis->pWrkThrds != NULL) {
- /* first stop the worker thread */
- dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
- pThis->bDoRun = 0;
-pThis->bImmediateShutdown = 1; /*testing */
- /* request all threads to terminate */
- /* We instruct worker 0 to shutdown, which in turn will terminate all other
- * threads (if any exist) -- rgerhards, 2008-01-10
- */
- pThis->pWrkThrds[0].tCurrCmd = eWRKTHRDCMD_SHUTDOWN;
- /* we must broadcast, because we can not specifically awake worker 0 */
- pthread_cond_broadcast(pThis->notEmpty);
- pthread_join(pThis->pWrkThrds[0].thrdID, NULL);
+ queueShutdownWorkers(pThis);
free(pThis->pWrkThrds);
- dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis);
+ pThis->pWrkThrds = NULL;
}
/* ... then free resources */
diff --git a/queue.h b/queue.h
index b4dd1000..07621972 100644
--- a/queue.h
+++ b/queue.h
@@ -69,7 +69,7 @@ typedef enum {
typedef struct qWrkThrd_s {
pthread_t thrdID; /* thread ID */
- qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
+ volatile qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
} qWrkThrd_t; /* type for queue worker threads */
/* the queue object */
@@ -79,7 +79,6 @@ typedef struct queue_s {
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */
- int bDoRun; /* 1 - run queue, 0 - shutdown of queue requested */
int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */
rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */
/* type-specific handlers (set during construction) */
diff --git a/syslogd.c b/syslogd.c
index e1fcb056..9c39f459 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -552,7 +552,7 @@ static void debug_switch();
static rsRetVal cfline(uchar *line, selector_t **pfCurr);
static int decode(uchar *name, struct code *codetab);
static void sighup_handler();
-static void die(int sig);
+//static void die(int sig);
static void freeSelectors(void);
static rsRetVal processConfFile(uchar *pConfFile);
static rsRetVal selectorAddList(selector_t *f);
@@ -2594,8 +2594,26 @@ die(int sig)
{
char buf[256];
+ dbgprintf("exiting on signal %d\n", sig);
+
+ /* IMPORTANT: we should close the inputs first, and THEN send our termination
+ * message. If we do it the other way around, logmsgInternal() may block on
+ * a full queue and the inputs still fill up that queue. Depending on the
+ * scheduling order, we may end up with logmsgInternal being held for a quite
+ * long time. When the inputs are terminated first, that should not happen
+ * because the queue is drained in parallel. The situation could only become
+ * an issue with extremely long running actions in a queue full environment.
+ * However, such actions are at least considered poorly written, if not
+ * outright wrong. So we do not care about this very remote problem.
+ * rgerhards, 2008-01-11
+ */
+
+ /* close the inputs */
+ dbgprintf("Terminating input threads...\n");
+ thrdTerminateAll(); /* TODO: inputs only, please */
+
+ /* and THEN send the termination log message (see long comment above) */
if (sig) {
- dbgprintf("exiting on signal %d\n", sig);
(void) snprintf(buf, sizeof(buf) / sizeof(char),
" [origin software=\"rsyslogd\" " "swVersion=\"" VERSION \
"\" x-pid=\"%d\"]" " exiting on signal %d.",
@@ -2604,16 +2622,14 @@ die(int sig)
logmsgInternal(LOG_SYSLOG|LOG_INFO, buf, ADDDATE);
}
- /* close the inputs */
- dbgprintf("Terminating input threads...\n");
- thrdTerminateAll(); /* TODO: inputs only, please */
-
- /* drain queue and stop worker thread */
+ /* drain queue (if configured so) and stop main queue worker thread pool */
dbgprintf("Terminating main queue...\n");
queueDestruct(pMsgQueue);
pMsgQueue = NULL;
- /* Free ressources and close connections */
+ /* Free ressources and close connections. This includes flushing any remaining
+ * repeated msgs.
+ */
dbgprintf("Terminating outputs...\n");
freeSelectors();
@@ -2650,7 +2666,6 @@ die(int sig)
*/
unregCfSysLineHdlrs();
-
/* clean up auxiliary data */
if(pModDir != NULL)
free(pModDir);
@@ -3270,7 +3285,7 @@ init(void)
char bufStartUpMsg[512];
struct sigaction sigAct;
- thrdTerminateAll(); /* stop all running threads - TODO: reconsider location! */
+ thrdTerminateAll(); /* stop all running input threads - TODO: reconsider location! */
/* initialize some static variables */
pDfltHostnameCmp = NULL;
@@ -3366,7 +3381,6 @@ init(void)
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
exit(1);
}
-dbgprintf("queue 1 \n");
/* ... set some properties ... */
# define setQPROP(func, directive, data) \
CHKiRet_Hdlr(func(pMsgQueue, data)) { \
@@ -3378,21 +3392,18 @@ dbgprintf("queue 1 \n");
}
setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
-dbgprintf("queue 2 \n");
setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName",
(pszMainMsgQFName == NULL ? (uchar*) "mainq" : pszMainMsgQFName));
# undef setQPROP
# undef setQPROPstr
-dbgprintf("try start queue \n");
/* ... and finally start the queue! */
CHKiRet_Hdlr(queueStart(pMsgQueue)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet);
exit(1);
}
-dbgprintf("queue running\n");
Initialized = 1;
bHaveMainQueue = (MainMsgQueType == QUEUETYPE_DIRECT) ? 0 : 1;
@@ -3405,7 +3416,6 @@ dbgprintf("queue running\n");
*/
startInputModules();
-dbgprintf("inputs running\n");
if(Debug) {
dbgPrintInitInfo();
}
@@ -4427,12 +4437,29 @@ mainloop(void)
tvSelectTimeout.tv_sec = TIMERINTVL;
tvSelectTimeout.tv_usec = 0;
select(1, NULL, NULL, NULL, &tvSelectTimeout);
+ if(bFinished)
+ break; /* exit as quickly as possible - see long comment below */
/* If we received a HUP signal, we call doFlushRptdMsgs() a bit early. This
* doesn't matter, because doFlushRptdMsgs() checks timestamps. What may happen,
* however, is that the too-early call may lead to a bit too-late output
* of "last message repeated n times" messages. But that is quite acceptable.
* rgerhards, 2007-12-21
+ * ... and just to explain, we flush here because that is exactly what the mainloop
+ * shall do - provide a periodic interval in which not-yet-flushed messages will
+ * be flushed. Be careful, there is a potential race condition: doFlushRptdMsgs()
+ * needs to aquire a lock on the action objects. If, however, long-running consumers
+ * cause the main queue worker threads to lock them for a long time, we may receive
+ * a starvation condition, resulting in the mainloop being held on lock for an extended
+ * period of time. That, in turn, could lead to unresponsiveness to termination
+ * requests. It is especially important that the bFinished flag is checked before
+ * doFlushRptdMsgs() is called (I know because I ran into that situation). I am
+ * not yet sure if the remaining probability window of a termination-related
+ * problem is large enough to justify changing the code - I would consider it
+ * extremely unlikely that the problem ever occurs in practice. Fixing it would
+ * require not only a lot of effort but would cost considerable performance. So
+ * for the time being, I think the remaining risk can be accepted.
+ * rgerhards, 2008-01-10
*/
doFlushRptdMsgs();