summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/debug.h2
-rw-r--r--runtime/queue.c314
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--runtime/srUtils.h12
-rw-r--r--runtime/wti.c30
-rw-r--r--runtime/wti.h4
-rw-r--r--runtime/wtp.c15
-rw-r--r--runtime/wtp.h2
-rwxr-xr-xtests/daqueue-persist-drvr.sh3
-rwxr-xr-xtests/diag.sh7
10 files changed, 275 insertions, 115 deletions
diff --git a/runtime/debug.h b/runtime/debug.h
index 1375493d..03702250 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -135,7 +135,7 @@ void dbgPrintAllDebugInfo(void);
/* debug aides */
//#ifdef RTINST
-#if 0 // temporarily removed for helgrind
+#if 1 // temporarily removed for helgrind
#define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_trylock(x) dbgMutexTryLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
diff --git a/runtime/queue.c b/runtime/queue.c
index 539cf4ec..48d1c6e3 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -66,7 +66,7 @@ DEFobjCurrIf(glbl)
/* forward-definitions */
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
-static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
+static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
@@ -205,7 +205,7 @@ static inline void queueDrain(qqueue_t *pThis)
ASSERT(pThis != NULL);
BEGINfunc
- dbgoprint((obj_t*) pThis, "queue will lose %d messages, destroying...\n", pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(pThis->iQueueSize-- > 0) {
pThis->qDeq(pThis, &pUsr);
@@ -366,7 +366,7 @@ qqueueChkIsDA(qqueue_t *pThis)
* rgerhards, 2008-01-15
*/
static rsRetVal
-qqueueStartDA(qqueue_t *pThis)
+StartDA(qqueue_t *pThis)
{
DEFiRet;
uchar pszDAQName[128];
@@ -396,7 +396,7 @@ qqueueStartDA(qqueue_t *pThis)
CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
+ CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
@@ -450,8 +450,8 @@ finalize_it:
* If this function fails (should not happen), DA mode is not turned on.
* rgerhards, 2008-01-16
*/
-static inline rsRetVal
-qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
+static rsRetVal
+InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -464,16 +464,17 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
* is intentional. We assume that when we need it once, we may also need it on another
* occasion. Ressources used are quite minimal when no worker is running.
* rgerhards, 2008-01-24
+ * NOTE: this is the DA worker *pool*, not the DA queue!
*/
if(pThis->pWtpDA == NULL) {
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
@@ -493,6 +494,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
* until the next enqueue request.
*/
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */
+RUNLOG_VAR("%d", pThis->bRunsDA);
finalize_it:
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -536,7 +538,7 @@ qqueueChkStrtDA(qqueue_t *pThis)
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
getPhysicalQueueSize(pThis));
- qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
+ InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
finalize_it:
@@ -706,7 +708,6 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
if(pThis->tVars.linklist.pDeqRoot == NULL) {
pThis->tVars.linklist.pDeqRoot = pEntry;
}
-RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot);
finalize_it:
RETiRet;
@@ -1157,25 +1158,14 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
}
-/* This function shuts down all worker threads and waits until they
- * have terminated. If they timeout, they are cancelled.
- * rgerhards, 2008-01-24
- * Please note that this function shuts down BOTH the parent AND the child queue
- * in DA case. This is necessary because their timeouts are tightly coupled. Most
- * importantly, the timeouts would be applied twice (or logic be extremely
- * complex) if each would have its own shutdown. The function does not self check
- * this condition - the caller must make sure it is not called with a parent.
- * rgerhards, 2009-05-26: we do NO logner persist the queue here if bSaveOnShutdown
- * is set. This must be handled by the caller. Not doing that cleans up the queue
- * shutdown considerably. Also, older engines had a potential hang condition when
- * the DA queue was already started and the DA worker configured for infinite
- * retries and the action was during retry processing. This was a design issue,
- * which is solved as of now. Note that the shutdown now may take a little bit
- * longer, because we no longer can persist the queue in parallel to waiting
- * on worker timeouts.
+/* Try to terminate queue worker threads within the regular shutdown interval.
+ * Both the regular and DA queue (if it exists) is waited for, but on the same timeout.
+ * After this function returns, the workers must either be finished or some force
+ * to finish them must be applied.
+ * rgerhards, 2009-05-27
*/
static rsRetVal
-ShutdownWorkers(qqueue_t *pThis)
+tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
{
DEFVARS_mutexProtection;
struct timespec tTimeout;
@@ -1185,16 +1175,6 @@ ShutdownWorkers(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
-
- /* we reduce the low water mark in any case. This is not absolutely necessary, but
- * it is useful because we enable DA mode at several spots below and so we do not need
- * to think about the low water mark each time.
- */
- pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
- pThis->iLowWtrMrk = 0;
-
- /* first try to shutdown the queue within the regular shutdown period */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(getPhysicalQueueSize(pThis) > 0) {
if(pThis->bRunsDA) {
@@ -1206,6 +1186,11 @@ ShutdownWorkers(qqueue_t *pThis)
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
+ if(pThis->bRunsDA) {
+ qqueueWaitDAModeInitialized(pThis);
+ }
+
/* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
* out there are no active workers - that doesn't matter: the wtp knows about that and so will
* return immediately.
@@ -1228,74 +1213,109 @@ ShutdownWorkers(qqueue_t *pThis)
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgoprint((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n");
} else {
- /* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */
dbgoprint((obj_t*) pThis, "regular queue workers shut down.\n");
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(pThis->bRunsDA) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
- qqueueGetID(pThis->pqDA));
- /* we use the same absolute timeout as above, so we do not use more than the configured
- * timeout interval!
- */
- dbgoprint((obj_t*) pThis, "trying shutdown of DA workers\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue (this is OK)\n");
- }
+ }
+
+ /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->bRunsDA) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
+ qqueueGetID(pThis->pqDA));
+ /* we use the same absolute timeout as above, so we do not use more than the configured
+ * timeout interval!
+ */
+ dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n");
+ iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue worker (this is OK)\n");
} else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n");
}
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
- /* when we reach this point, both queues are either empty or the regular queue shutdown timeout
- * has expired. We must set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate
- * as soon as its consumer is done. In particular, it does no longer need try to empty the queue.
- */
- wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
+ RETiRet;
+}
- /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
- if(pThis->bRunsDA) {
- qqueueWaitDAModeInitialized(pThis);
- wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); /* also stop DA queue */
+
+/* Try to shut down regular and DA queue workers, within the action timeout
+ * period. Note that the main queue DA worker is still unaffected (and may shuffle
+ * data to the disk queue while we terminate the other workers). Not finishing
+ * processing all messages is now OK (but they may be preserved later, depending
+ * on bSaveOnShutdown setting).
+ * rgerhards, 2009-05-27
+ */
+static rsRetVal
+tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
+{
+ DEFVARS_mutexProtection;
+ struct timespec tTimeout;
+ rsRetVal iRetLocal;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
+
+ /* instruct workers to finish ASAP, even if still work exists */
+RUNLOG_STR("setting enqOnly for main queue");
+ //TODO:SetEnqOnly(pThis, 1, LOCK_MUTEX); /* start no new workers */
+ pThis->bEnqOnly = 1;
+ wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
+ if(pThis->pqDA != NULL) {
+RUNLOG_STR("setting enqOnly for DA queue");
+ //TODO:SetEnqOnly(pThis->pqDA, 1, LOCK_MUTEX);
+ pThis->pqDA->bEnqOnly = 1;
+ wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
}
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
+ timeoutComp(&tTimeout, pThis->toActShutdown);
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(getPhysicalQueueSize(pThis) > 0) {
- timeoutComp(&tTimeout, pThis->toActShutdown);
- if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
- "triggers cancellation)\n");
- } else if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
- "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
- }
- /* we need to re-aquire the mutex for the next check in this case! */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- }
- if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
- /* and now the same for the DA queue */
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and "
- "triggers cancellation)\n");
- } else if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
- "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
- }
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n");
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
+ "triggers cancellation)\n");
+ } else if(iRetLocal != RS_RET_OK) {
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
+ "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
- } else {
+ /* we need to re-aquire the mutex for the next check in this case! */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX);
+ }
+
+ if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) {
+ /* and now the same for the DA queue */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n");
+ iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and "
+ "triggers cancellation)\n");
+ } else if(iRetLocal != RS_RET_OK) {
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
+ "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
+ }
}
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+
+ RETiRet;
+}
+
+
+/* This function cancels all remenaing regular workers for both the main and the DA
+ * queue. The main queue's DA worker pool continues to run (if it exists and is active).
+ * rgerhards, 2009-05-29
+ */
+static rsRetVal
+cancelWorkers(qqueue_t *pThis)
+{
+ rsRetVal iRetLocal;
+ DEFiRet;
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
* all timeout setting. If any worker in any queue still executes, its consumer is possibly
@@ -1318,13 +1338,60 @@ ShutdownWorkers(qqueue_t *pThis)
}
}
+ RETiRet;
+}
+
+
+/* This function shuts down all worker threads and waits until they
+ * have terminated. If they timeout, they are cancelled.
+ * rgerhards, 2008-01-24
+ * Please note that this function shuts down BOTH the parent AND the child queue
+ * in DA case. This is necessary because their timeouts are tightly coupled. Most
+ * importantly, the timeouts would be applied twice (or logic be extremely
+ * complex) if each would have its own shutdown. The function does not self check
+ * this condition - the caller must make sure it is not called with a parent.
+ * rgerhards, 2009-05-26: we do NO longer persist the queue here if bSaveOnShutdown
+ * is set. This must be handled by the caller. Not doing that cleans up the queue
+ * shutdown considerably. Also, older engines had a potential hang condition when
+ * the DA queue was already started and the DA worker configured for infinite
+ * retries and the action was during retry processing. This was a design issue,
+ * which is solved as of now. Note that the shutdown now may take a little bit
+ * longer, because we no longer can persist the queue in parallel to waiting
+ * on worker timeouts.
+ */
+static rsRetVal
+ShutdownWorkers(qqueue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
+
+ dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
+
+ /* we reduce the low water mark in any case. This is not absolutely necessary, but
+ * it is useful because we enable DA mode at several spots below and so we do not need
+ * to think about the low water mark each time.
+ */
+ pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
+ pThis->iLowWtrMrk = 0;
+
+ CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
+
+ if(getPhysicalQueueSize(pThis) > 0) {
+ CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis));
+ }
+
+ CHKiRet(cancelWorkers(pThis));
+
/* ... finally ... all worker threads have terminated :-)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
- * may still be running.
+ * may still be running. Note that the main queue's DA worker may still be running.
*/
dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+finalize_it:
RETiRet;
}
@@ -1498,8 +1565,8 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
- pTdl = tdlPeek(pThis);
- if(pTdl == NULL) {
+ pTdl = tdlPeek(pThis); /* get current head element */
+ if(pTdl == NULL) { /* to-delete list empty */
DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq);
} else if(pBatch->deqID == pThis->deqIDDel) {
deqIDDel = pThis->deqIDDel;
@@ -1512,6 +1579,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
}
} else {
/* can not delete, insert into to-delete list */
+ dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq));
}
@@ -1769,6 +1837,27 @@ finalize_it:
RETiRet;
}
+
+/* This is called when a batch is processed and the worker does not
+ * ask for another batch (e.g. because it is to be terminated)
+ * rgerhards, 2009-05-27
+ */
+static rsRetVal
+batchProcessedReg(qqueue_t *pThis, wti_t *pWti)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ISOBJ_TYPE_assert(pWti, wti);
+dbgprintf("XXX: batchProcessedReg deletes %d records\n", pWti->batch.nElemDeq);
+
+ DeleteProcessedBatch(pThis, &pWti->batch);
+ qqueueChkPersist(pThis, pWti->batch.nElemDeq);
+
+ RETiRet;
+}
+
+
/* This is the queue consumer in the regular (non-DA) case. It is
* protected by the queue mutex, but MUST release it as soon as possible.
* rgerhards, 2008-01-21
@@ -1844,7 +1933,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
DEFiRet;
if(pThis->bEnqOnly) {
- iRet = RS_RET_TERMINATE_NOW;
+ iRet = RS_RET_TERMINATE_WHEN_IDLE;
+RUNLOG;
} else {
if(pThis->bRunsDA) {
ASSERT(pThis->pqDA != NULL);
@@ -1852,11 +1942,14 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
&& pThis->pqDA->sizeOnDiskMax > 0
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
/* this queue can never grow, so we can give up... */
+RUNLOG;
iRet = RS_RET_TERMINATE_NOW;
} else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
+RUNLOG;
iRet = RS_RET_TERMINATE_NOW;
}
} else {
+RUNLOG;
iRet = RS_RET_TERMINATE_NOW;
}
}
@@ -1880,10 +1973,14 @@ ChkStopWrkrReg(qqueue_t *pThis)
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0);
* TODO: remove when verified! -- rgerhards, 2009-05-26
*/
- if(pThis->bEnqOnly || pThis->bRunsDA)
+RUNLOG;
+ if(pThis->bEnqOnly || pThis->bRunsDA) {
+RUNLOG;
iRet = RS_RET_TERMINATE_NOW;
- else if(pThis->pqParent != NULL)
+ } else if(pThis->pqParent != NULL) {
+RUNLOG;
iRet = RS_RET_TERMINATE_WHEN_IDLE;
+ }
RETiRet;
}
@@ -2039,6 +2136,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
+ CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessedReg));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
@@ -2056,7 +2154,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
iRetLocal = qqueueHaveQIF(pThis);
if(iRetLocal == RS_RET_OK) {
dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
- qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
+ InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
/* TODO: use logerror? -- rgerhards, 2008-01-16 */
@@ -2212,9 +2310,16 @@ DoSaveOnShutdown(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
- qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
+dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ if(pThis->bRunsDA != 2) {
+ InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
+dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+RUNLOG_VAR("%d", pThis->bRunsDA);
+RUNLOG_VAR("%d", pThis->pWtpDA->wtpState);
+ qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */
+ }
/* make sure we do not timeout before we are done */
- dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
+ dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
@@ -2247,7 +2352,7 @@ CODESTARTobjDestruct(qqueue)
* we need to reset the logical dequeue pointer, persist the queue if configured to do
* so and then destruct everything. -- rgerhards, 2009-05-26
*/
- CHKiRet(pThis->qUnDeqAll(pThis));
+//!!!! //CHKiRet(pThis->qUnDeqAll(pThis));
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
CHKiRet(DoSaveOnShutdown(pThis));
@@ -2470,7 +2575,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
+SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -2495,13 +2600,16 @@ qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
if(pThis->pWtpReg != NULL)
wtpWakeupAllWrkr(pThis->pWtpReg);
+RUNLOG;
if(pThis->pWtpDA != NULL)
wtpWakeupAllWrkr(pThis->pWtpDA);
+RUNLOG;
} else {
/* switch back to regular mode */
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
}
}
+RUNLOG;
pThis->bEnqOnly = bEnqOnly;
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index a43c0327..58346d03 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -89,6 +89,7 @@ typedef struct nsd_gsspi_s nsd_gsspi_t;
typedef struct nsd_nss_s nsd_nss_t;
typedef struct nsdsel_ptcp_s nsdsel_ptcp_t;
typedef struct nsdsel_gtls_s nsdsel_gtls_t;
+typedef struct wti_s wti_t;
typedef obj_t nsd_t;
typedef obj_t nsdsel_t;
typedef struct msg msg_t;
diff --git a/runtime/srUtils.h b/runtime/srUtils.h
index bfce4cbb..288e9dd7 100644
--- a/runtime/srUtils.h
+++ b/runtime/srUtils.h
@@ -117,11 +117,23 @@ int getSubString(uchar **ppSrc, char *pDst, size_t DstSize, char cSep);
if(bMustLock == LOCK_MUTEX) { \
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \
d_pthread_mutex_lock(mut); \
+ assert(bLockedOpIsLocked == 0); \
bLockedOpIsLocked = 1; \
}
#define END_MTX_PROTECTED_OPERATIONS(mut) \
if(bLockedOpIsLocked) { \
d_pthread_mutex_unlock(mut); \
+ bLockedOpIsLocked = 0; \
pthread_setcancelstate(iCancelStateSave, NULL); \
}
+/* The unconditional versions of the macro always lock the mutex. They are preferred in
+ * complex scenarios, where the simple ones might get mixed up by multiple calls.
+ */
+#define BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \
+ d_pthread_mutex_lock(mut);
+#define END_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \
+ d_pthread_mutex_unlock(mut); \
+ pthread_setcancelstate(iCancelStateSave, NULL);
+
#endif
diff --git a/runtime/wti.c b/runtime/wti.c
index 3b6bf1b9..75e497b8 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -343,6 +343,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
{
struct timespec t;
+ BEGINfunc
DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
@@ -356,6 +357,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
+ ENDfunc
}
@@ -390,9 +392,12 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
- BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
+dbgprintf("XXX: worker startup\n");
+ RUNLOG_STR("MUTEX lock");
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
pWtp->pfOnWorkerStartup(pWtp->pUsr);
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+ RUNLOG_STR("MUTEX release");
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
@@ -405,17 +410,25 @@ wtiWorker(wti_t *pThis)
}
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
- BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
+ RUNLOG_STR("MUTEX lock");
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
+RUNLOG;
terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED);
+RUNLOG_VAR("%d", terminateRet);
if(terminateRet == RS_RET_TERMINATE_NOW) {
- // TODO: we need to free the old batch! -- rgerhards, 2009-05-26 MULTI
+ /* we now need to free the old batch */
+ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
+ dbgoprint((obj_t*) pThis, "terminating worker because auf TERMINATE_NOW mode, del iRet %d\n",
+ localRet);
break;
}
+RUNLOG;
/* try to execute and process whatever we have */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
+ /* This function must and does RELEASE the MUTEX! */
if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) {
@@ -426,17 +439,20 @@ wtiWorker(wti_t *pThis)
/* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
break; /* end worker thread run */
}
+ RUNLOG_STR("MUTEX lock");
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+ RUNLOG_STR("MUTEX release");
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
continue; /* request next iteration */
}
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
bInactivityTOOccured = 0; /* reset for next run */
}
/* if we exit the loop, the mutex is locked and must be unlocked */
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+ RUNLOG_STR("MUTEX release");
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
diff --git a/runtime/wti.h b/runtime/wti.h
index 0990941e..17038ec5 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -31,7 +31,7 @@
/* the worker thread instance class */
-typedef struct wti_s {
+struct wti_s {
BEGINobjInstance;
int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
pthread_t thrdID; /* thread ID */
@@ -42,7 +42,7 @@ typedef struct wti_s {
int bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
-} wti_t;
+};
/* some symbolic constants for easier reference */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 41fcd8d9..7786a656 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -91,10 +91,12 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pThis->pfGetDeqBatchSize = NotImplementedDummy;
pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
+ pThis->pfObjProcessed = NotImplementedDummy;
pThis->pfOnIdle = NotImplementedDummy;
pThis->pfOnWorkerCancel = NotImplementedDummy;
pThis->pfOnWorkerStartup = NotImplementedDummy;
pThis->pfOnWorkerShutdown = NotImplementedDummy;
+dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState);
ENDobjConstruct(wtp)
@@ -139,6 +141,7 @@ BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODE
int i;
CODESTARTobjDestruct(wtp)
wtpProcessThrdChanges(pThis); /* process thread changes one last time */
+RUNLOG_STR("wtpDestruct");
/* destruct workers */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
@@ -260,17 +263,22 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
ISOBJ_TYPE_assert(pThis, wtp);
+RUNLOG;
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
+RUNLOG;
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
} else if(pThis->wtpState == wtpState_SHUTDOWN) {
ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
+RUNLOG;
}
+RUNLOG_VAR("%d", iRet);
/* try customer handler if one was set and we do not yet have a definite result */
if(pThis->pfChkStopWrkr != NULL) {
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
}
+RUNLOG_VAR("%d", iRet);
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
@@ -292,13 +300,17 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
ISOBJ_TYPE_assert(pThis, wtp);
+dbgprintf("XXX:10 wtp %p, state %d\n", pThis, pThis->wtpState);
wtpSetState(pThis, tShutdownCmd);
+dbgprintf("XXX:20 wtp %p, state %d\n", pThis, pThis->wtpState);
wtpWakeupAllWrkr(pThis);
+dbgprintf("XXX:30 wtp %p, state %d\n", pThis, pThis->wtpState);
/* see if we need to harvest (join) any terminated threads (even in timeout case,
* some may have terminated...
*/
wtpProcessThrdChanges(pThis);
+dbgprintf("XXX:40 wtp %p, state %d\n", pThis, pThis->wtpState);
/* and wait for their termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
@@ -306,7 +318,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
bTimedOut = 0;
+dbgprintf("XXX:50 wtp %p, state %d\n", pThis, pThis->wtpState);
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+dbgprintf("XXX:60 wtp %p, state %d\n", pThis, pThis->wtpState);
dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
@@ -581,6 +595,7 @@ DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
+DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*))
diff --git a/runtime/wtp.h b/runtime/wtp.h
index d9d582af..e1180177 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -72,6 +72,7 @@ struct wtp_s {
pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
+ rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */
rsRetVal (*pfRateLimiter)(void *pUsr);
rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
@@ -108,6 +109,7 @@ PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*));
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*));
PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*));
PROTOTYPEpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*));
diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh
index 11058110..0ec76b47 100755
--- a/tests/daqueue-persist-drvr.sh
+++ b/tests/daqueue-persist-drvr.sh
@@ -19,6 +19,9 @@ $srcdir/diag.sh shutdown-immediate
$srcdir/diag.sh wait-shutdown
source $srcdir/diag.sh check-mainq-spool
+echo DEBUG EXIT!
+#exit
+
# restart engine and have rest processed
#remove delay
echo "#" > work-delay.conf
diff --git a/tests/diag.sh b/tests/diag.sh
index 01b335ee..5c6a0ab8 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -5,8 +5,11 @@
# not always able to convey back states to the upper-level test driver
# begun 2009-05-27 by rgerhards
# This file is part of the rsyslog project, released under GPLv3
+#valgrind="valgrind --log-fd=1"
+#valgrind="valgrind --tool=drd --log-fd=1"
+#valgrind="valgrind --tool=helgrind --log-fd=1"
#set -o xtrace
-export RSYSLOG_DEBUG="debug nostdout"
+#export RSYSLOG_DEBUG="debug Xnostdout printmutexaction"
export RSYSLOG_DEBUGLOG="log"
case $1 in
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
@@ -22,7 +25,7 @@ case $1 in
;;
'startup') # start rsyslogd with default params. $2 is the config file name to use
# returns only after successful startup
- valgrind ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/$2 &
+ $valgrind ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/$2 &
$srcdir/diag.sh wait-startup
;;
'wait-startup') # wait for rsyslogd startup