summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-28 09:59:45 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-28 09:59:45 +0200
commit9517e19b6427c295e206ece9562ce70f4a6d7044 (patch)
tree783fa1af0c03e05f10cb3b54149b6fcba508263d /runtime/queue.c
parentd4564f8399f4362c7e79066370049f909cef996c (diff)
downloadrsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.tar.gz
rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.tar.xz
rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.zip
preserving current changes
... in preparation for some larger changes - I need to apply some serious design changes, as the current system does not play well at all with ultra-reliable queues. Will do that in a totally new version.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c35
1 files changed, 23 insertions, 12 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 48d1c6e3..4405dd39 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -255,7 +255,11 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
/* wait until we have a fully initialized DA queue. Sometimes, we need to
- * sync with it, as we expect it for some function.
+ * sync with it, as we expect it for some function. Note that in extreme
+ * cases, the DA queue may already have started up AND terminated when we
+ * call this function. As such,it may validly be that DA is already shut down.
+ * So we just check if we are in init phase and then wait for full startup.
+ * If in non-DA mode, we silently return.
* rgerhards, 2008-02-27
*/
static rsRetVal
@@ -264,9 +268,8 @@ qqueueWaitDAModeInitialized(qqueue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(pThis->bRunsDA);
- while(pThis->bRunsDA != 2) {
+ while(pThis->bRunsDA == 1) {
d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
}
@@ -289,13 +292,16 @@ qqueueTurnOffDAMode(qqueue_t *pThis)
{
DEFiRet;
+RUNLOG_STR("XXX: TurnOffDAMode\n");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
*/
+RUNLOG;
qqueueWaitDAModeInitialized(pThis);
+RUNLOG;
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
@@ -309,6 +315,7 @@ qqueueTurnOffDAMode(qqueue_t *pThis)
* during the lifetime of DA-mode, depending on how often the DA worker receives an
* inactivity timeout. -- rgerhards, 2008-01-25
*/
+dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pThis->pqDA));
if(getLogicalQueueSize(pThis->pqDA) == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
@@ -1151,8 +1158,8 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
iRet = pThis->qDeq(pThis, pUsr);
ATOMIC_INC(pThis->nLogDeq);
- dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
- getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
+// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1259,13 +1266,12 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
-RUNLOG_STR("setting enqOnly for main queue");
- //TODO:SetEnqOnly(pThis, 1, LOCK_MUTEX); /* start no new workers */
+ /* note that we modify bEnqOnly direclty, because going through the method would
+ * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
+ */
pThis->bEnqOnly = 1;
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
if(pThis->pqDA != NULL) {
-RUNLOG_STR("setting enqOnly for DA queue");
- //TODO:SetEnqOnly(pThis->pqDA, 1, LOCK_MUTEX);
pThis->pqDA->bEnqOnly = 1;
wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
}
@@ -1644,7 +1650,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
nDequeued = nDiscarded = 0;
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
-dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
+//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDeq(pThis, &pUsr));
/* check if we should discard this element */
@@ -1945,7 +1951,7 @@ RUNLOG;
RUNLOG;
iRet = RS_RET_TERMINATE_NOW;
} else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
-RUNLOG;
+dbgprintf("XXX: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
iRet = RS_RET_TERMINATE_NOW;
}
} else {
@@ -1973,7 +1979,6 @@ ChkStopWrkrReg(qqueue_t *pThis)
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0);
* TODO: remove when verified! -- rgerhards, 2009-05-26
*/
-RUNLOG;
if(pThis->bEnqOnly || pThis->bRunsDA) {
RUNLOG;
iRet = RS_RET_TERMINATE_NOW;
@@ -2314,6 +2319,7 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g
if(pThis->bRunsDA != 2) {
InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+//!!! TODO !!!das passiert wohl, wenn die queue empty wird! (aber es vorher noch nciht war)
RUNLOG_VAR("%d", pThis->bRunsDA);
RUNLOG_VAR("%d", pThis->pWtpDA->wtpState);
qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */
@@ -2322,6 +2328,7 @@ RUNLOG_VAR("%d", pThis->pWtpDA->wtpState);
dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
+RUNLOG;
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
@@ -2339,6 +2346,7 @@ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and C
CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
+RUNLOG_STR("XXX: queue destruct\n");
/* shut down all workers
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
@@ -2353,6 +2361,9 @@ CODESTARTobjDestruct(qqueue)
* so and then destruct everything. -- rgerhards, 2009-05-26
*/
//!!!! //CHKiRet(pThis->qUnDeqAll(pThis));
+dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ CHKiRet(pThis->qUnDeqAll(pThis));
+dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
CHKiRet(DoSaveOnShutdown(pThis));