summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-05-18 17:53:12 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-05-18 17:53:12 +0200
commit7574e70df4c6796878d3b753275f01b5f0d65ade (patch)
treed79350f92fa93c08258836213d3f01f475946b32
parentaf8582e50914cfc719be1a1a80eeb81030d611c5 (diff)
downloadrsyslog-7574e70df4c6796878d3b753275f01b5f0d65ade.tar.gz
rsyslog-7574e70df4c6796878d3b753275f01b5f0d65ade.tar.xz
rsyslog-7574e70df4c6796878d3b753275f01b5f0d65ade.zip
fixed race conditions during queue shutdown (DA case, disks active)
-rw-r--r--runtime/queue.c18
-rw-r--r--runtime/wti.c12
-rw-r--r--runtime/wti.h1
-rw-r--r--runtime/wtp.c17
4 files changed, 33 insertions, 15 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 4f0d36b9..b6c30278 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -769,8 +769,8 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
*/
objDestruct(pUsr);
- DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
- nWriteCount, pThis->tVars.disk.sizeOnDisk);
+ DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n",
+ nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly);
finalize_it:
RETiRet;
@@ -944,6 +944,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
d_pthread_mutex_lock(pThis->mut);
/* tell regular queue DA worker to stop shuffling messages to DA queue... */
+ DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode for DA worker\n");
pThis->pqDA->bEnqOnly = 1;
wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE);
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
@@ -1010,6 +1011,7 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
+ DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n");
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
/* now DA queue */
@@ -1356,6 +1358,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
+dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem);
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
@@ -1600,6 +1603,12 @@ finalize_it:
/* This is called when a batch is processed and the worker does not
* ask for another batch (e.g. because it is to be terminated)
+ * Note that we must not be terminated while we delete a processed
+ * batch. Otherwise, we may not complete it, and then the cancel
+ * handler also tries to delete the batch. But then it finds some of
+ * the messages already destructed. This was a bug we have seen, especially
+ * with disk mode, where a delete takes rather long. Anyhow, the coneptual
+ * problem exists in all queue modes.
* rgerhards, 2009-05-27
*/
static rsRetVal
@@ -1610,8 +1619,12 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
+ int iCancelStateSave;
+ /* at this spot, we must not be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
DeleteProcessedBatch(pThis, &pWti->batch);
qqueueChkPersist(pThis, pWti->batch.nElemDeq);
+ pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
@@ -2136,6 +2149,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
+//dbgCallStackPrintAll();
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
diff --git a/runtime/wti.c b/runtime/wti.c
index 2dfc2d3f..307f1af1 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -123,9 +123,7 @@ wtiWakeupThrd(wti_t *pThis)
if(wtiGetState(pThis)) {
/* we first try the cooperative "cancel" interface */
pthread_kill(pThis->thrdID, SIGTTIN);
- dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID);
- srSleep(0, 10000);
- dbgprintf("cooperative worker termination failed, using cancellation...\n");
+ dbgprintf("sent SIGTTIN to worker thread 0x%x\n", (unsigned) pThis->thrdID);
}
RETiRet;
@@ -151,13 +149,13 @@ wtiCancelThrd(wti_t *pThis)
if(wtiGetState(pThis)) {
/* we first try the cooperative "cancel" interface */
-#if 0
pthread_kill(pThis->thrdID, SIGTTIN);
- dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID);
+ dbgprintf("sent SIGTTIN to worker thread 0x%x, giving it a chance to terminate\n", (unsigned) pThis->thrdID);
srSleep(0, 10000);
- dbgprintf("cooperative worker termination failed, using cancellation...\n");
-#endif
+ }
+ if(wtiGetState(pThis)) {
+ dbgprintf("cooperative worker termination failed, using cancellation...\n");
dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
/* now wait until the thread terminates... */
diff --git a/runtime/wti.h b/runtime/wti.h
index ab575427..51ece4ef 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -52,6 +52,7 @@ rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtiCancelThrd(wti_t *pThis);
rsRetVal wtiSetAlwaysRunning(wti_t *pThis);
rsRetVal wtiSetState(wti_t *pThis, sbool bNew);
+rsRetVal wtiWakeupThrd(wti_t *pThis);
sbool wtiGetState(wti_t *pThis);
PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 65155efc..ece80911 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -228,6 +228,10 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
d_pthread_mutex_lock(pThis->pmutUsr);
wtpSetState(pThis, tShutdownCmd);
pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
+ /* awake workers in retry loop */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ wtiWakeupThrd(pThis->pWrkr[i]);
+ }
d_pthread_mutex_unlock(pThis->pmutUsr);
/* wait for worker thread termination */
@@ -235,11 +239,6 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
-
-for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- wtiWakeupThrd(pThis->pWrkr[i]);
-}
-
DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout),
ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
@@ -248,6 +247,12 @@ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
+
+ /* awake workers in retry loop */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ wtiWakeupThrd(pThis->pWrkr[i]);
+ }
+
}
pthread_cleanup_pop(1);
@@ -298,7 +303,7 @@ wtpWrkrExecCleanup(wti_t *pWti)
wtiSetState(pWti, WRKTHRD_STOPPED);
ATOMIC_DEC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
- DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n",
+ DBGPRINTF("%s: Worker thread %lx, terminated, um workers now %d\n",
wtpGetDbgHdr(pThis), (unsigned long) pWti,
ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));