diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-27 11:38:26 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-27 11:38:26 +0000 |
commit | cd6501b3417262a4377b3944d386f72a26c82864 (patch) | |
tree | cc6013d1deefbb54a374a0c9016eb9d054b766c1 /queue.c | |
parent | 0c6c9dfe8a6da1553e39167600222cb6ab7e4b8b (diff) | |
download | rsyslog-cd6501b3417262a4377b3944d386f72a26c82864.tar.gz rsyslog-cd6501b3417262a4377b3944d386f72a26c82864.tar.xz rsyslog-cd6501b3417262a4377b3944d386f72a26c82864.zip |
reduced number of unnecessary wakeups of DA worker thread when high water
mark is not yet reached
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 112 |
1 files changed, 67 insertions, 45 deletions
@@ -69,7 +69,6 @@ static int queueChkStopWrkrDA(queue_t *pThis); static int queueIsIdleDA(queue_t *pThis); static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); -static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis); /* methods */ @@ -78,6 +77,45 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis); /* --------------- code for disk-assisted (DA) queue modes -------------------- */ +/* returns the number of workers that should be advised at + * this point in time. The mutex must be locked when + * ths function is called. -- rgerhards, 2008-01-25 + */ +static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) +{ + DEFiRet; + int iMaxWorkers; + + ISOBJ_TYPE_assert(pThis, queue); + +RUNLOG_VAR("%d", pThis->bEnqOnly); + if(!pThis->bEnqOnly) { +RUNLOG_VAR("%d", pThis->bRunsDA); + if(pThis->bRunsDA) { +RUNLOG_VAR("%d", pThis->bQueueStarted); +RUNLOG_VAR("%d", pThis->iQueueSize); +RUNLOG_VAR("%d", pThis->iHighWtrMrk); + /* if we have not yet reached the high water mark, there is no need to start a + * worker. -- rgerhards, 2008-01-26 + */ + if(pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { +RUNLOG; + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + } + } else { + if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; + } else { + iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1; + } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ + } + } + + RETiRet; +} + + /* Destruct DA queue. This is the last part of DA-to-normal-mode * transistion. This is called asynchronously and some time quite a * while after the actual transistion. The key point is that we need to @@ -141,41 +179,6 @@ dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQue } - -/* returns the number of workers that should be advised at - * this point in time. The mutex must be locked when - * ths function is called. -- rgerhards, 2008-01-25 - */ -static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) -{ - DEFiRet; - int iMaxWorkers; - - ISOBJ_TYPE_assert(pThis, queue); - -RUNLOG_VAR("%d", pThis->bEnqOnly); - if(!pThis->bEnqOnly) { - if(pThis->bRunsDA) { - /* if we have not yet reached the high water mark, there is no need to start a - * worker. -- rgerhards, 2008-01-26 - */ - // WRONG: if(pThis->iQueueSize >= pThis->iHighWtrMrk) { - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ - //} - } else { - if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1; - } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ - } - } - - RETiRet; -} - - /* check if we run in disk-assisted mode and record that * setting for easy (and quick!) access in the future. This * function must only be called from constructors and only @@ -337,7 +340,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) * that will also start one up. If we forgot that step, everything would be stalled * until the next enqueue request. */ - queueAdviseMaxWorkers(pThis); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */ finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1267,16 +1270,30 @@ queueChkStopWrkrDA(queue_t *pThis) /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate * until we have done so. */ - int bStopWrkr= - pThis->bEnqOnly - || !pThis->bRunsDA - || (pThis->pqDA != NULL && pThis->pqDA->iQueueSize == 0 && pThis->bQueueInDestruction); + int bStopWrkr; + + BEGINfunc + + if(pThis->bEnqOnly) { + bStopWrkr = 1; + } else { + if(pThis->bRunsDA) { + if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { + bStopWrkr = 1; + } else { + bStopWrkr = 0; + } + } else { + bStopWrkr = 1; + } + } + RUNLOG_VAR("%d", bStopWrkr); - return pThis->bEnqOnly - || !pThis->bRunsDA - ; - // || (pThis->pqDA != NULL && pThis->pqDA->iQueueSize == 0 && pThis->bQueueInDestruction); + ENDfunc + return bStopWrkr; } + + /* must only be called when the queue mutex is locked, else results * are not stable! * If we are a child, we have done our duty when the queue is empty. In that case, @@ -1450,6 +1467,11 @@ RUNLOG_VAR("%d", bInitialized); * the case when a disk queue has been loaded. If we did not start it here, it would never start. */ queueAdviseMaxWorkers(pThis); +#if 0 + if(!pThis->bEnqOnly && pThis->bRunsDA) { + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* there is always just one DA worker! */ + } +#endif pThis->bQueueStarted = 1; |