diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-27 10:00:23 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-27 10:00:23 +0100 |
commit | 24cd5aee4720a98e321b69d2d9b5948348abd571 (patch) | |
tree | eae0e6eea3bc9fde8ef1bcd86d68e128ae8fafc5 /runtime/queue.c | |
parent | a5cddbdbce76f14b4216aa74698bbc168ca5409f (diff) | |
download | rsyslog-24cd5aee4720a98e321b69d2d9b5948348abd571.tar.gz rsyslog-24cd5aee4720a98e321b69d2d9b5948348abd571.tar.xz rsyslog-24cd5aee4720a98e321b69d2d9b5948348abd571.zip |
fixed race condition during queue shutdown
Problems could happen if the queue worker needed to be cancelled
and this cancellation happened inside queue-code (including
wtp, wti). We have now solved this by disabling cancellation while
in this code and only enabling it when working inside the user consumer.
This exactly matches the use case for which cancellation may be needed.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index be169be2..1539db6d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1410,13 +1410,11 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) void *pUsr; int nEnqueued = 0; rsRetVal localRet; - int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); for(i = 0 ; i < pBatch->nElem ; ++i) { dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n", @@ -1444,7 +1442,6 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ - pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -1699,13 +1696,13 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); @@ -1719,6 +1716,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + /* now we are done, but need to re-aquire the mutex */ d_pthread_mutex_lock(pThis->mut); @@ -1747,13 +1747,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { @@ -1768,6 +1768,9 @@ dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp) pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + /* now we are done, but need to re-aquire the mutex */ d_pthread_mutex_lock(pThis->mut); @@ -2332,7 +2335,6 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ qqueueAdviseMaxWorkers(pThis); -dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); |