summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog4
-rw-r--r--action.c3
-rw-r--r--plugins/imdiag/imdiag.c3
-rw-r--r--runtime/obj.c8
-rw-r--r--runtime/queue.c509
-rw-r--r--runtime/wti.c34
-rw-r--r--runtime/wtp.c35
-rw-r--r--runtime/wtp.h10
-rw-r--r--tests/Makefile.am2
-rwxr-xr-xtests/arrayqueue.sh4
-rwxr-xr-xtests/daqueue-persist-drvr.sh9
-rwxr-xr-xtests/inputname.sh2
-rw-r--r--tools/syslogd.c9
-rw-r--r--tools/syslogd.h1
14 files changed, 141 insertions, 492 deletions
diff --git a/ChangeLog b/ChangeLog
index a92ceff5..1ef7ee12 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,9 @@
---------------------------------------------------------------------------
Version 5.3.2 [DEVEL] (rgerhards), 2009-10-??
+- simplified and thus speeded up the queue engine, also fixed some
+ potential race conditions (in very unusual shutdown conditions)
+ along the way. The threading model has seriously changes, so there may
+ be some regressions.
- enhanced omfile to support transactional interface. This will increase
performance in many cases.
- added multi-ruleset support to imudp
diff --git a/action.c b/action.c
index dddb0e01..5bd175e5 100644
--- a/action.c
+++ b/action.c
@@ -934,7 +934,8 @@ processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
CHKiRet(localRet);
/* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */
- for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
+ //for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
+ for(i = 0 ; i < pBatch->nElem ; i++) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
}
iRet = finishBatch(pAction);
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 4359cda1..cabdf157 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -263,6 +263,9 @@ waitMainQEmpty(tcps_sess_t *pSess)
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
while(iMsgQueueSize > 0) {
+ /* DEV DEBUG ONLY if(iPrint++ % 500)
+ printf("imdiag: main msg queue size: %d\n", iMsgQueueSize);
+ */
if(iPrint++ % 500 == 0)
dbgprintf("imdiag sleeping, wait mainq drain, curr size %d\n", iMsgQueueSize);
srSleep(0,2); /* wait a little bit */
diff --git a/runtime/obj.c b/runtime/obj.c
index aebea332..3692b957 100644
--- a/runtime/obj.c
+++ b/runtime/obj.c
@@ -1129,7 +1129,7 @@ UseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
/* DEV debug only: dbgprintf("source file %s requests object '%s', ifIsLoaded %d\n", srcFile, pObjName, pIf->ifIsLoaded); */
- d_pthread_mutex_lock(&mutObjGlobalOp);
+ pthread_mutex_lock(&mutObjGlobalOp);
if(pIf->ifIsLoaded == 1) {
ABORT_FINALIZE(RS_RET_OK); /* we are already set */
@@ -1170,7 +1170,7 @@ UseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
pIf->ifIsLoaded = 1; /* we are happy */
finalize_it:
- d_pthread_mutex_unlock(&mutObjGlobalOp);
+ pthread_mutex_unlock(&mutObjGlobalOp);
if(pStr != NULL)
rsCStrDestruct(&pStr);
@@ -1193,7 +1193,7 @@ ReleaseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
/* dev debug only dbgprintf("source file %s releasing object '%s', ifIsLoaded %d\n", srcFile, pObjName, pIf->ifIsLoaded); */
- d_pthread_mutex_lock(&mutObjGlobalOp);
+ pthread_mutex_lock(&mutObjGlobalOp);
if(pObjFile == NULL)
FINALIZE; /* if it is not a lodable module, we do not need to do anything... */
@@ -1214,7 +1214,7 @@ ReleaseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
pIf->ifIsLoaded = 0; /* indicated "no longer valid" */
finalize_it:
- d_pthread_mutex_unlock(&mutObjGlobalOp);
+ pthread_mutex_unlock(&mutObjGlobalOp);
if(pStr != NULL)
rsCStrDestruct(&pStr);
diff --git a/runtime/queue.c b/runtime/queue.c
index 00bbd15f..dacf1f13 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -55,6 +55,7 @@
#include "wti.h"
#include "msg.h"
#include "atomic.h"
+#include "unicode-helper.h"
#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */
#ifdef OS_SOLARIS
@@ -68,11 +69,9 @@ DEFobjCurrIf(strm)
/* forward-definitions */
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
-static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
-static int qqueueIsIdleDA(qqueue_t *pThis);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
@@ -228,7 +227,8 @@ static inline void queueDrain(qqueue_t *pThis)
* this point in time. The mutex must be locked when
* ths function is called. -- rgerhards, 2008-01-25
*/
-static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
+static inline rsRetVal
+qqueueAdviseMaxWorkers(qqueue_t *pThis)
{
DEFiRet;
int iMaxWorkers;
@@ -236,48 +236,20 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
if(!pThis->bEnqOnly) {
- if(pThis->bRunsDA) {
- /* if we have not yet reached the high water mark, there is no need to start a
- * worker. -- rgerhards, 2008-01-26
- */
- if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
- }
- }
- /* regular workers always run */
- if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
+dbgprintf("AdviseMaxWorkers: log Queue Size: %d, high water mark %d\n",
+ getLogicalQueueSize(pThis) , pThis->iHighWtrMrk);
+ if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
} else {
- iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ if(getLogicalQueueSize(pThis) == 0) {
+ iMaxWorkers = 0;
+ } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
+ } else {
+ iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ }
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
}
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
- }
-
- RETiRet;
-}
-
-
-/* Destruct DA queue. This is the last part of DA-to-normal-mode
- * transistion. This is called asynchronously and some time quite a
- * while after the actual transistion. The key point is that we need to
- * do it at some later time, because we need to destruct the DA queue. That,
- * however, can not be done in a thread that has been signalled
- * This is to be called when we revert back to our own queue.
- * This function must be called with the queue mutex locked (the wti
- * class ensures this).
- * rgerhards, 2008-01-15
- */
-static rsRetVal
-TurnOffDAMode(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(pThis->bRunsDA);
- if(getLogicalQueueSize(pThis->pqDA) == 0) {
- pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
- DBGOPRINT((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
- iRet);
}
RETiRet;
@@ -348,33 +320,18 @@ StartDA(qqueue_t *pThis)
CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, pThis->toQShutdown));
CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
- // experimental: XXX
- CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0));
-
- if(pThis->toQShutdown == 0) {
- CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
- } else {
- /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
- * have an obviously large backlog, we can't finish it in any case. So there is no point
- * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
- */
- CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1));
- }
-
iRet = qqueueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
- //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
-
- DBGOPRINT((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
+ DBGOPRINT((obj_t*) pThis, "DA queue initialized, disk queue 0x%lx\n",
qqueueGetID(pThis->pqDA));
finalize_it:
@@ -412,91 +369,35 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
* rgerhards, 2008-01-24
* NOTE: this is the DA worker *pool*, not the DA queue!
*/
- if(pThis->pWtpDA == NULL) {
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis));
- CHKiRet(wtpConstruct (&pThis->pWtpDA));
- CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
- CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
- CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode));
- CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
- CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
- CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
- CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
- CHKiRet(wtpConstructFinalize (pThis->pWtpDA));
- }
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis));
+ CHKiRet(wtpConstruct (&pThis->pWtpDA));
+ CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
+ CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
+ CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
+ CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
+ CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
+ CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
+ CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
+ CHKiRet(wtpConstructFinalize (pThis->pWtpDA));
/* if we reach this point, we have a "good" DA worker pool */
- /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
- pThis->bDAEnqOnly = bEnqOnly;
-
/* now construct the actual queue (if it does not already exist) */
if(pThis->pqDA == NULL) {
CHKiRet(StartDA(pThis));
}
+ pThis->bEnqOnly = bEnqOnly; // TODO: I think this is not needed, but first clean up shutdown processing!
pThis->bRunsDA = 1;
- /* now we must now adivse the wtp that we need one worker. If none is yet active,
- * that will also start one up. If we forgot that step, everything would be stalled
- * until the next enqueue request.
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */
-
finalize_it:
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
RETiRet;
}
-/* check if we need to start disk assisted mode and send some signals to
- * keep it running if we are already in it. It also checks if DA mode is
- * partially initialized, in which case it waits for initialization to
- * complete.
- * rgerhards, 2008-01-14
- */
-static rsRetVal
-ChkStrtDA(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* if we do not hit the high water mark, we have nothing to do */
- if(getPhysicalQueueSize(pThis) != pThis->iHighWtrMrk)
- ABORT_FINALIZE(RS_RET_OK);
-
- if(pThis->bRunsDA) {
- /* then we need to signal that we are at the high water mark again. If that happens
- * on our way down the queue, that doesn't matter, because then nobody is waiting
- * on the condition variable.
- * (Remember that a DA queue stops draining the queue once it has reached the low
- * water mark and restarts it when the high water mark is reached again - this is
- * what this code here is responsible for. Please note that all workers may have been
- * terminated due to the inactivity timeout, thus we need to advise the pool that
- * we need at least one).
- */
- DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
- getPhysicalQueueSize(pThis));
- qqueueAdviseMaxWorkers(pThis);
- } else {
- /* this is the case when we are currently not running in DA mode. So it is time
- * to turn it back on.
- */
- DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
- getPhysicalQueueSize(pThis));
- InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* --------------- end code for disk-assisted queue modes -------------------- */
@@ -733,44 +634,6 @@ finalize_it:
}
-/* This method checks if we have a QIF file for the current queue (no matter of
- * queue mode). Returns RS_RET_OK if we have a QIF file or an error status otherwise.
- * rgerhards, 2008-01-15
- */
-static rsRetVal
-qqueueHaveQIF(qqueue_t *pThis)
-{
- DEFiRet;
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
- struct stat stat_buf;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pszFilePrefix == NULL)
- ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
-
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
-
- /* check if the file exists */
- if(stat((char*) pszQIFNam, &stat_buf) == -1) {
- if(errno == ENOENT) {
- DBGOPRINT((obj_t*) pThis, "no .qi file found\n");
- ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
- } else {
- DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- }
- }
- /* If we reach this point, we have a .qi file */
-
-finalize_it:
- RETiRet;
-}
-
-
/* The method loads the persistent queue information.
* rgerhards, 2008-01-11
*/
@@ -1112,15 +975,17 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
}
-/* Try to terminate queue worker threads within the regular shutdown interval.
- * Both the regular and DA queue (if it exists) is waited for, but on the same timeout.
- * After this function returns, the workers must either be finished or some force
- * to finish them must be applied.
- * This function also instructs the DA worker pool (if it exists) to terminate. This is done
- * in preparation of final queue shutdown.
- * rgerhards, 2009-05-27
+/* Try to shut down regular and DA queue workers, within the queue timeout
+ * period. That means processing continues as usual. This is the expected
+ * usual case, where during shutdown those messages remaining are being
+ * processed. At this point, it is acceptable that the queue can not be
+ * fully depleted, that case is handled in the next step. During this phase,
+ * we first shut down the main queue DA worker to prevent new data to arrive
+ * at the DA queue, and then we ask the regular workers of both the Regular
+ * and DA queue to try complete processing.
+ * rgerhards, 2009-10-14
*/
-static rsRetVal
+static inline rsRetVal
tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
{
struct timespec tTimeout;
@@ -1130,30 +995,26 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
- if(getPhysicalQueueSize(pThis) > 0) {
- if(pThis->bRunsDA) {
- /* We may have waited on the low water mark. As it may have changed, we
- * see if we reactivate the worker.
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
- }
+ if(pThis->bIsDA) {
+ /* We need to lock the mutex, as otherwise we may have a race that prevents
+ * us from awaking the DA worker. */
+ d_pthread_mutex_lock(pThis->mut);
+
+ /* tell regular queue DA worker to stop shuffling messages to DA queue... */
+ pThis->pqDA->bEnqOnly = 1;
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE);
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
+ DBGOPRINT((obj_t*) pThis, "awoke DA worker, told it to shut down.\n");
+
+ /* also tell the DA queue worker to shut down, so that it already knows... */
+ wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN);
+ wtpAdviseMaxWorkers(pThis->pqDA->pWtpReg, 1); /* awake its lone worker */
+ DBGOPRINT((obj_t*) pThis, "awoke DA queue regular worker, told it to shut down when done.\n");
+
+ d_pthread_mutex_unlock(pThis->mut);
}
- d_pthread_mutex_unlock(pThis->mut);
- /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
- * out there are no active workers - that doesn't matter: the wtp knows about that and so will
- * return immediately.
- * We do not yet care about the DA worker - that will be handled down later in the process.
- * Note that we must not request shutdown right now - that may introduce a race: if the regular queue
- * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA
- * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead
- * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way
- * (from a performance point of view). So we don't do anything right now. The DA queue will continue to
- * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything
- * by not requesting shutdown now.
- * rgerhards, 2008-01-25
- */
+
/* first calculate absolute timeout - we need the absolute value here, because we need to coordinate
* shutdown of both the regular and DA queue on *the same* timeout.
*/
@@ -1182,29 +1043,17 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
}
}
- if(pThis->pWtpDA != NULL) {
- /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
- * the queue, it is restarted at a later stage. We don't care here if a timeout happens.
- */
- DBGOPRINT((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
- } else {
- DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n");
- }
- }
-
RETiRet;
}
/* Try to shut down regular and DA queue workers, within the action timeout
- * period. Note that the main queue DA worker is still unaffected (and may shuffle
- * data to the disk queue while we terminate the other workers). Not finishing
- * processing all messages is now OK (but they may be preserved later, depending
- * on bSaveOnShutdown setting).
- * rgerhards, 2009-05-27
+ * period. This aborts processing, but at the end of the current action, in
+ * a well-defined manner. During this phase, we terminate all three worker
+ * pools, including the regular queue DA worker if it not yet has terminated.
+ * Not finishing processing all messages is OK (and expected) at this stage
+ * (they may be preserved later, depending * on bSaveOnShutdown setting).
+ * rgerhards, 2009-10-14
*/
static rsRetVal
tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
@@ -1218,17 +1067,10 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
- /* note that we modify bEnqOnly directly, because going through the method would
- * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
- */
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
- /* need to set this so that the DA queue begins shutdown in parallel! */
- if(pThis->pqDA != NULL) {
- pThis->pqDA->bEnqOnly = 1;
- wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
- }
+// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance...
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
timeoutComp(&tTimeout, pThis->toActShutdown);
DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n");
@@ -1252,20 +1094,14 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
"queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
- }
- if(pThis->pWtpDA != NULL) {
- /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This
- * is necessary because we may be in a situation where the DA queue regular worker and the
- * main queue worker stopped rather quickly. In this case, there is almost no time (and
- * probably no thread switch!) between the point where we instructed the main queue DA
- * worker to shutdown and this code location. In consequence, it may not even have
- * noticed that it should should down, less acutally done this. So we provide it with a
- * fixed 100ms timeout to try complete its work, what usually should be sufficient.
- * rgerhards, 2009-10-06
+ /* and now we need to terminate the DA worker itself. We always grant it a 100ms timeout,
+ * which should be sufficient and usually not be required (it is expected to have finished
+ * long before while we were processing the queue timeout in shutdown phase 1).
+ * rgerhards, 2009-10-14
*/
timeoutComp(&tTimeout, 100);
- DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n");
+ DBGOPRINT((obj_t*) pThis, "trying regular shutdown of main queue DA worker pool\n");
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
@@ -1280,14 +1116,13 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
/* This function cancels all remaining regular workers for both the main and the DA
- * queue. The main queue's DA worker pool continues to run (if it exists and is active).
+ * queue.
* rgerhards, 2009-05-29
*/
static rsRetVal
cancelWorkers(qqueue_t *pThis)
{
rsRetVal iRetLocal;
- struct timespec tTimeout;
DEFiRet;
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
@@ -1309,30 +1144,12 @@ cancelWorkers(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
- }
- /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
- * restarted later to persist the queue. But we stop it, because otherwise we get into
- * big trouble when resetting the logical dequeue pointer. This operation can only be
- * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
- */
- if(pThis->pWtpDA != NULL) {
- /* but because of the potentially harsh consequences of cancelling, we try one last
- * (and short) time to shut down the DA worker in a normal fashion. The idea here
- * is that it may be willing to do so, but we did not yet have a task switch so
- * that it could not terminate but will do immediately when it gets time.
- * rgerhards, 2009-10-13
+ /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
+ * restarted later to persist the queue. But we stop it, because otherwise we get into
+ * big trouble when resetting the logical dequeue pointer. This operation can only be
+ * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
*/
- timeoutComp(&tTimeout, 50);
- DBGOPRINT((obj_t*) pThis, "one ultimately last try for regular shutdown of main queue DA worker pool\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
- "- this is not good, need to cancel now...\n");
- } else {
- DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down!\n");
- }
-
DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n");
iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
}
@@ -1368,13 +1185,6 @@ ShutdownWorkers(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
- /* we reduce the low water mark in any case. This is not absolutely necessary, but
- * it is useful because we enable DA mode at several spots below and so we do not need
- * to think about the low water mark each time.
- */
- pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
- pThis->iLowWtrMrk = 0;
-
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis));
@@ -1412,9 +1222,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
ASSERT(pConsumer != NULL);
ASSERT(iWorkerThreads >= 0);
- if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc(pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t)));
/* we have an object, so let's fill the properties */
objConstructSetObjInfo(pThis);
@@ -1425,7 +1233,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
- pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir);
+ pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
pThis->nLogDeq = 0;
@@ -1814,7 +1622,7 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
-dbgprintf("YYY: deqeueu for consumer");
+dbgprintf("YYY: dequeue for consumer\n");
CHKiRet(DequeueConsumable(pThis, pWti));
if(pWti->batch.nElem == 0)
@@ -1908,11 +1716,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
d_pthread_mutex_unlock(pThis->mut);
/* iterate over returned results and enqueue them in DA queue */
- for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
+ //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
+ for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
+dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
}
@@ -1941,6 +1751,7 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
if(pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
+#if 0
} else {
if(pThis->bRunsDA) {
ASSERT(pThis->pqDA != NULL);
@@ -1961,6 +1772,7 @@ RUNLOG_STR("XXX: done re-start reg worker");
// experimental iRet = RS_RET_TERMINATE_NOW;
;
}
+#endif
}
RETiRet;
@@ -1997,60 +1809,12 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
DEFiRet;
assert(pVal != NULL);
*pVal = pThis->iDeqBatchSize;
-if(pThis->pqParent != NULL)
+if(pThis->pqParent != NULL) // TODO: check why we actually do this!
*pVal = 16;
RETiRet;
}
-/* must only be called when the queue mutex is locked, else results
- * are not stable! DA worker version (pThis *is* the *main* queue, not DA!)
- */
-static int
-qqueueIsIdleDA(qqueue_t *pThis)
-{
- return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk);
-}
-/* must only be called when the queue mutex is locked, else results
- * are not stable! Regular worker version.
- */
-static int
-IsIdleReg(qqueue_t *pThis)
-{
- return(getPhysicalQueueSize(pThis) == 0);
-}
-
-
-/* This function is called when a worker thread for the regular queue is shut down.
- * If we are the primary queue, this is not really interesting to us. If, however,
- * we are the DA (child) queue, that means the DA queue is empty. In that case, we
- * need to signal the parent queue's DA worker, so that it can terminate DA mode.
- * rgerhards, 2008-01-26
- * rgerhards, 2008-02-27: HOWEVER, in a shutdown condition, it may be that the parent's worker thread pool
- * has already been terminated and destructed. This *is* a legal condition and happens
- * from time to time in practice. So we need to signal only if there still is a
- * parent DA worker queue. Please keep in mind that the the parent's DA worker
- * pool is DIFFERENT from our (DA queue) regular worker pool. So when the parent's
- * pWtpDA is destructed, there can still be some of our (DAq/wtp) threads be running.
- * I am telling this, because I, too, always get confused by those...
- */
-static rsRetVal
-RegOnWrkrShutdown(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pqParent != NULL) {
- if(pThis->pqParent->pWtpDA != NULL) { /* see comment in function header from 2008-02-27 */
- wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
- }
- }
-
- RETiRet;
-}
-
-
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
@@ -2058,8 +1822,6 @@ rsRetVal
qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
- rsRetVal iRetLocal;
- int bInitialized = 0; /* is queue already initialized? */
uchar pszBuf[64];
size_t lenBuf;
@@ -2101,8 +1863,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
- /* create worker thread pools for regular operation. The DA pool is created on an as-needed
- * basis, which potentially means never under most circumstances.
+ /* create worker thread pools for regular and DA operation.
*/
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
@@ -2110,10 +1871,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -2121,27 +1880,11 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis));
CHKiRet(wtpConstructFinalize (pThis->pWtpReg));
- /* initialize worker thread instances */
- if(pThis->bIsDA) {
- /* If we are disk-assisted, we need to check if there is a QIF file
- * which we need to load. -- rgerhards, 2008-01-15
- */
- iRetLocal = qqueueHaveQIF(pThis);
- if(iRetLocal == RS_RET_OK) {
- DBGOPRINT((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
- InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
- bInitialized = 1; /* we are done */
- } else {
- /* TODO: use logerror? -- rgerhards, 2008-01-16 */
- DBGOPRINT((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. "
- "Some data may be lost\n", iRetLocal);
- }
- }
+ /* set up DA system if we have a disk-assisted queue */
+ if(pThis->bIsDA)
+ InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
- if(Debug && !bInitialized) {
- DBGOPRINT((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA "
- "queue itself!)\n");
- }
+ DBGOPRINT((obj_t*) pThis, "queue finished initialization\n");
/* if the queue already contains data, we need to start the correct number of worker threads. This can be
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
@@ -2284,10 +2027,16 @@ DoSaveOnShutdown(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
- InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
-dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
- /* make sure we do not timeout before we are done */
- DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
+ /* we reduce the low water mark, otherwise the DA worker would terminate when
+ * it is reached.
+ */
+ DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown set, restarting DA worker...\n");
+ pThis->bShutdownImmediate = 0; /* would termiante the DA worker! */
+ pThis->iLowWtrMrk = 0;
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN); /* shutdown worker (only) when done (was _IMMEDIATE!) */
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* restart DA worker */
+
+ DBGOPRINT((obj_t*) pThis, "waiting for DA worker to terminate...\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
@@ -2442,10 +2191,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
*/
CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
- /* then check if we need to add an assistance disk queue */
- if(pThis->bIsDA)
- CHKiRet(ChkStrtDA(pThis));
-
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
* Basic flow control has always been implemented and protects the queue structures
@@ -2489,6 +2234,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
timeoutComp(&t, pThis->toEnq);
+// TODO : handle enqOnly => discard!
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
objDestruct(pUsr);
@@ -2527,7 +2273,6 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
}
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
-dbgprintf("queueMultiEnq: %d\n", i);
CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
}
@@ -2582,58 +2327,6 @@ dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
}
-/* set queue mode to enqueue only or not
- * There is one subtle issue: this method may be called during queue
- * construction or while it is running. In the former case, the queue
- * mutex does not yet exist (it is NULL), while in the later case it
- * must be locked. The function detects the state and operates as
- * required.
- * rgerhards, 2008-01-16
- */
-static rsRetVal
-SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* for simplicity, we do one big mutex lock. This method is extremely seldom
- * called, so that doesn't matter... -- rgerhards, 2008-01-16
- */
- if(pThis->mut != NULL) {
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- }
-
- if(bEnqOnly == pThis->bEnqOnly)
- FINALIZE; /* no change, nothing to do */
-
- if(pThis->bQueueStarted) {
- /* we need to adjust queue operation only if we are not during initial param setup */
- if(bEnqOnly == 1) {
- /* switch to enqueue-only mode */
- /* this means we need to terminate all workers - that's it... */
- DBGOPRINT((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
- if(pThis->pWtpReg != NULL)
- wtpWakeupAllWrkr(pThis->pWtpReg);
- if(pThis->pWtpDA != NULL)
- wtpWakeupAllWrkr(pThis->pWtpDA);
- } else {
- /* switch back to regular mode */
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
- }
- }
-
- pThis->bEnqOnly = bEnqOnly;
-
-finalize_it:
- if(pThis->mut != NULL) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- }
- RETiRet;
-}
-
-
/* some simple object access methods */
DEFpropSetMeth(qqueue, bSyncQueueFiles, int)
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
diff --git a/runtime/wti.c b/runtime/wti.c
index c3ab0aba..24988cbe 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -189,6 +189,7 @@ finalize_it:
* the cancel cleanup handler (and have been cancelled).
* rgerhards, 2008-01-16
*/
+// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14
static void
wtiWorkerCancelCleanup(void *arg)
{
@@ -202,9 +203,6 @@ wtiWorkerCancelCleanup(void *arg)
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
- /* call user supplied handler */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
-
ENDfunc
}
@@ -222,8 +220,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
BEGINfunc
DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
- pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
-
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
@@ -235,6 +231,7 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
+ dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n");
ENDfunc
}
@@ -249,7 +246,6 @@ wtiWorker(wti_t *pThis)
int bInactivityTOOccured = 0;
rsRetVal localRet;
rsRetVal terminateRet;
- int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -259,23 +255,18 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
- pWtp->pfOnWorkerStartup(pWtp->pUsr);
-
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
-dbgprintf("YYY/ZZZ: pre lock mutex\n");
d_pthread_mutex_lock(pWtp->pmutUsr);
-dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
-RUNLOG;
+RUNLOG_VAR("%d", terminateRet);
if(terminateRet == RS_RET_TERMINATE_NOW) {
-RUNLOG;
/* we now need to free the old batch */
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
@@ -283,7 +274,6 @@ RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
-RUNLOG;
/* try to execute and process whatever we have */
/* Note that this function releases and re-aquires the mutex. The returned
@@ -291,41 +281,23 @@ RUNLOG;
*/
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
-dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr);
if(localRet == RS_RET_IDLE) {
-RUNLOG;
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
}
-RUNLOG;
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
-RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
-RUNLOG;
continue; /* request next iteration */
}
-RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
bInactivityTOOccured = 0; /* reset for next run */
}
/* indicate termination */
-RUNLOG;
- d_pthread_mutex_lock(pWtp->pmutUsr);
-RUNLOG;
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-RUNLOG;
pthread_cleanup_pop(0); /* remove cleanup handler */
-RUNLOG;
- pWtp->pfOnWorkerShutdown(pWtp->pUsr);
-RUNLOG;
- pthread_setcancelstate(iCancelStateSave, NULL);
-RUNLOG;
- d_pthread_mutex_unlock(pWtp->pmutUsr);
-RUNLOG;
RETiRet;
}
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 93234819..3e76bb56 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -94,13 +94,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
pThis->pfGetDeqBatchSize = NotImplementedDummy;
- pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
pThis->pfObjProcessed = NotImplementedDummy;
- pThis->pfOnIdle = NotImplementedDummy;
- pThis->pfOnWorkerCancel = NotImplementedDummy;
- pThis->pfOnWorkerStartup = NotImplementedDummy;
- pThis->pfOnWorkerShutdown = NotImplementedDummy;
ENDobjConstruct(wtp)
@@ -160,22 +155,6 @@ CODESTARTobjDestruct(wtp)
ENDobjDestruct(wtp)
-/* wake up all worker threads.
- * rgerhards, 2008-01-16
- */
-rsRetVal
-wtpWakeupAllWrkr(wtp_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, wtp);
- //d_pthread_mutex_lock(pThis->pmutUsr);
- pthread_cond_broadcast(pThis->pcondBusy);
- //d_pthread_mutex_unlock(pThis->pmutUsr);
- RETiRet;
-}
-
-
/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21
* We do not need to do atomic instructions as set operations are only
* called when terminating the pool, and then in strict sequence. So we
@@ -211,8 +190,10 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex)
wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState);
if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
+RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_NOW");
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
} else if(wtpState == wtpState_SHUTDOWN) {
+RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_WHEN_IDLE");
ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
}
@@ -241,8 +222,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
ISOBJ_TYPE_assert(pThis, wtp);
+ /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */
+ d_pthread_mutex_lock(pThis->pmutUsr);
wtpSetState(pThis, tShutdownCmd);
- wtpWakeupAllWrkr(pThis);
+ pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
+ d_pthread_mutex_unlock(pThis->pmutUsr);
/* wait for worker thread termination */
d_pthread_mutex_lock(&pThis->mutWtp);
@@ -430,7 +414,7 @@ dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, p
CHKiRet(wtpStartWrkr(pThis));
}
} else {
-dbgprintf("YYY: adivse signal cond busy");
+dbgprintf("YYY: wtpAdviseMaxWorkers, sufficient workers, just doing adivse signal cond busy\n");
pthread_cond_signal(pThis->pcondBusy);
}
@@ -450,13 +434,8 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
-DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*))
DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
-DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
-DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
-DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*))
-DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*))
/* set the debug header message
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 0505b91c..05c02a8c 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -62,12 +62,7 @@ struct wtp_s {
rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */
rsRetVal (*pfRateLimiter)(void *pUsr);
- rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti);
- rsRetVal (*pfOnIdle)(void *pUsr, int);
- rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti);
- rsRetVal (*pfOnWorkerStartup)(void *pUsr);
- rsRetVal (*pfOnWorkerShutdown)(void *pUsr);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
};
@@ -91,13 +86,8 @@ PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
-PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*));
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*));
PROTOTYPEpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*));
-PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
-PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*));
-PROTOTYPEpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*));
-PROTOTYPEpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*));
PROTOTYPEpropSetMeth(wtp, toWrkShutdown, long);
PROTOTYPEpropSetMeth(wtp, wtpState, wtpState_t);
PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int);
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 7f12f633..c31e9eaa 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -7,7 +7,7 @@ TESTS = $(TESTRUNS) cfg.sh \
da-mainmsg-q.sh \
validation-run.sh \
imtcp-multiport.sh \
- #daqueue-persist.sh \
+ daqueue-persist.sh \
diskqueue.sh \
diskqueue-fsync.sh \
manytcp.sh \
diff --git a/tests/arrayqueue.sh b/tests/arrayqueue.sh
index 58fd24ae..baf303f3 100755
--- a/tests/arrayqueue.sh
+++ b/tests/arrayqueue.sh
@@ -1,7 +1,7 @@
# Test for fixedArray queue mode
# added 2009-05-20 by rgerhards
# This file is part of the rsyslog project, released under GPLv3
-echo testing queue fixedArray queue mode
+echo \[arrayqueue.sh\]: testing queue fixedArray queue mode
source $srcdir/diag.sh init
source $srcdir/diag.sh startup arrayqueue.conf
@@ -13,5 +13,5 @@ kill `cat rsyslog.pid`
# now wait until rsyslog.pid is gone (and the process finished)
source $srcdir/diag.sh wait-shutdown
-source $srcdir/diag.sh seq-check 0 39999
+source $srcdir/diag.sh seq-check 39999
source $srcdir/diag.sh exit
diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh
index 69db73ba..f5937541 100755
--- a/tests/daqueue-persist-drvr.sh
+++ b/tests/daqueue-persist-drvr.sh
@@ -12,6 +12,9 @@ source $srcdir/diag.sh init
echo \$MainMsgQueueType $1 > work-queuemode.conf
echo "*.* :omtesting:sleep 0 1000" > work-delay.conf
+#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
+#export RSYSLOG_DEBUGLOG="log0"
+
# inject 10000 msgs, so that DO hit the high watermark
source $srcdir/diag.sh startup queue-persist.conf
source $srcdir/diag.sh injectmsg 0 10000
@@ -23,8 +26,8 @@ echo "Enter phase 2, rsyslogd restart"
#exit
-export RSYSLOG_DEBUG="debug nostdout printmutexaction"
-export RSYSLOG_DEBUGLOG="log"
+#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
+#export RSYSLOG_DEBUGLOG="log"
#valgrind="valgrind --tool=helgrind --log-fd=1"
# restart engine and have rest processed
@@ -33,5 +36,5 @@ echo "#" > work-delay.conf
source $srcdir/diag.sh startup queue-persist.conf
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
$srcdir/diag.sh wait-shutdown
-source $srcdir/diag.sh seq-check 0 4999
+source $srcdir/diag.sh seq-check 0 99999
source $srcdir/diag.sh exit
diff --git a/tests/inputname.sh b/tests/inputname.sh
index e1a58517..71f11c1e 100755
--- a/tests/inputname.sh
+++ b/tests/inputname.sh
@@ -1,4 +1,4 @@
-echo testing $InputTCPServerInputName directive
+echo \[inputname.sh\]: testing $InputTCPServerInputName directive
$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
echo port 12514
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 36a46175..89b90dec 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -158,6 +158,7 @@ DEFobjCurrIf(net) /* TODO: make go away! */
/* forward definitions */
static rsRetVal GlobalClassExit(void);
+static void logmsg(msg_t *pMsg, int flags);
#ifndef _PATH_LOGCONF
@@ -400,13 +401,16 @@ static int usage(void)
*/
/* return back the approximate current number of messages in the main message queue
+ * This number includes the messages that reside in an associated DA queue (if
+ * it exists) -- rgerhards, 2009-10-14
*/
rsRetVal
diagGetMainMsgQSize(int *piSize)
{
DEFiRet;
assert(piSize != NULL);
- *piSize = pMsgQueue->iQueueSize;
+ *piSize = (pMsgQueue->bIsDA) ? pMsgQueue->pqDA->iQueueSize : 0;
+ *piSize += pMsgQueue->iQueueSize;
RETiRet;
}
@@ -584,6 +588,7 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags)
MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()));
MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp());
MsgSetRcvFromIP(pMsg, pLocalHostIP);
+ MsgSetMSGoffs(pMsg, 0);
/* check if we have an error code associated and, if so,
* adjust the tag. -- rgerhards, 2008-06-27
*/
@@ -1079,7 +1084,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub)
* potential for misinterpretation, which we simply can not solve under the
* circumstances given.
*/
-void
+static void
logmsg(msg_t *pMsg, int flags)
{
char *msg;
diff --git a/tools/syslogd.h b/tools/syslogd.h
index 3dfdbe2b..c3b99f9d 100644
--- a/tools/syslogd.h
+++ b/tools/syslogd.h
@@ -32,7 +32,6 @@
/* the following prototypes should go away once we have an input
* module interface -- rgerhards, 2007-12-12
*/
-void logmsg(msg_t *pMsg, int flags);
extern int NoHops;
extern int send_to_all;
extern int Debug;