summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-28 10:48:51 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-28 10:48:51 +0000
commit59b67824d6ada9e224beef8ec9a6e6e566e1fcf8 (patch)
treeb9dac56dd55c1c5593dab1fa3017fd24bf0a8d32
parent082737dec693fbb531ca9dad3ab9b638baaa3327 (diff)
downloadrsyslog-59b67824d6ada9e224beef8ec9a6e6e566e1fcf8.tar.gz
rsyslog-59b67824d6ada9e224beef8ec9a6e6e566e1fcf8.tar.xz
rsyslog-59b67824d6ada9e224beef8ec9a6e6e566e1fcf8.zip
some more testing and cleanup with the queue class (pretty stable now)
-rw-r--r--ChangeLog3
-rw-r--r--debug.c2
-rw-r--r--queue.c85
-rw-r--r--queue.h10
-rwxr-xr-xsrUtils.c12
-rw-r--r--syslogd.c4
-rw-r--r--wtp.c3
7 files changed, 68 insertions, 51 deletions
diff --git a/ChangeLog b/ChangeLog
index 27a86fd2..e7e3e1b3 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -3,6 +3,9 @@ Version 3.10.3 (rgerhards), 2008-01-14
- fixed a bug with standard template definitions (not a big deal) - thanks
to varmojfekoj for spotting it
- run-time instrumentation added
+- implemented the $MainMsgQueueSaveOnShutdown config directive
+- implemented the $MainMsgQueueWorkerThreadMinimumMessages config directive
+- implemented the $MainMsgQueueTimeoutWorkerThreadShutdown config directive
---------------------------------------------------------------------------
Version 3.10.2 (rgerhards), 2008-01-14
- added the ability to keep stop rsyslogd without the need to drain
diff --git a/debug.c b/debug.c
index 4605eec8..b1a35084 100644
--- a/debug.c
+++ b/debug.c
@@ -57,7 +57,7 @@ static dbgThrdInfo_t *dbgGetThrdInfo(void);
int Debug; /* debug flag - read-only after startup */
int debugging_on = 0; /* read-only, except on sig USR1 */
static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */
-static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */
+static int bPrintFuncDBOnExit = 1; /* shall the function entry and exit be logged to the debug log? */
static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */
static int bPrintTime = 1; /* print a timestamp together with debug message */
static char *pszAltDbgFileName = NULL; /* if set, debug output is *also* sent to here */
diff --git a/queue.c b/queue.c
index 53b2b7b5..0e0a5a83 100644
--- a/queue.c
+++ b/queue.c
@@ -1,7 +1,6 @@
-// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!:w
-//
- // TODO: we need to implement peek(), without it (today!) we lose one message upon
- // worker cancellation! -- rgerhards, 2008-01-14
+// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!
+// TODO: we need to implement peek(), without it (today!) we lose one message upon
+// worker cancellation! -- rgerhards, 2008-01-14
// TODO: think about mutDA - I think it's no longer needed
// TODO: start up the correct num of workers when switching to non-DA mode
// TODO: "preforked" worker threads
@@ -10,18 +9,18 @@
// call consumer state. Facilitates retaining messages in queue until action could
// be called!
/* queue.c
- *
- * This file implements the queue object and its several queueing methods.
- *
- * File begun on 2008-01-03 by RGerhards
- *
- * There is some in-depth documentation available in doc/dev_queue.html
- * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
- * if you are getting aquainted to the object.
- *
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
- *
- * This file is part of rsyslog.
+*
+* This file implements the queue object and its several queueing methods.
+*
+* File begun on 2008-01-03 by RGerhards
+*
+* There is some in-depth documentation available in doc/dev_queue.html
+* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
+* if you are getting aquainted to the object.
+*
+* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+*
+* This file is part of rsyslog.
*
* Rsyslog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -88,18 +87,12 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
-RUNLOG_VAR("%d", pThis->bEnqOnly);
if(!pThis->bEnqOnly) {
-RUNLOG_VAR("%d", pThis->bRunsDA);
if(pThis->bRunsDA) {
-RUNLOG_VAR("%d", pThis->bQueueStarted);
-RUNLOG_VAR("%d", pThis->iQueueSize);
-RUNLOG_VAR("%d", pThis->iHighWtrMrk);
/* if we have not yet reached the high water mark, there is no need to start a
* worker. -- rgerhards, 2008-01-26
*/
if(pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
-RUNLOG;
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
}
} else {
@@ -227,7 +220,6 @@ queueStartDA(queue_t *pThis)
/* set up sync objects */
pthread_mutex_init(&pThis->mutDA, NULL);
- pthread_cond_init(&pThis->condDA, NULL);
/* create message queue */
dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis);
@@ -842,6 +834,11 @@ queueDel(queue_t *pThis, void *pUsr)
* before this function is called so that DA queues will be fully persisted to
* disk (if configured to do so).
* rgerhards, 2008-01-24
+ * Please note that this function shuts down BOTH the parent AND the child queue
+ * in DA case. This is necessary because their timeouts are tightly coupled. Most
+ * importantly, the timeouts would be applied twice (or logic be extremely
+ * complex) if each would have its own shutdown. The function does not self check
+ * this condition - the caller must make sure it is not called with a parent.
*/
static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
@@ -851,6 +848,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
rsRetVal iRetLocal;
ISOBJ_TYPE_assert(pThis, queue);
+ assert(pThis->pqParent == NULL); /* detect invalid calling sequence */
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
@@ -891,6 +889,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
*/
RUNLOG_VAR("%d", pThis->toQShutdown);
timeoutComp(&tTimeout, pThis->toQShutdown);
+ dbgprintf("Queue 0x%lx: trying shutdown of regular workers\n", queueGetID(pThis));
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis));
@@ -905,6 +904,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
/* we use the same absolute timeout as above, so we do not use more than the configured
* timeout interval!
*/
+ dbgprintf("Queue 0x%lx: trying shutdown of DA workers\n", queueGetID(pThis));
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n",
@@ -959,7 +959,6 @@ RUNLOG;
* the queue is now empty. If regular workers are still running, and try to pull the next message,
* they will automatically terminate as there no longer is any message left to process.
*/
- // TODO: use pWtp mutex? - guess so!
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
RUNLOG_VAR("%d", pThis->iQueueSize);
//old: if(pThis->iQueueSize > 0) {
@@ -967,6 +966,8 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: trying immediate shutdown of regular workers\n", queueGetID(pThis));
+ // TODO: ??? cut&paste error? iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and "
@@ -976,11 +977,15 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
"in disk save mode. Continuing, but results are unpredictable\n",
queueGetID(pThis), iRetLocal);
}
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->bIsDA) {
+ /* we need to re-aquire the mutex for the next check in this case! */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ }
}
- if(wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
+ if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: trying immediate shutdown of DA workers\n", queueGetID(pThis));
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: immediate shutdown timed out on DA queue (this is acceptable and "
@@ -1042,6 +1047,8 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
}
}
+// TODO: think about joining all workers, so that the destructors are called
+//
/* ... finally ... all worker threads have terminated :-)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
@@ -1341,8 +1348,10 @@ if(pThis->pqDA != NULL)
static int
queueChkStopWrkrReg(queue_t *pThis)
{
+ BEGINfunc
int bStopWrkr = pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0);
RUNLOG_VAR("%d", bStopWrkr);
+ ENDfunc
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0);
}
@@ -1354,10 +1363,12 @@ static int
queueIsIdleDA(queue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
+ BEGINfunc
RUNLOG_VAR("%d", pThis->iLowWtrMrk);
dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize);
//// TODO: I think we need just a single function...
//return (pThis->iQueueSize == 0);
+ ENDfunc
return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
@@ -1386,6 +1397,10 @@ queueRegOnWrkrShutdown(queue_t *pThis)
if(pThis->pqParent != NULL) {
RUNLOG_VAR("%p", pThis->pqParent);
RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
+ assert(pThis->pqParent->pWtpDA != NULL);
+ pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
+ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
+#if 0
if(pThis->pqParent->pWtpDA == NULL) {
/* this can happen if we are not set to save on an eternal timeout. We
* log a warning but otherwise do nothing
@@ -1396,6 +1411,7 @@ RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
}
+#endif
}
RETiRet;
@@ -1633,11 +1649,14 @@ rsRetVal queueDestruct(queue_t **ppThis)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
- /* shut down all workers (handles *all* of the persistence logic) */
- //if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */
- if(!pThis->bEnqOnly && pThis->pqParent == NULL) /* in enque-only mode, we have no worker pool! */
+ /* shut down all workers (handles *all* of the persistence logic)
+ * See function head comment of queueShutdownWorkers () on why we don't call it
+ * We also do not need to shutdown workers when we are in enqueue-only mode or we are a
+ * direct queue - because in both cases we have none... ;)
+ * with a child! -- rgerhards, 2008-01-28
+ */
+ if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
queueShutdownWorkers(pThis);
-RUNLOG;
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
@@ -1767,8 +1786,6 @@ queueEnqObj(queue_t *pThis, void *pUsr)
ISOBJ_TYPE_assert(pThis, queue);
-// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly*
-dbgprintf("Queue %p: EnqObj() 1\n", pThis);
RUNLOG_VAR("%d", pThis->bRunsDA);
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
@@ -1815,7 +1832,9 @@ finalize_it:
}
/* make sure at least one worker is running. */
- queueAdviseMaxWorkers(pThis);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ queueAdviseMaxWorkers(pThis);
+ }
RETiRet;
}
diff --git a/queue.h b/queue.h
index fd97596d..aadb13d7 100644
--- a/queue.h
+++ b/queue.h
@@ -46,7 +46,7 @@ typedef struct qLinkedList_S {
typedef struct qWrkThrd_s {
pthread_t thrdID; /* thread ID */
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
- obj_t *pUsr; /* current user object being processed (or NULL if none) */
+ obj_t *pUsr; /* current user object being processed (or NULL if none) */
struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */
int iThrd; /* my worker thread array index */
pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */
@@ -91,13 +91,6 @@ typedef struct queue_s {
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
pthread_cond_t notFull, notEmpty;
pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */
- pthread_cond_t condThrdTrm;/* signalled when threads terminate */ // TODO: no longer used?
- //pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */
- //pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */
- //pthread_cond_t *condSignalOnEmpty2;/* another condition to be signalled on empty */
- //int bSignalOnEmpty; /* signal caller when queue is empty via xxxSignalOnEmpty cond/mut,
- // 0 = do not, 1 = signal only condSignalOnEmpty, 2 = signal both condSig..*/ // TODO: no longer needed?
-
int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */
int bThrdStateChanged; /* at least one thread state has changed if 1 */
/* end sync variables */
@@ -115,7 +108,6 @@ typedef struct queue_s {
int bIsDA; /* is this queue disk assisted? */
int bRunsDA; /* is this queue actually *running* disk assisted? */
pthread_mutex_t mutDA; /* mutex for low water mark algo */
- pthread_cond_t condDA; /* and its matching condition */ // TODO: no longer needed!
struct queue_s *pqDA; /* queue for disk-assisted modes */
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
diff --git a/srUtils.c b/srUtils.c
index 9724190a..e15456a4 100755
--- a/srUtils.c
+++ b/srUtils.c
@@ -344,14 +344,14 @@ timeoutVal(struct timespec *pt)
assert(pt != NULL);
/* compute timeout */
clock_gettime(CLOCK_REALTIME, &t);
-RUNLOG_VAR("%ld", pt->tv_sec);
-RUNLOG_VAR("%ld", t.tv_sec);
-RUNLOG_VAR("%ld", pt->tv_nsec);
-RUNLOG_VAR("%ld", t.tv_nsec);
+//RUNLOG_VAR("%ld", pt->tv_sec);
+//RUNLOG_VAR("%ld", t.tv_sec);
+//RUNLOG_VAR("%ld", pt->tv_nsec);
+//RUNLOG_VAR("%ld", t.tv_nsec);
iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000000;
-RUNLOG_VAR("%ld", iTimeout);
+//RUNLOG_VAR("%ld", iTimeout);
iTimeout += (pt->tv_sec - t.tv_sec) * 1000;
-RUNLOG_VAR("%ld", iTimeout);
+//RUNLOG_VAR("%ld", iTimeout);
if(iTimeout < 0)
iTimeout = 0;
diff --git a/syslogd.c b/syslogd.c
index 26c6fb84..2e8b5af6 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -2637,7 +2637,7 @@ die(int sig)
dbgprintf("Terminating main queue...\n");
queueDestruct(&pMsgQueue);
pMsgQueue = NULL;
-
+
/* Free ressources and close connections. This includes flushing any remaining
* repeated msgs.
*/
@@ -2660,6 +2660,8 @@ die(int sig)
/* de-init some modules */
modExitIminternal();
+ dbgPrintAllDebugInfo(); /* this is the last spot where this can be done - below output modules are unloaded! */
+
/* TODO: this would also be the right place to de-init the builtin output modules. We
* do not currently do that, because the module interface does not allow for
* it. This will come some time later (it's essential with loadable modules).
diff --git a/wtp.c b/wtp.c
index 817204d8..a71301a5 100644
--- a/wtp.c
+++ b/wtp.c
@@ -393,13 +393,14 @@ wtpWrkrExecCancelCleanup(void *arg)
{
wtp_t *pThis = (wtp_t*) arg;
+ BEGINfunc
ISOBJ_TYPE_assert(pThis, wtp);
pThis->iCurNumWrkThrd--;
RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
wtpSignalWrkrTermination(pThis);
dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
-
+ ENDfunc
}