diff options
-rw-r--r-- | runtime/queue.c | 34 | ||||
-rw-r--r-- | runtime/wti.c | 2 | ||||
-rw-r--r-- | runtime/wtp.c | 48 | ||||
-rwxr-xr-x | tests/daqueue-persist-drvr.sh | 4 |
4 files changed, 64 insertions, 24 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index b4f00446..d9942365 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1070,6 +1070,11 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); /* instruct workers to finish ASAP, even if still work exists */ pThis->bEnqOnly = 1; pThis->bShutdownImmediate = 1; + /* now DA queue */ + if(pThis->bIsDA) { + pThis->pqDA->bEnqOnly = 1; + pThis->pqDA->bShutdownImmediate = 1; + } // TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance... /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ @@ -1356,14 +1361,10 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue /* remove messages from the physical queue store that are fully processed. This is - * controlled via the to-delete list. We can only delete those elements, that are - * at the current physical tail of the queue. If the batch is from another position, - * we schedule it for deletion, but actual deletion will happen at a later call - * of this function here. We always delete as much as possible, which includes - * picking up things from the to-delete list. + * controlled via the to-delete list. */ static inline rsRetVal -DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) +DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) { toDeleteLst_t *pTdl; qDeqID deqIDDel; @@ -1371,11 +1372,10 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - assert(nDeleted > 0); pTdl = tdlPeek(pThis); /* get current head element */ if(pTdl == NULL) { /* to-delete list empty */ - DoDeleteBatchFromQStore(pThis, nDeleted); + DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; pTdl = tdlPeek(pThis); @@ -1385,10 +1385,12 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) ++deqIDDel; pTdl = tdlPeek(pThis); } + /* old entries deleted, now delete current ones... */ + DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else { /* can not delete, insert into to-delete list */ dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); - CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted)); + CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); } finalize_it: @@ -1408,20 +1410,23 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) void *pUsr; int nEnqueued = 0; rsRetVal localRet; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); for(i = 0 ; i < pBatch->nElem ; ++i) { -dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state); +dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n", +i, pBatch->pElem[i].pUsrp, ((msg_t*)pBatch->pElem[i].pUsrp)->iRefCount, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { -RUNLOG_STR("we need to requeue the entry"); localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); +dbgprintf("we need to requeue the entry, refcnt now %d\n", ((msg_t*) pUsr)->iRefCount); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); @@ -1435,11 +1440,11 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); - if(i > 0) - iRet = DeleteBatchFromQStore(pThis, pBatch, i); + iRet = DeleteBatchFromQStore(pThis, pBatch); pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ + pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -2072,7 +2077,8 @@ CODESTARTobjDestruct(qqueue) * we need to reset the logical dequeue pointer, persist the queue if configured to do * so and then destruct everything. -- rgerhards, 2009-05-26 */ - CHKiRet(pThis->qUnDeqAll(pThis)); +RUNLOG_STR("XXX: NOT undequeueing entries!"); + //CHKiRet(pThis->qUnDeqAll(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); diff --git a/runtime/wti.c b/runtime/wti.c index 3d98b4c4..e5237885 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -202,6 +202,8 @@ wtiWorkerCancelCleanup(void *arg) ISOBJ_TYPE_assert(pWtp, wtp); DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); + pWtp->pfObjProcessed(pWtp->pUsr, pThis); + DBGPRINTF("%s: done cancelation cleanup handler.\n", wtiGetDbgHdr(pThis)); ENDfunc } diff --git a/runtime/wtp.c b/runtime/wtp.c index 08cf5c3d..e075e5b8 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -271,14 +271,14 @@ wtpCancelAll(wtp_t *pThis) } -/* cancellation cleanup handler for executing worker decrements the worker counter. - * This is also called when the the worker is normally shut down. - * rgerhards, 2009-07-20 +/* this function contains shared code for both regular worker shutdown as + * well as shutdown via cancellation. We can not simply use pthread_cleanup_pop(1) + * as this introduces a race in the debug system (RETiRet system). + * rgerhards, 2009-10-26 */ -static void -wtpWrkrExecCancelCleanup(void *arg) +static inline void +wtpWrkrExecCleanup(wti_t *pWti) { - wti_t *pWti = (wti_t*) arg; wtp_t *pThis; BEGINfunc @@ -293,11 +293,37 @@ wtpWrkrExecCancelCleanup(void *arg) DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); - pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ ENDfunc } +/* cancellation cleanup handler for executing worker decrements the worker counter. + * rgerhards, 2009-07-20 + */ +static void +wtpWrkrExecCancelCleanup(void *arg) +{ + wti_t *pWti = (wti_t*) arg; + wtp_t *pThis; + + BEGINfunc + ISOBJ_TYPE_assert(pWti, wti); + pThis = pWti->pWtp; + ISOBJ_TYPE_assert(pThis, wtp); + DBGPRINTF("%s: Worker thread %lx requested to be cancelled.\n", + wtpGetDbgHdr(pThis), (unsigned long) pWti); + + wtpWrkrExecCleanup(pWti); + + ENDfunc + /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main + * thread after the broadcast, which could destroy the debug class, resulting in a potential + * segfault. So we need to do the broadcast as actually the last action in our processing + */ + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ +} + + /* wtp worker shell. This is started and calls into the actual * wti worker. * rgerhards, 2008-01-21 @@ -331,9 +357,15 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti); wtiWorker(pWti); - pthread_cleanup_pop(1); + pthread_cleanup_pop(0); + wtpWrkrExecCleanup(pWti); ENDfunc + /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main + * thread after the broadcast, which could destroy the debug class, resulting in a potential + * segfault. So we need to do the broadcast as actually the last action in our processing + */ + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ pthread_exit(0); } #pragma GCC diagnostic warning "-Wempty-body" diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh index 7934eb2b..c44ce6f8 100755 --- a/tests/daqueue-persist-drvr.sh +++ b/tests/daqueue-persist-drvr.sh @@ -8,8 +8,8 @@ echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1 source $srcdir/diag.sh init -#export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction" -#export RSYSLOG_DEBUGLOG="log" +export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction" +export RSYSLOG_DEBUGLOG="log" # prepare config echo \$MainMsgQueueType $1 > work-queuemode.conf |