summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-25 10:45:25 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-25 10:45:25 +0000
commit167abdb5b3fa6900edd6bbdb1cc7d586896a268c (patch)
treebed714a9789bd3f7bd2c86039dfdd4196471b85a /queue.c
parent5c686c8adcc473cbdbb14e4b2d736f9123210ee6 (diff)
downloadrsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.tar.gz
rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.tar.xz
rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.zip
restructured queue shutdown so that the queue timeout is properly applied
before terminatiing the queue
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c256
1 files changed, 159 insertions, 97 deletions
diff --git a/queue.c b/queue.c
index c3666db2..320b3385 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,5 @@
+ // TODO: we need to implement peek(), without it (today!) we lose one message upon
+ // worker cancellation! -- rgerhards, 2008-01-14
// TODO: think about mutDA - I think it's no longer needed
// TODO: start up the correct num of workers when switching to non-DA mode
// TODO: "preforked" worker threads
@@ -776,81 +778,157 @@ queueDel(queue_t *pThis, void *pUsr)
static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
+ DEFVARS_mutexProtection;
int i;
+ struct timespec tTimeout;
+ rsRetVal iRetLocal;
ISOBJ_TYPE_assert(pThis, queue);
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
- /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This
- * is necessary so that all threads get sent the termination command. With a timeout of 0, however,
- * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as
- * good.
- */
- wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, pThis->toQShutdown);
- if(iRet == RS_RET_TIMED_OUT) {
- if(pThis->toQShutdown == 0) {
- iRet = RS_RET_OK;
- } else {
- /* OK, we now need to try force the shutdown */
- dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
- queueGetID(pThis));
- iRet = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
+ // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25
+ /* first try to shutdown the queue within the regular shutdown period */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->iQueueSize > 0) {
+ if(pThis->bRunsDA) {
+ /* worker threads may be inactive after reaching low water
+ * mark. Lower the mark and react workers.
+ */
+ pThis->iLowWtrMrk = 0;
+ wtpAdviseMaxWorkers(pThis->pWtpReg, 1);
}
}
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
- /* still didn't work out - so we now need to cancel the workers */
- dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
- iRet = wtpCancelAll(pThis->pWtpReg);
- }
-
- // TODO: do it just once but right ;)
- if(pThis->pWtpDA != NULL) {
- wtpShutdownAll(pThis->pWtpDA, pThis->toQShutdown, pThis->toQShutdown);
- if(iRet == RS_RET_TIMED_OUT) {
- if(pThis->toQShutdown == 0) {
- iRet = RS_RET_OK;
- } else {
- /* OK, we now need to try force the shutdown */
- dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
+ /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
+ * out there are no active workers - that doesn't matter: the wtp knows about that and so will
+ * return immediately.
+ * We do not yet care about the DA worker - that will be handled down later in the process.
+ * Note that we must not request shutdown right now - that may introduce a race: if the regular queue
+ * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA
+ * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead
+ * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way
+ * (from a performance point of view). So we don't do anything right now. The DA queue will continue to
+ * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything
+ * by not requesting shutdown now.
+ * rgerhards, 2008-01-25
+ */
+ /* first calculate absolute timeout - we need the absolute value here, because we need to coordinate
+ * shutdown of both the regular and DA queue on *the same* timeout.
+ */
+ timeoutComp(&tTimeout, pThis->toQShutdown);
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis));
+ } else {
+ /* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */
+ dbgprintf("Queue 0x%lx: regular queue workers shut down.\n", queueGetID(pThis));
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->bRunsDA) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: we have a DA queue (0x%lx), requesting its shutdown.\n",
+ queueGetID(pThis), queueGetID(pThis->pqDA));
+ /* we use the same absolute timeout as above, so we do not use more than the configured
+ * timeout interval!
+ */
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n",
queueGetID(pThis));
- iRet = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
}
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
+ }
- if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
- /* still didn't work out - so we now need to cancel the workers */
- dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
- iRet = wtpCancelAll(pThis->pWtpDA);
+ /* when we reach this point, both queues are either empty or the regular queue shutdown timeout
+ * has expired. Now we need to check if we areconfigured to not loose messages. If so, we need
+ * to persist the queue to disk (this is only possible if the queue is DA-enabled).
+ */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ /* optimize parameters for shutdown of DA-enabled queues */
+ if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) {
+ /* switch to enqueue-only mode so that no more actions happen */
+ if(pThis->bRunsDA == 0) {
+ queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
+ } else {
+ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* switch to enqueue-only mode */
}
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ /* make sure we do not timeout before we are done */
+ dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis));
+ timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ /* and run the primary's queue worker to drain the queue */
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
+ "continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
+ }
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
-
-
- /* finally join the threads
- * In case of a cancellation, this may actually take some time. This is also
- * needed to clean up the thread descriptors, even with a regular termination.
- * And, most importantly, this is needed if we have an indifitite termination
- * time set (timeout == 0)! -- rgerhards, 2008-01-14
+
+ /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
+ * can now request immediate shutdown of any remaining workers.
*/
-#if 0 // totally wrong, we must implement something along these lines in wtp!
-RUNLOG;
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWtpReg->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) {
- wtiJoinThrd(pThis->pWtpReg->pWrkr[i]);
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->iQueueSize > 0) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue "
+ "in disk save mode. Continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
}
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
-RUNLOG;
- if(pThis->pWtpDA != NULL) {
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWtpDA->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) {
- wtiJoinThrd(pThis->pWtpDA->pWrkr[i]);
- }
+ /* Now queue workers should have terminated. If not, we need to cancel them as we have applied
+ * all timeout setting. If any worker in any queue still executes, its consumer is possibly
+ * long-running and cancelling is the only way to get rid of it. Note that the
+ * cancellation handler will probably re-queue a user pointer, so the queue's enqueue
+ * function is still needed (what is no problem as we do not yet destroy the queue - but I
+ * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
+ */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->iQueueSize > 0) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: primary queue worker threads could not be shutdown, now canceling them\n",
+ queueGetID(pThis));
+ iRetLocal = wtpCancelAll(pThis->pWtpReg);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker "
+ "threads, continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
+ }
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ }
+
+ /* ... and now the DA queue, if it exists (should always be after the primary one) */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: DA worker threads could not be shutdown, now canceling them\n",
+ queueGetID(pThis));
+ iRetLocal = wtpCancelAll(pThis->pWtpReg);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker "
+ "threads, continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
}
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
-#endif
+ /* ... finally ... all worker threads have terminated :-)
+ * Well, more precisely, they *are in termination*. Some cancel cleanup handlers
+ * may still be running.
+ */
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
queueGetID(pThis), pThis->iQueueSize);
@@ -1112,20 +1190,21 @@ queueChkStopWrkrReg(queue_t *pThis)
/* must only be called when the queue mutex is locked, else results
- * are not stable! DA version
+ * are not stable! DA queue version
*/
static int
queueIsIdleDA(queue_t *pThis)
{
- return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
+ /* remember: iQueueSize is the DA queue size, not the main queue! */
+ return (pThis->iQueueSize == 0);
}
/* must only be called when the queue mutex is locked, else results
- * are not stable! Regular version
+ * are not stable! Regular queue version
*/
static int
queueIsIdleReg(queue_t *pThis)
{
- return (pThis->iQueueSize == 0);
+ return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
}
@@ -1322,7 +1401,6 @@ rsRetVal queueDestruct(queue_t **ppThis)
{
DEFiRet;
queue_t *pThis;
- DEFVARS_mutexProtection;
assert(ppThis != NULL);
pThis = *ppThis;
@@ -1332,55 +1410,37 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
- /* we do not need to take care of any messages left in queue if we are in enqueue only mode */
- if(!pThis->bEnqOnly) {
- /* in regular mode, need look at termination */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- /* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && pThis->iQueueSize > 0) { // TODO: atomic iQueueSize!
- dbgprintf("IsDA queue, modifying params for draining\n");
- pThis->iHighWtrMrk = 1; /* make sure we drain */
- pThis->iLowWtrMrk = 0; /* disable low water mark algo */
- if(pThis->bRunsDA == 0) {
- if(pThis->iQueueSize > 0) {
- queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
- }
- } else {
- queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */
- /* worker may have been waited on low water mark, reactivate */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
- }
- if(pThis->bSaveOnShutdown) {
- dbgprintf("bSaveOnShutdown set, eternal timeout set\n");
- pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
- }
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- }
- }
+ /* shut down all workers (handles *all* of the persistence logic) */
+ queueShutdownWorkers(pThis);
- /* at this point, the queue is either empty with all workers being idle (or deact) or the queue
- * is full and all workers are running. We now need to wait for everyone to become idle.
- */
+ /* finally destruct our (regular) worker thread pool */
if(pThis->qType != QUEUETYPE_DIRECT) {
- queueShutdownWorkers(pThis);
+ wtpDestruct(&pThis->pWtpReg);
}
- /* if still running DA, terminate disk queue (note that the DA queue is NULL if it was never used) */
- if(pThis->bRunsDA && pThis->pqDA != NULL)
+ /* Now check if we actually have a DA queue and, if so, destruct it.
+ * Note that the wtp must be destructed first, it may be in cancel cleanup handler
+ * *right now* and actually *need* to access the queue object to persist some final
+ * data (re-queueing case). So we need to destruct the wtp first, which will make
+ * sure all workers have terminated.
+ */
+ if(pThis->pWtpDA != NULL) {
+ wtpDestruct(&pThis->pWtpDA);
queueDestruct(&pThis->pqDA);
+ }
- /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */
+ /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
+ * This handler is most important for disk queues, it will finally persist the necessary
+ * on-disk structures. In theory, other queueing modes may implement their other (non-DA)
+ * methods of persisting a queue between runs, but in practice all of this is done via
+ * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
+ * if need arises (what I doubt...) -- rgerhards, 2008-01-25
+ */
CHKiRet_Hdlr(queuePersist(pThis)) {
dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet);
}
- /* ... then free resources */
- if(pThis->qType != QUEUETYPE_DIRECT) {
- wtpDestruct(&pThis->pWtpReg);
- if(pThis->pWtpDA != NULL)
- wtpDestruct(&pThis->pWtpDA);
- }
-
+ /* finally, clean up some simple things... */
if(pThis->pqParent == NULL) {
/* if we are not a child, we allocated our own mutex, which we now need to destroy */
pthread_mutex_destroy(pThis->mut);
@@ -1389,6 +1449,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
pthread_mutex_destroy(&pThis->mutThrdMgmt);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->notEmpty);
+
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
@@ -1467,6 +1528,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
ISOBJ_TYPE_assert(pThis, queue);
+// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly*
dbgprintf("Queue %p: EnqObj() 1\n", pThis);
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE