summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-05-18 17:53:12 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-05-18 17:53:12 +0200
commit7574e70df4c6796878d3b753275f01b5f0d65ade (patch)
treed79350f92fa93c08258836213d3f01f475946b32 /runtime/queue.c
parentaf8582e50914cfc719be1a1a80eeb81030d611c5 (diff)
downloadrsyslog-7574e70df4c6796878d3b753275f01b5f0d65ade.tar.gz
rsyslog-7574e70df4c6796878d3b753275f01b5f0d65ade.tar.xz
rsyslog-7574e70df4c6796878d3b753275f01b5f0d65ade.zip
fixed race conditions during queue shutdown (DA case, disks active)
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c18
1 files changed, 16 insertions, 2 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 4f0d36b9..b6c30278 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -769,8 +769,8 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
*/
objDestruct(pUsr);
- DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
- nWriteCount, pThis->tVars.disk.sizeOnDisk);
+ DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n",
+ nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly);
finalize_it:
RETiRet;
@@ -944,6 +944,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
d_pthread_mutex_lock(pThis->mut);
/* tell regular queue DA worker to stop shuffling messages to DA queue... */
+ DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode for DA worker\n");
pThis->pqDA->bEnqOnly = 1;
wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE);
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
@@ -1010,6 +1011,7 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
+ DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n");
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
/* now DA queue */
@@ -1356,6 +1358,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
+dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem);
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
@@ -1600,6 +1603,12 @@ finalize_it:
/* This is called when a batch is processed and the worker does not
* ask for another batch (e.g. because it is to be terminated)
+ * Note that we must not be terminated while we delete a processed
+ * batch. Otherwise, we may not complete it, and then the cancel
+ * handler also tries to delete the batch. But then it finds some of
+ * the messages already destructed. This was a bug we have seen, especially
+ * with disk mode, where a delete takes rather long. Anyhow, the coneptual
+ * problem exists in all queue modes.
* rgerhards, 2009-05-27
*/
static rsRetVal
@@ -1610,8 +1619,12 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
+ int iCancelStateSave;
+ /* at this spot, we must not be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
DeleteProcessedBatch(pThis, &pWti->batch);
qqueueChkPersist(pThis, pWti->batch.nElemDeq);
+ pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
@@ -2136,6 +2149,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
+//dbgCallStackPrintAll();
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.