diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/queue.c | 47 |
1 files changed, 42 insertions, 5 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 9961ed4d..e968806c 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2243,8 +2243,9 @@ finalize_it: static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) { - struct timespec t; DEFiRet; + int err; + struct timespec t; if(glbl.GetGlobalInputTermState()) { ABORT_FINALIZE(RS_RET_FORCE_TERM); @@ -2276,15 +2277,48 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14 */ if(flowCtlType == eFLOWCTL_FULL_DELAY) { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message " + "- blocking.\n"); while(pThis->iQueueSize >= pThis->iFullDlyMrk) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n"); - pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); + /* We have a problem during shutdown if we block eternally. In that + * case, the the input thread cannot be terminated. So we wake up + * from time to time to check for termination. + * TODO/v6(at earliest): check if we could signal the condition during + * shutdown. However, this requires new queue registries and thus is + * far to much change for a stable version (and I am still not sure it + * is worth the effort, given how seldom this situation occurs and how + * few resources the wakeups need). -- rgerhards, 2012-05-03 + * In any case, this was the old code (if we do the TODO): + * pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); + */ + timeoutComp(&t, 1000); + err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); + if(err != 0 && err != ETIMEDOUT) { + /* Something is really wrong now. Report to debug log and abort the + * wait. That keeps us running, even though we may lose messages. + */ + DBGOPRINT((obj_t*) pThis, "potential program bug: pthread_cond_timedwait()" + "/fulldelay returned %d\n", err); + break; + + } + DBGPRINTF("wti worker in full delay timed out, checking termination...\n"); + if(glbl.GetGlobalInputTermState()) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } } } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) { if(pThis->iQueueSize >= pThis->iLightDlyMrk) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n"); + DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light " + "delayable message - blocking a bit.\n"); timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ - pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */ + err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); + if(err != 0 && err != ETIMEDOUT) { + /* Something is really wrong now. Report to debug log */ + DBGOPRINT((obj_t*) pThis, "potential program bug: pthread_cond_timedwait()" + "/lightdelay returned %d\n", err); + + } } } @@ -2297,6 +2331,9 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); + if(glbl.GetGlobalInputTermState()) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } timeoutComp(&t, pThis->toEnq); STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); // TODO : handle enqOnly => discard! |