summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-27 16:24:05 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-27 16:24:05 +0000
commit4b44a34d71b8b9bfc2574adeff4e735a8e97c876 (patch)
tree3807fd8ccb94b4a3d9beb9da754bfa0bb3d46c5c /queue.c
parent7d8b1c293746d325db7f93d343a952e382da9ddd (diff)
downloadrsyslog-4b44a34d71b8b9bfc2574adeff4e735a8e97c876.tar.gz
rsyslog-4b44a34d71b8b9bfc2574adeff4e735a8e97c876.tar.xz
rsyslog-4b44a34d71b8b9bfc2574adeff4e735a8e97c876.zip
some more fixing and cleanup on the queue shutdown sequence
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c72
1 files changed, 40 insertions, 32 deletions
diff --git a/queue.c b/queue.c
index d26a6e24..e89b35b8 100644
--- a/queue.c
+++ b/queue.c
@@ -889,6 +889,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* first calculate absolute timeout - we need the absolute value here, because we need to coordinate
* shutdown of both the regular and DA queue on *the same* timeout.
*/
+RUNLOG_VAR("%d", pThis->toQShutdown);
timeoutComp(&tTimeout, pThis->toQShutdown);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
@@ -926,11 +927,11 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
// TODO: what about pure disk queues and bSaveOnShutdown?
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
-RUNLOG_VAR("%d", pThis->bSaveOnShutdown);
-RUNLOG_VAR("%d", pThis->bIsDA);
-RUNLOG_VAR("%d", pThis->iQueueSize);
+//RUNLOG_VAR("%d", pThis->bSaveOnShutdown);
+//RUNLOG_VAR("%d", pThis->bIsDA);
+//RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) {
-RUNLOG;
+//RUNLOG;
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
@@ -958,13 +959,18 @@ RUNLOG;
* the queue is now empty. If regular workers are still running, and try to pull the next message,
* they will automatically terminate as there no longer is any message left to process.
*/
+ // TODO: use pWtp mutex? - guess so!
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
RUNLOG_VAR("%d", pThis->iQueueSize);
- if(pThis->iQueueSize > 0) {
+ //old: if(pThis->iQueueSize > 0) {
+ if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
timeoutComp(&tTimeout, pThis->toActShutdown);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal != RS_RET_OK) {
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and "
+ "triggers cancellation)\n", queueGetID(pThis));
+ } else if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue "
"in disk save mode. Continuing, but results are unpredictable\n",
queueGetID(pThis), iRetLocal);
@@ -980,42 +986,44 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
* function is still needed (what is no problem as we do not yet destroy the queue - but I
* thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
*/
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
-RUNLOG_VAR("%d", pThis->iQueueSize);
- if(pThis->iQueueSize > 0) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n",
- queueGetID(pThis));
- iRetLocal = wtpCancelAll(pThis->pWtpReg);
- if(iRetLocal != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker "
- "threads, continuing, but results are unpredictable\n",
- queueGetID(pThis), iRetLocal);
- }
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n",
+ queueGetID(pThis));
+ iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker "
+ "threads, continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
}
+
+ /* TODO:
+ * If we cancelled some regular workers above, we need to think about where any "ungotten()" pUsr
+ * data elements need to go to. We need to make sure they are persisted. But this will be kept open
+ * until we finally code that part of the logic.
+ * To provide an early idea: the ungetObj() call should be a pointer. If running DA, it shall point
+ * to the DA queues ungetObj() and if we are running regular, it should point to the parent queues. The
+ * idea behind that logic is that if something is to be ungotten, it should normally go back to the top
+ * of the queue, which in that case is inside the DA queue... - but that idea needs to be verified once
+ * we reached that point.
+ * rgerhards, 2008-01-27
+ */
+
+
+ /* TODO: think: do we really need to do this here? Can't it happen on DA queue destruction? If we
+ * disable it, we get an assertion... I think this is OK, as we need to have a certain order and
+ * canceling the DA workers here ensures that order. But in any instant, we may have a look at this
+ * code after we have reaced the milestone. -- rgerhards, 2008-01-27
+ */
/* ... and now the DA queue, if it exists (should always be after the primary one) */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- //TODO: use right mutex!
- // was used: if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) {
-if(pThis->pqDA != NULL) {
-RUNLOG_VAR("%p", pThis->pqDA->pWtpReg);
-RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd);
-}
- if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ if(pThis->pqDA != NULL) {
dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n",
queueGetID(pThis));
- iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg);
+ iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); /* returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n",
queueGetID(pThis), iRetLocal);
}
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
/* ... finally ... all worker threads have terminated :-)