diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-25 10:45:25 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-25 10:45:25 +0000 |
commit | 167abdb5b3fa6900edd6bbdb1cc7d586896a268c (patch) | |
tree | bed714a9789bd3f7bd2c86039dfdd4196471b85a /queue.c | |
parent | 5c686c8adcc473cbdbb14e4b2d736f9123210ee6 (diff) | |
download | rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.tar.gz rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.tar.xz rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.zip |
restructured queue shutdown so that the queue timeout is properly applied
before terminatiing the queue
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 256 |
1 files changed, 159 insertions, 97 deletions
@@ -1,3 +1,5 @@ + // TODO: we need to implement peek(), without it (today!) we lose one message upon + // worker cancellation! -- rgerhards, 2008-01-14 // TODO: think about mutDA - I think it's no longer needed // TODO: start up the correct num of workers when switching to non-DA mode // TODO: "preforked" worker threads @@ -776,81 +778,157 @@ queueDel(queue_t *pThis, void *pUsr) static rsRetVal queueShutdownWorkers(queue_t *pThis) { DEFiRet; + DEFVARS_mutexProtection; int i; + struct timespec tTimeout; + rsRetVal iRetLocal; ISOBJ_TYPE_assert(pThis, queue); dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); - /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This - * is necessary so that all threads get sent the termination command. With a timeout of 0, however, - * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as - * good. - */ - wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, pThis->toQShutdown); - if(iRet == RS_RET_TIMED_OUT) { - if(pThis->toQShutdown == 0) { - iRet = RS_RET_OK; - } else { - /* OK, we now need to try force the shutdown */ - dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", - queueGetID(pThis)); - iRet = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); + // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25 + /* first try to shutdown the queue within the regular shutdown period */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->iQueueSize > 0) { + if(pThis->bRunsDA) { + /* worker threads may be inactive after reaching low water + * mark. Lower the mark and react workers. + */ + pThis->iLowWtrMrk = 0; + wtpAdviseMaxWorkers(pThis->pWtpReg, 1); } } + END_MTX_PROTECTED_OPERATIONS(pThis->mut); - if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ - /* still didn't work out - so we now need to cancel the workers */ - dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); - iRet = wtpCancelAll(pThis->pWtpReg); - } - - // TODO: do it just once but right ;) - if(pThis->pWtpDA != NULL) { - wtpShutdownAll(pThis->pWtpDA, pThis->toQShutdown, pThis->toQShutdown); - if(iRet == RS_RET_TIMED_OUT) { - if(pThis->toQShutdown == 0) { - iRet = RS_RET_OK; - } else { - /* OK, we now need to try force the shutdown */ - dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", + /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found + * out there are no active workers - that doesn't matter: the wtp knows about that and so will + * return immediately. + * We do not yet care about the DA worker - that will be handled down later in the process. + * Note that we must not request shutdown right now - that may introduce a race: if the regular queue + * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA + * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead + * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way + * (from a performance point of view). So we don't do anything right now. The DA queue will continue to + * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything + * by not requesting shutdown now. + * rgerhards, 2008-01-25 + */ + /* first calculate absolute timeout - we need the absolute value here, because we need to coordinate + * shutdown of both the regular and DA queue on *the same* timeout. + */ + timeoutComp(&tTimeout, pThis->toQShutdown); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis)); + } else { + /* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */ + dbgprintf("Queue 0x%lx: regular queue workers shut down.\n", queueGetID(pThis)); + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->bRunsDA) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: we have a DA queue (0x%lx), requesting its shutdown.\n", + queueGetID(pThis), queueGetID(pThis->pqDA)); + /* we use the same absolute timeout as above, so we do not use more than the configured + * timeout interval! + */ + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n", queueGetID(pThis)); - iRet = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } + } - if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ - /* still didn't work out - so we now need to cancel the workers */ - dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); - iRet = wtpCancelAll(pThis->pWtpDA); + /* when we reach this point, both queues are either empty or the regular queue shutdown timeout + * has expired. Now we need to check if we areconfigured to not loose messages. If so, we need + * to persist the queue to disk (this is only possible if the queue is DA-enabled). + */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + /* optimize parameters for shutdown of DA-enabled queues */ + if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) { + /* switch to enqueue-only mode so that no more actions happen */ + if(pThis->bRunsDA == 0) { + queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ + } else { + queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* switch to enqueue-only mode */ } + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + /* make sure we do not timeout before we are done */ + dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis)); + timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); + /* and run the primary's queue worker to drain the queue */ + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, " + "continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); + } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } - - - /* finally join the threads - * In case of a cancellation, this may actually take some time. This is also - * needed to clean up the thread descriptors, even with a regular termination. - * And, most importantly, this is needed if we have an indifitite termination - * time set (timeout == 0)! -- rgerhards, 2008-01-14 + + /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we + * can now request immediate shutdown of any remaining workers. */ -#if 0 // totally wrong, we must implement something along these lines in wtp! -RUNLOG; - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWtpReg->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) { - wtiJoinThrd(pThis->pWtpReg->pWrkr[i]); + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->iQueueSize > 0) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue " + "in disk save mode. Continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } -RUNLOG; - if(pThis->pWtpDA != NULL) { - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWtpDA->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) { - wtiJoinThrd(pThis->pWtpDA->pWrkr[i]); - } + /* Now queue workers should have terminated. If not, we need to cancel them as we have applied + * all timeout setting. If any worker in any queue still executes, its consumer is possibly + * long-running and cancelling is the only way to get rid of it. Note that the + * cancellation handler will probably re-queue a user pointer, so the queue's enqueue + * function is still needed (what is no problem as we do not yet destroy the queue - but I + * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25 + */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->iQueueSize > 0) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: primary queue worker threads could not be shutdown, now canceling them\n", + queueGetID(pThis)); + iRetLocal = wtpCancelAll(pThis->pWtpReg); + if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker " + "threads, continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); + } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + } + + /* ... and now the DA queue, if it exists (should always be after the primary one) */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: DA worker threads could not be shutdown, now canceling them\n", + queueGetID(pThis)); + iRetLocal = wtpCancelAll(pThis->pWtpReg); + if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker " + "threads, continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } -#endif + /* ... finally ... all worker threads have terminated :-) + * Well, more precisely, they *are in termination*. Some cancel cleanup handlers + * may still be running. + */ dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", queueGetID(pThis), pThis->iQueueSize); @@ -1112,20 +1190,21 @@ queueChkStopWrkrReg(queue_t *pThis) /* must only be called when the queue mutex is locked, else results - * are not stable! DA version + * are not stable! DA queue version */ static int queueIsIdleDA(queue_t *pThis) { - return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); + /* remember: iQueueSize is the DA queue size, not the main queue! */ + return (pThis->iQueueSize == 0); } /* must only be called when the queue mutex is locked, else results - * are not stable! Regular version + * are not stable! Regular queue version */ static int queueIsIdleReg(queue_t *pThis) { - return (pThis->iQueueSize == 0); + return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); } @@ -1322,7 +1401,6 @@ rsRetVal queueDestruct(queue_t **ppThis) { DEFiRet; queue_t *pThis; - DEFVARS_mutexProtection; assert(ppThis != NULL); pThis = *ppThis; @@ -1332,55 +1410,37 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* we do not need to take care of any messages left in queue if we are in enqueue only mode */ - if(!pThis->bEnqOnly) { - /* in regular mode, need look at termination */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && pThis->iQueueSize > 0) { // TODO: atomic iQueueSize! - dbgprintf("IsDA queue, modifying params for draining\n"); - pThis->iHighWtrMrk = 1; /* make sure we drain */ - pThis->iLowWtrMrk = 0; /* disable low water mark algo */ - if(pThis->bRunsDA == 0) { - if(pThis->iQueueSize > 0) { - queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ - } - } else { - queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ - /* worker may have been waited on low water mark, reactivate */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); - } - if(pThis->bSaveOnShutdown) { - dbgprintf("bSaveOnShutdown set, eternal timeout set\n"); - pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; - } - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - } - } + /* shut down all workers (handles *all* of the persistence logic) */ + queueShutdownWorkers(pThis); - /* at this point, the queue is either empty with all workers being idle (or deact) or the queue - * is full and all workers are running. We now need to wait for everyone to become idle. - */ + /* finally destruct our (regular) worker thread pool */ if(pThis->qType != QUEUETYPE_DIRECT) { - queueShutdownWorkers(pThis); + wtpDestruct(&pThis->pWtpReg); } - /* if still running DA, terminate disk queue (note that the DA queue is NULL if it was never used) */ - if(pThis->bRunsDA && pThis->pqDA != NULL) + /* Now check if we actually have a DA queue and, if so, destruct it. + * Note that the wtp must be destructed first, it may be in cancel cleanup handler + * *right now* and actually *need* to access the queue object to persist some final + * data (re-queueing case). So we need to destruct the wtp first, which will make + * sure all workers have terminated. + */ + if(pThis->pWtpDA != NULL) { + wtpDestruct(&pThis->pWtpDA); queueDestruct(&pThis->pqDA); + } - /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */ + /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) + * This handler is most important for disk queues, it will finally persist the necessary + * on-disk structures. In theory, other queueing modes may implement their other (non-DA) + * methods of persisting a queue between runs, but in practice all of this is done via + * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here + * if need arises (what I doubt...) -- rgerhards, 2008-01-25 + */ CHKiRet_Hdlr(queuePersist(pThis)) { dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet); } - /* ... then free resources */ - if(pThis->qType != QUEUETYPE_DIRECT) { - wtpDestruct(&pThis->pWtpReg); - if(pThis->pWtpDA != NULL) - wtpDestruct(&pThis->pWtpDA); - } - + /* finally, clean up some simple things... */ if(pThis->pqParent == NULL) { /* if we are not a child, we allocated our own mutex, which we now need to destroy */ pthread_mutex_destroy(pThis->mut); @@ -1389,6 +1449,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove pthread_mutex_destroy(&pThis->mutThrdMgmt); pthread_cond_destroy(&pThis->notFull); pthread_cond_destroy(&pThis->notEmpty); + /* type-specific destructor */ iRet = pThis->qDestruct(pThis); @@ -1467,6 +1528,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) ISOBJ_TYPE_assert(pThis, queue); +// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly* dbgprintf("Queue %p: EnqObj() 1\n", pThis); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE |