summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--debug.c3
-rw-r--r--queue.c256
-rwxr-xr-xsrUtils.c34
-rwxr-xr-xsrUtils.h3
-rw-r--r--wti.c12
-rw-r--r--wtp.c32
-rw-r--r--wtp.h2
7 files changed, 222 insertions, 120 deletions
diff --git a/debug.c b/debug.c
index e473504e..a3aa2b35 100644
--- a/debug.c
+++ b/debug.c
@@ -55,6 +55,7 @@ static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logge
static char *pszAltDbgFileName = "/home/rger/proj/rsyslog/log"; /* if set, debug output is *also* sent to here */
static FILE *altdbg; /* and the handle for alternate debug output */
static FILE *stddbg;
+static dbgFuncDB_t pCurrFunc;
/* list of all known FuncDBs. We use a special list, because it must only be single-linked. As
@@ -653,7 +654,7 @@ sigsegvHdlr(int signum)
signame = "";
}
- dbgprintf("Signal %d%s occured, execution must be terminated %d.\n", signum, signame, SIGSEGV);
+ dbgprintf("\n\n\n\nSignal %d%s occured, execution must be terminated %d.\n\n\n\n", signum, signame, SIGSEGV);
dbgCallStackPrintAll();
diff --git a/queue.c b/queue.c
index c3666db2..320b3385 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,5 @@
+ // TODO: we need to implement peek(), without it (today!) we lose one message upon
+ // worker cancellation! -- rgerhards, 2008-01-14
// TODO: think about mutDA - I think it's no longer needed
// TODO: start up the correct num of workers when switching to non-DA mode
// TODO: "preforked" worker threads
@@ -776,81 +778,157 @@ queueDel(queue_t *pThis, void *pUsr)
static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
+ DEFVARS_mutexProtection;
int i;
+ struct timespec tTimeout;
+ rsRetVal iRetLocal;
ISOBJ_TYPE_assert(pThis, queue);
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
- /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This
- * is necessary so that all threads get sent the termination command. With a timeout of 0, however,
- * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as
- * good.
- */
- wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, pThis->toQShutdown);
- if(iRet == RS_RET_TIMED_OUT) {
- if(pThis->toQShutdown == 0) {
- iRet = RS_RET_OK;
- } else {
- /* OK, we now need to try force the shutdown */
- dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
- queueGetID(pThis));
- iRet = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
+ // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25
+ /* first try to shutdown the queue within the regular shutdown period */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->iQueueSize > 0) {
+ if(pThis->bRunsDA) {
+ /* worker threads may be inactive after reaching low water
+ * mark. Lower the mark and react workers.
+ */
+ pThis->iLowWtrMrk = 0;
+ wtpAdviseMaxWorkers(pThis->pWtpReg, 1);
}
}
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
- /* still didn't work out - so we now need to cancel the workers */
- dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
- iRet = wtpCancelAll(pThis->pWtpReg);
- }
-
- // TODO: do it just once but right ;)
- if(pThis->pWtpDA != NULL) {
- wtpShutdownAll(pThis->pWtpDA, pThis->toQShutdown, pThis->toQShutdown);
- if(iRet == RS_RET_TIMED_OUT) {
- if(pThis->toQShutdown == 0) {
- iRet = RS_RET_OK;
- } else {
- /* OK, we now need to try force the shutdown */
- dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
+ /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
+ * out there are no active workers - that doesn't matter: the wtp knows about that and so will
+ * return immediately.
+ * We do not yet care about the DA worker - that will be handled down later in the process.
+ * Note that we must not request shutdown right now - that may introduce a race: if the regular queue
+ * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA
+ * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead
+ * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way
+ * (from a performance point of view). So we don't do anything right now. The DA queue will continue to
+ * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything
+ * by not requesting shutdown now.
+ * rgerhards, 2008-01-25
+ */
+ /* first calculate absolute timeout - we need the absolute value here, because we need to coordinate
+ * shutdown of both the regular and DA queue on *the same* timeout.
+ */
+ timeoutComp(&tTimeout, pThis->toQShutdown);
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis));
+ } else {
+ /* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */
+ dbgprintf("Queue 0x%lx: regular queue workers shut down.\n", queueGetID(pThis));
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->bRunsDA) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: we have a DA queue (0x%lx), requesting its shutdown.\n",
+ queueGetID(pThis), queueGetID(pThis->pqDA));
+ /* we use the same absolute timeout as above, so we do not use more than the configured
+ * timeout interval!
+ */
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n",
queueGetID(pThis));
- iRet = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
}
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
+ }
- if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
- /* still didn't work out - so we now need to cancel the workers */
- dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
- iRet = wtpCancelAll(pThis->pWtpDA);
+ /* when we reach this point, both queues are either empty or the regular queue shutdown timeout
+ * has expired. Now we need to check if we areconfigured to not loose messages. If so, we need
+ * to persist the queue to disk (this is only possible if the queue is DA-enabled).
+ */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ /* optimize parameters for shutdown of DA-enabled queues */
+ if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) {
+ /* switch to enqueue-only mode so that no more actions happen */
+ if(pThis->bRunsDA == 0) {
+ queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
+ } else {
+ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* switch to enqueue-only mode */
}
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ /* make sure we do not timeout before we are done */
+ dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis));
+ timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ /* and run the primary's queue worker to drain the queue */
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
+ "continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
+ }
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
-
-
- /* finally join the threads
- * In case of a cancellation, this may actually take some time. This is also
- * needed to clean up the thread descriptors, even with a regular termination.
- * And, most importantly, this is needed if we have an indifitite termination
- * time set (timeout == 0)! -- rgerhards, 2008-01-14
+
+ /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
+ * can now request immediate shutdown of any remaining workers.
*/
-#if 0 // totally wrong, we must implement something along these lines in wtp!
-RUNLOG;
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWtpReg->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) {
- wtiJoinThrd(pThis->pWtpReg->pWrkr[i]);
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->iQueueSize > 0) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue "
+ "in disk save mode. Continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
}
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
-RUNLOG;
- if(pThis->pWtpDA != NULL) {
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWtpDA->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) {
- wtiJoinThrd(pThis->pWtpDA->pWrkr[i]);
- }
+ /* Now queue workers should have terminated. If not, we need to cancel them as we have applied
+ * all timeout setting. If any worker in any queue still executes, its consumer is possibly
+ * long-running and cancelling is the only way to get rid of it. Note that the
+ * cancellation handler will probably re-queue a user pointer, so the queue's enqueue
+ * function is still needed (what is no problem as we do not yet destroy the queue - but I
+ * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
+ */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->iQueueSize > 0) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: primary queue worker threads could not be shutdown, now canceling them\n",
+ queueGetID(pThis));
+ iRetLocal = wtpCancelAll(pThis->pWtpReg);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker "
+ "threads, continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
+ }
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ }
+
+ /* ... and now the DA queue, if it exists (should always be after the primary one) */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: DA worker threads could not be shutdown, now canceling them\n",
+ queueGetID(pThis));
+ iRetLocal = wtpCancelAll(pThis->pWtpReg);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker "
+ "threads, continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
}
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
-#endif
+ /* ... finally ... all worker threads have terminated :-)
+ * Well, more precisely, they *are in termination*. Some cancel cleanup handlers
+ * may still be running.
+ */
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
queueGetID(pThis), pThis->iQueueSize);
@@ -1112,20 +1190,21 @@ queueChkStopWrkrReg(queue_t *pThis)
/* must only be called when the queue mutex is locked, else results
- * are not stable! DA version
+ * are not stable! DA queue version
*/
static int
queueIsIdleDA(queue_t *pThis)
{
- return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
+ /* remember: iQueueSize is the DA queue size, not the main queue! */
+ return (pThis->iQueueSize == 0);
}
/* must only be called when the queue mutex is locked, else results
- * are not stable! Regular version
+ * are not stable! Regular queue version
*/
static int
queueIsIdleReg(queue_t *pThis)
{
- return (pThis->iQueueSize == 0);
+ return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
}
@@ -1322,7 +1401,6 @@ rsRetVal queueDestruct(queue_t **ppThis)
{
DEFiRet;
queue_t *pThis;
- DEFVARS_mutexProtection;
assert(ppThis != NULL);
pThis = *ppThis;
@@ -1332,55 +1410,37 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
- /* we do not need to take care of any messages left in queue if we are in enqueue only mode */
- if(!pThis->bEnqOnly) {
- /* in regular mode, need look at termination */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- /* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && pThis->iQueueSize > 0) { // TODO: atomic iQueueSize!
- dbgprintf("IsDA queue, modifying params for draining\n");
- pThis->iHighWtrMrk = 1; /* make sure we drain */
- pThis->iLowWtrMrk = 0; /* disable low water mark algo */
- if(pThis->bRunsDA == 0) {
- if(pThis->iQueueSize > 0) {
- queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
- }
- } else {
- queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */
- /* worker may have been waited on low water mark, reactivate */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
- }
- if(pThis->bSaveOnShutdown) {
- dbgprintf("bSaveOnShutdown set, eternal timeout set\n");
- pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
- }
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- }
- }
+ /* shut down all workers (handles *all* of the persistence logic) */
+ queueShutdownWorkers(pThis);
- /* at this point, the queue is either empty with all workers being idle (or deact) or the queue
- * is full and all workers are running. We now need to wait for everyone to become idle.
- */
+ /* finally destruct our (regular) worker thread pool */
if(pThis->qType != QUEUETYPE_DIRECT) {
- queueShutdownWorkers(pThis);
+ wtpDestruct(&pThis->pWtpReg);
}
- /* if still running DA, terminate disk queue (note that the DA queue is NULL if it was never used) */
- if(pThis->bRunsDA && pThis->pqDA != NULL)
+ /* Now check if we actually have a DA queue and, if so, destruct it.
+ * Note that the wtp must be destructed first, it may be in cancel cleanup handler
+ * *right now* and actually *need* to access the queue object to persist some final
+ * data (re-queueing case). So we need to destruct the wtp first, which will make
+ * sure all workers have terminated.
+ */
+ if(pThis->pWtpDA != NULL) {
+ wtpDestruct(&pThis->pWtpDA);
queueDestruct(&pThis->pqDA);
+ }
- /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */
+ /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
+ * This handler is most important for disk queues, it will finally persist the necessary
+ * on-disk structures. In theory, other queueing modes may implement their other (non-DA)
+ * methods of persisting a queue between runs, but in practice all of this is done via
+ * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
+ * if need arises (what I doubt...) -- rgerhards, 2008-01-25
+ */
CHKiRet_Hdlr(queuePersist(pThis)) {
dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet);
}
- /* ... then free resources */
- if(pThis->qType != QUEUETYPE_DIRECT) {
- wtpDestruct(&pThis->pWtpReg);
- if(pThis->pWtpDA != NULL)
- wtpDestruct(&pThis->pWtpDA);
- }
-
+ /* finally, clean up some simple things... */
if(pThis->pqParent == NULL) {
/* if we are not a child, we allocated our own mutex, which we now need to destroy */
pthread_mutex_destroy(pThis->mut);
@@ -1389,6 +1449,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
pthread_mutex_destroy(&pThis->mutThrdMgmt);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->notEmpty);
+
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
@@ -1467,6 +1528,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
ISOBJ_TYPE_assert(pThis, queue);
+// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly*
dbgprintf("Queue %p: EnqObj() 1\n", pThis);
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
diff --git a/srUtils.c b/srUtils.c
index 590e7eff..4c64ce89 100755
--- a/srUtils.c
+++ b/srUtils.c
@@ -316,7 +316,7 @@ int getNumberDigits(long lNum)
* rgerhards, 2008-01-14
*/
rsRetVal
-timeoutComp(struct timespec *pt, int iTimeout)
+timeoutComp(struct timespec *pt, long iTimeout)
{
assert(pt != NULL);
/* compute timeout */
@@ -331,6 +331,38 @@ timeoutComp(struct timespec *pt, int iTimeout)
}
+/* This function is kind of the reverse of timeoutComp() - it takes an absolute
+ * timeout value and computes how far this is in the future. If the value is already
+ * in the past, 0 is returned. The return value is in ms.
+ * rgerhards, 2008-01-25
+ */
+long
+timeoutVal(struct timespec *pt)
+{
+ struct timespec t;
+ long iTimeout;
+
+ assert(pt != NULL);
+ /* compute timeout */
+ clock_gettime(CLOCK_REALTIME, &t);
+ if(pt->tv_sec < t.tv_sec) {
+ iTimeout = 0; /* in the past! */
+ } else if(pt->tv_sec == t.tv_sec) {
+ if(pt->tv_nsec < t.tv_nsec) {
+ iTimeout = 0; /* in the past! */
+ } else {
+ iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000;
+ }
+ } else {
+ iTimeout = pt->tv_sec - t.tv_nsec;
+ iTimeout += 1000 - (pt->tv_nsec / 1000);
+ iTimeout += t.tv_nsec / 1000;
+ }
+
+ return iTimeout;
+}
+
+
/* cancellation cleanup handler - frees provided mutex
* rgerhards, 2008-01-14
*/
diff --git a/srUtils.h b/srUtils.h
index 66af63d1..d0d34f37 100755
--- a/srUtils.h
+++ b/srUtils.h
@@ -66,7 +66,8 @@ void skipWhiteSpace(uchar **pp);
rsRetVal genFileName(uchar **ppName, uchar *pDirName, size_t lenDirName, uchar *pFName,
size_t lenFName, long lNum, int lNumDigits);
int getNumberDigits(long lNum);
-rsRetVal timeoutComp(struct timespec *pt, int iTimeout);
+rsRetVal timeoutComp(struct timespec *pt, long iTimeout);
+long timeoutVal(struct timespec *pt);
void mutexCancelCleanup(void *arg);
/* mutex operations */
diff --git a/wti.c b/wti.c
index 045330c6..2e1dd548 100644
--- a/wti.c
+++ b/wti.c
@@ -159,6 +159,16 @@ rsRetVal wtiDestruct(wti_t **ppThis)
/* we can not be canceled, that would have a myriad of side-effects */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ /* if we reach this point, we must make sure the associated worker has terminated. It is
+ * the callers duty to make sure the worker has already terminated.
+ * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25
+ */
+ wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */
+
+ d_pthread_mutex_lock(&pThis->mut);
+ assert(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) <= eWRKTHRD_TERMINATING); // I knew it smelled bad...
+ d_pthread_mutex_unlock(&pThis->mut);
+
/* actual destruction */
pthread_cond_destroy(&pThis->condInitDone);
pthread_mutex_destroy(&pThis->mut);
@@ -166,7 +176,7 @@ rsRetVal wtiDestruct(wti_t **ppThis)
if(pThis->pszDbgHdr != NULL)
free(pThis->pszDbgHdr);
- /* and finally delete the queue objet itself */
+ /* and finally delete the wti object itself */
free(pThis);
*ppThis = NULL;
diff --git a/wtp.c b/wtp.c
index 4c3ea921..4133e7b4 100644
--- a/wtp.c
+++ b/wtp.c
@@ -106,7 +106,6 @@ wtpConstructFinalize(wtp_t *pThis)
/* alloc and construct workers - this can only be done in finalizer as we previously do
* not know the max number of workers
*/
-RUNLOG;
if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -276,22 +275,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
* rgerhards, 2008-01-14
*/
rsRetVal
-wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout)
+wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout)
{
DEFiRet;
int bTimedOut;
- struct timespec t;
int iCancelStateSave;
dbgPrintAllDebugInfo();
RUNLOG_VAR("%p", pThis);
-RUNLOG_VAR("%ld", iTimeout);
RUNLOG_VAR("%d", tShutdownCmd);
ISOBJ_TYPE_assert(pThis, wtp);
wtpSetState(pThis, tShutdownCmd);
wtpWakeupAllWrkr(pThis);
- timeoutComp(&t, iTimeout);/* get timeout */
/* and wait for their termination */
dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
@@ -302,9 +298,9 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
- wtpGetDbgHdr(pThis), iTimeout, pThis->iCurNumWrkThrd);
+ wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
- if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, &t) != 0) {
+ if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, ptTimeout) != 0) {
dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
@@ -313,6 +309,11 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
if(bTimedOut)
iRet = RS_RET_TIMED_OUT;
+
+ /* see if we need to harvest (join) any terminated threads (even in timeout case,
+ * some may have terminated...
+ */
+ wtpProcessThrdChanges(pThis);
dbgprintf("wtpShutdownAll exit");
RETiRet;
@@ -345,25 +346,25 @@ wtpCancelAll(wtp_t *pThis)
{
DEFiRet;
int i;
- // TODO: we need to implement peek(), without it (today!) we lose one message upon
- // worker cancellation! -- rgerhards, 2008-01-14
+ // TODO: mutex?? // TODO: cancellation in wti!
ISOBJ_TYPE_assert(pThis, wtp);
/* process any pending thread requests so that we know who actually is still running */
wtpProcessThrdChanges(pThis);
- /* awake the workers one more time, just to be sure */
- wtpWakeupAllWrkr(pThis);
-
+RUNLOG_VAR("%d", pThis->iNumWorkerThreads);;
/* first tell the workers our request */
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
// TODO: mutex lock!
+RUNLOG_VAR("%p", pThis->pWrkr[i]);
if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) {
+RUNLOG;
dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
pthread_cancel(pThis->pWrkr[i]->thrdID);
}
}
+RUNLOG;
RETiRet;
}
@@ -492,9 +493,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
* we do NOT start a new one. Let's give the other one a chance, first.
*/
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- // TODO: sync!
-RUNLOG;
-dbgprintf("%s: i %d, wti_T* %p\n", wtpGetDbgHdr(pThis), i, pThis->pWrkr[i]);
if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
break;
}
@@ -510,7 +508,6 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis
dbgprintf("%s: started with state %d, num workers now %d\n",
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
-RUNLOG;
/* we try to give the starting worker a little boost. It won't help much as we still
* hold the queue's mutex, but at least it has a chance to start on a single-CPU system.
*/
@@ -521,7 +518,6 @@ RUNLOG;
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-RUNLOG;
RETiRet;
}
diff --git a/wtp.h b/wtp.h
index 58fc8a5f..37cbd7e9 100644
--- a/wtp.h
+++ b/wtp.h
@@ -92,10 +92,10 @@ rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex);
rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState);
rsRetVal wtpWakeupWrkr(wtp_t *pThis);
rsRetVal wtpWakeupAllWrkr(wtp_t *pThis);
-rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout);
rsRetVal wtpCancelAll(wtp_t *pThis);
rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp);
+rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));