diff options
-rw-r--r-- | ChangeLog | 3 | ||||
-rw-r--r-- | debug.c | 2 | ||||
-rw-r--r-- | queue.c | 85 | ||||
-rw-r--r-- | queue.h | 10 | ||||
-rwxr-xr-x | srUtils.c | 12 | ||||
-rw-r--r-- | syslogd.c | 4 | ||||
-rw-r--r-- | wtp.c | 3 |
7 files changed, 68 insertions, 51 deletions
@@ -3,6 +3,9 @@ Version 3.10.3 (rgerhards), 2008-01-14 - fixed a bug with standard template definitions (not a big deal) - thanks to varmojfekoj for spotting it - run-time instrumentation added +- implemented the $MainMsgQueueSaveOnShutdown config directive +- implemented the $MainMsgQueueWorkerThreadMinimumMessages config directive +- implemented the $MainMsgQueueTimeoutWorkerThreadShutdown config directive --------------------------------------------------------------------------- Version 3.10.2 (rgerhards), 2008-01-14 - added the ability to keep stop rsyslogd without the need to drain @@ -57,7 +57,7 @@ static dbgThrdInfo_t *dbgGetThrdInfo(void); int Debug; /* debug flag - read-only after startup */ int debugging_on = 0; /* read-only, except on sig USR1 */ static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */ -static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */ +static int bPrintFuncDBOnExit = 1; /* shall the function entry and exit be logged to the debug log? */ static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */ static int bPrintTime = 1; /* print a timestamp together with debug message */ static char *pszAltDbgFileName = NULL; /* if set, debug output is *also* sent to here */ @@ -1,7 +1,6 @@ -// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!:w -// - // TODO: we need to implement peek(), without it (today!) we lose one message upon - // worker cancellation! -- rgerhards, 2008-01-14 +// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode! +// TODO: we need to implement peek(), without it (today!) we lose one message upon +// worker cancellation! -- rgerhards, 2008-01-14 // TODO: think about mutDA - I think it's no longer needed // TODO: start up the correct num of workers when switching to non-DA mode // TODO: "preforked" worker threads @@ -10,18 +9,18 @@ // call consumer state. Facilitates retaining messages in queue until action could // be called! /* queue.c - * - * This file implements the queue object and its several queueing methods. - * - * File begun on 2008-01-03 by RGerhards - * - * There is some in-depth documentation available in doc/dev_queue.html - * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it - * if you are getting aquainted to the object. - * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. - * - * This file is part of rsyslog. +* +* This file implements the queue object and its several queueing methods. +* +* File begun on 2008-01-03 by RGerhards +* +* There is some in-depth documentation available in doc/dev_queue.html +* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it +* if you are getting aquainted to the object. +* +* Copyright 2008 Rainer Gerhards and Adiscon GmbH. +* +* This file is part of rsyslog. * * Rsyslog is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -88,18 +87,12 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); -RUNLOG_VAR("%d", pThis->bEnqOnly); if(!pThis->bEnqOnly) { -RUNLOG_VAR("%d", pThis->bRunsDA); if(pThis->bRunsDA) { -RUNLOG_VAR("%d", pThis->bQueueStarted); -RUNLOG_VAR("%d", pThis->iQueueSize); -RUNLOG_VAR("%d", pThis->iHighWtrMrk); /* if we have not yet reached the high water mark, there is no need to start a * worker. -- rgerhards, 2008-01-26 */ if(pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { -RUNLOG; wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } } else { @@ -227,7 +220,6 @@ queueStartDA(queue_t *pThis) /* set up sync objects */ pthread_mutex_init(&pThis->mutDA, NULL); - pthread_cond_init(&pThis->condDA, NULL); /* create message queue */ dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis); @@ -842,6 +834,11 @@ queueDel(queue_t *pThis, void *pUsr) * before this function is called so that DA queues will be fully persisted to * disk (if configured to do so). * rgerhards, 2008-01-24 + * Please note that this function shuts down BOTH the parent AND the child queue + * in DA case. This is necessary because their timeouts are tightly coupled. Most + * importantly, the timeouts would be applied twice (or logic be extremely + * complex) if each would have its own shutdown. The function does not self check + * this condition - the caller must make sure it is not called with a parent. */ static rsRetVal queueShutdownWorkers(queue_t *pThis) { @@ -851,6 +848,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) rsRetVal iRetLocal; ISOBJ_TYPE_assert(pThis, queue); + assert(pThis->pqParent == NULL); /* detect invalid calling sequence */ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); @@ -891,6 +889,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) */ RUNLOG_VAR("%d", pThis->toQShutdown); timeoutComp(&tTimeout, pThis->toQShutdown); + dbgprintf("Queue 0x%lx: trying shutdown of regular workers\n", queueGetID(pThis)); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis)); @@ -905,6 +904,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown); /* we use the same absolute timeout as above, so we do not use more than the configured * timeout interval! */ + dbgprintf("Queue 0x%lx: trying shutdown of DA workers\n", queueGetID(pThis)); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n", @@ -959,7 +959,6 @@ RUNLOG; * the queue is now empty. If regular workers are still running, and try to pull the next message, * they will automatically terminate as there no longer is any message left to process. */ - // TODO: use pWtp mutex? - guess so! BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ RUNLOG_VAR("%d", pThis->iQueueSize); //old: if(pThis->iQueueSize > 0) { @@ -967,6 +966,8 @@ RUNLOG_VAR("%d", pThis->iQueueSize); timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: trying immediate shutdown of regular workers\n", queueGetID(pThis)); + // TODO: ??? cut&paste error? iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and " @@ -976,11 +977,15 @@ RUNLOG_VAR("%d", pThis->iQueueSize); "in disk save mode. Continuing, but results are unpredictable\n", queueGetID(pThis), iRetLocal); } - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->bIsDA) { + /* we need to re-aquire the mutex for the next check in this case! */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + } } - if(wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { + if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { /* and now the same for the DA queue */ END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: trying immediate shutdown of DA workers\n", queueGetID(pThis)); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgprintf("Queue 0x%lx: immediate shutdown timed out on DA queue (this is acceptable and " @@ -1042,6 +1047,8 @@ RUNLOG_VAR("%d", pThis->iQueueSize); } } +// TODO: think about joining all workers, so that the destructors are called +// /* ... finally ... all worker threads have terminated :-) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. @@ -1341,8 +1348,10 @@ if(pThis->pqDA != NULL) static int queueChkStopWrkrReg(queue_t *pThis) { + BEGINfunc int bStopWrkr = pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0); RUNLOG_VAR("%d", bStopWrkr); + ENDfunc return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0); } @@ -1354,10 +1363,12 @@ static int queueIsIdleDA(queue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ + BEGINfunc RUNLOG_VAR("%d", pThis->iLowWtrMrk); dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize); //// TODO: I think we need just a single function... //return (pThis->iQueueSize == 0); + ENDfunc return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results @@ -1386,6 +1397,10 @@ queueRegOnWrkrShutdown(queue_t *pThis) if(pThis->pqParent != NULL) { RUNLOG_VAR("%p", pThis->pqParent); RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); + assert(pThis->pqParent->pWtpDA != NULL); + pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ + wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ +#if 0 if(pThis->pqParent->pWtpDA == NULL) { /* this can happen if we are not set to save on an eternal timeout. We * log a warning but otherwise do nothing @@ -1396,6 +1411,7 @@ RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ } +#endif } RETiRet; @@ -1633,11 +1649,14 @@ rsRetVal queueDestruct(queue_t **ppThis) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* shut down all workers (handles *all* of the persistence logic) */ - //if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */ - if(!pThis->bEnqOnly && pThis->pqParent == NULL) /* in enque-only mode, we have no worker pool! */ + /* shut down all workers (handles *all* of the persistence logic) + * See function head comment of queueShutdownWorkers () on why we don't call it + * We also do not need to shutdown workers when we are in enqueue-only mode or we are a + * direct queue - because in both cases we have none... ;) + * with a child! -- rgerhards, 2008-01-28 + */ + if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) queueShutdownWorkers(pThis); -RUNLOG; /* finally destruct our (regular) worker thread pool * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, @@ -1767,8 +1786,6 @@ queueEnqObj(queue_t *pThis, void *pUsr) ISOBJ_TYPE_assert(pThis, queue); -// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly* -dbgprintf("Queue %p: EnqObj() 1\n", pThis); RUNLOG_VAR("%d", pThis->bRunsDA); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE @@ -1815,7 +1832,9 @@ finalize_it: } /* make sure at least one worker is running. */ - queueAdviseMaxWorkers(pThis); + if(pThis->qType != QUEUETYPE_DIRECT) { + queueAdviseMaxWorkers(pThis); + } RETiRet; } @@ -46,7 +46,7 @@ typedef struct qLinkedList_S { typedef struct qWrkThrd_s { pthread_t thrdID; /* thread ID */ qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ - obj_t *pUsr; /* current user object being processed (or NULL if none) */ + obj_t *pUsr; /* current user object being processed (or NULL if none) */ struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */ int iThrd; /* my worker thread array index */ pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */ @@ -91,13 +91,6 @@ typedef struct queue_s { pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ pthread_cond_t notFull, notEmpty; pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */ - pthread_cond_t condThrdTrm;/* signalled when threads terminate */ // TODO: no longer used? - //pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */ - //pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */ - //pthread_cond_t *condSignalOnEmpty2;/* another condition to be signalled on empty */ - //int bSignalOnEmpty; /* signal caller when queue is empty via xxxSignalOnEmpty cond/mut, - // 0 = do not, 1 = signal only condSignalOnEmpty, 2 = signal both condSig..*/ // TODO: no longer needed? - int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */ int bThrdStateChanged; /* at least one thread state has changed if 1 */ /* end sync variables */ @@ -115,7 +108,6 @@ typedef struct queue_s { int bIsDA; /* is this queue disk assisted? */ int bRunsDA; /* is this queue actually *running* disk assisted? */ pthread_mutex_t mutDA; /* mutex for low water mark algo */ - pthread_cond_t condDA; /* and its matching condition */ // TODO: no longer needed! struct queue_s *pqDA; /* queue for disk-assisted modes */ struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */ int bDAEnqOnly; /* EnqOnly setting for DA queue */ @@ -344,14 +344,14 @@ timeoutVal(struct timespec *pt) assert(pt != NULL); /* compute timeout */ clock_gettime(CLOCK_REALTIME, &t); -RUNLOG_VAR("%ld", pt->tv_sec); -RUNLOG_VAR("%ld", t.tv_sec); -RUNLOG_VAR("%ld", pt->tv_nsec); -RUNLOG_VAR("%ld", t.tv_nsec); +//RUNLOG_VAR("%ld", pt->tv_sec); +//RUNLOG_VAR("%ld", t.tv_sec); +//RUNLOG_VAR("%ld", pt->tv_nsec); +//RUNLOG_VAR("%ld", t.tv_nsec); iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000000; -RUNLOG_VAR("%ld", iTimeout); +//RUNLOG_VAR("%ld", iTimeout); iTimeout += (pt->tv_sec - t.tv_sec) * 1000; -RUNLOG_VAR("%ld", iTimeout); +//RUNLOG_VAR("%ld", iTimeout); if(iTimeout < 0) iTimeout = 0; @@ -2637,7 +2637,7 @@ die(int sig) dbgprintf("Terminating main queue...\n"); queueDestruct(&pMsgQueue); pMsgQueue = NULL; - + /* Free ressources and close connections. This includes flushing any remaining * repeated msgs. */ @@ -2660,6 +2660,8 @@ die(int sig) /* de-init some modules */ modExitIminternal(); + dbgPrintAllDebugInfo(); /* this is the last spot where this can be done - below output modules are unloaded! */ + /* TODO: this would also be the right place to de-init the builtin output modules. We * do not currently do that, because the module interface does not allow for * it. This will come some time later (it's essential with loadable modules). @@ -393,13 +393,14 @@ wtpWrkrExecCancelCleanup(void *arg) { wtp_t *pThis = (wtp_t*) arg; + BEGINfunc ISOBJ_TYPE_assert(pThis, wtp); pThis->iCurNumWrkThrd--; RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); wtpSignalWrkrTermination(pThis); dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd); - + ENDfunc } |