summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-26 17:30:51 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-26 17:30:51 +0000
commit439f19ad38f5dcc3282d450a2a15cea3238fc754 (patch)
tree1cc92b98f533a92de5fc97c528497e27b588714f /queue.c
parent1fb59126b0f9facb88cbbdc0331035f7e9dd80a7 (diff)
downloadrsyslog-439f19ad38f5dcc3282d450a2a15cea3238fc754.tar.gz
rsyslog-439f19ad38f5dcc3282d450a2a15cea3238fc754.tar.xz
rsyslog-439f19ad38f5dcc3282d450a2a15cea3238fc754.zip
partially fixed bug that caused rsyslogd to stall processing enqueued
messages after turning off DA mode and before any new message were arrived (if a new message arrived, everything went back to normal, so it was a temporary halt)
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c62
1 files changed, 53 insertions, 9 deletions
diff --git a/queue.c b/queue.c
index 9cfb6c3b..45ce771d 100644
--- a/queue.c
+++ b/queue.c
@@ -67,6 +67,7 @@ static int queueChkStopWrkrDA(queue_t *pThis);
static int queueIsIdleDA(queue_t *pThis);
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
+static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis);
/* methods */
@@ -125,6 +126,13 @@ dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQue
queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
queueGetID(pThis), iRet);
+ /* now we need to check if the regular queue has some messages. This may be the case
+ * when it is waiting that the high water mark is reached again. If so, we need to start up
+ * a regular worker. -- rgerhards, 2008-01-26
+ */
+ if(pThis->iQueueSize > 0) {
+ queueAdviseMaxWorkers(pThis);
+ }
}
RETiRet;
@@ -1229,22 +1237,40 @@ dbgprintf("DAConsumer returns with iRet %d\n", iRet);
/* must only be called when the queue mutex is locked, else results
* are not stable!
- * Version when running in DA mode.
+ * If we are a child, we have done our duty when the queue is empty. In that case,
+ * we can terminate.
+ * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from
+ * the DA queue
*/
static int
queueChkStopWrkrDA(queue_t *pThis)
{
- return pThis->bEnqOnly || !pThis->bRunsDA;
+ /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
+ * until we have done so.
+ */
+ int bStopWrkr=
+ pThis->bEnqOnly
+ || !pThis->bRunsDA
+ || (pThis->pqDA != NULL && pThis->pqDA->iQueueSize == 0 && pThis->bQueueInDestruction);
+RUNLOG_VAR("%d", bStopWrkr);
+ return pThis->bEnqOnly
+ || !pThis->bRunsDA
+ ;
+ // || (pThis->pqDA != NULL && pThis->pqDA->iQueueSize == 0 && pThis->bQueueInDestruction);
}
-
/* must only be called when the queue mutex is locked, else results
* are not stable!
- * Version when running in non-DA mode.
+ * If we are a child, we have done our duty when the queue is empty. In that case,
+ * we can terminate.
+ * Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from
+ * the DA queue
*/
static int
queueChkStopWrkrReg(queue_t *pThis)
{
- return pThis->bEnqOnly || pThis->bRunsDA;
+ int bStopWrkr = pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0);
+RUNLOG_VAR("%d", bStopWrkr);
+ return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0);
}
@@ -1271,6 +1297,26 @@ queueIsIdleReg(queue_t *pThis)
}
+/* This function is called when a worker thread for the regular queue is shut down.
+ * If we are the primary queue, this is not really interesting to us. If, however,
+ * we are the DA (child) queue, that means the DA queue is empty. In that case, we
+ * need to signal the parent queue's DA worker, so that it can terminate DA mode.
+ * rgerhards, 2008-01-26
+ */
+static rsRetVal
+queueRegOnWrkrShutdown(queue_t *pThis)
+{
+ DEFiRet;
+
+ if(pThis->pqParent != NULL) {
+ assert(pThis->pqParent->pWtpDA != NULL);
+ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
+ }
+
+ RETiRet;
+}
+
+
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
@@ -1322,6 +1368,7 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg));
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, queueRegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1475,8 +1522,6 @@ rsRetVal queueDestruct(queue_t **ppThis)
pThis = *ppThis;
ISOBJ_TYPE_assert(pThis, queue);
-pThis->bSaveOnShutdown = 1; // TODO: Test remove
-
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
/* shut down all workers (handles *all* of the persistence logic) */
@@ -1731,6 +1776,7 @@ DEFpropSetMeth(queue, iDiscardMrk, int);
DEFpropSetMeth(queue, iDiscardSeverity, int);
DEFpropSetMeth(queue, bIsDA, int);
DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
+DEFpropSetMeth(queue, bSaveOnShutdown, int);
/* This function can be used as a generic way to set properties. Only the subset
@@ -1764,8 +1810,6 @@ finalize_it:
BEGINObjClassInit(queue, 1)
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
//OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize);
-//fprintf(stdout, "queueChkStopWrkrReg: %p\n", queueChkStopWrkrReg);
-//fprintf(stdout, "queueChkStopWrkrDA: %p\n", queueChkStopWrkrDA);
ENDObjClassInit(queue)
/*