summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-26 18:53:01 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-26 18:53:01 +0100
commitb585a4e90940e9f4d2d288d462d1c273ae5ffa09 (patch)
treee51e04234399bd7991f0d3cfbe04835ae2a945b7 /runtime/queue.c
parent5e4f9d5e65d7c0db640f275d5cfb05371b697776 (diff)
downloadrsyslog-b585a4e90940e9f4d2d288d462d1c273ae5ffa09.tar.gz
rsyslog-b585a4e90940e9f4d2d288d462d1c273ae5ffa09.tar.xz
rsyslog-b585a4e90940e9f4d2d288d462d1c273ae5ffa09.zip
addressed some race issues during queue shutdown
these occured in very unusual scenarios where we had a DA-queue running in parallel and very lengthy actions. Then, in some situations, the shutdown could hang. The code needs some addition lab time, but is believed to be much better than any previous version.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c34
1 files changed, 20 insertions, 14 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index b4f00446..d9942365 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1070,6 +1070,11 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
/* instruct workers to finish ASAP, even if still work exists */
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
+ /* now DA queue */
+ if(pThis->bIsDA) {
+ pThis->pqDA->bEnqOnly = 1;
+ pThis->pqDA->bShutdownImmediate = 1;
+ }
// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance...
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
@@ -1356,14 +1361,10 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue
/* remove messages from the physical queue store that are fully processed. This is
- * controlled via the to-delete list. We can only delete those elements, that are
- * at the current physical tail of the queue. If the batch is from another position,
- * we schedule it for deletion, but actual deletion will happen at a later call
- * of this function here. We always delete as much as possible, which includes
- * picking up things from the to-delete list.
+ * controlled via the to-delete list.
*/
static inline rsRetVal
-DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
{
toDeleteLst_t *pTdl;
qDeqID deqIDDel;
@@ -1371,11 +1372,10 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
- assert(nDeleted > 0);
pTdl = tdlPeek(pThis); /* get current head element */
if(pTdl == NULL) { /* to-delete list empty */
- DoDeleteBatchFromQStore(pThis, nDeleted);
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else if(pBatch->deqID == pThis->deqIDDel) {
deqIDDel = pThis->deqIDDel;
pTdl = tdlPeek(pThis);
@@ -1385,10 +1385,12 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
++deqIDDel;
pTdl = tdlPeek(pThis);
}
+ /* old entries deleted, now delete current ones... */
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else {
/* can not delete, insert into to-delete list */
dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
- CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted));
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
}
finalize_it:
@@ -1408,20 +1410,23 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
void *pUsr;
int nEnqueued = 0;
rsRetVal localRet;
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
for(i = 0 ; i < pBatch->nElem ; ++i) {
-dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
+dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n",
+i, pBatch->pElem[i].pUsrp, ((msg_t*)pBatch->pElem[i].pUsrp)->iRefCount, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
-RUNLOG_STR("we need to requeue the entry");
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*) pUsr));
+dbgprintf("we need to requeue the entry, refcnt now %d\n", ((msg_t*) pUsr)->iRefCount);
++nEnqueued;
if(localRet != RS_RET_OK) {
DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
@@ -1435,11 +1440,11 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque
if(nEnqueued > 0)
qqueueChkPersist(pThis, nEnqueued);
- if(i > 0)
- iRet = DeleteBatchFromQStore(pThis, pBatch, i);
+ iRet = DeleteBatchFromQStore(pThis, pBatch);
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
+ pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
@@ -2072,7 +2077,8 @@ CODESTARTobjDestruct(qqueue)
* we need to reset the logical dequeue pointer, persist the queue if configured to do
* so and then destruct everything. -- rgerhards, 2009-05-26
*/
- CHKiRet(pThis->qUnDeqAll(pThis));
+RUNLOG_STR("XXX: NOT undequeueing entries!");
+ //CHKiRet(pThis->qUnDeqAll(pThis));
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
CHKiRet(DoSaveOnShutdown(pThis));