summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/queue.c34
-rw-r--r--runtime/wti.c2
-rw-r--r--runtime/wtp.c48
-rwxr-xr-xtests/daqueue-persist-drvr.sh4
4 files changed, 64 insertions, 24 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index b4f00446..d9942365 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1070,6 +1070,11 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
/* instruct workers to finish ASAP, even if still work exists */
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
+ /* now DA queue */
+ if(pThis->bIsDA) {
+ pThis->pqDA->bEnqOnly = 1;
+ pThis->pqDA->bShutdownImmediate = 1;
+ }
// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance...
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
@@ -1356,14 +1361,10 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue
/* remove messages from the physical queue store that are fully processed. This is
- * controlled via the to-delete list. We can only delete those elements, that are
- * at the current physical tail of the queue. If the batch is from another position,
- * we schedule it for deletion, but actual deletion will happen at a later call
- * of this function here. We always delete as much as possible, which includes
- * picking up things from the to-delete list.
+ * controlled via the to-delete list.
*/
static inline rsRetVal
-DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
{
toDeleteLst_t *pTdl;
qDeqID deqIDDel;
@@ -1371,11 +1372,10 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
- assert(nDeleted > 0);
pTdl = tdlPeek(pThis); /* get current head element */
if(pTdl == NULL) { /* to-delete list empty */
- DoDeleteBatchFromQStore(pThis, nDeleted);
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else if(pBatch->deqID == pThis->deqIDDel) {
deqIDDel = pThis->deqIDDel;
pTdl = tdlPeek(pThis);
@@ -1385,10 +1385,12 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
++deqIDDel;
pTdl = tdlPeek(pThis);
}
+ /* old entries deleted, now delete current ones... */
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else {
/* can not delete, insert into to-delete list */
dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
- CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted));
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
}
finalize_it:
@@ -1408,20 +1410,23 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
void *pUsr;
int nEnqueued = 0;
rsRetVal localRet;
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
for(i = 0 ; i < pBatch->nElem ; ++i) {
-dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
+dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n",
+i, pBatch->pElem[i].pUsrp, ((msg_t*)pBatch->pElem[i].pUsrp)->iRefCount, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
-RUNLOG_STR("we need to requeue the entry");
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*) pUsr));
+dbgprintf("we need to requeue the entry, refcnt now %d\n", ((msg_t*) pUsr)->iRefCount);
++nEnqueued;
if(localRet != RS_RET_OK) {
DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
@@ -1435,11 +1440,11 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque
if(nEnqueued > 0)
qqueueChkPersist(pThis, nEnqueued);
- if(i > 0)
- iRet = DeleteBatchFromQStore(pThis, pBatch, i);
+ iRet = DeleteBatchFromQStore(pThis, pBatch);
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
+ pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
@@ -2072,7 +2077,8 @@ CODESTARTobjDestruct(qqueue)
* we need to reset the logical dequeue pointer, persist the queue if configured to do
* so and then destruct everything. -- rgerhards, 2009-05-26
*/
- CHKiRet(pThis->qUnDeqAll(pThis));
+RUNLOG_STR("XXX: NOT undequeueing entries!");
+ //CHKiRet(pThis->qUnDeqAll(pThis));
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
CHKiRet(DoSaveOnShutdown(pThis));
diff --git a/runtime/wti.c b/runtime/wti.c
index 3d98b4c4..e5237885 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -202,6 +202,8 @@ wtiWorkerCancelCleanup(void *arg)
ISOBJ_TYPE_assert(pWtp, wtp);
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
+ pWtp->pfObjProcessed(pWtp->pUsr, pThis);
+ DBGPRINTF("%s: done cancelation cleanup handler.\n", wtiGetDbgHdr(pThis));
ENDfunc
}
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 08cf5c3d..e075e5b8 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -271,14 +271,14 @@ wtpCancelAll(wtp_t *pThis)
}
-/* cancellation cleanup handler for executing worker decrements the worker counter.
- * This is also called when the the worker is normally shut down.
- * rgerhards, 2009-07-20
+/* this function contains shared code for both regular worker shutdown as
+ * well as shutdown via cancellation. We can not simply use pthread_cleanup_pop(1)
+ * as this introduces a race in the debug system (RETiRet system).
+ * rgerhards, 2009-10-26
*/
-static void
-wtpWrkrExecCancelCleanup(void *arg)
+static inline void
+wtpWrkrExecCleanup(wti_t *pWti)
{
- wti_t *pWti = (wti_t*) arg;
wtp_t *pThis;
BEGINfunc
@@ -293,11 +293,37 @@ wtpWrkrExecCancelCleanup(void *arg)
DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n",
wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
- pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
ENDfunc
}
+/* cancellation cleanup handler for executing worker decrements the worker counter.
+ * rgerhards, 2009-07-20
+ */
+static void
+wtpWrkrExecCancelCleanup(void *arg)
+{
+ wti_t *pWti = (wti_t*) arg;
+ wtp_t *pThis;
+
+ BEGINfunc
+ ISOBJ_TYPE_assert(pWti, wti);
+ pThis = pWti->pWtp;
+ ISOBJ_TYPE_assert(pThis, wtp);
+ DBGPRINTF("%s: Worker thread %lx requested to be cancelled.\n",
+ wtpGetDbgHdr(pThis), (unsigned long) pWti);
+
+ wtpWrkrExecCleanup(pWti);
+
+ ENDfunc
+ /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
+ * thread after the broadcast, which could destroy the debug class, resulting in a potential
+ * segfault. So we need to do the broadcast as actually the last action in our processing
+ */
+ pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
+}
+
+
/* wtp worker shell. This is started and calls into the actual
* wti worker.
* rgerhards, 2008-01-21
@@ -331,9 +357,15 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti);
wtiWorker(pWti);
- pthread_cleanup_pop(1);
+ pthread_cleanup_pop(0);
+ wtpWrkrExecCleanup(pWti);
ENDfunc
+ /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
+ * thread after the broadcast, which could destroy the debug class, resulting in a potential
+ * segfault. So we need to do the broadcast as actually the last action in our processing
+ */
+ pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
pthread_exit(0);
}
#pragma GCC diagnostic warning "-Wempty-body"
diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh
index 7934eb2b..c44ce6f8 100755
--- a/tests/daqueue-persist-drvr.sh
+++ b/tests/daqueue-persist-drvr.sh
@@ -8,8 +8,8 @@
echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1
source $srcdir/diag.sh init
-#export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction"
-#export RSYSLOG_DEBUGLOG="log"
+export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction"
+export RSYSLOG_DEBUGLOG="log"
# prepare config
echo \$MainMsgQueueType $1 > work-queuemode.conf