diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-05-18 17:53:12 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-05-18 17:53:12 +0200 |
commit | 7574e70df4c6796878d3b753275f01b5f0d65ade (patch) | |
tree | d79350f92fa93c08258836213d3f01f475946b32 /runtime | |
parent | af8582e50914cfc719be1a1a80eeb81030d611c5 (diff) | |
download | rsyslog-7574e70df4c6796878d3b753275f01b5f0d65ade.tar.gz rsyslog-7574e70df4c6796878d3b753275f01b5f0d65ade.tar.xz rsyslog-7574e70df4c6796878d3b753275f01b5f0d65ade.zip |
fixed race conditions during queue shutdown (DA case, disks active)
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/queue.c | 18 | ||||
-rw-r--r-- | runtime/wti.c | 12 | ||||
-rw-r--r-- | runtime/wti.h | 1 | ||||
-rw-r--r-- | runtime/wtp.c | 17 |
4 files changed, 33 insertions, 15 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 4f0d36b9..b6c30278 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -769,8 +769,8 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) */ objDestruct(pUsr); - DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", - nWriteCount, pThis->tVars.disk.sizeOnDisk); + DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n", + nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly); finalize_it: RETiRet; @@ -944,6 +944,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) d_pthread_mutex_lock(pThis->mut); /* tell regular queue DA worker to stop shuffling messages to DA queue... */ + DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode for DA worker\n"); pThis->pqDA->bEnqOnly = 1; wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); wtpAdviseMaxWorkers(pThis->pWtpDA, 1); @@ -1010,6 +1011,7 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ + DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n"); pThis->bEnqOnly = 1; pThis->bShutdownImmediate = 1; /* now DA queue */ @@ -1356,6 +1358,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { +dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem); pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { @@ -1600,6 +1603,12 @@ finalize_it: /* This is called when a batch is processed and the worker does not * ask for another batch (e.g. because it is to be terminated) + * Note that we must not be terminated while we delete a processed + * batch. Otherwise, we may not complete it, and then the cancel + * handler also tries to delete the batch. But then it finds some of + * the messages already destructed. This was a bug we have seen, especially + * with disk mode, where a delete takes rather long. Anyhow, the coneptual + * problem exists in all queue modes. * rgerhards, 2009-05-27 */ static rsRetVal @@ -1610,8 +1619,12 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); + int iCancelStateSave; + /* at this spot, we must not be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); DeleteProcessedBatch(pThis, &pWti->batch); qqueueChkPersist(pThis, pWti->batch.nElemDeq); + pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -2136,6 +2149,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); +//dbgCallStackPrintAll(); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. diff --git a/runtime/wti.c b/runtime/wti.c index 2dfc2d3f..307f1af1 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -123,9 +123,7 @@ wtiWakeupThrd(wti_t *pThis) if(wtiGetState(pThis)) { /* we first try the cooperative "cancel" interface */ pthread_kill(pThis->thrdID, SIGTTIN); - dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID); - srSleep(0, 10000); - dbgprintf("cooperative worker termination failed, using cancellation...\n"); + dbgprintf("sent SIGTTIN to worker thread 0x%x\n", (unsigned) pThis->thrdID); } RETiRet; @@ -151,13 +149,13 @@ wtiCancelThrd(wti_t *pThis) if(wtiGetState(pThis)) { /* we first try the cooperative "cancel" interface */ -#if 0 pthread_kill(pThis->thrdID, SIGTTIN); - dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID); + dbgprintf("sent SIGTTIN to worker thread 0x%x, giving it a chance to terminate\n", (unsigned) pThis->thrdID); srSleep(0, 10000); - dbgprintf("cooperative worker termination failed, using cancellation...\n"); -#endif + } + if(wtiGetState(pThis)) { + dbgprintf("cooperative worker termination failed, using cancellation...\n"); dbgoprint((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ diff --git a/runtime/wti.h b/runtime/wti.h index ab575427..51ece4ef 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -52,6 +52,7 @@ rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg); rsRetVal wtiCancelThrd(wti_t *pThis); rsRetVal wtiSetAlwaysRunning(wti_t *pThis); rsRetVal wtiSetState(wti_t *pThis, sbool bNew); +rsRetVal wtiWakeupThrd(wti_t *pThis); sbool wtiGetState(wti_t *pThis); PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); diff --git a/runtime/wtp.c b/runtime/wtp.c index 65155efc..ece80911 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -228,6 +228,10 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ + /* awake workers in retry loop */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + wtiWakeupThrd(pThis->pWrkr[i]); + } d_pthread_mutex_unlock(pThis->pmutUsr); /* wait for worker thread termination */ @@ -235,11 +239,6 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - -for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - wtiWakeupThrd(pThis->pWrkr[i]); -} - DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); @@ -248,6 +247,12 @@ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } + + /* awake workers in retry loop */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + wtiWakeupThrd(pThis->pWrkr[i]); + } + } pthread_cleanup_pop(1); @@ -298,7 +303,7 @@ wtpWrkrExecCleanup(wti_t *pWti) wtiSetState(pWti, WRKTHRD_STOPPED); ATOMIC_DEC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); - DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", + DBGPRINTF("%s: Worker thread %lx, terminated, um workers now %d\n", wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); |