From 184497d4cbc438d14d1dedb0b21e8b6e27990690 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 10 May 2012 14:44:26 +0200 Subject: bugfix: disk queue was not persisted on shutdown, regression of fix to http://bugzilla.adiscon.com/show_bug.cgi?id=299 The new code also handles the case of shutdown of blocking light and full delayable sources somewhat smarter and permits, assuming sufficient timouts, to persist message up to the max queue capacity. Also some nits in debug instrumentation have been fixed. --- ChangeLog | 8 +++++++- runtime/queue.c | 16 +++++----------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ChangeLog b/ChangeLog index 30f12f07..ea21b3ea 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,11 @@ --------------------------------------------------------------------------- -Version 5.8.12 [V5-stable] 2012-05-03 +Version 5.8.12 [V5-stable] 2012-05-?? +- bugfix: disk queue was not persisted on shutdown, regression of fix to + http://bugzilla.adiscon.com/show_bug.cgi?id=299 + The new code also handles the case of shutdown of blocking light and + full delayable sources somewhat smarter and permits, assuming sufficient + timouts, to persist message up to the max queue capacity. Also some nits + in debug instrumentation have been fixed. - bugfix/tcpflood: sending small test files did not work correctly --------------------------------------------------------------------------- Version 5.8.11 [V5-stable] 2012-05-03 diff --git a/runtime/queue.c b/runtime/queue.c index e968806c..9d92af36 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2247,10 +2247,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) int err; struct timespec t; - if(glbl.GetGlobalInputTermState()) { - ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ @@ -2277,9 +2273,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14 */ if(flowCtlType == eFLOWCTL_FULL_DELAY) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message " - "- blocking.\n"); - while(pThis->iQueueSize >= pThis->iFullDlyMrk) { + while(pThis->iQueueSize >= pThis->iFullDlyMrk&& ! glbl.GetGlobalInputTermState()) { /* We have a problem during shutdown if we block eternally. In that * case, the the input thread cannot be terminated. So we wake up * from time to time to check for termination. @@ -2291,6 +2285,8 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) * In any case, this was the old code (if we do the TODO): * pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); */ + DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message " + "- blocking, queue size is %d.\n", pThis->iQueueSize); timeoutComp(&t, 1000); err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); if(err != 0 && err != ETIMEDOUT) { @@ -2303,11 +2299,8 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) } DBGPRINTF("wti worker in full delay timed out, checking termination...\n"); - if(glbl.GetGlobalInputTermState()) { - ABORT_FINALIZE(RS_RET_FORCE_TERM); - } } - } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) { + } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY && !glbl.GetGlobalInputTermState()) { if(pThis->iQueueSize >= pThis->iLightDlyMrk) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light " "delayable message - blocking a bit.\n"); @@ -2332,6 +2325,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); if(glbl.GetGlobalInputTermState()) { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL, discard due to FORCE_TERM.\n"); ABORT_FINALIZE(RS_RET_FORCE_TERM); } timeoutComp(&t, pThis->toEnq); -- cgit