diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-25 10:45:25 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-25 10:45:25 +0000 |
commit | 167abdb5b3fa6900edd6bbdb1cc7d586896a268c (patch) | |
tree | bed714a9789bd3f7bd2c86039dfdd4196471b85a /wtp.c | |
parent | 5c686c8adcc473cbdbb14e4b2d736f9123210ee6 (diff) | |
download | rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.tar.gz rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.tar.xz rsyslog-167abdb5b3fa6900edd6bbdb1cc7d586896a268c.zip |
restructured queue shutdown so that the queue timeout is properly applied
before terminatiing the queue
Diffstat (limited to 'wtp.c')
-rw-r--r-- | wtp.c | 32 |
1 files changed, 14 insertions, 18 deletions
@@ -106,7 +106,6 @@ wtpConstructFinalize(wtp_t *pThis) /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ -RUNLOG; if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -276,22 +275,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) * rgerhards, 2008-01-14 */ rsRetVal -wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout) +wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout) { DEFiRet; int bTimedOut; - struct timespec t; int iCancelStateSave; dbgPrintAllDebugInfo(); RUNLOG_VAR("%p", pThis); -RUNLOG_VAR("%ld", iTimeout); RUNLOG_VAR("%d", tShutdownCmd); ISOBJ_TYPE_assert(pThis, wtp); wtpSetState(pThis, tShutdownCmd); wtpWakeupAllWrkr(pThis); - timeoutComp(&t, iTimeout);/* get timeout */ /* and wait for their termination */ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); @@ -302,9 +298,9 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", - wtpGetDbgHdr(pThis), iTimeout, pThis->iCurNumWrkThrd); + wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); - if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, &t) != 0) { + if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, ptTimeout) != 0) { dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } @@ -313,6 +309,11 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); if(bTimedOut) iRet = RS_RET_TIMED_OUT; + + /* see if we need to harvest (join) any terminated threads (even in timeout case, + * some may have terminated... + */ + wtpProcessThrdChanges(pThis); dbgprintf("wtpShutdownAll exit"); RETiRet; @@ -345,25 +346,25 @@ wtpCancelAll(wtp_t *pThis) { DEFiRet; int i; - // TODO: we need to implement peek(), without it (today!) we lose one message upon - // worker cancellation! -- rgerhards, 2008-01-14 + // TODO: mutex?? // TODO: cancellation in wti! ISOBJ_TYPE_assert(pThis, wtp); /* process any pending thread requests so that we know who actually is still running */ wtpProcessThrdChanges(pThis); - /* awake the workers one more time, just to be sure */ - wtpWakeupAllWrkr(pThis); - +RUNLOG_VAR("%d", pThis->iNumWorkerThreads);; /* first tell the workers our request */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { // TODO: mutex lock! +RUNLOG_VAR("%p", pThis->pWrkr[i]); if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { +RUNLOG; dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); pthread_cancel(pThis->pWrkr[i]->thrdID); } } +RUNLOG; RETiRet; } @@ -492,9 +493,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) * we do NOT start a new one. Let's give the other one a chance, first. */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - // TODO: sync! -RUNLOG; -dbgprintf("%s: i %d, wti_T* %p\n", wtpGetDbgHdr(pThis), i, pThis->pWrkr[i]); if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) { break; } @@ -510,7 +508,6 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); -RUNLOG; /* we try to give the starting worker a little boost. It won't help much as we still * hold the queue's mutex, but at least it has a chance to start on a single-CPU system. */ @@ -521,7 +518,6 @@ RUNLOG; finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); -RUNLOG; RETiRet; } |