diff options
-rw-r--r-- | plugins/omtesting/omtesting.c | 1 | ||||
-rw-r--r-- | queue.c | 93 | ||||
-rw-r--r-- | queue.h | 3 | ||||
-rw-r--r-- | syslogd.c | 57 |
4 files changed, 91 insertions, 63 deletions
diff --git a/plugins/omtesting/omtesting.c b/plugins/omtesting/omtesting.c index 8fec66ac..3781eb76 100644 --- a/plugins/omtesting/omtesting.c +++ b/plugins/omtesting/omtesting.c @@ -97,6 +97,7 @@ CODESTARTdoAction tvSelectTimeout.tv_sec = pData->iWaitSeconds; tvSelectTimeout.tv_usec = pData->iWaitUSeconds; /* milli seconds */ select(0, NULL, NULL, NULL, &tvSelectTimeout); + //dbgprintf(":omtesting: end doAction(), iRet %d\n", iRet); ENDdoAction @@ -336,39 +336,39 @@ queueDel(queue_t *pThis, void *pUsr) } - -/* Worker thread management function carried out each time - * the main worker is awoken. - */ -static rsRetVal queueManageWorkers(queue_t *pThis) -{ - DEFiRet; - - return iRet; -} - - /* Worker thread management function carried out when the main * worker is about to terminate. */ -static rsRetVal queueManageWorkersOnShutdown(queue_t *pThis) +static rsRetVal queueShutdownWorkers(queue_t *pThis) { DEFiRet; int i; + qWrkCmd_t tShutdownCmd; - /* ask all other workers to terminate */ - for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) - pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_SHUTDOWN; +pThis->bImmediateShutdown = 1; /*testing */ + assert(pThis != NULL); - /* awake those that sleep */ + /* select shutdown mode */ + tShutdownCmd = (pThis->bImmediateShutdown) ? eWRKTHRDCMD_SHUTDOWN_IMMEDIATE : eWRKTHRDCMD_SHUTDOWN; + + dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence, mode %d...\n", + (unsigned long) pThis, (int) tShutdownCmd); + + /* tell all workers to terminate */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) + pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd; + + /* awake them... */ pthread_cond_broadcast(pThis->notEmpty); /* and wait for their termination */ - for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) { -dbgprintf("WrkShutdown joining thread %d\n", i); + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { pthread_join(pThis->pWrkThrds[i].thrdID, NULL); } + dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", + (unsigned long) pThis, pThis->iQueueSize); + return iRet; } @@ -398,7 +398,6 @@ queueWorker(void *arg) void *pUsr; sigset_t sigSet; int iMyThrdIndx; /* index for this thread in queue thread table */ - int bIsAdmin; /* does this thread have admin chores? */ assert(pThis != NULL); @@ -411,14 +410,12 @@ queueWorker(void *arg) break; assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); - bIsAdmin = (iMyThrdIndx == 0 && pThis->iNumWorkerThreads > 1) ? 1 : 0; - dbgprintf("Queue 0x%lx/w%d: worker thread startup (isAdmin=%d).\n", (unsigned long) pThis, iMyThrdIndx, bIsAdmin); + dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx); /* now we have our identity, on to real processing */ - while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || !pThis->iQueueSize == 0) { - if(bIsAdmin) /* main worker must do special chores */ - queueManageWorkers(pThis); - + while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN + || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) { +dbgprintf("worker %d runs, cmd %d\n", iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); pthread_mutex_lock(pThis->mut); while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", @@ -441,6 +438,8 @@ queueWorker(void *arg) dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", (unsigned long) pThis, iMyThrdIndx); iRetLocal = pThis->pConsumer(pUsr); + dbgprintf("Queue 0x%lx/w%d: worker: consumer returnd %d\n", + (unsigned long) pThis, iMyThrdIndx, iRetLocal); if(iRetLocal != RS_RET_OK) dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", (unsigned long) pThis, iMyThrdIndx, iRetLocal); @@ -451,16 +450,29 @@ queueWorker(void *arg) } else { /* the mutex must be unlocked in any case (important for termination) */ pthread_mutex_unlock(pThis->mut); } + + /* We now yield to give the other threads a chance to obtain the mutex. If we do not + * do that, this thread may very well aquire the mutex again before another thread + * has even a chance to run. The reason is that mutex operations are free to be + * implemented in the quickest possible way (and they typically are!). That is, the + * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily + * schedule other threads waiting on the same mutex. That can lead to the same thread + * aquiring the mutex ever and ever again while all others are starving for it. We + * have exactly seen this behaviour when we deliberately introduced a long-running + * test action which basically did a sleep. I understand that with real actions the + * likelihood of this starvation condition is very low - but it could still happen + * and would be very hard to debug. The yield() is a sure fix, its performance overhead + * should be well accepted given the above facts. -- rgerhards, 2008-01-10 + */ + pthread_yield(); - if(Debug && !(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) && pThis->iQueueSize > 0) + if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " - "messages to process.\n", (unsigned long) pThis, iMyThrdIndx); + " %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize); } - if(bIsAdmin) /* main worker must do special chores */ - queueManageWorkersOnShutdown(pThis); - - dbgprintf("Queue 0x%lx/w%d: worker thread terminates.\n", (unsigned long) pThis, iMyThrdIndx); + dbgprintf("Queue 0x%lx/w%d: worker thread terminates with %d entries left in queue.\n", + (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize); pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */ pthread_exit(0); } @@ -569,7 +581,6 @@ rsRetVal queueStart(queue_t *pThis) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); /* fire up the worker threads */ - pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN; iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); @@ -589,21 +600,11 @@ rsRetVal queueDestruct(queue_t *pThis) assert(pThis != NULL); + /* first, terminate worker threads */ if(pThis->pWrkThrds != NULL) { - /* first stop the worker thread */ - dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); - pThis->bDoRun = 0; -pThis->bImmediateShutdown = 1; /*testing */ - /* request all threads to terminate */ - /* We instruct worker 0 to shutdown, which in turn will terminate all other - * threads (if any exist) -- rgerhards, 2008-01-10 - */ - pThis->pWrkThrds[0].tCurrCmd = eWRKTHRDCMD_SHUTDOWN; - /* we must broadcast, because we can not specifically awake worker 0 */ - pthread_cond_broadcast(pThis->notEmpty); - pthread_join(pThis->pWrkThrds[0].thrdID, NULL); + queueShutdownWorkers(pThis); free(pThis->pWrkThrds); - dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis); + pThis->pWrkThrds = NULL; } /* ... then free resources */ @@ -69,7 +69,7 @@ typedef enum { typedef struct qWrkThrd_s { pthread_t thrdID; /* thread ID */ - qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ + volatile qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ } qWrkThrd_t; /* type for queue worker threads */ /* the queue object */ @@ -79,7 +79,6 @@ typedef struct queue_s { int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */ - int bDoRun; /* 1 - run queue, 0 - shutdown of queue requested */ int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */ rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */ /* type-specific handlers (set during construction) */ @@ -552,7 +552,7 @@ static void debug_switch(); static rsRetVal cfline(uchar *line, selector_t **pfCurr); static int decode(uchar *name, struct code *codetab); static void sighup_handler(); -static void die(int sig); +//static void die(int sig); static void freeSelectors(void); static rsRetVal processConfFile(uchar *pConfFile); static rsRetVal selectorAddList(selector_t *f); @@ -2594,8 +2594,26 @@ die(int sig) { char buf[256]; + dbgprintf("exiting on signal %d\n", sig); + + /* IMPORTANT: we should close the inputs first, and THEN send our termination + * message. If we do it the other way around, logmsgInternal() may block on + * a full queue and the inputs still fill up that queue. Depending on the + * scheduling order, we may end up with logmsgInternal being held for a quite + * long time. When the inputs are terminated first, that should not happen + * because the queue is drained in parallel. The situation could only become + * an issue with extremely long running actions in a queue full environment. + * However, such actions are at least considered poorly written, if not + * outright wrong. So we do not care about this very remote problem. + * rgerhards, 2008-01-11 + */ + + /* close the inputs */ + dbgprintf("Terminating input threads...\n"); + thrdTerminateAll(); /* TODO: inputs only, please */ + + /* and THEN send the termination log message (see long comment above) */ if (sig) { - dbgprintf("exiting on signal %d\n", sig); (void) snprintf(buf, sizeof(buf) / sizeof(char), " [origin software=\"rsyslogd\" " "swVersion=\"" VERSION \ "\" x-pid=\"%d\"]" " exiting on signal %d.", @@ -2604,16 +2622,14 @@ die(int sig) logmsgInternal(LOG_SYSLOG|LOG_INFO, buf, ADDDATE); } - /* close the inputs */ - dbgprintf("Terminating input threads...\n"); - thrdTerminateAll(); /* TODO: inputs only, please */ - - /* drain queue and stop worker thread */ + /* drain queue (if configured so) and stop main queue worker thread pool */ dbgprintf("Terminating main queue...\n"); queueDestruct(pMsgQueue); pMsgQueue = NULL; - /* Free ressources and close connections */ + /* Free ressources and close connections. This includes flushing any remaining + * repeated msgs. + */ dbgprintf("Terminating outputs...\n"); freeSelectors(); @@ -2650,7 +2666,6 @@ die(int sig) */ unregCfSysLineHdlrs(); - /* clean up auxiliary data */ if(pModDir != NULL) free(pModDir); @@ -3270,7 +3285,7 @@ init(void) char bufStartUpMsg[512]; struct sigaction sigAct; - thrdTerminateAll(); /* stop all running threads - TODO: reconsider location! */ + thrdTerminateAll(); /* stop all running input threads - TODO: reconsider location! */ /* initialize some static variables */ pDfltHostnameCmp = NULL; @@ -3366,7 +3381,6 @@ init(void) fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); exit(1); } -dbgprintf("queue 1 \n"); /* ... set some properties ... */ # define setQPROP(func, directive, data) \ CHKiRet_Hdlr(func(pMsgQueue, data)) { \ @@ -3378,21 +3392,18 @@ dbgprintf("queue 1 \n"); } setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); -dbgprintf("queue 2 \n"); setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName", (pszMainMsgQFName == NULL ? (uchar*) "mainq" : pszMainMsgQFName)); # undef setQPROP # undef setQPROPstr -dbgprintf("try start queue \n"); /* ... and finally start the queue! */ CHKiRet_Hdlr(queueStart(pMsgQueue)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet); exit(1); } -dbgprintf("queue running\n"); Initialized = 1; bHaveMainQueue = (MainMsgQueType == QUEUETYPE_DIRECT) ? 0 : 1; @@ -3405,7 +3416,6 @@ dbgprintf("queue running\n"); */ startInputModules(); -dbgprintf("inputs running\n"); if(Debug) { dbgPrintInitInfo(); } @@ -4427,12 +4437,29 @@ mainloop(void) tvSelectTimeout.tv_sec = TIMERINTVL; tvSelectTimeout.tv_usec = 0; select(1, NULL, NULL, NULL, &tvSelectTimeout); + if(bFinished) + break; /* exit as quickly as possible - see long comment below */ /* If we received a HUP signal, we call doFlushRptdMsgs() a bit early. This * doesn't matter, because doFlushRptdMsgs() checks timestamps. What may happen, * however, is that the too-early call may lead to a bit too-late output * of "last message repeated n times" messages. But that is quite acceptable. * rgerhards, 2007-12-21 + * ... and just to explain, we flush here because that is exactly what the mainloop + * shall do - provide a periodic interval in which not-yet-flushed messages will + * be flushed. Be careful, there is a potential race condition: doFlushRptdMsgs() + * needs to aquire a lock on the action objects. If, however, long-running consumers + * cause the main queue worker threads to lock them for a long time, we may receive + * a starvation condition, resulting in the mainloop being held on lock for an extended + * period of time. That, in turn, could lead to unresponsiveness to termination + * requests. It is especially important that the bFinished flag is checked before + * doFlushRptdMsgs() is called (I know because I ran into that situation). I am + * not yet sure if the remaining probability window of a termination-related + * problem is large enough to justify changing the code - I would consider it + * extremely unlikely that the problem ever occurs in practice. Fixing it would + * require not only a lot of effort but would cost considerable performance. So + * for the time being, I think the remaining risk can be accepted. + * rgerhards, 2008-01-10 */ doFlushRptdMsgs(); |