summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-13 14:38:45 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-13 14:38:45 +0200
commit4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 (patch)
tree35e71c16c55fd3a18a9e5f47d0b027866f825e35 /runtime/queue.c
parentbecc47cef625bfabf53589bb98ca10c352a4c824 (diff)
downloadrsyslog-4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32.tar.gz
rsyslog-4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32.tar.xz
rsyslog-4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32.zip
added some debug settings plus improved shutdown sequence
... non-working version!
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c51
1 files changed, 39 insertions, 12 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 101052a1..00bbd15f 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1041,7 +1041,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
batchObj.pUsrp = (obj_t*) pUsr;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
- iRet = pThis->pConsumer(pThis->pUsr, &singleBatch);
+ iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
objDestruct(pUsr);
RETiRet;
@@ -1180,6 +1180,9 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
} else {
DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n");
}
+ }
+
+ if(pThis->pWtpDA != NULL) {
/* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
* the queue, it is restarted at a later stage. We don't care here if a timeout happens.
*/
@@ -1210,6 +1213,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
rsRetVal iRetLocal;
DEFiRet;
+RUNLOG_STR("trying to shutdown workers within Action Timeout");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
@@ -1218,6 +1222,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
* startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
*/
pThis->bEnqOnly = 1;
+ pThis->bShutdownImmediate = 1;
/* need to set this so that the DA queue begins shutdown in parallel! */
if(pThis->pqDA != NULL) {
pThis->pqDA->bEnqOnly = 1;
@@ -1247,6 +1252,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
"queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
+ }
+
+ if(pThis->pWtpDA != NULL) {
/* and now we need to check the DA worker itself (the one that shuffles data to the disk). This
* is necessary because we may be in a situation where the DA queue regular worker and the
* main queue worker stopped rather quickly. In this case, there is almost no time (and
@@ -1279,6 +1287,7 @@ static rsRetVal
cancelWorkers(qqueue_t *pThis)
{
rsRetVal iRetLocal;
+ struct timespec tTimeout;
DEFiRet;
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
@@ -1300,13 +1309,31 @@ cancelWorkers(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
+ }
- /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
- * restarted later to persist the queue. But we stop it, because otherwise we get into
- * big trouble when resetting the logical dequeue pointer. This operation can only be
- * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
+ /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
+ * restarted later to persist the queue. But we stop it, because otherwise we get into
+ * big trouble when resetting the logical dequeue pointer. This operation can only be
+ * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
+ */
+ if(pThis->pWtpDA != NULL) {
+ /* but because of the potentially harsh consequences of cancelling, we try one last
+ * (and short) time to shut down the DA worker in a normal fashion. The idea here
+ * is that it may be willing to do so, but we did not yet have a task switch so
+ * that it could not terminate but will do immediately when it gets time.
+ * rgerhards, 2009-10-13
*/
- DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n");
+ timeoutComp(&tTimeout, 50);
+ DBGOPRINT((obj_t*) pThis, "one ultimately last try for regular shutdown of main queue DA worker pool\n");
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
+ "- this is not good, need to cancel now...\n");
+ } else {
+ DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down!\n");
+ }
+
+ DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n");
iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
}
@@ -1349,6 +1376,7 @@ ShutdownWorkers(qqueue_t *pThis)
pThis->iLowWtrMrk = 0;
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
+dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis));
if(getPhysicalQueueSize(pThis) > 0) {
CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis));
@@ -1375,7 +1403,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1835,7 +1863,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
- CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -1880,7 +1908,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
d_pthread_mutex_unlock(pThis->mut);
/* iterate over returned results and enqueue them in DA queue */
- for(i = 0 ; i < pWti->batch.nElem ; i++) {
+ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
@@ -1925,7 +1953,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
iRet = RS_RET_TERMINATE_NOW;
RUNLOG_STR("XXX: re-start reg worker");
-qqueueAdviseMaxWorkers(pThis);
+if(!pThis->bShutdownImmediate)
+ qqueueAdviseMaxWorkers(pThis);
RUNLOG_STR("XXX: done re-start reg worker");
}
} else {
@@ -2276,8 +2305,6 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue)
- pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
-
/* shut down all workers
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)