summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-27 11:38:26 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-27 11:38:26 +0000
commitcd6501b3417262a4377b3944d386f72a26c82864 (patch)
treecc6013d1deefbb54a374a0c9016eb9d054b766c1 /queue.c
parent0c6c9dfe8a6da1553e39167600222cb6ab7e4b8b (diff)
downloadrsyslog-cd6501b3417262a4377b3944d386f72a26c82864.tar.gz
rsyslog-cd6501b3417262a4377b3944d386f72a26c82864.tar.xz
rsyslog-cd6501b3417262a4377b3944d386f72a26c82864.zip
reduced number of unnecessary wakeups of DA worker thread when high water
mark is not yet reached
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c112
1 files changed, 67 insertions, 45 deletions
diff --git a/queue.c b/queue.c
index ae0796c1..ac891584 100644
--- a/queue.c
+++ b/queue.c
@@ -69,7 +69,6 @@ static int queueChkStopWrkrDA(queue_t *pThis);
static int queueIsIdleDA(queue_t *pThis);
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
-static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis);
/* methods */
@@ -78,6 +77,45 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis);
/* --------------- code for disk-assisted (DA) queue modes -------------------- */
+/* returns the number of workers that should be advised at
+ * this point in time. The mutex must be locked when
+ * ths function is called. -- rgerhards, 2008-01-25
+ */
+static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
+{
+ DEFiRet;
+ int iMaxWorkers;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+RUNLOG_VAR("%d", pThis->bEnqOnly);
+ if(!pThis->bEnqOnly) {
+RUNLOG_VAR("%d", pThis->bRunsDA);
+ if(pThis->bRunsDA) {
+RUNLOG_VAR("%d", pThis->bQueueStarted);
+RUNLOG_VAR("%d", pThis->iQueueSize);
+RUNLOG_VAR("%d", pThis->iHighWtrMrk);
+ /* if we have not yet reached the high water mark, there is no need to start a
+ * worker. -- rgerhards, 2008-01-26
+ */
+ if(pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
+RUNLOG;
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
+ }
+ } else {
+ if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
+ } else {
+ iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1;
+ }
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
+ }
+ }
+
+ RETiRet;
+}
+
+
/* Destruct DA queue. This is the last part of DA-to-normal-mode
* transistion. This is called asynchronously and some time quite a
* while after the actual transistion. The key point is that we need to
@@ -141,41 +179,6 @@ dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQue
}
-
-/* returns the number of workers that should be advised at
- * this point in time. The mutex must be locked when
- * ths function is called. -- rgerhards, 2008-01-25
- */
-static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
-{
- DEFiRet;
- int iMaxWorkers;
-
- ISOBJ_TYPE_assert(pThis, queue);
-
-RUNLOG_VAR("%d", pThis->bEnqOnly);
- if(!pThis->bEnqOnly) {
- if(pThis->bRunsDA) {
- /* if we have not yet reached the high water mark, there is no need to start a
- * worker. -- rgerhards, 2008-01-26
- */
- // WRONG: if(pThis->iQueueSize >= pThis->iHighWtrMrk) {
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
- //}
- } else {
- if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
- } else {
- iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1;
- }
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
- }
- }
-
- RETiRet;
-}
-
-
/* check if we run in disk-assisted mode and record that
* setting for easy (and quick!) access in the future. This
* function must only be called from constructors and only
@@ -337,7 +340,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
* that will also start one up. If we forgot that step, everything would be stalled
* until the next enqueue request.
*/
- queueAdviseMaxWorkers(pThis);
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */
finalize_it:
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1267,16 +1270,30 @@ queueChkStopWrkrDA(queue_t *pThis)
/* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
* until we have done so.
*/
- int bStopWrkr=
- pThis->bEnqOnly
- || !pThis->bRunsDA
- || (pThis->pqDA != NULL && pThis->pqDA->iQueueSize == 0 && pThis->bQueueInDestruction);
+ int bStopWrkr;
+
+ BEGINfunc
+
+ if(pThis->bEnqOnly) {
+ bStopWrkr = 1;
+ } else {
+ if(pThis->bRunsDA) {
+ if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
+ bStopWrkr = 1;
+ } else {
+ bStopWrkr = 0;
+ }
+ } else {
+ bStopWrkr = 1;
+ }
+ }
+
RUNLOG_VAR("%d", bStopWrkr);
- return pThis->bEnqOnly
- || !pThis->bRunsDA
- ;
- // || (pThis->pqDA != NULL && pThis->pqDA->iQueueSize == 0 && pThis->bQueueInDestruction);
+ ENDfunc
+ return bStopWrkr;
}
+
+
/* must only be called when the queue mutex is locked, else results
* are not stable!
* If we are a child, we have done our duty when the queue is empty. In that case,
@@ -1450,6 +1467,11 @@ RUNLOG_VAR("%d", bInitialized);
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
*/
queueAdviseMaxWorkers(pThis);
+#if 0
+ if(!pThis->bEnqOnly && pThis->bRunsDA) {
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* there is always just one DA worker! */
+ }
+#endif
pThis->bQueueStarted = 1;