summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-03-14 11:04:36 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-03-14 11:04:36 +0000
commitfdfcb2a8f953cc91abbe628366e3f5474a101670 (patch)
tree9dae7bb09bd99a7f98cc9edb609c48ce09b07013 /queue.c
parentde0665a4755140715ad9d95098f6b3a7ff713bd2 (diff)
downloadrsyslog-fdfcb2a8f953cc91abbe628366e3f5474a101670.tar.gz
rsyslog-fdfcb2a8f953cc91abbe628366e3f5474a101670.tar.xz
rsyslog-fdfcb2a8f953cc91abbe628366e3f5474a101670.zip
added advanced flow control for congestion cases (mode depending on message
source and its capablity to be delayed without bad side effects)
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c86
1 files changed, 83 insertions, 3 deletions
diff --git a/queue.c b/queue.c
index 859bff4f..1eded530 100644
--- a/queue.c
+++ b/queue.c
@@ -1260,6 +1260,10 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
if((pThis->pszSpoolDir = (uchar*) strdup((char*)glblGetWorkDir())) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ /* set some water marks so that we have useful defaults if none are set specifically */
+ pThis->iFullDlyMrk = (iMaxQueueSize < 100) ? iMaxQueueSize : 100; /* 100 should be far sufficient */
+ pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 70; /* default 70% */
+
pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
@@ -1404,6 +1408,21 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
pWti->pUsrp = pUsr;
}
+ /* awake some flow-controlled sources if we can do this right now */
+ /* TODO: this could be done better from a performance point of view -- do it only if
+ * we have someone waiting for the condition (or only when we hit the watermark right
+ * on the nail [exact value]) -- rgerhards, 2008-03-14
+ */
+ if(iQueueSize < pThis->iFullDlyMrk) {
+dbgoprint((obj_t*) pThis, "queue size %d below FullDlyMrk %d\n", iQueueSize, pThis->iFullDlyMrk);
+ pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
+ }
+
+ if(iQueueSize < pThis->iLightDlyMrk) {
+dbgoprint((obj_t*) pThis, "queue size %d below LightDlyMrk %d\n", iQueueSize, pThis->iLightDlyMrk);
+ pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
+ }
+
d_pthread_mutex_unlock(pThis->mut);
pthread_cond_signal(&pThis->notFull);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1481,7 +1500,7 @@ queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(queueEnqObj(pThis->pqDA, pWti->pUsrp));
+ CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1635,6 +1654,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
ASSERT(pThis != NULL);
+ /* we need to do a quick check if our water marks are set plausible. If not,
+ * we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14
+ */
+
/* finalize some initializations that could not yet be done because it is
* influenced by properties which might have been set after queueConstruct ()
*/
@@ -1651,6 +1674,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
pthread_cond_init (&pThis->condDAReady, NULL);
pthread_cond_init (&pThis->notFull, NULL);
pthread_cond_init (&pThis->notEmpty, NULL);
+ pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL);
+ pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL);
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
@@ -1893,6 +1918,8 @@ CODESTARTobjDestruct(queue)
pthread_cond_destroy(&pThis->condDAReady);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->notEmpty);
+ pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
+ pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
@@ -1956,7 +1983,7 @@ finalize_it:
* rgerhards, 2008-01-03
*/
rsRetVal
-queueEnqObj(queue_t *pThis, void *pUsr)
+queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
{
DEFiRet;
int iCancelStateSave;
@@ -1984,8 +2011,59 @@ queueEnqObj(queue_t *pThis, void *pUsr)
CHKiRet(queueChkStrtDA(pThis));
+ /* handle flow control
+ * There are two different flow control mechanisms: basic and advanced flow control.
+ * Basic flow control has always been implemented and protects the queue structures
+ * in that it makes sure no more data is enqueued than the queue is configured to
+ * support. Enhanced flow control is being added today. There are some sources which
+ * can easily be stopped, e.g. a file reader. This is the case because it is unlikely
+ * that blocking those sources will have negative effects (after all, the file is
+ * continued to be written). Other sources can somewhat be blocked (e.g. the kernel
+ * log reader or the local log stream reader): in general, nothing is lost if messages
+ * from these sources are not picked up immediately. HOWEVER, they can not block for
+ * an extended period of time, as this either causes message loss or - even worse - some
+ * other bad effects (e.g. unresponsive system in respect to the main system log socket).
+ * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is
+ * a prime example. If a UDP message is not received, it is simply lost. So we can't
+ * do anything against UDP sockets that come in too fast. The core idea of advanced
+ * flow control is that we take into account the different natures of the sources and
+ * select flow control mechanisms that fit these needs. This also means, in the end
+ * result, that non-blockable sources like UDP syslog receive priority in the system.
+ * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14
+ */
+dbgprintf("enqueueMsg: flowctl mode: %d, queue size %d, FullDlyMrk %d\n", flowCtlType, pThis->iQueueSize, pThis->iFullDlyMrk);
+ if(flowCtlType == eFLOWCTL_FULL_DELAY) {
+ while(pThis->iQueueSize >= pThis->iFullDlyMrk) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayble message - blocking.\n");
+ pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
+ }
+ } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
+ while(pThis->iQueueSize >= pThis->iLightDlyMrk) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayble message - blocking a bit.\n");
+ timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
+ pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
+ }
+ }
+
+ /* from our regular flow control settings, we are now ready to enqueue the object.
+ * However, we now need to do a check if the queue permits to add more data. If that
+ * is not the case, basic flow control enters the field, which means we wait for
+ * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14
+ */
+ while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
+ || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
+ && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
+ timeoutComp(&t, pThis->toEnq);
+ if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ objDestruct(pUsr);
+ ABORT_FINALIZE(RS_RET_QUEUE_FULL);
+ }
+ }
+
+#if 0 // previous code, remove when done with advanced flow control
/* wait for the queue to be ready... */
- //while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
|| (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
@@ -1997,6 +2075,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
}
+#endif
/* and finally enqueue the message */
CHKiRet(queueAdd(pThis, pUsr));
@@ -2080,6 +2159,7 @@ DEFpropSetMeth(queue, toEnq, long);
DEFpropSetMeth(queue, iHighWtrMrk, int);
DEFpropSetMeth(queue, iLowWtrMrk, int);
DEFpropSetMeth(queue, iDiscardMrk, int);
+DEFpropSetMeth(queue, iFullDlyMrk, int);
DEFpropSetMeth(queue, iDiscardSeverity, int);
DEFpropSetMeth(queue, bIsDA, int);
DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);