From 4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 13 Oct 2009 14:38:45 +0200 Subject: added some debug settings plus improved shutdown sequence ... non-working version! --- runtime/queue.c | 51 +++++++++++++++++++++++++++++++++++++++------------ runtime/queue.h | 9 +++++---- runtime/wti.c | 15 +++++++++++++++ runtime/wtp.c | 4 ++-- 4 files changed, 61 insertions(+), 18 deletions(-) (limited to 'runtime') diff --git a/runtime/queue.c b/runtime/queue.c index 101052a1..00bbd15f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1041,7 +1041,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) batchObj.pUsrp = (obj_t*) pUsr; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; - iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); + iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); objDestruct(pUsr); RETiRet; @@ -1180,6 +1180,9 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } else { DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n"); } + } + + if(pThis->pWtpDA != NULL) { /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting * the queue, it is restarted at a later stage. We don't care here if a timeout happens. */ @@ -1210,6 +1213,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) rsRetVal iRetLocal; DEFiRet; +RUNLOG_STR("trying to shutdown workers within Action Timeout"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ @@ -1218,6 +1222,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 */ pThis->bEnqOnly = 1; + pThis->bShutdownImmediate = 1; /* need to set this so that the DA queue begins shutdown in parallel! */ if(pThis->pqDA != NULL) { pThis->pqDA->bEnqOnly = 1; @@ -1247,6 +1252,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } + } + + if(pThis->pWtpDA != NULL) { /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This * is necessary because we may be in a situation where the DA queue regular worker and the * main queue worker stopped rather quickly. In this case, there is almost no time (and @@ -1279,6 +1287,7 @@ static rsRetVal cancelWorkers(qqueue_t *pThis) { rsRetVal iRetLocal; + struct timespec tTimeout; DEFiRet; /* Now queue workers should have terminated. If not, we need to cancel them as we have applied @@ -1300,13 +1309,31 @@ cancelWorkers(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", iRetLocal); } + } - /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be - * restarted later to persist the queue. But we stop it, because otherwise we get into - * big trouble when resetting the logical dequeue pointer. This operation can only be - * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 + /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be + * restarted later to persist the queue. But we stop it, because otherwise we get into + * big trouble when resetting the logical dequeue pointer. This operation can only be + * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 + */ + if(pThis->pWtpDA != NULL) { + /* but because of the potentially harsh consequences of cancelling, we try one last + * (and short) time to shut down the DA worker in a normal fashion. The idea here + * is that it may be willing to do so, but we did not yet have a task switch so + * that it could not terminate but will do immediately when it gets time. + * rgerhards, 2009-10-13 */ - DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n"); + timeoutComp(&tTimeout, 50); + DBGOPRINT((obj_t*) pThis, "one ultimately last try for regular shutdown of main queue DA worker pool\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool " + "- this is not good, need to cancel now...\n"); + } else { + DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down!\n"); + } + + DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n"); iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */ } @@ -1349,6 +1376,7 @@ ShutdownWorkers(qqueue_t *pThis) pThis->iLowWtrMrk = 0; CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); +dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis)); if(getPhysicalQueueSize(pThis) > 0) { CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis)); @@ -1375,7 +1403,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) { DEFiRet; qqueue_t *pThis; @@ -1835,7 +1863,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1880,7 +1908,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) d_pthread_mutex_unlock(pThis->mut); /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->batch.nElem ; i++) { + for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 @@ -1925,7 +1953,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; RUNLOG_STR("XXX: re-start reg worker"); -qqueueAdviseMaxWorkers(pThis); +if(!pThis->bShutdownImmediate) + qqueueAdviseMaxWorkers(pThis); RUNLOG_STR("XXX: done re-start reg worker"); } } else { @@ -2276,8 +2305,6 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g /* destructor for the queue object */ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(qqueue) - pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) diff --git a/runtime/queue.h b/runtime/queue.h index 73c62b52..74bf2d31 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -59,10 +59,10 @@ typedef struct queue_s { BEGINobjInstance; queueType_t qType; int nLogDeq; /* number of elements currently logically dequeued */ + int bShutdownImmediate; /* should all workers cease processing messages? */ bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ - bool bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */ int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ @@ -101,10 +101,11 @@ typedef struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 - * is pointer to an array of message message pointers) + * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero + * during normal operations and one if the consumer must urgently shut down. */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -185,7 +186,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); diff --git a/runtime/wti.c b/runtime/wti.c index 53b695b0..c3ab0aba 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -273,7 +273,9 @@ dbgprintf("YYY/ZZZ: pre lock mutex\n"); dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); +RUNLOG; if(terminateRet == RS_RET_TERMINATE_NOW) { +RUNLOG; /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", @@ -281,6 +283,7 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); d_pthread_mutex_unlock(pWtp->pmutUsr); break; } +RUNLOG; /* try to execute and process whatever we have */ /* Note that this function releases and re-aquires the mutex. The returned @@ -290,27 +293,39 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); if(localRet == RS_RET_IDLE) { +RUNLOG; if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } +RUNLOG; doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); +RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); +RUNLOG; continue; /* request next iteration */ } +RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); bInactivityTOOccured = 0; /* reset for next run */ } /* indicate termination */ +RUNLOG; d_pthread_mutex_lock(pWtp->pmutUsr); +RUNLOG; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +RUNLOG; pthread_cleanup_pop(0); /* remove cleanup handler */ +RUNLOG; pWtp->pfOnWorkerShutdown(pWtp->pUsr); +RUNLOG; pthread_setcancelstate(iCancelStateSave, NULL); +RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); +RUNLOG; RETiRet; } diff --git a/runtime/wtp.c b/runtime/wtp.c index 40d031dc..93234819 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -169,9 +169,9 @@ wtpWakeupAllWrkr(wtp_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, wtp); - d_pthread_mutex_lock(pThis->pmutUsr); + //d_pthread_mutex_lock(pThis->pmutUsr); pthread_cond_broadcast(pThis->pcondBusy); - d_pthread_mutex_unlock(pThis->pmutUsr); + //d_pthread_mutex_unlock(pThis->pmutUsr); RETiRet; } -- cgit