summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-13 14:38:45 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-13 14:38:45 +0200
commit4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 (patch)
tree35e71c16c55fd3a18a9e5f47d0b027866f825e35 /runtime
parentbecc47cef625bfabf53589bb98ca10c352a4c824 (diff)
downloadrsyslog-4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32.tar.gz
rsyslog-4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32.tar.xz
rsyslog-4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32.zip
added some debug settings plus improved shutdown sequence
... non-working version!
Diffstat (limited to 'runtime')
-rw-r--r--runtime/queue.c51
-rw-r--r--runtime/queue.h9
-rw-r--r--runtime/wti.c15
-rw-r--r--runtime/wtp.c4
4 files changed, 61 insertions, 18 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 101052a1..00bbd15f 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1041,7 +1041,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
batchObj.pUsrp = (obj_t*) pUsr;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
- iRet = pThis->pConsumer(pThis->pUsr, &singleBatch);
+ iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
objDestruct(pUsr);
RETiRet;
@@ -1180,6 +1180,9 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
} else {
DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n");
}
+ }
+
+ if(pThis->pWtpDA != NULL) {
/* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
* the queue, it is restarted at a later stage. We don't care here if a timeout happens.
*/
@@ -1210,6 +1213,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
rsRetVal iRetLocal;
DEFiRet;
+RUNLOG_STR("trying to shutdown workers within Action Timeout");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
@@ -1218,6 +1222,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
* startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
*/
pThis->bEnqOnly = 1;
+ pThis->bShutdownImmediate = 1;
/* need to set this so that the DA queue begins shutdown in parallel! */
if(pThis->pqDA != NULL) {
pThis->pqDA->bEnqOnly = 1;
@@ -1247,6 +1252,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
"queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
+ }
+
+ if(pThis->pWtpDA != NULL) {
/* and now we need to check the DA worker itself (the one that shuffles data to the disk). This
* is necessary because we may be in a situation where the DA queue regular worker and the
* main queue worker stopped rather quickly. In this case, there is almost no time (and
@@ -1279,6 +1287,7 @@ static rsRetVal
cancelWorkers(qqueue_t *pThis)
{
rsRetVal iRetLocal;
+ struct timespec tTimeout;
DEFiRet;
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
@@ -1300,13 +1309,31 @@ cancelWorkers(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
+ }
- /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
- * restarted later to persist the queue. But we stop it, because otherwise we get into
- * big trouble when resetting the logical dequeue pointer. This operation can only be
- * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
+ /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
+ * restarted later to persist the queue. But we stop it, because otherwise we get into
+ * big trouble when resetting the logical dequeue pointer. This operation can only be
+ * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
+ */
+ if(pThis->pWtpDA != NULL) {
+ /* but because of the potentially harsh consequences of cancelling, we try one last
+ * (and short) time to shut down the DA worker in a normal fashion. The idea here
+ * is that it may be willing to do so, but we did not yet have a task switch so
+ * that it could not terminate but will do immediately when it gets time.
+ * rgerhards, 2009-10-13
*/
- DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n");
+ timeoutComp(&tTimeout, 50);
+ DBGOPRINT((obj_t*) pThis, "one ultimately last try for regular shutdown of main queue DA worker pool\n");
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
+ "- this is not good, need to cancel now...\n");
+ } else {
+ DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down!\n");
+ }
+
+ DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n");
iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
}
@@ -1349,6 +1376,7 @@ ShutdownWorkers(qqueue_t *pThis)
pThis->iLowWtrMrk = 0;
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
+dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis));
if(getPhysicalQueueSize(pThis) > 0) {
CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis));
@@ -1375,7 +1403,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1835,7 +1863,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
- CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -1880,7 +1908,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
d_pthread_mutex_unlock(pThis->mut);
/* iterate over returned results and enqueue them in DA queue */
- for(i = 0 ; i < pWti->batch.nElem ; i++) {
+ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
@@ -1925,7 +1953,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
iRet = RS_RET_TERMINATE_NOW;
RUNLOG_STR("XXX: re-start reg worker");
-qqueueAdviseMaxWorkers(pThis);
+if(!pThis->bShutdownImmediate)
+ qqueueAdviseMaxWorkers(pThis);
RUNLOG_STR("XXX: done re-start reg worker");
}
} else {
@@ -2276,8 +2305,6 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue)
- pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
-
/* shut down all workers
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
diff --git a/runtime/queue.h b/runtime/queue.h
index 73c62b52..74bf2d31 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -59,10 +59,10 @@ typedef struct queue_s {
BEGINobjInstance;
queueType_t qType;
int nLogDeq; /* number of elements currently logically dequeued */
+ int bShutdownImmediate; /* should all workers cease processing messages? */
bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
- bool bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
@@ -101,10 +101,11 @@ typedef struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
- * is pointer to an array of message message pointers)
+ * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero
+ * during normal operations and one if the consumer must urgently shut down.
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -185,7 +186,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
diff --git a/runtime/wti.c b/runtime/wti.c
index 53b695b0..c3ab0aba 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -273,7 +273,9 @@ dbgprintf("YYY/ZZZ: pre lock mutex\n");
dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
+RUNLOG;
if(terminateRet == RS_RET_TERMINATE_NOW) {
+RUNLOG;
/* we now need to free the old batch */
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
@@ -281,6 +283,7 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
+RUNLOG;
/* try to execute and process whatever we have */
/* Note that this function releases and re-aquires the mutex. The returned
@@ -290,27 +293,39 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr);
if(localRet == RS_RET_IDLE) {
+RUNLOG;
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
}
+RUNLOG;
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
+RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
+RUNLOG;
continue; /* request next iteration */
}
+RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
bInactivityTOOccured = 0; /* reset for next run */
}
/* indicate termination */
+RUNLOG;
d_pthread_mutex_lock(pWtp->pmutUsr);
+RUNLOG;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+RUNLOG;
pthread_cleanup_pop(0); /* remove cleanup handler */
+RUNLOG;
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
+RUNLOG;
pthread_setcancelstate(iCancelStateSave, NULL);
+RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
+RUNLOG;
RETiRet;
}
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 40d031dc..93234819 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -169,9 +169,9 @@ wtpWakeupAllWrkr(wtp_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, wtp);
- d_pthread_mutex_lock(pThis->pmutUsr);
+ //d_pthread_mutex_lock(pThis->pmutUsr);
pthread_cond_broadcast(pThis->pcondBusy);
- d_pthread_mutex_unlock(pThis->pmutUsr);
+ //d_pthread_mutex_unlock(pThis->pmutUsr);
RETiRet;
}