summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-28 09:59:45 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-28 09:59:45 +0200
commit9517e19b6427c295e206ece9562ce70f4a6d7044 (patch)
tree783fa1af0c03e05f10cb3b54149b6fcba508263d
parentd4564f8399f4362c7e79066370049f909cef996c (diff)
downloadrsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.tar.gz
rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.tar.xz
rsyslog-9517e19b6427c295e206ece9562ce70f4a6d7044.zip
preserving current changes
... in preparation for some larger changes - I need to apply some serious design changes, as the current system does not play well at all with ultra-reliable queues. Will do that in a totally new version.
-rw-r--r--plugins/imdiag/imdiag.c4
-rw-r--r--runtime/queue.c35
-rw-r--r--runtime/wti.c13
-rw-r--r--runtime/wtp.c6
-rwxr-xr-xtests/diag.sh2
5 files changed, 30 insertions, 30 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 03396db7..cb1c1686 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -256,10 +256,13 @@ static rsRetVal
waitMainQEmpty(tcps_sess_t *pSess)
{
int iMsgQueueSize;
+ int iPrint = 0;
DEFiRet;
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
while(iMsgQueueSize > 0) {
+ if(iPrint++ % 500 == 0)
+ dbgprintf("imdiag sleeping, wait mainq drain, curr size %d\n", iMsgQueueSize);
srSleep(0,2); /* wait a little bit */
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
}
@@ -294,6 +297,7 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg)
getFirstWord(&pszMsg, cmdBuf, sizeof(cmdBuf)/sizeof(uchar), TO_LOWERCASE);
+ dbgprintf("imdiag received command '%s'\n", cmdBuf);
if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("getmainmsgqueuesize"))) {
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
CHKiRet(sendResponse(pSess, "%d\n", iMsgQueueSize));
diff --git a/runtime/queue.c b/runtime/queue.c
index 48d1c6e3..4405dd39 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -255,7 +255,11 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
/* wait until we have a fully initialized DA queue. Sometimes, we need to
- * sync with it, as we expect it for some function.
+ * sync with it, as we expect it for some function. Note that in extreme
+ * cases, the DA queue may already have started up AND terminated when we
+ * call this function. As such,it may validly be that DA is already shut down.
+ * So we just check if we are in init phase and then wait for full startup.
+ * If in non-DA mode, we silently return.
* rgerhards, 2008-02-27
*/
static rsRetVal
@@ -264,9 +268,8 @@ qqueueWaitDAModeInitialized(qqueue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(pThis->bRunsDA);
- while(pThis->bRunsDA != 2) {
+ while(pThis->bRunsDA == 1) {
d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
}
@@ -289,13 +292,16 @@ qqueueTurnOffDAMode(qqueue_t *pThis)
{
DEFiRet;
+RUNLOG_STR("XXX: TurnOffDAMode\n");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
*/
+RUNLOG;
qqueueWaitDAModeInitialized(pThis);
+RUNLOG;
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
@@ -309,6 +315,7 @@ qqueueTurnOffDAMode(qqueue_t *pThis)
* during the lifetime of DA-mode, depending on how often the DA worker receives an
* inactivity timeout. -- rgerhards, 2008-01-25
*/
+dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pThis->pqDA));
if(getLogicalQueueSize(pThis->pqDA) == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
@@ -1151,8 +1158,8 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
iRet = pThis->qDeq(pThis, pUsr);
ATOMIC_INC(pThis->nLogDeq);
- dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
- getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
+// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1259,13 +1266,12 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
-RUNLOG_STR("setting enqOnly for main queue");
- //TODO:SetEnqOnly(pThis, 1, LOCK_MUTEX); /* start no new workers */
+ /* note that we modify bEnqOnly direclty, because going through the method would
+ * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
+ */
pThis->bEnqOnly = 1;
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
if(pThis->pqDA != NULL) {
-RUNLOG_STR("setting enqOnly for DA queue");
- //TODO:SetEnqOnly(pThis->pqDA, 1, LOCK_MUTEX);
pThis->pqDA->bEnqOnly = 1;
wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
}
@@ -1644,7 +1650,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
nDequeued = nDiscarded = 0;
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
-dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
+//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDeq(pThis, &pUsr));
/* check if we should discard this element */
@@ -1945,7 +1951,7 @@ RUNLOG;
RUNLOG;
iRet = RS_RET_TERMINATE_NOW;
} else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
-RUNLOG;
+dbgprintf("XXX: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
iRet = RS_RET_TERMINATE_NOW;
}
} else {
@@ -1973,7 +1979,6 @@ ChkStopWrkrReg(qqueue_t *pThis)
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0);
* TODO: remove when verified! -- rgerhards, 2009-05-26
*/
-RUNLOG;
if(pThis->bEnqOnly || pThis->bRunsDA) {
RUNLOG;
iRet = RS_RET_TERMINATE_NOW;
@@ -2314,6 +2319,7 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g
if(pThis->bRunsDA != 2) {
InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+//!!! TODO !!!das passiert wohl, wenn die queue empty wird! (aber es vorher noch nciht war)
RUNLOG_VAR("%d", pThis->bRunsDA);
RUNLOG_VAR("%d", pThis->pWtpDA->wtpState);
qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */
@@ -2322,6 +2328,7 @@ RUNLOG_VAR("%d", pThis->pWtpDA->wtpState);
dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
+RUNLOG;
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
@@ -2339,6 +2346,7 @@ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and C
CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
+RUNLOG_STR("XXX: queue destruct\n");
/* shut down all workers
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
@@ -2353,6 +2361,9 @@ CODESTARTobjDestruct(qqueue)
* so and then destruct everything. -- rgerhards, 2009-05-26
*/
//!!!! //CHKiRet(pThis->qUnDeqAll(pThis));
+dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ CHKiRet(pThis->qUnDeqAll(pThis));
+dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
CHKiRet(DoSaveOnShutdown(pThis));
diff --git a/runtime/wti.c b/runtime/wti.c
index 75e497b8..2fb5eea2 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -392,11 +392,8 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
-dbgprintf("XXX: worker startup\n");
- RUNLOG_STR("MUTEX lock");
BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
pWtp->pfOnWorkerStartup(pWtp->pUsr);
- RUNLOG_STR("MUTEX release");
END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
/* now we have our identity, on to real processing */
@@ -410,21 +407,17 @@ dbgprintf("XXX: worker startup\n");
}
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
- RUNLOG_STR("MUTEX lock");
BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
-RUNLOG;
terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED);
-RUNLOG_VAR("%d", terminateRet);
if(terminateRet == RS_RET_TERMINATE_NOW) {
/* we now need to free the old batch */
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
- dbgoprint((obj_t*) pThis, "terminating worker because auf TERMINATE_NOW mode, del iRet %d\n",
+ dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
localRet);
break;
}
-RUNLOG;
/* try to execute and process whatever we have */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
@@ -439,10 +432,8 @@ RUNLOG;
/* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
break; /* end worker thread run */
}
- RUNLOG_STR("MUTEX lock");
BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
- RUNLOG_STR("MUTEX release");
END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
continue; /* request next iteration */
}
@@ -451,7 +442,6 @@ RUNLOG;
}
/* if we exit the loop, the mutex is locked and must be unlocked */
- RUNLOG_STR("MUTEX release");
END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
/* indicate termination */
@@ -459,6 +449,7 @@ RUNLOG;
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
+RUNLOG_STR("XXX: Worker shutdown");
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 7786a656..a5836da3 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -141,7 +141,6 @@ BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODE
int i;
CODESTARTobjDestruct(wtp)
wtpProcessThrdChanges(pThis); /* process thread changes one last time */
-RUNLOG_STR("wtpDestruct");
/* destruct workers */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
@@ -263,22 +262,17 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
ISOBJ_TYPE_assert(pThis, wtp);
-RUNLOG;
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
-RUNLOG;
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
} else if(pThis->wtpState == wtpState_SHUTDOWN) {
ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
-RUNLOG;
}
-RUNLOG_VAR("%d", iRet);
/* try customer handler if one was set and we do not yet have a definite result */
if(pThis->pfChkStopWrkr != NULL) {
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
}
-RUNLOG_VAR("%d", iRet);
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
diff --git a/tests/diag.sh b/tests/diag.sh
index 5c6a0ab8..8bf6b129 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -9,7 +9,7 @@
#valgrind="valgrind --tool=drd --log-fd=1"
#valgrind="valgrind --tool=helgrind --log-fd=1"
#set -o xtrace
-#export RSYSLOG_DEBUG="debug Xnostdout printmutexaction"
+export RSYSLOG_DEBUG="debug nostdout printmutexaction"
export RSYSLOG_DEBUGLOG="log"
case $1 in
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason