diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-03-14 11:04:36 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-03-14 11:04:36 +0000 |
commit | fdfcb2a8f953cc91abbe628366e3f5474a101670 (patch) | |
tree | 9dae7bb09bd99a7f98cc9edb609c48ce09b07013 /queue.c | |
parent | de0665a4755140715ad9d95098f6b3a7ff713bd2 (diff) | |
download | rsyslog-fdfcb2a8f953cc91abbe628366e3f5474a101670.tar.gz rsyslog-fdfcb2a8f953cc91abbe628366e3f5474a101670.tar.xz rsyslog-fdfcb2a8f953cc91abbe628366e3f5474a101670.zip |
added advanced flow control for congestion cases (mode depending on message
source and its capablity to be delayed without bad side effects)
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 86 |
1 files changed, 83 insertions, 3 deletions
@@ -1260,6 +1260,10 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, if((pThis->pszSpoolDir = (uchar*) strdup((char*)glblGetWorkDir())) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + /* set some water marks so that we have useful defaults if none are set specifically */ + pThis->iFullDlyMrk = (iMaxQueueSize < 100) ? iMaxQueueSize : 100; /* 100 should be far sufficient */ + pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 70; /* default 70% */ + pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; @@ -1404,6 +1408,21 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) pWti->pUsrp = pUsr; } + /* awake some flow-controlled sources if we can do this right now */ + /* TODO: this could be done better from a performance point of view -- do it only if + * we have someone waiting for the condition (or only when we hit the watermark right + * on the nail [exact value]) -- rgerhards, 2008-03-14 + */ + if(iQueueSize < pThis->iFullDlyMrk) { +dbgoprint((obj_t*) pThis, "queue size %d below FullDlyMrk %d\n", iQueueSize, pThis->iFullDlyMrk); + pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); + } + + if(iQueueSize < pThis->iLightDlyMrk) { +dbgoprint((obj_t*) pThis, "queue size %d below LightDlyMrk %d\n", iQueueSize, pThis->iLightDlyMrk); + pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); + } + d_pthread_mutex_unlock(pThis->mut); pthread_cond_signal(&pThis->notFull); pthread_setcancelstate(iCancelStateSave, NULL); @@ -1481,7 +1500,7 @@ queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(queueEnqObj(pThis->pqDA, pWti->pUsrp)); + CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1635,6 +1654,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ ASSERT(pThis != NULL); + /* we need to do a quick check if our water marks are set plausible. If not, + * we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14 + */ + /* finalize some initializations that could not yet be done because it is * influenced by properties which might have been set after queueConstruct () */ @@ -1651,6 +1674,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ pthread_cond_init (&pThis->condDAReady, NULL); pthread_cond_init (&pThis->notFull, NULL); pthread_cond_init (&pThis->notEmpty, NULL); + pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL); + pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL); /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ @@ -1893,6 +1918,8 @@ CODESTARTobjDestruct(queue) pthread_cond_destroy(&pThis->condDAReady); pthread_cond_destroy(&pThis->notFull); pthread_cond_destroy(&pThis->notEmpty); + pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); + pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); /* type-specific destructor */ iRet = pThis->qDestruct(pThis); @@ -1956,7 +1983,7 @@ finalize_it: * rgerhards, 2008-01-03 */ rsRetVal -queueEnqObj(queue_t *pThis, void *pUsr) +queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) { DEFiRet; int iCancelStateSave; @@ -1984,8 +2011,59 @@ queueEnqObj(queue_t *pThis, void *pUsr) CHKiRet(queueChkStrtDA(pThis)); + /* handle flow control + * There are two different flow control mechanisms: basic and advanced flow control. + * Basic flow control has always been implemented and protects the queue structures + * in that it makes sure no more data is enqueued than the queue is configured to + * support. Enhanced flow control is being added today. There are some sources which + * can easily be stopped, e.g. a file reader. This is the case because it is unlikely + * that blocking those sources will have negative effects (after all, the file is + * continued to be written). Other sources can somewhat be blocked (e.g. the kernel + * log reader or the local log stream reader): in general, nothing is lost if messages + * from these sources are not picked up immediately. HOWEVER, they can not block for + * an extended period of time, as this either causes message loss or - even worse - some + * other bad effects (e.g. unresponsive system in respect to the main system log socket). + * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is + * a prime example. If a UDP message is not received, it is simply lost. So we can't + * do anything against UDP sockets that come in too fast. The core idea of advanced + * flow control is that we take into account the different natures of the sources and + * select flow control mechanisms that fit these needs. This also means, in the end + * result, that non-blockable sources like UDP syslog receive priority in the system. + * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14 + */ +dbgprintf("enqueueMsg: flowctl mode: %d, queue size %d, FullDlyMrk %d\n", flowCtlType, pThis->iQueueSize, pThis->iFullDlyMrk); + if(flowCtlType == eFLOWCTL_FULL_DELAY) { + while(pThis->iQueueSize >= pThis->iFullDlyMrk) { + dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayble message - blocking.\n"); + pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */ + } + } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) { + while(pThis->iQueueSize >= pThis->iLightDlyMrk) { + dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayble message - blocking a bit.\n"); + timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ + pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */ + } + } + + /* from our regular flow control settings, we are now ready to enqueue the object. + * However, we now need to do a check if the queue permits to add more data. If that + * is not the case, basic flow control enters the field, which means we wait for + * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14 + */ + while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) + || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 + && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { + dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); + timeoutComp(&t, pThis->toEnq); + if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { + dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + objDestruct(pUsr); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } + } + +#if 0 // previous code, remove when done with advanced flow control /* wait for the queue to be ready... */ - //while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { @@ -1997,6 +2075,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) ABORT_FINALIZE(RS_RET_QUEUE_FULL); } } +#endif /* and finally enqueue the message */ CHKiRet(queueAdd(pThis, pUsr)); @@ -2080,6 +2159,7 @@ DEFpropSetMeth(queue, toEnq, long); DEFpropSetMeth(queue, iHighWtrMrk, int); DEFpropSetMeth(queue, iLowWtrMrk, int); DEFpropSetMeth(queue, iDiscardMrk, int); +DEFpropSetMeth(queue, iFullDlyMrk, int); DEFpropSetMeth(queue, iDiscardSeverity, int); DEFpropSetMeth(queue, bIsDA, int); DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); |