summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-14 11:01:21 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-14 11:01:21 +0200
commitc5408da3d8f17691fb91282d031757ed041fec55 (patch)
tree4f932e801fac21ddc03616168106ac65411c340b /runtime
parent4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 (diff)
downloadrsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.gz
rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.xz
rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.zip
new queue engine - initial commit (probably not 100% working!)
simplified and thus speeded up the queue engine, also fixed some potential race conditions (in very unusual shutdown conditions) along the way. The threading model has seriously changes, so there may be some regressions. NOTE: the code passed basic tests, but there is still more work and testing to be done. This commit should be treated with care.
Diffstat (limited to 'runtime')
-rw-r--r--runtime/obj.c8
-rw-r--r--runtime/queue.c509
-rw-r--r--runtime/wti.c34
-rw-r--r--runtime/wtp.c35
-rw-r--r--runtime/wtp.h10
5 files changed, 115 insertions, 481 deletions
diff --git a/runtime/obj.c b/runtime/obj.c
index aebea332..3692b957 100644
--- a/runtime/obj.c
+++ b/runtime/obj.c
@@ -1129,7 +1129,7 @@ UseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
/* DEV debug only: dbgprintf("source file %s requests object '%s', ifIsLoaded %d\n", srcFile, pObjName, pIf->ifIsLoaded); */
- d_pthread_mutex_lock(&mutObjGlobalOp);
+ pthread_mutex_lock(&mutObjGlobalOp);
if(pIf->ifIsLoaded == 1) {
ABORT_FINALIZE(RS_RET_OK); /* we are already set */
@@ -1170,7 +1170,7 @@ UseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
pIf->ifIsLoaded = 1; /* we are happy */
finalize_it:
- d_pthread_mutex_unlock(&mutObjGlobalOp);
+ pthread_mutex_unlock(&mutObjGlobalOp);
if(pStr != NULL)
rsCStrDestruct(&pStr);
@@ -1193,7 +1193,7 @@ ReleaseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
/* dev debug only dbgprintf("source file %s releasing object '%s', ifIsLoaded %d\n", srcFile, pObjName, pIf->ifIsLoaded); */
- d_pthread_mutex_lock(&mutObjGlobalOp);
+ pthread_mutex_lock(&mutObjGlobalOp);
if(pObjFile == NULL)
FINALIZE; /* if it is not a lodable module, we do not need to do anything... */
@@ -1214,7 +1214,7 @@ ReleaseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
pIf->ifIsLoaded = 0; /* indicated "no longer valid" */
finalize_it:
- d_pthread_mutex_unlock(&mutObjGlobalOp);
+ pthread_mutex_unlock(&mutObjGlobalOp);
if(pStr != NULL)
rsCStrDestruct(&pStr);
diff --git a/runtime/queue.c b/runtime/queue.c
index 00bbd15f..dacf1f13 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -55,6 +55,7 @@
#include "wti.h"
#include "msg.h"
#include "atomic.h"
+#include "unicode-helper.h"
#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */
#ifdef OS_SOLARIS
@@ -68,11 +69,9 @@ DEFobjCurrIf(strm)
/* forward-definitions */
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
-static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
-static int qqueueIsIdleDA(qqueue_t *pThis);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
@@ -228,7 +227,8 @@ static inline void queueDrain(qqueue_t *pThis)
* this point in time. The mutex must be locked when
* ths function is called. -- rgerhards, 2008-01-25
*/
-static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
+static inline rsRetVal
+qqueueAdviseMaxWorkers(qqueue_t *pThis)
{
DEFiRet;
int iMaxWorkers;
@@ -236,48 +236,20 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
if(!pThis->bEnqOnly) {
- if(pThis->bRunsDA) {
- /* if we have not yet reached the high water mark, there is no need to start a
- * worker. -- rgerhards, 2008-01-26
- */
- if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
- }
- }
- /* regular workers always run */
- if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
+dbgprintf("AdviseMaxWorkers: log Queue Size: %d, high water mark %d\n",
+ getLogicalQueueSize(pThis) , pThis->iHighWtrMrk);
+ if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
} else {
- iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ if(getLogicalQueueSize(pThis) == 0) {
+ iMaxWorkers = 0;
+ } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
+ } else {
+ iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ }
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
}
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
- }
-
- RETiRet;
-}
-
-
-/* Destruct DA queue. This is the last part of DA-to-normal-mode
- * transistion. This is called asynchronously and some time quite a
- * while after the actual transistion. The key point is that we need to
- * do it at some later time, because we need to destruct the DA queue. That,
- * however, can not be done in a thread that has been signalled
- * This is to be called when we revert back to our own queue.
- * This function must be called with the queue mutex locked (the wti
- * class ensures this).
- * rgerhards, 2008-01-15
- */
-static rsRetVal
-TurnOffDAMode(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(pThis->bRunsDA);
- if(getLogicalQueueSize(pThis->pqDA) == 0) {
- pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
- DBGOPRINT((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
- iRet);
}
RETiRet;
@@ -348,33 +320,18 @@ StartDA(qqueue_t *pThis)
CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, pThis->toQShutdown));
CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
- // experimental: XXX
- CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0));
-
- if(pThis->toQShutdown == 0) {
- CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
- } else {
- /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
- * have an obviously large backlog, we can't finish it in any case. So there is no point
- * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
- */
- CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1));
- }
-
iRet = qqueueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
- //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
-
- DBGOPRINT((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
+ DBGOPRINT((obj_t*) pThis, "DA queue initialized, disk queue 0x%lx\n",
qqueueGetID(pThis->pqDA));
finalize_it:
@@ -412,91 +369,35 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
* rgerhards, 2008-01-24
* NOTE: this is the DA worker *pool*, not the DA queue!
*/
- if(pThis->pWtpDA == NULL) {
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis));
- CHKiRet(wtpConstruct (&pThis->pWtpDA));
- CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
- CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
- CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode));
- CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
- CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
- CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
- CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
- CHKiRet(wtpConstructFinalize (pThis->pWtpDA));
- }
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis));
+ CHKiRet(wtpConstruct (&pThis->pWtpDA));
+ CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
+ CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
+ CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
+ CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
+ CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
+ CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
+ CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
+ CHKiRet(wtpConstructFinalize (pThis->pWtpDA));
/* if we reach this point, we have a "good" DA worker pool */
- /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
- pThis->bDAEnqOnly = bEnqOnly;
-
/* now construct the actual queue (if it does not already exist) */
if(pThis->pqDA == NULL) {
CHKiRet(StartDA(pThis));
}
+ pThis->bEnqOnly = bEnqOnly; // TODO: I think this is not needed, but first clean up shutdown processing!
pThis->bRunsDA = 1;
- /* now we must now adivse the wtp that we need one worker. If none is yet active,
- * that will also start one up. If we forgot that step, everything would be stalled
- * until the next enqueue request.
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */
-
finalize_it:
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
RETiRet;
}
-/* check if we need to start disk assisted mode and send some signals to
- * keep it running if we are already in it. It also checks if DA mode is
- * partially initialized, in which case it waits for initialization to
- * complete.
- * rgerhards, 2008-01-14
- */
-static rsRetVal
-ChkStrtDA(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* if we do not hit the high water mark, we have nothing to do */
- if(getPhysicalQueueSize(pThis) != pThis->iHighWtrMrk)
- ABORT_FINALIZE(RS_RET_OK);
-
- if(pThis->bRunsDA) {
- /* then we need to signal that we are at the high water mark again. If that happens
- * on our way down the queue, that doesn't matter, because then nobody is waiting
- * on the condition variable.
- * (Remember that a DA queue stops draining the queue once it has reached the low
- * water mark and restarts it when the high water mark is reached again - this is
- * what this code here is responsible for. Please note that all workers may have been
- * terminated due to the inactivity timeout, thus we need to advise the pool that
- * we need at least one).
- */
- DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
- getPhysicalQueueSize(pThis));
- qqueueAdviseMaxWorkers(pThis);
- } else {
- /* this is the case when we are currently not running in DA mode. So it is time
- * to turn it back on.
- */
- DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
- getPhysicalQueueSize(pThis));
- InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* --------------- end code for disk-assisted queue modes -------------------- */
@@ -733,44 +634,6 @@ finalize_it:
}
-/* This method checks if we have a QIF file for the current queue (no matter of
- * queue mode). Returns RS_RET_OK if we have a QIF file or an error status otherwise.
- * rgerhards, 2008-01-15
- */
-static rsRetVal
-qqueueHaveQIF(qqueue_t *pThis)
-{
- DEFiRet;
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
- struct stat stat_buf;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pszFilePrefix == NULL)
- ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
-
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
-
- /* check if the file exists */
- if(stat((char*) pszQIFNam, &stat_buf) == -1) {
- if(errno == ENOENT) {
- DBGOPRINT((obj_t*) pThis, "no .qi file found\n");
- ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
- } else {
- DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- }
- }
- /* If we reach this point, we have a .qi file */
-
-finalize_it:
- RETiRet;
-}
-
-
/* The method loads the persistent queue information.
* rgerhards, 2008-01-11
*/
@@ -1112,15 +975,17 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
}
-/* Try to terminate queue worker threads within the regular shutdown interval.
- * Both the regular and DA queue (if it exists) is waited for, but on the same timeout.
- * After this function returns, the workers must either be finished or some force
- * to finish them must be applied.
- * This function also instructs the DA worker pool (if it exists) to terminate. This is done
- * in preparation of final queue shutdown.
- * rgerhards, 2009-05-27
+/* Try to shut down regular and DA queue workers, within the queue timeout
+ * period. That means processing continues as usual. This is the expected
+ * usual case, where during shutdown those messages remaining are being
+ * processed. At this point, it is acceptable that the queue can not be
+ * fully depleted, that case is handled in the next step. During this phase,
+ * we first shut down the main queue DA worker to prevent new data to arrive
+ * at the DA queue, and then we ask the regular workers of both the Regular
+ * and DA queue to try complete processing.
+ * rgerhards, 2009-10-14
*/
-static rsRetVal
+static inline rsRetVal
tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
{
struct timespec tTimeout;
@@ -1130,30 +995,26 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
- if(getPhysicalQueueSize(pThis) > 0) {
- if(pThis->bRunsDA) {
- /* We may have waited on the low water mark. As it may have changed, we
- * see if we reactivate the worker.
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
- }
+ if(pThis->bIsDA) {
+ /* We need to lock the mutex, as otherwise we may have a race that prevents
+ * us from awaking the DA worker. */
+ d_pthread_mutex_lock(pThis->mut);
+
+ /* tell regular queue DA worker to stop shuffling messages to DA queue... */
+ pThis->pqDA->bEnqOnly = 1;
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE);
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
+ DBGOPRINT((obj_t*) pThis, "awoke DA worker, told it to shut down.\n");
+
+ /* also tell the DA queue worker to shut down, so that it already knows... */
+ wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN);
+ wtpAdviseMaxWorkers(pThis->pqDA->pWtpReg, 1); /* awake its lone worker */
+ DBGOPRINT((obj_t*) pThis, "awoke DA queue regular worker, told it to shut down when done.\n");
+
+ d_pthread_mutex_unlock(pThis->mut);
}
- d_pthread_mutex_unlock(pThis->mut);
- /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
- * out there are no active workers - that doesn't matter: the wtp knows about that and so will
- * return immediately.
- * We do not yet care about the DA worker - that will be handled down later in the process.
- * Note that we must not request shutdown right now - that may introduce a race: if the regular queue
- * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA
- * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead
- * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way
- * (from a performance point of view). So we don't do anything right now. The DA queue will continue to
- * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything
- * by not requesting shutdown now.
- * rgerhards, 2008-01-25
- */
+
/* first calculate absolute timeout - we need the absolute value here, because we need to coordinate
* shutdown of both the regular and DA queue on *the same* timeout.
*/
@@ -1182,29 +1043,17 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
}
}
- if(pThis->pWtpDA != NULL) {
- /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
- * the queue, it is restarted at a later stage. We don't care here if a timeout happens.
- */
- DBGOPRINT((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
- } else {
- DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n");
- }
- }
-
RETiRet;
}
/* Try to shut down regular and DA queue workers, within the action timeout
- * period. Note that the main queue DA worker is still unaffected (and may shuffle
- * data to the disk queue while we terminate the other workers). Not finishing
- * processing all messages is now OK (but they may be preserved later, depending
- * on bSaveOnShutdown setting).
- * rgerhards, 2009-05-27
+ * period. This aborts processing, but at the end of the current action, in
+ * a well-defined manner. During this phase, we terminate all three worker
+ * pools, including the regular queue DA worker if it not yet has terminated.
+ * Not finishing processing all messages is OK (and expected) at this stage
+ * (they may be preserved later, depending * on bSaveOnShutdown setting).
+ * rgerhards, 2009-10-14
*/
static rsRetVal
tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
@@ -1218,17 +1067,10 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
- /* note that we modify bEnqOnly directly, because going through the method would
- * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
- */
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
- /* need to set this so that the DA queue begins shutdown in parallel! */
- if(pThis->pqDA != NULL) {
- pThis->pqDA->bEnqOnly = 1;
- wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
- }
+// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance...
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
timeoutComp(&tTimeout, pThis->toActShutdown);
DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n");
@@ -1252,20 +1094,14 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
"queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
- }
- if(pThis->pWtpDA != NULL) {
- /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This
- * is necessary because we may be in a situation where the DA queue regular worker and the
- * main queue worker stopped rather quickly. In this case, there is almost no time (and
- * probably no thread switch!) between the point where we instructed the main queue DA
- * worker to shutdown and this code location. In consequence, it may not even have
- * noticed that it should should down, less acutally done this. So we provide it with a
- * fixed 100ms timeout to try complete its work, what usually should be sufficient.
- * rgerhards, 2009-10-06
+ /* and now we need to terminate the DA worker itself. We always grant it a 100ms timeout,
+ * which should be sufficient and usually not be required (it is expected to have finished
+ * long before while we were processing the queue timeout in shutdown phase 1).
+ * rgerhards, 2009-10-14
*/
timeoutComp(&tTimeout, 100);
- DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n");
+ DBGOPRINT((obj_t*) pThis, "trying regular shutdown of main queue DA worker pool\n");
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
@@ -1280,14 +1116,13 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
/* This function cancels all remaining regular workers for both the main and the DA
- * queue. The main queue's DA worker pool continues to run (if it exists and is active).
+ * queue.
* rgerhards, 2009-05-29
*/
static rsRetVal
cancelWorkers(qqueue_t *pThis)
{
rsRetVal iRetLocal;
- struct timespec tTimeout;
DEFiRet;
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
@@ -1309,30 +1144,12 @@ cancelWorkers(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
- }
- /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
- * restarted later to persist the queue. But we stop it, because otherwise we get into
- * big trouble when resetting the logical dequeue pointer. This operation can only be
- * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
- */
- if(pThis->pWtpDA != NULL) {
- /* but because of the potentially harsh consequences of cancelling, we try one last
- * (and short) time to shut down the DA worker in a normal fashion. The idea here
- * is that it may be willing to do so, but we did not yet have a task switch so
- * that it could not terminate but will do immediately when it gets time.
- * rgerhards, 2009-10-13
+ /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
+ * restarted later to persist the queue. But we stop it, because otherwise we get into
+ * big trouble when resetting the logical dequeue pointer. This operation can only be
+ * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
*/
- timeoutComp(&tTimeout, 50);
- DBGOPRINT((obj_t*) pThis, "one ultimately last try for regular shutdown of main queue DA worker pool\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
- "- this is not good, need to cancel now...\n");
- } else {
- DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down!\n");
- }
-
DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n");
iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
}
@@ -1368,13 +1185,6 @@ ShutdownWorkers(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
- /* we reduce the low water mark in any case. This is not absolutely necessary, but
- * it is useful because we enable DA mode at several spots below and so we do not need
- * to think about the low water mark each time.
- */
- pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
- pThis->iLowWtrMrk = 0;
-
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis));
@@ -1412,9 +1222,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
ASSERT(pConsumer != NULL);
ASSERT(iWorkerThreads >= 0);
- if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc(pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t)));
/* we have an object, so let's fill the properties */
objConstructSetObjInfo(pThis);
@@ -1425,7 +1233,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
- pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir);
+ pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
pThis->nLogDeq = 0;
@@ -1814,7 +1622,7 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
-dbgprintf("YYY: deqeueu for consumer");
+dbgprintf("YYY: dequeue for consumer\n");
CHKiRet(DequeueConsumable(pThis, pWti));
if(pWti->batch.nElem == 0)
@@ -1908,11 +1716,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
d_pthread_mutex_unlock(pThis->mut);
/* iterate over returned results and enqueue them in DA queue */
- for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
+ //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
+ for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
+dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
}
@@ -1941,6 +1751,7 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
if(pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
+#if 0
} else {
if(pThis->bRunsDA) {
ASSERT(pThis->pqDA != NULL);
@@ -1961,6 +1772,7 @@ RUNLOG_STR("XXX: done re-start reg worker");
// experimental iRet = RS_RET_TERMINATE_NOW;
;
}
+#endif
}
RETiRet;
@@ -1997,60 +1809,12 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
DEFiRet;
assert(pVal != NULL);
*pVal = pThis->iDeqBatchSize;
-if(pThis->pqParent != NULL)
+if(pThis->pqParent != NULL) // TODO: check why we actually do this!
*pVal = 16;
RETiRet;
}
-/* must only be called when the queue mutex is locked, else results
- * are not stable! DA worker version (pThis *is* the *main* queue, not DA!)
- */
-static int
-qqueueIsIdleDA(qqueue_t *pThis)
-{
- return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk);
-}
-/* must only be called when the queue mutex is locked, else results
- * are not stable! Regular worker version.
- */
-static int
-IsIdleReg(qqueue_t *pThis)
-{
- return(getPhysicalQueueSize(pThis) == 0);
-}
-
-
-/* This function is called when a worker thread for the regular queue is shut down.
- * If we are the primary queue, this is not really interesting to us. If, however,
- * we are the DA (child) queue, that means the DA queue is empty. In that case, we
- * need to signal the parent queue's DA worker, so that it can terminate DA mode.
- * rgerhards, 2008-01-26
- * rgerhards, 2008-02-27: HOWEVER, in a shutdown condition, it may be that the parent's worker thread pool
- * has already been terminated and destructed. This *is* a legal condition and happens
- * from time to time in practice. So we need to signal only if there still is a
- * parent DA worker queue. Please keep in mind that the the parent's DA worker
- * pool is DIFFERENT from our (DA queue) regular worker pool. So when the parent's
- * pWtpDA is destructed, there can still be some of our (DAq/wtp) threads be running.
- * I am telling this, because I, too, always get confused by those...
- */
-static rsRetVal
-RegOnWrkrShutdown(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pqParent != NULL) {
- if(pThis->pqParent->pWtpDA != NULL) { /* see comment in function header from 2008-02-27 */
- wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
- }
- }
-
- RETiRet;
-}
-
-
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
@@ -2058,8 +1822,6 @@ rsRetVal
qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
- rsRetVal iRetLocal;
- int bInitialized = 0; /* is queue already initialized? */
uchar pszBuf[64];
size_t lenBuf;
@@ -2101,8 +1863,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
- /* create worker thread pools for regular operation. The DA pool is created on an as-needed
- * basis, which potentially means never under most circumstances.
+ /* create worker thread pools for regular and DA operation.
*/
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
@@ -2110,10 +1871,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -2121,27 +1880,11 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis));
CHKiRet(wtpConstructFinalize (pThis->pWtpReg));
- /* initialize worker thread instances */
- if(pThis->bIsDA) {
- /* If we are disk-assisted, we need to check if there is a QIF file
- * which we need to load. -- rgerhards, 2008-01-15
- */
- iRetLocal = qqueueHaveQIF(pThis);
- if(iRetLocal == RS_RET_OK) {
- DBGOPRINT((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
- InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
- bInitialized = 1; /* we are done */
- } else {
- /* TODO: use logerror? -- rgerhards, 2008-01-16 */
- DBGOPRINT((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. "
- "Some data may be lost\n", iRetLocal);
- }
- }
+ /* set up DA system if we have a disk-assisted queue */
+ if(pThis->bIsDA)
+ InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
- if(Debug && !bInitialized) {
- DBGOPRINT((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA "
- "queue itself!)\n");
- }
+ DBGOPRINT((obj_t*) pThis, "queue finished initialization\n");
/* if the queue already contains data, we need to start the correct number of worker threads. This can be
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
@@ -2284,10 +2027,16 @@ DoSaveOnShutdown(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
- InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
-dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
- /* make sure we do not timeout before we are done */
- DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
+ /* we reduce the low water mark, otherwise the DA worker would terminate when
+ * it is reached.
+ */
+ DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown set, restarting DA worker...\n");
+ pThis->bShutdownImmediate = 0; /* would termiante the DA worker! */
+ pThis->iLowWtrMrk = 0;
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN); /* shutdown worker (only) when done (was _IMMEDIATE!) */
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* restart DA worker */
+
+ DBGOPRINT((obj_t*) pThis, "waiting for DA worker to terminate...\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
@@ -2442,10 +2191,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
*/
CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
- /* then check if we need to add an assistance disk queue */
- if(pThis->bIsDA)
- CHKiRet(ChkStrtDA(pThis));
-
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
* Basic flow control has always been implemented and protects the queue structures
@@ -2489,6 +2234,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
timeoutComp(&t, pThis->toEnq);
+// TODO : handle enqOnly => discard!
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
objDestruct(pUsr);
@@ -2527,7 +2273,6 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
}
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
-dbgprintf("queueMultiEnq: %d\n", i);
CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
}
@@ -2582,58 +2327,6 @@ dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
}
-/* set queue mode to enqueue only or not
- * There is one subtle issue: this method may be called during queue
- * construction or while it is running. In the former case, the queue
- * mutex does not yet exist (it is NULL), while in the later case it
- * must be locked. The function detects the state and operates as
- * required.
- * rgerhards, 2008-01-16
- */
-static rsRetVal
-SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* for simplicity, we do one big mutex lock. This method is extremely seldom
- * called, so that doesn't matter... -- rgerhards, 2008-01-16
- */
- if(pThis->mut != NULL) {
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- }
-
- if(bEnqOnly == pThis->bEnqOnly)
- FINALIZE; /* no change, nothing to do */
-
- if(pThis->bQueueStarted) {
- /* we need to adjust queue operation only if we are not during initial param setup */
- if(bEnqOnly == 1) {
- /* switch to enqueue-only mode */
- /* this means we need to terminate all workers - that's it... */
- DBGOPRINT((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
- if(pThis->pWtpReg != NULL)
- wtpWakeupAllWrkr(pThis->pWtpReg);
- if(pThis->pWtpDA != NULL)
- wtpWakeupAllWrkr(pThis->pWtpDA);
- } else {
- /* switch back to regular mode */
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
- }
- }
-
- pThis->bEnqOnly = bEnqOnly;
-
-finalize_it:
- if(pThis->mut != NULL) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- }
- RETiRet;
-}
-
-
/* some simple object access methods */
DEFpropSetMeth(qqueue, bSyncQueueFiles, int)
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
diff --git a/runtime/wti.c b/runtime/wti.c
index c3ab0aba..24988cbe 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -189,6 +189,7 @@ finalize_it:
* the cancel cleanup handler (and have been cancelled).
* rgerhards, 2008-01-16
*/
+// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14
static void
wtiWorkerCancelCleanup(void *arg)
{
@@ -202,9 +203,6 @@ wtiWorkerCancelCleanup(void *arg)
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
- /* call user supplied handler */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
-
ENDfunc
}
@@ -222,8 +220,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
BEGINfunc
DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
- pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
-
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
@@ -235,6 +231,7 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
+ dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n");
ENDfunc
}
@@ -249,7 +246,6 @@ wtiWorker(wti_t *pThis)
int bInactivityTOOccured = 0;
rsRetVal localRet;
rsRetVal terminateRet;
- int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -259,23 +255,18 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
- pWtp->pfOnWorkerStartup(pWtp->pUsr);
-
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
-dbgprintf("YYY/ZZZ: pre lock mutex\n");
d_pthread_mutex_lock(pWtp->pmutUsr);
-dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
-RUNLOG;
+RUNLOG_VAR("%d", terminateRet);
if(terminateRet == RS_RET_TERMINATE_NOW) {
-RUNLOG;
/* we now need to free the old batch */
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
@@ -283,7 +274,6 @@ RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
-RUNLOG;
/* try to execute and process whatever we have */
/* Note that this function releases and re-aquires the mutex. The returned
@@ -291,41 +281,23 @@ RUNLOG;
*/
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
-dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr);
if(localRet == RS_RET_IDLE) {
-RUNLOG;
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
}
-RUNLOG;
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
-RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
-RUNLOG;
continue; /* request next iteration */
}
-RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
bInactivityTOOccured = 0; /* reset for next run */
}
/* indicate termination */
-RUNLOG;
- d_pthread_mutex_lock(pWtp->pmutUsr);
-RUNLOG;
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-RUNLOG;
pthread_cleanup_pop(0); /* remove cleanup handler */
-RUNLOG;
- pWtp->pfOnWorkerShutdown(pWtp->pUsr);
-RUNLOG;
- pthread_setcancelstate(iCancelStateSave, NULL);
-RUNLOG;
- d_pthread_mutex_unlock(pWtp->pmutUsr);
-RUNLOG;
RETiRet;
}
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 93234819..3e76bb56 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -94,13 +94,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
pThis->pfGetDeqBatchSize = NotImplementedDummy;
- pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
pThis->pfObjProcessed = NotImplementedDummy;
- pThis->pfOnIdle = NotImplementedDummy;
- pThis->pfOnWorkerCancel = NotImplementedDummy;
- pThis->pfOnWorkerStartup = NotImplementedDummy;
- pThis->pfOnWorkerShutdown = NotImplementedDummy;
ENDobjConstruct(wtp)
@@ -160,22 +155,6 @@ CODESTARTobjDestruct(wtp)
ENDobjDestruct(wtp)
-/* wake up all worker threads.
- * rgerhards, 2008-01-16
- */
-rsRetVal
-wtpWakeupAllWrkr(wtp_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, wtp);
- //d_pthread_mutex_lock(pThis->pmutUsr);
- pthread_cond_broadcast(pThis->pcondBusy);
- //d_pthread_mutex_unlock(pThis->pmutUsr);
- RETiRet;
-}
-
-
/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21
* We do not need to do atomic instructions as set operations are only
* called when terminating the pool, and then in strict sequence. So we
@@ -211,8 +190,10 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex)
wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState);
if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
+RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_NOW");
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
} else if(wtpState == wtpState_SHUTDOWN) {
+RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_WHEN_IDLE");
ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
}
@@ -241,8 +222,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
ISOBJ_TYPE_assert(pThis, wtp);
+ /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */
+ d_pthread_mutex_lock(pThis->pmutUsr);
wtpSetState(pThis, tShutdownCmd);
- wtpWakeupAllWrkr(pThis);
+ pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
+ d_pthread_mutex_unlock(pThis->pmutUsr);
/* wait for worker thread termination */
d_pthread_mutex_lock(&pThis->mutWtp);
@@ -430,7 +414,7 @@ dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, p
CHKiRet(wtpStartWrkr(pThis));
}
} else {
-dbgprintf("YYY: adivse signal cond busy");
+dbgprintf("YYY: wtpAdviseMaxWorkers, sufficient workers, just doing adivse signal cond busy\n");
pthread_cond_signal(pThis->pcondBusy);
}
@@ -450,13 +434,8 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
-DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*))
DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
-DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
-DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
-DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*))
-DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*))
/* set the debug header message
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 0505b91c..05c02a8c 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -62,12 +62,7 @@ struct wtp_s {
rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */
rsRetVal (*pfRateLimiter)(void *pUsr);
- rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti);
- rsRetVal (*pfOnIdle)(void *pUsr, int);
- rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti);
- rsRetVal (*pfOnWorkerStartup)(void *pUsr);
- rsRetVal (*pfOnWorkerShutdown)(void *pUsr);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
};
@@ -91,13 +86,8 @@ PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
-PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*));
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*));
PROTOTYPEpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*));
-PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
-PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*));
-PROTOTYPEpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*));
-PROTOTYPEpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*));
PROTOTYPEpropSetMeth(wtp, toWrkShutdown, long);
PROTOTYPEpropSetMeth(wtp, wtpState, wtpState_t);
PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int);