summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-26 18:53:01 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-26 18:53:01 +0100
commitb585a4e90940e9f4d2d288d462d1c273ae5ffa09 (patch)
treee51e04234399bd7991f0d3cfbe04835ae2a945b7
parent5e4f9d5e65d7c0db640f275d5cfb05371b697776 (diff)
downloadrsyslog-b585a4e90940e9f4d2d288d462d1c273ae5ffa09.tar.gz
rsyslog-b585a4e90940e9f4d2d288d462d1c273ae5ffa09.tar.xz
rsyslog-b585a4e90940e9f4d2d288d462d1c273ae5ffa09.zip
addressed some race issues during queue shutdown
these occured in very unusual scenarios where we had a DA-queue running in parallel and very lengthy actions. Then, in some situations, the shutdown could hang. The code needs some addition lab time, but is believed to be much better than any previous version.
-rw-r--r--runtime/queue.c34
-rw-r--r--runtime/wti.c2
-rw-r--r--runtime/wtp.c48
-rwxr-xr-xtests/daqueue-persist-drvr.sh4
4 files changed, 64 insertions, 24 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index b4f00446..d9942365 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1070,6 +1070,11 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
/* instruct workers to finish ASAP, even if still work exists */
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
+ /* now DA queue */
+ if(pThis->bIsDA) {
+ pThis->pqDA->bEnqOnly = 1;
+ pThis->pqDA->bShutdownImmediate = 1;
+ }
// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance...
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
@@ -1356,14 +1361,10 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue
/* remove messages from the physical queue store that are fully processed. This is
- * controlled via the to-delete list. We can only delete those elements, that are
- * at the current physical tail of the queue. If the batch is from another position,
- * we schedule it for deletion, but actual deletion will happen at a later call
- * of this function here. We always delete as much as possible, which includes
- * picking up things from the to-delete list.
+ * controlled via the to-delete list.
*/
static inline rsRetVal
-DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
{
toDeleteLst_t *pTdl;
qDeqID deqIDDel;
@@ -1371,11 +1372,10 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
- assert(nDeleted > 0);
pTdl = tdlPeek(pThis); /* get current head element */
if(pTdl == NULL) { /* to-delete list empty */
- DoDeleteBatchFromQStore(pThis, nDeleted);
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else if(pBatch->deqID == pThis->deqIDDel) {
deqIDDel = pThis->deqIDDel;
pTdl = tdlPeek(pThis);
@@ -1385,10 +1385,12 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
++deqIDDel;
pTdl = tdlPeek(pThis);
}
+ /* old entries deleted, now delete current ones... */
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else {
/* can not delete, insert into to-delete list */
dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
- CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted));
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
}
finalize_it:
@@ -1408,20 +1410,23 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
void *pUsr;
int nEnqueued = 0;
rsRetVal localRet;
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
for(i = 0 ; i < pBatch->nElem ; ++i) {
-dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
+dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n",
+i, pBatch->pElem[i].pUsrp, ((msg_t*)pBatch->pElem[i].pUsrp)->iRefCount, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
-RUNLOG_STR("we need to requeue the entry");
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*) pUsr));
+dbgprintf("we need to requeue the entry, refcnt now %d\n", ((msg_t*) pUsr)->iRefCount);
++nEnqueued;
if(localRet != RS_RET_OK) {
DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
@@ -1435,11 +1440,11 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque
if(nEnqueued > 0)
qqueueChkPersist(pThis, nEnqueued);
- if(i > 0)
- iRet = DeleteBatchFromQStore(pThis, pBatch, i);
+ iRet = DeleteBatchFromQStore(pThis, pBatch);
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
+ pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
@@ -2072,7 +2077,8 @@ CODESTARTobjDestruct(qqueue)
* we need to reset the logical dequeue pointer, persist the queue if configured to do
* so and then destruct everything. -- rgerhards, 2009-05-26
*/
- CHKiRet(pThis->qUnDeqAll(pThis));
+RUNLOG_STR("XXX: NOT undequeueing entries!");
+ //CHKiRet(pThis->qUnDeqAll(pThis));
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
CHKiRet(DoSaveOnShutdown(pThis));
diff --git a/runtime/wti.c b/runtime/wti.c
index 3d98b4c4..e5237885 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -202,6 +202,8 @@ wtiWorkerCancelCleanup(void *arg)
ISOBJ_TYPE_assert(pWtp, wtp);
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
+ pWtp->pfObjProcessed(pWtp->pUsr, pThis);
+ DBGPRINTF("%s: done cancelation cleanup handler.\n", wtiGetDbgHdr(pThis));
ENDfunc
}
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 08cf5c3d..e075e5b8 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -271,14 +271,14 @@ wtpCancelAll(wtp_t *pThis)
}
-/* cancellation cleanup handler for executing worker decrements the worker counter.
- * This is also called when the the worker is normally shut down.
- * rgerhards, 2009-07-20
+/* this function contains shared code for both regular worker shutdown as
+ * well as shutdown via cancellation. We can not simply use pthread_cleanup_pop(1)
+ * as this introduces a race in the debug system (RETiRet system).
+ * rgerhards, 2009-10-26
*/
-static void
-wtpWrkrExecCancelCleanup(void *arg)
+static inline void
+wtpWrkrExecCleanup(wti_t *pWti)
{
- wti_t *pWti = (wti_t*) arg;
wtp_t *pThis;
BEGINfunc
@@ -293,11 +293,37 @@ wtpWrkrExecCancelCleanup(void *arg)
DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n",
wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
- pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
ENDfunc
}
+/* cancellation cleanup handler for executing worker decrements the worker counter.
+ * rgerhards, 2009-07-20
+ */
+static void
+wtpWrkrExecCancelCleanup(void *arg)
+{
+ wti_t *pWti = (wti_t*) arg;
+ wtp_t *pThis;
+
+ BEGINfunc
+ ISOBJ_TYPE_assert(pWti, wti);
+ pThis = pWti->pWtp;
+ ISOBJ_TYPE_assert(pThis, wtp);
+ DBGPRINTF("%s: Worker thread %lx requested to be cancelled.\n",
+ wtpGetDbgHdr(pThis), (unsigned long) pWti);
+
+ wtpWrkrExecCleanup(pWti);
+
+ ENDfunc
+ /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
+ * thread after the broadcast, which could destroy the debug class, resulting in a potential
+ * segfault. So we need to do the broadcast as actually the last action in our processing
+ */
+ pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
+}
+
+
/* wtp worker shell. This is started and calls into the actual
* wti worker.
* rgerhards, 2008-01-21
@@ -331,9 +357,15 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti);
wtiWorker(pWti);
- pthread_cleanup_pop(1);
+ pthread_cleanup_pop(0);
+ wtpWrkrExecCleanup(pWti);
ENDfunc
+ /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
+ * thread after the broadcast, which could destroy the debug class, resulting in a potential
+ * segfault. So we need to do the broadcast as actually the last action in our processing
+ */
+ pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
pthread_exit(0);
}
#pragma GCC diagnostic warning "-Wempty-body"
diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh
index 7934eb2b..c44ce6f8 100755
--- a/tests/daqueue-persist-drvr.sh
+++ b/tests/daqueue-persist-drvr.sh
@@ -8,8 +8,8 @@
echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1
source $srcdir/diag.sh init
-#export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction"
-#export RSYSLOG_DEBUGLOG="log"
+export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction"
+export RSYSLOG_DEBUGLOG="log"
# prepare config
echo \$MainMsgQueueType $1 > work-queuemode.conf