summaryrefslogtreecommitdiffstats
path: root/wtp.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-25 10:45:25 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-25 10:45:25 +0000
commit167abdb5b3fa6900edd6bbdb1cc7d586896a268c (patch)
treebed714a9789bd3f7bd2c86039dfdd4196471b85a /wtp.c
parent5c686c8adcc473cbdbb14e4b2d736f9123210ee6 (diff)
downloadrsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.tar.gz
rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.tar.xz
rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.zip
restructured queue shutdown so that the queue timeout is properly applied
before terminatiing the queue
Diffstat (limited to 'wtp.c')
-rw-r--r--wtp.c32
1 files changed, 14 insertions, 18 deletions
diff --git a/wtp.c b/wtp.c
index 4c3ea921..4133e7b4 100644
--- a/wtp.c
+++ b/wtp.c
@@ -106,7 +106,6 @@ wtpConstructFinalize(wtp_t *pThis)
/* alloc and construct workers - this can only be done in finalizer as we previously do
* not know the max number of workers
*/
-RUNLOG;
if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -276,22 +275,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
* rgerhards, 2008-01-14
*/
rsRetVal
-wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout)
+wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout)
{
DEFiRet;
int bTimedOut;
- struct timespec t;
int iCancelStateSave;
dbgPrintAllDebugInfo();
RUNLOG_VAR("%p", pThis);
-RUNLOG_VAR("%ld", iTimeout);
RUNLOG_VAR("%d", tShutdownCmd);
ISOBJ_TYPE_assert(pThis, wtp);
wtpSetState(pThis, tShutdownCmd);
wtpWakeupAllWrkr(pThis);
- timeoutComp(&t, iTimeout);/* get timeout */
/* and wait for their termination */
dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
@@ -302,9 +298,9 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
- wtpGetDbgHdr(pThis), iTimeout, pThis->iCurNumWrkThrd);
+ wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
- if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, &t) != 0) {
+ if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, ptTimeout) != 0) {
dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
@@ -313,6 +309,11 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
if(bTimedOut)
iRet = RS_RET_TIMED_OUT;
+
+ /* see if we need to harvest (join) any terminated threads (even in timeout case,
+ * some may have terminated...
+ */
+ wtpProcessThrdChanges(pThis);
dbgprintf("wtpShutdownAll exit");
RETiRet;
@@ -345,25 +346,25 @@ wtpCancelAll(wtp_t *pThis)
{
DEFiRet;
int i;
- // TODO: we need to implement peek(), without it (today!) we lose one message upon
- // worker cancellation! -- rgerhards, 2008-01-14
+ // TODO: mutex?? // TODO: cancellation in wti!
ISOBJ_TYPE_assert(pThis, wtp);
/* process any pending thread requests so that we know who actually is still running */
wtpProcessThrdChanges(pThis);
- /* awake the workers one more time, just to be sure */
- wtpWakeupAllWrkr(pThis);
-
+RUNLOG_VAR("%d", pThis->iNumWorkerThreads);;
/* first tell the workers our request */
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
// TODO: mutex lock!
+RUNLOG_VAR("%p", pThis->pWrkr[i]);
if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) {
+RUNLOG;
dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
pthread_cancel(pThis->pWrkr[i]->thrdID);
}
}
+RUNLOG;
RETiRet;
}
@@ -492,9 +493,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
* we do NOT start a new one. Let's give the other one a chance, first.
*/
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- // TODO: sync!
-RUNLOG;
-dbgprintf("%s: i %d, wti_T* %p\n", wtpGetDbgHdr(pThis), i, pThis->pWrkr[i]);
if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
break;
}
@@ -510,7 +508,6 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis
dbgprintf("%s: started with state %d, num workers now %d\n",
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
-RUNLOG;
/* we try to give the starting worker a little boost. It won't help much as we still
* hold the queue's mutex, but at least it has a chance to start on a single-CPU system.
*/
@@ -521,7 +518,6 @@ RUNLOG;
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-RUNLOG;
RETiRet;
}