From b0ea982f5802b020b9e35f37cfa3a4acacd048bb Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 15 Jan 2008 16:49:02 +0000 Subject: improved shutdown processing - in-memory queue is now drained to disk --- queue.c | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index a33c677a..c6f761e0 100644 --- a/queue.c +++ b/queue.c @@ -301,7 +301,8 @@ queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize); CHKiRet(queueEnqObj(pThis->pqDA, pUsr)); - if(pThis->iQueueSize == pThis->iLowWtrMrk) { + /* we check if we reached the low water mark, but only if we are not in shutdown mode */ + if(pThis->iQueueSize == pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -363,20 +364,42 @@ queueStrtDA(queue_t *pThis) CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); - CHKiRet(queueSettoQShutdown(pThis->pqDA, pThis->toQShutdown)); CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); + if(pThis->toQShutdown == 0) { + CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ + } else { + /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND + * have an obviously large backlog, we can't finish it in any case. So there is no point + * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 + */ + CHKiRet(queueSettoQShutdown(pThis->pqDA, 1)); + } iRet = queueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - /* tell our fellow workers to shut down */ + /* tell our fellow workers to shut down + * NOTE: we do NOT join them by intension! If we did, we would hold draining + * the queue until some potentially long-running actions are finished. Having + * the ability to immediatly drain the queue was the primary intension of + * reserving worker thread 0 for DA queues. So if we would join the other + * workers, we would screw up and do against our design goal. + */ CHKiRet(queueTellWrkThrds(pThis, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE)); + /* as we are right now starting DA mode because we are so busy, it is + * extremely unlikely that any worker is sleeping on empty queue. HOWEVER, + * we want to be on the safe side, and so we awake anyone that is waiting + * on one. So even if the scheduler plays badly with us, things should be + * quite well. -- rgerhards, 2008-01-15 + */ + pthread_cond_broadcast(pThis->notEmpty); + pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */ dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", @@ -889,7 +912,6 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) /* awake them... */ pthread_cond_broadcast(pThis->notEmpty); -dbgprintf("queueWrkThrdTrm broadcasted notEmpty\n"); if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */ pthread_cond_broadcast(&pThis->condDA); @@ -1079,9 +1101,13 @@ dbgprintf("CallConsumer returns %d\n", iRet); } -/* Each queue has one associated worker (consumer) thread. It will pull +/* Each queue has at least one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. + * Worker thread 0 is always reserved for disk-assisted mode (if the queue + * is not DA, this worker will be dormant). All other workers are for + * regular operations mode. Workers are started and stopped as need arises. + * rgerhards, 2008-01-15 */ static void * queueWorker(void *arg) -- cgit