diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-25 19:25:46 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-25 19:25:46 +0000 |
commit | 87f0e9b5f91407418a43a06f39831febfbd4e3ad (patch) | |
tree | 810a4191b8cfd14a4a2a19399dbe894b16b5e6ae /wtp.c | |
parent | 167abdb5b3fa6900edd6bbdb1cc7d586896a268c (diff) | |
download | rsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.tar.gz rsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.tar.xz rsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.zip |
disk-assisted queue mode finally begins to look good ;)
Diffstat (limited to 'wtp.c')
-rw-r--r-- | wtp.c | 51 |
1 files changed, 24 insertions, 27 deletions
@@ -135,8 +135,6 @@ wtpDestruct(wtp_t **ppThis) int iCancelStateSave; int i; -dbgPrintAllDebugInfo(); -RUNLOG; assert(ppThis != NULL); pThis = *ppThis; ISOBJ_TYPE_assert(pThis, wtp); @@ -179,9 +177,7 @@ wtpWakeupWrkr(wtp_t *pThis) // TODO; mutex? ISOBJ_TYPE_assert(pThis, wtp); -dbgprintf("wtpWakeupWrkr 1, cond %p\n", pThis->pcondBusy); pthread_cond_signal(pThis->pcondBusy); -dbgprintf("wtpWakeupWrkr 2\n"); RETiRet; } /* wake up all worker threads. @@ -211,10 +207,8 @@ wtpProcessThrdChanges(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - RUNLOG; if(pThis->bThrdStateChanged == 0) FINALIZE; - RUNLOG; /* go through all threads */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { @@ -222,7 +216,6 @@ wtpProcessThrdChanges(wtp_t *pThis) } finalize_it: - RUNLOG; RETiRet; } @@ -255,6 +248,8 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) DEFiRet; DEFVARS_mutexProtection; + ISOBJ_TYPE_assert(pThis, wtp); + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) @@ -281,14 +276,17 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout int bTimedOut; int iCancelStateSave; -dbgPrintAllDebugInfo(); -RUNLOG_VAR("%p", pThis); -RUNLOG_VAR("%d", tShutdownCmd); ISOBJ_TYPE_assert(pThis, wtp); wtpSetState(pThis, tShutdownCmd); wtpWakeupAllWrkr(pThis); + + /* see if we need to harvest (join) any terminated threads (even in timeout case, + * some may have terminated... + */ + wtpProcessThrdChanges(pThis); +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); /* and wait for their termination */ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -297,6 +295,7 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); @@ -315,7 +314,6 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); */ wtpProcessThrdChanges(pThis); -dbgprintf("wtpShutdownAll exit"); RETiRet; } @@ -346,6 +344,7 @@ wtpCancelAll(wtp_t *pThis) { DEFiRet; int i; + int numCancelled = 0; // TODO: mutex?? // TODO: cancellation in wti! ISOBJ_TYPE_assert(pThis, wtp); @@ -353,19 +352,17 @@ wtpCancelAll(wtp_t *pThis) /* process any pending thread requests so that we know who actually is still running */ wtpProcessThrdChanges(pThis); -RUNLOG_VAR("%d", pThis->iNumWorkerThreads);; /* first tell the workers our request */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { // TODO: mutex lock! -RUNLOG_VAR("%p", pThis->pWrkr[i]); if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { -RUNLOG; dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); pthread_cancel(pThis->pWrkr[i]->thrdID); + ++numCancelled; } } -RUNLOG; + dbgprintf("%s: cancelled %d worker threads\n", wtpGetDbgHdr(pThis), numCancelled); RETiRet; } @@ -380,13 +377,9 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex) DEFiRet; DEFVARS_mutexProtection; -RUNLOG; BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); -RUNLOG; pThis->bInactivityGuard = bNewState; -RUNLOG; END_MTX_PROTECTED_OPERATIONS(&pThis->mut); -RUNLOG; RETiRet; } @@ -403,6 +396,7 @@ wtpWrkrExecCancelCleanup(void *arg) ISOBJ_TYPE_assert(pThis, wtp); pThis->iCurNumWrkThrd--; +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); wtpSignalWrkrTermination(pThis); dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd); @@ -459,6 +453,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pthread_cleanup_pop(0); pThis->iCurNumWrkThrd--; +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); wtpSignalWrkrTermination(pThis); dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n", @@ -488,6 +483,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); pThis->iCurNumWrkThrd++; +dbgPrintAllDebugInfo(); +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); /* find free spot in thread table. If we find at least one worker that is in initialization, * we do NOT start a new one. Let's give the other one a chance, first. @@ -538,7 +535,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) int nMissing; /* number workers missing to run */ int i; - if(pThis == NULL) dbgPrintAllDebugInfo(); ISOBJ_TYPE_assert(pThis, wtp); dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetDbgHdr(pThis), nMaxWrkr, pThis->iCurNumWrkThrd, pThis->iNumWorkerThreads); @@ -547,11 +543,10 @@ dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetD BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); + if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */ + nMaxWrkr = pThis->iNumWorkerThreads; + nMissing = nMaxWrkr - pThis->iCurNumWrkThrd; - if(nMissing > pThis->iNumWorkerThreads) - nMissing = pThis->iNumWorkerThreads; - else if(nMissing < 0) - nMissing = 0; if(nMissing > 0) { dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); @@ -559,9 +554,11 @@ dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetD for(i = 0 ; i < nMissing ; ++i) { CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED)); } - } else { -dbgprintf("wtpAdviseMaxWorkers signals busy\n"); - wtpWakeupWrkr(pThis); + } else { + if(nMaxWrkr > 0) { + dbgprintf("wtpAdviseMaxWorkers signals busy\n"); + wtpWakeupWrkr(pThis); + } } |