summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c85
1 files changed, 52 insertions, 33 deletions
diff --git a/queue.c b/queue.c
index 53b2b7b5..0e0a5a83 100644
--- a/queue.c
+++ b/queue.c
@@ -1,7 +1,6 @@
-// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!:w
-//
- // TODO: we need to implement peek(), without it (today!) we lose one message upon
- // worker cancellation! -- rgerhards, 2008-01-14
+// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!
+// TODO: we need to implement peek(), without it (today!) we lose one message upon
+// worker cancellation! -- rgerhards, 2008-01-14
// TODO: think about mutDA - I think it's no longer needed
// TODO: start up the correct num of workers when switching to non-DA mode
// TODO: "preforked" worker threads
@@ -10,18 +9,18 @@
// call consumer state. Facilitates retaining messages in queue until action could
// be called!
/* queue.c
- *
- * This file implements the queue object and its several queueing methods.
- *
- * File begun on 2008-01-03 by RGerhards
- *
- * There is some in-depth documentation available in doc/dev_queue.html
- * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
- * if you are getting aquainted to the object.
- *
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
- *
- * This file is part of rsyslog.
+*
+* This file implements the queue object and its several queueing methods.
+*
+* File begun on 2008-01-03 by RGerhards
+*
+* There is some in-depth documentation available in doc/dev_queue.html
+* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
+* if you are getting aquainted to the object.
+*
+* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+*
+* This file is part of rsyslog.
*
* Rsyslog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -88,18 +87,12 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
-RUNLOG_VAR("%d", pThis->bEnqOnly);
if(!pThis->bEnqOnly) {
-RUNLOG_VAR("%d", pThis->bRunsDA);
if(pThis->bRunsDA) {
-RUNLOG_VAR("%d", pThis->bQueueStarted);
-RUNLOG_VAR("%d", pThis->iQueueSize);
-RUNLOG_VAR("%d", pThis->iHighWtrMrk);
/* if we have not yet reached the high water mark, there is no need to start a
* worker. -- rgerhards, 2008-01-26
*/
if(pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
-RUNLOG;
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
}
} else {
@@ -227,7 +220,6 @@ queueStartDA(queue_t *pThis)
/* set up sync objects */
pthread_mutex_init(&pThis->mutDA, NULL);
- pthread_cond_init(&pThis->condDA, NULL);
/* create message queue */
dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis);
@@ -842,6 +834,11 @@ queueDel(queue_t *pThis, void *pUsr)
* before this function is called so that DA queues will be fully persisted to
* disk (if configured to do so).
* rgerhards, 2008-01-24
+ * Please note that this function shuts down BOTH the parent AND the child queue
+ * in DA case. This is necessary because their timeouts are tightly coupled. Most
+ * importantly, the timeouts would be applied twice (or logic be extremely
+ * complex) if each would have its own shutdown. The function does not self check
+ * this condition - the caller must make sure it is not called with a parent.
*/
static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
@@ -851,6 +848,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
rsRetVal iRetLocal;
ISOBJ_TYPE_assert(pThis, queue);
+ assert(pThis->pqParent == NULL); /* detect invalid calling sequence */
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
@@ -891,6 +889,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
*/
RUNLOG_VAR("%d", pThis->toQShutdown);
timeoutComp(&tTimeout, pThis->toQShutdown);
+ dbgprintf("Queue 0x%lx: trying shutdown of regular workers\n", queueGetID(pThis));
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis));
@@ -905,6 +904,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
/* we use the same absolute timeout as above, so we do not use more than the configured
* timeout interval!
*/
+ dbgprintf("Queue 0x%lx: trying shutdown of DA workers\n", queueGetID(pThis));
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n",
@@ -959,7 +959,6 @@ RUNLOG;
* the queue is now empty. If regular workers are still running, and try to pull the next message,
* they will automatically terminate as there no longer is any message left to process.
*/
- // TODO: use pWtp mutex? - guess so!
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
RUNLOG_VAR("%d", pThis->iQueueSize);
//old: if(pThis->iQueueSize > 0) {
@@ -967,6 +966,8 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: trying immediate shutdown of regular workers\n", queueGetID(pThis));
+ // TODO: ??? cut&paste error? iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and "
@@ -976,11 +977,15 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
"in disk save mode. Continuing, but results are unpredictable\n",
queueGetID(pThis), iRetLocal);
}
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->bIsDA) {
+ /* we need to re-aquire the mutex for the next check in this case! */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ }
}
- if(wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
+ if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: trying immediate shutdown of DA workers\n", queueGetID(pThis));
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: immediate shutdown timed out on DA queue (this is acceptable and "
@@ -1042,6 +1047,8 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
}
}
+// TODO: think about joining all workers, so that the destructors are called
+//
/* ... finally ... all worker threads have terminated :-)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
@@ -1341,8 +1348,10 @@ if(pThis->pqDA != NULL)
static int
queueChkStopWrkrReg(queue_t *pThis)
{
+ BEGINfunc
int bStopWrkr = pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0);
RUNLOG_VAR("%d", bStopWrkr);
+ ENDfunc
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0);
}
@@ -1354,10 +1363,12 @@ static int
queueIsIdleDA(queue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
+ BEGINfunc
RUNLOG_VAR("%d", pThis->iLowWtrMrk);
dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize);
//// TODO: I think we need just a single function...
//return (pThis->iQueueSize == 0);
+ ENDfunc
return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
@@ -1386,6 +1397,10 @@ queueRegOnWrkrShutdown(queue_t *pThis)
if(pThis->pqParent != NULL) {
RUNLOG_VAR("%p", pThis->pqParent);
RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
+ assert(pThis->pqParent->pWtpDA != NULL);
+ pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
+ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
+#if 0
if(pThis->pqParent->pWtpDA == NULL) {
/* this can happen if we are not set to save on an eternal timeout. We
* log a warning but otherwise do nothing
@@ -1396,6 +1411,7 @@ RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
}
+#endif
}
RETiRet;
@@ -1633,11 +1649,14 @@ rsRetVal queueDestruct(queue_t **ppThis)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
- /* shut down all workers (handles *all* of the persistence logic) */
- //if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */
- if(!pThis->bEnqOnly && pThis->pqParent == NULL) /* in enque-only mode, we have no worker pool! */
+ /* shut down all workers (handles *all* of the persistence logic)
+ * See function head comment of queueShutdownWorkers () on why we don't call it
+ * We also do not need to shutdown workers when we are in enqueue-only mode or we are a
+ * direct queue - because in both cases we have none... ;)
+ * with a child! -- rgerhards, 2008-01-28
+ */
+ if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
queueShutdownWorkers(pThis);
-RUNLOG;
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
@@ -1767,8 +1786,6 @@ queueEnqObj(queue_t *pThis, void *pUsr)
ISOBJ_TYPE_assert(pThis, queue);
-// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly*
-dbgprintf("Queue %p: EnqObj() 1\n", pThis);
RUNLOG_VAR("%d", pThis->bRunsDA);
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
@@ -1815,7 +1832,9 @@ finalize_it:
}
/* make sure at least one worker is running. */
- queueAdviseMaxWorkers(pThis);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ queueAdviseMaxWorkers(pThis);
+ }
RETiRet;
}