diff options
-rw-r--r-- | runtime/queue.c | 18 | ||||
-rw-r--r-- | runtime/wti.c | 13 | ||||
-rwxr-xr-x | tests/daqueue-persist-drvr.sh | 4 |
3 files changed, 21 insertions, 14 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index be169be2..1539db6d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1410,13 +1410,11 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) void *pUsr; int nEnqueued = 0; rsRetVal localRet; - int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); for(i = 0 ; i < pBatch->nElem ; ++i) { dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n", @@ -1444,7 +1442,6 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ - pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -1699,13 +1696,13 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); @@ -1719,6 +1716,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + /* now we are done, but need to re-aquire the mutex */ d_pthread_mutex_lock(pThis->mut); @@ -1747,13 +1747,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { @@ -1768,6 +1768,9 @@ dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp) pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + /* now we are done, but need to re-aquire the mutex */ d_pthread_mutex_lock(pThis->mut); @@ -2332,7 +2335,6 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ qqueueAdviseMaxWorkers(pThis); -dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); diff --git a/runtime/wti.c b/runtime/wti.c index e5237885..aade156e 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -184,12 +184,11 @@ finalize_it: /* cancellation cleanup handler for queueWorker () - * Updates admin structure and frees ressources. + * Most importantly, it must bring back the batch into a consistent state. * Keep in mind that cancellation is disabled if we run into * the cancel cleanup handler (and have been cancelled). * rgerhards, 2008-01-16 */ -// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14 static void wtiWorkerCancelCleanup(void *arg) { @@ -224,7 +223,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) if(pThis->bAlwaysRunning) { /* never shut down any started worker */ -dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ @@ -238,7 +236,11 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); } -/* generic worker thread framework +/* generic worker thread framework. Note that we prohibit cancellation + * during almost all times, because it can have very undesired side effects. + * However, we may need to cancel a thread if the consumer blocks for too + * long (during shutdown). So what we do is block cancellation, and every + * consumer must enable it during the periods where it is safe. */ #pragma GCC diagnostic ignored "-Wempty-body" rsRetVal @@ -248,6 +250,7 @@ wtiWorker(wti_t *pThis) int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -256,6 +259,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -300,6 +304,7 @@ RUNLOG_VAR("%d", terminateRet); /* indicate termination */ pthread_cleanup_pop(0); /* remove cleanup handler */ + pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh index c44ce6f8..7934eb2b 100755 --- a/tests/daqueue-persist-drvr.sh +++ b/tests/daqueue-persist-drvr.sh @@ -8,8 +8,8 @@ echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1 source $srcdir/diag.sh init -export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction" -export RSYSLOG_DEBUGLOG="log" +#export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction" +#export RSYSLOG_DEBUGLOG="log" # prepare config echo \$MainMsgQueueType $1 > work-queuemode.conf |