summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/queue.c18
-rw-r--r--runtime/wti.c13
-rwxr-xr-xtests/daqueue-persist-drvr.sh4
3 files changed, 21 insertions, 14 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index be169be2..1539db6d 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1410,13 +1410,11 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
void *pUsr;
int nEnqueued = 0;
rsRetVal localRet;
- int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
for(i = 0 ; i < pBatch->nElem ; ++i) {
dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n",
@@ -1444,7 +1442,6 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
- pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
@@ -1699,13 +1696,13 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
CHKiRet(DequeueForConsumer(pThis, pWti));
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
+ /* at this spot, we may be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate));
@@ -1719,6 +1716,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
}
+ /* but now cancellation is no longer permitted */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+
/* now we are done, but need to re-aquire the mutex */
d_pthread_mutex_lock(pThis->mut);
@@ -1747,13 +1747,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
CHKiRet(DequeueForConsumer(pThis, pWti));
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
+ /* at this spot, we may be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
@@ -1768,6 +1768,9 @@ dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp)
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
}
+ /* but now cancellation is no longer permitted */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+
/* now we are done, but need to re-aquire the mutex */
d_pthread_mutex_lock(pThis->mut);
@@ -2332,7 +2335,6 @@ finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
-dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/runtime/wti.c b/runtime/wti.c
index e5237885..aade156e 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -184,12 +184,11 @@ finalize_it:
/* cancellation cleanup handler for queueWorker ()
- * Updates admin structure and frees ressources.
+ * Most importantly, it must bring back the batch into a consistent state.
* Keep in mind that cancellation is disabled if we run into
* the cancel cleanup handler (and have been cancelled).
* rgerhards, 2008-01-16
*/
-// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14
static void
wtiWorkerCancelCleanup(void *arg)
{
@@ -224,7 +223,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
-dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
@@ -238,7 +236,11 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
}
-/* generic worker thread framework
+/* generic worker thread framework. Note that we prohibit cancellation
+ * during almost all times, because it can have very undesired side effects.
+ * However, we may need to cancel a thread if the consumer blocks for too
+ * long (during shutdown). So what we do is block cancellation, and every
+ * consumer must enable it during the periods where it is safe.
*/
#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
@@ -248,6 +250,7 @@ wtiWorker(wti_t *pThis)
int bInactivityTOOccured = 0;
rsRetVal localRet;
rsRetVal terminateRet;
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -256,6 +259,7 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
@@ -300,6 +304,7 @@ RUNLOG_VAR("%d", terminateRet);
/* indicate termination */
pthread_cleanup_pop(0); /* remove cleanup handler */
+ pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh
index c44ce6f8..7934eb2b 100755
--- a/tests/daqueue-persist-drvr.sh
+++ b/tests/daqueue-persist-drvr.sh
@@ -8,8 +8,8 @@
echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1
source $srcdir/diag.sh init
-export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction"
-export RSYSLOG_DEBUGLOG="log"
+#export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction"
+#export RSYSLOG_DEBUGLOG="log"
# prepare config
echo \$MainMsgQueueType $1 > work-queuemode.conf