summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog2
-rw-r--r--action.c2
-rw-r--r--doc/rsyslog_ng_comparison.html6
-rw-r--r--msg.c19
-rw-r--r--msg.h3
-rw-r--r--plugins/imfile/imfile.c1
-rw-r--r--queue.c86
-rw-r--r--queue.h6
-rw-r--r--rsyslog.h10
-rw-r--r--syslogd.c4
10 files changed, 128 insertions, 11 deletions
diff --git a/ChangeLog b/ChangeLog
index 0a5218da..53a2f716 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,7 @@
---------------------------------------------------------------------------
Version 3.12.3 (rgerhards), 2008-03-??
+- added advanced flow control for congestion cases (mode depending on message
+ source and its capablity to be delayed without bad side effects)
- bugfix: $ModDir should not be reset on $ResetConfig - this can cause a lot
of confusion and there is no real good reason to do so. Also conflicts with
the new -M option and environment setting.
diff --git a/action.c b/action.c
index b4ec5651..0f4c1db6 100644
--- a/action.c
+++ b/action.c
@@ -541,7 +541,7 @@ actionWriteToAction(action_t *pAction)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = queueEnqObj(pAction->pQueue, (void*) MsgAddRef(pAction->f_pMsg));
+ iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
diff --git a/doc/rsyslog_ng_comparison.html b/doc/rsyslog_ng_comparison.html
index e6dfdd5f..547501af 100644
--- a/doc/rsyslog_ng_comparison.html
+++ b/doc/rsyslog_ng_comparison.html
@@ -469,10 +469,8 @@ system stress</td>
<tr>
<td height="43" valign="top">flow control
(slow down message reception when system is busy)</td>
-<td height="43" valign="top">limited (TCP
-Window, delay on queue full)</td>
-<td height="43" valign="top">yes (limited,
-too? "stops accepting messages")</td>
+<td height="43" valign="top">yes (advanced, with multiple ways to slow down inputs depending on individual input capabilities, based on watermarks)</td>
+<td height="43" valign="top">yes (limited? "stops accepting messages")</td>
</tr>
<tr>
<td valign="top">rewriting messages</td>
diff --git a/msg.c b/msg.c
index f99fd853..d4291a66 100644
--- a/msg.c
+++ b/msg.c
@@ -904,6 +904,25 @@ char *getFacilityStr(msg_t *pM)
}
+/* set flow control state (if not called, the default - NO_DELAY - is used)
+ * This needs no locking because it is only done while the object is
+ * not fully constructed (which also means you must not call this
+ * method after the msg has been handed over to a queue).
+ * rgerhards, 2008-03-14
+ */
+rsRetVal
+MsgSetFlowControlType(msg_t *pMsg, flowControl_t eFlowCtl)
+{
+ DEFiRet;
+ assert(pMsg != NULL);
+ assert(eFlowCtl == eFLOWCTL_NO_DELAY || eFlowCtl == eFLOWCTL_LIGHT_DELAY || eFlowCtl == eFLOWCTL_FULL_DELAY);
+
+ pMsg->flowCtlType = eFlowCtl;
+
+ RETiRet;
+}
+
+
/* rgerhards 2004-11-24: set APP-NAME in msg object
* TODO: revisit msg locking code!
*/
diff --git a/msg.h b/msg.h
index dd12b77c..61feaddb 100644
--- a/msg.h
+++ b/msg.h
@@ -59,6 +59,8 @@ struct msg {
* sockets. All in all, the parser would need parse templates, that would
* resolve all these issues... rgerhards, 2005-10-06
*/
+ flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because
+ once data has entered the queue, this property is no longer needed. */
short iSeverity; /* the severity 0..7 */
uchar *pszSeverity; /* severity as string... */
int iLenSeverity; /* ... and its length. */
@@ -137,6 +139,7 @@ char *getPROCID(msg_t *pM);
rsRetVal MsgSetMSGID(msg_t *pMsg, char* pszMSGID);
void MsgAssignTAG(msg_t *pMsg, uchar *pBuf);
void MsgSetTAG(msg_t *pMsg, char* pszTAG);
+rsRetVal MsgSetFlowControlType(msg_t *pMsg, flowControl_t eFlowCtl);
char *getTAG(msg_t *pM);
int getHOSTNAMELen(msg_t *pM);
char *getHOSTNAME(msg_t *pM);
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index b431fbbc..91f90cc3 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -90,6 +90,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
}
CHKiRet(msgConstruct(&pMsg));
+ MsgSetFlowControlType(pMsg, eFLOWCTL_FULL_DELAY);
MsgSetUxTradMsg(pMsg, (char*)rsCStrGetSzStr(cstrLine));
MsgSetRawMsg(pMsg, (char*)rsCStrGetSzStr(cstrLine));
MsgSetMSG(pMsg, (char*)rsCStrGetSzStr(cstrLine));
diff --git a/queue.c b/queue.c
index 859bff4f..1eded530 100644
--- a/queue.c
+++ b/queue.c
@@ -1260,6 +1260,10 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
if((pThis->pszSpoolDir = (uchar*) strdup((char*)glblGetWorkDir())) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ /* set some water marks so that we have useful defaults if none are set specifically */
+ pThis->iFullDlyMrk = (iMaxQueueSize < 100) ? iMaxQueueSize : 100; /* 100 should be far sufficient */
+ pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 70; /* default 70% */
+
pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
@@ -1404,6 +1408,21 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
pWti->pUsrp = pUsr;
}
+ /* awake some flow-controlled sources if we can do this right now */
+ /* TODO: this could be done better from a performance point of view -- do it only if
+ * we have someone waiting for the condition (or only when we hit the watermark right
+ * on the nail [exact value]) -- rgerhards, 2008-03-14
+ */
+ if(iQueueSize < pThis->iFullDlyMrk) {
+dbgoprint((obj_t*) pThis, "queue size %d below FullDlyMrk %d\n", iQueueSize, pThis->iFullDlyMrk);
+ pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
+ }
+
+ if(iQueueSize < pThis->iLightDlyMrk) {
+dbgoprint((obj_t*) pThis, "queue size %d below LightDlyMrk %d\n", iQueueSize, pThis->iLightDlyMrk);
+ pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
+ }
+
d_pthread_mutex_unlock(pThis->mut);
pthread_cond_signal(&pThis->notFull);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1481,7 +1500,7 @@ queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(queueEnqObj(pThis->pqDA, pWti->pUsrp));
+ CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1635,6 +1654,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
ASSERT(pThis != NULL);
+ /* we need to do a quick check if our water marks are set plausible. If not,
+ * we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14
+ */
+
/* finalize some initializations that could not yet be done because it is
* influenced by properties which might have been set after queueConstruct ()
*/
@@ -1651,6 +1674,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
pthread_cond_init (&pThis->condDAReady, NULL);
pthread_cond_init (&pThis->notFull, NULL);
pthread_cond_init (&pThis->notEmpty, NULL);
+ pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL);
+ pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL);
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
@@ -1893,6 +1918,8 @@ CODESTARTobjDestruct(queue)
pthread_cond_destroy(&pThis->condDAReady);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->notEmpty);
+ pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
+ pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
@@ -1956,7 +1983,7 @@ finalize_it:
* rgerhards, 2008-01-03
*/
rsRetVal
-queueEnqObj(queue_t *pThis, void *pUsr)
+queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
{
DEFiRet;
int iCancelStateSave;
@@ -1984,8 +2011,59 @@ queueEnqObj(queue_t *pThis, void *pUsr)
CHKiRet(queueChkStrtDA(pThis));
+ /* handle flow control
+ * There are two different flow control mechanisms: basic and advanced flow control.
+ * Basic flow control has always been implemented and protects the queue structures
+ * in that it makes sure no more data is enqueued than the queue is configured to
+ * support. Enhanced flow control is being added today. There are some sources which
+ * can easily be stopped, e.g. a file reader. This is the case because it is unlikely
+ * that blocking those sources will have negative effects (after all, the file is
+ * continued to be written). Other sources can somewhat be blocked (e.g. the kernel
+ * log reader or the local log stream reader): in general, nothing is lost if messages
+ * from these sources are not picked up immediately. HOWEVER, they can not block for
+ * an extended period of time, as this either causes message loss or - even worse - some
+ * other bad effects (e.g. unresponsive system in respect to the main system log socket).
+ * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is
+ * a prime example. If a UDP message is not received, it is simply lost. So we can't
+ * do anything against UDP sockets that come in too fast. The core idea of advanced
+ * flow control is that we take into account the different natures of the sources and
+ * select flow control mechanisms that fit these needs. This also means, in the end
+ * result, that non-blockable sources like UDP syslog receive priority in the system.
+ * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14
+ */
+dbgprintf("enqueueMsg: flowctl mode: %d, queue size %d, FullDlyMrk %d\n", flowCtlType, pThis->iQueueSize, pThis->iFullDlyMrk);
+ if(flowCtlType == eFLOWCTL_FULL_DELAY) {
+ while(pThis->iQueueSize >= pThis->iFullDlyMrk) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayble message - blocking.\n");
+ pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
+ }
+ } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
+ while(pThis->iQueueSize >= pThis->iLightDlyMrk) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayble message - blocking a bit.\n");
+ timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
+ pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
+ }
+ }
+
+ /* from our regular flow control settings, we are now ready to enqueue the object.
+ * However, we now need to do a check if the queue permits to add more data. If that
+ * is not the case, basic flow control enters the field, which means we wait for
+ * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14
+ */
+ while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
+ || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
+ && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
+ timeoutComp(&t, pThis->toEnq);
+ if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ objDestruct(pUsr);
+ ABORT_FINALIZE(RS_RET_QUEUE_FULL);
+ }
+ }
+
+#if 0 // previous code, remove when done with advanced flow control
/* wait for the queue to be ready... */
- //while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
|| (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
@@ -1997,6 +2075,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
}
+#endif
/* and finally enqueue the message */
CHKiRet(queueAdd(pThis, pUsr));
@@ -2080,6 +2159,7 @@ DEFpropSetMeth(queue, toEnq, long);
DEFpropSetMeth(queue, iHighWtrMrk, int);
DEFpropSetMeth(queue, iLowWtrMrk, int);
DEFpropSetMeth(queue, iDiscardMrk, int);
+DEFpropSetMeth(queue, iFullDlyMrk, int);
DEFpropSetMeth(queue, iDiscardSeverity, int);
DEFpropSetMeth(queue, bIsDA, int);
DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
diff --git a/queue.h b/queue.h
index 959c3b17..bc09fbd8 100644
--- a/queue.h
+++ b/queue.h
@@ -74,6 +74,8 @@ typedef struct queue_s {
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
int iLowWtrMrk; /* low water mark for disk-assisted memory queues */
int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */
+ int iFullDlyMrk; /* if the queue is above this mark, FULL_DELAYable message are put on hold */
+ int iLightDlyMrk; /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */
int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */
int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
int toQShutdown; /* timeout for regular queue shutdown in ms */
@@ -99,6 +101,8 @@ typedef struct queue_s {
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
pthread_cond_t notFull, notEmpty;
+ pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
+ pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */
int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */
int bThrdStateChanged; /* at least one thread state has changed if 1 */
@@ -161,7 +165,7 @@ typedef struct queue_s {
/* prototypes */
rsRetVal queueDestruct(queue_t **ppThis);
-rsRetVal queueEnqObj(queue_t *pThis, void *pUsr);
+rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flwCtlType, void *pUsr);
rsRetVal queueStart(queue_t *pThis);
rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize);
rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
diff --git a/rsyslog.h b/rsyslog.h
index 76c6d849..01329aaf 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -54,6 +54,16 @@ typedef unsigned int u_int32_t; /* TODO: is this correct? */
typedef int socklen_t;
#endif
+/* settings for flow control
+ * TODO: is there a better place for them? -- rgerhards, 2008-03-14
+ */
+typedef enum {
+ eFLOWCTL_NO_DELAY = 0, /**< UDP and other non-delayable sources */
+ eFLOWCTL_LIGHT_DELAY = 1, /**< some light delay possible, but no extended period of time */
+ eFLOWCTL_FULL_DELAY = 2 /**< delay possible for extended period of time */
+} flowControl_t;
+
+
/* The error codes below are orginally "borrowed" from
* liblogging. As such, we reserve values up to -2999
* just in case we need to borrow something more ;)
diff --git a/syslogd.c b/syslogd.c
index e50e4727..6ecf5865 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -1543,7 +1543,7 @@ submitMsg(msg_t *pMsg)
ISOBJ_TYPE_assert(pMsg, msg);
MsgPrepareEnqueue(pMsg);
- queueEnqObj(pMsgQueue, (void*) pMsg);
+ queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg);
RETiRet;
}
@@ -1640,7 +1640,7 @@ logmsg(msg_t *pMsg, int flags)
pMsg->msgFlags = flags;
MsgPrepareEnqueue(pMsg);
- queueEnqObj(pMsgQueue, (void*) pMsg);
+ queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg);
ENDfunc
}