diff options
-rw-r--r-- | ChangeLog | 2 | ||||
-rw-r--r-- | action.c | 2 | ||||
-rw-r--r-- | doc/rsyslog_ng_comparison.html | 6 | ||||
-rw-r--r-- | msg.c | 19 | ||||
-rw-r--r-- | msg.h | 3 | ||||
-rw-r--r-- | plugins/imfile/imfile.c | 1 | ||||
-rw-r--r-- | queue.c | 86 | ||||
-rw-r--r-- | queue.h | 6 | ||||
-rw-r--r-- | rsyslog.h | 10 | ||||
-rw-r--r-- | syslogd.c | 4 |
10 files changed, 128 insertions, 11 deletions
@@ -1,5 +1,7 @@ --------------------------------------------------------------------------- Version 3.12.3 (rgerhards), 2008-03-?? +- added advanced flow control for congestion cases (mode depending on message + source and its capablity to be delayed without bad side effects) - bugfix: $ModDir should not be reset on $ResetConfig - this can cause a lot of confusion and there is no real good reason to do so. Also conflicts with the new -M option and environment setting. @@ -541,7 +541,7 @@ actionWriteToAction(action_t *pAction) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = queueEnqObj(pAction->pQueue, (void*) MsgAddRef(pAction->f_pMsg)); + iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ diff --git a/doc/rsyslog_ng_comparison.html b/doc/rsyslog_ng_comparison.html index e6dfdd5f..547501af 100644 --- a/doc/rsyslog_ng_comparison.html +++ b/doc/rsyslog_ng_comparison.html @@ -469,10 +469,8 @@ system stress</td> <tr> <td height="43" valign="top">flow control (slow down message reception when system is busy)</td> -<td height="43" valign="top">limited (TCP -Window, delay on queue full)</td> -<td height="43" valign="top">yes (limited, -too? "stops accepting messages")</td> +<td height="43" valign="top">yes (advanced, with multiple ways to slow down inputs depending on individual input capabilities, based on watermarks)</td> +<td height="43" valign="top">yes (limited? "stops accepting messages")</td> </tr> <tr> <td valign="top">rewriting messages</td> @@ -904,6 +904,25 @@ char *getFacilityStr(msg_t *pM) } +/* set flow control state (if not called, the default - NO_DELAY - is used) + * This needs no locking because it is only done while the object is + * not fully constructed (which also means you must not call this + * method after the msg has been handed over to a queue). + * rgerhards, 2008-03-14 + */ +rsRetVal +MsgSetFlowControlType(msg_t *pMsg, flowControl_t eFlowCtl) +{ + DEFiRet; + assert(pMsg != NULL); + assert(eFlowCtl == eFLOWCTL_NO_DELAY || eFlowCtl == eFLOWCTL_LIGHT_DELAY || eFlowCtl == eFLOWCTL_FULL_DELAY); + + pMsg->flowCtlType = eFlowCtl; + + RETiRet; +} + + /* rgerhards 2004-11-24: set APP-NAME in msg object * TODO: revisit msg locking code! */ @@ -59,6 +59,8 @@ struct msg { * sockets. All in all, the parser would need parse templates, that would * resolve all these issues... rgerhards, 2005-10-06 */ + flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because + once data has entered the queue, this property is no longer needed. */ short iSeverity; /* the severity 0..7 */ uchar *pszSeverity; /* severity as string... */ int iLenSeverity; /* ... and its length. */ @@ -137,6 +139,7 @@ char *getPROCID(msg_t *pM); rsRetVal MsgSetMSGID(msg_t *pMsg, char* pszMSGID); void MsgAssignTAG(msg_t *pMsg, uchar *pBuf); void MsgSetTAG(msg_t *pMsg, char* pszTAG); +rsRetVal MsgSetFlowControlType(msg_t *pMsg, flowControl_t eFlowCtl); char *getTAG(msg_t *pM); int getHOSTNAMELen(msg_t *pM); char *getHOSTNAME(msg_t *pM); diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index b431fbbc..91f90cc3 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -90,6 +90,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) } CHKiRet(msgConstruct(&pMsg)); + MsgSetFlowControlType(pMsg, eFLOWCTL_FULL_DELAY); MsgSetUxTradMsg(pMsg, (char*)rsCStrGetSzStr(cstrLine)); MsgSetRawMsg(pMsg, (char*)rsCStrGetSzStr(cstrLine)); MsgSetMSG(pMsg, (char*)rsCStrGetSzStr(cstrLine)); @@ -1260,6 +1260,10 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, if((pThis->pszSpoolDir = (uchar*) strdup((char*)glblGetWorkDir())) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + /* set some water marks so that we have useful defaults if none are set specifically */ + pThis->iFullDlyMrk = (iMaxQueueSize < 100) ? iMaxQueueSize : 100; /* 100 should be far sufficient */ + pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 70; /* default 70% */ + pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; @@ -1404,6 +1408,21 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) pWti->pUsrp = pUsr; } + /* awake some flow-controlled sources if we can do this right now */ + /* TODO: this could be done better from a performance point of view -- do it only if + * we have someone waiting for the condition (or only when we hit the watermark right + * on the nail [exact value]) -- rgerhards, 2008-03-14 + */ + if(iQueueSize < pThis->iFullDlyMrk) { +dbgoprint((obj_t*) pThis, "queue size %d below FullDlyMrk %d\n", iQueueSize, pThis->iFullDlyMrk); + pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); + } + + if(iQueueSize < pThis->iLightDlyMrk) { +dbgoprint((obj_t*) pThis, "queue size %d below LightDlyMrk %d\n", iQueueSize, pThis->iLightDlyMrk); + pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); + } + d_pthread_mutex_unlock(pThis->mut); pthread_cond_signal(&pThis->notFull); pthread_setcancelstate(iCancelStateSave, NULL); @@ -1481,7 +1500,7 @@ queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(queueEnqObj(pThis->pqDA, pWti->pUsrp)); + CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1635,6 +1654,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ ASSERT(pThis != NULL); + /* we need to do a quick check if our water marks are set plausible. If not, + * we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14 + */ + /* finalize some initializations that could not yet be done because it is * influenced by properties which might have been set after queueConstruct () */ @@ -1651,6 +1674,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ pthread_cond_init (&pThis->condDAReady, NULL); pthread_cond_init (&pThis->notFull, NULL); pthread_cond_init (&pThis->notEmpty, NULL); + pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL); + pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL); /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ @@ -1893,6 +1918,8 @@ CODESTARTobjDestruct(queue) pthread_cond_destroy(&pThis->condDAReady); pthread_cond_destroy(&pThis->notFull); pthread_cond_destroy(&pThis->notEmpty); + pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); + pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); /* type-specific destructor */ iRet = pThis->qDestruct(pThis); @@ -1956,7 +1983,7 @@ finalize_it: * rgerhards, 2008-01-03 */ rsRetVal -queueEnqObj(queue_t *pThis, void *pUsr) +queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) { DEFiRet; int iCancelStateSave; @@ -1984,8 +2011,59 @@ queueEnqObj(queue_t *pThis, void *pUsr) CHKiRet(queueChkStrtDA(pThis)); + /* handle flow control + * There are two different flow control mechanisms: basic and advanced flow control. + * Basic flow control has always been implemented and protects the queue structures + * in that it makes sure no more data is enqueued than the queue is configured to + * support. Enhanced flow control is being added today. There are some sources which + * can easily be stopped, e.g. a file reader. This is the case because it is unlikely + * that blocking those sources will have negative effects (after all, the file is + * continued to be written). Other sources can somewhat be blocked (e.g. the kernel + * log reader or the local log stream reader): in general, nothing is lost if messages + * from these sources are not picked up immediately. HOWEVER, they can not block for + * an extended period of time, as this either causes message loss or - even worse - some + * other bad effects (e.g. unresponsive system in respect to the main system log socket). + * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is + * a prime example. If a UDP message is not received, it is simply lost. So we can't + * do anything against UDP sockets that come in too fast. The core idea of advanced + * flow control is that we take into account the different natures of the sources and + * select flow control mechanisms that fit these needs. This also means, in the end + * result, that non-blockable sources like UDP syslog receive priority in the system. + * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14 + */ +dbgprintf("enqueueMsg: flowctl mode: %d, queue size %d, FullDlyMrk %d\n", flowCtlType, pThis->iQueueSize, pThis->iFullDlyMrk); + if(flowCtlType == eFLOWCTL_FULL_DELAY) { + while(pThis->iQueueSize >= pThis->iFullDlyMrk) { + dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayble message - blocking.\n"); + pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */ + } + } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) { + while(pThis->iQueueSize >= pThis->iLightDlyMrk) { + dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayble message - blocking a bit.\n"); + timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ + pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */ + } + } + + /* from our regular flow control settings, we are now ready to enqueue the object. + * However, we now need to do a check if the queue permits to add more data. If that + * is not the case, basic flow control enters the field, which means we wait for + * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14 + */ + while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) + || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 + && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { + dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); + timeoutComp(&t, pThis->toEnq); + if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { + dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + objDestruct(pUsr); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } + } + +#if 0 // previous code, remove when done with advanced flow control /* wait for the queue to be ready... */ - //while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { @@ -1997,6 +2075,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) ABORT_FINALIZE(RS_RET_QUEUE_FULL); } } +#endif /* and finally enqueue the message */ CHKiRet(queueAdd(pThis, pUsr)); @@ -2080,6 +2159,7 @@ DEFpropSetMeth(queue, toEnq, long); DEFpropSetMeth(queue, iHighWtrMrk, int); DEFpropSetMeth(queue, iLowWtrMrk, int); DEFpropSetMeth(queue, iDiscardMrk, int); +DEFpropSetMeth(queue, iFullDlyMrk, int); DEFpropSetMeth(queue, iDiscardSeverity, int); DEFpropSetMeth(queue, bIsDA, int); DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); @@ -74,6 +74,8 @@ typedef struct queue_s { int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ + int iFullDlyMrk; /* if the queue is above this mark, FULL_DELAYable message are put on hold */ + int iLightDlyMrk; /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */ int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */ int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */ int toQShutdown; /* timeout for regular queue shutdown in ms */ @@ -99,6 +101,8 @@ typedef struct queue_s { pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ pthread_cond_t notFull, notEmpty; + pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ + pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */ int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */ int bThrdStateChanged; /* at least one thread state has changed if 1 */ @@ -161,7 +165,7 @@ typedef struct queue_s { /* prototypes */ rsRetVal queueDestruct(queue_t **ppThis); -rsRetVal queueEnqObj(queue_t *pThis, void *pUsr); +rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flwCtlType, void *pUsr); rsRetVal queueStart(queue_t *pThis); rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize); rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); @@ -54,6 +54,16 @@ typedef unsigned int u_int32_t; /* TODO: is this correct? */ typedef int socklen_t; #endif +/* settings for flow control + * TODO: is there a better place for them? -- rgerhards, 2008-03-14 + */ +typedef enum { + eFLOWCTL_NO_DELAY = 0, /**< UDP and other non-delayable sources */ + eFLOWCTL_LIGHT_DELAY = 1, /**< some light delay possible, but no extended period of time */ + eFLOWCTL_FULL_DELAY = 2 /**< delay possible for extended period of time */ +} flowControl_t; + + /* The error codes below are orginally "borrowed" from * liblogging. As such, we reserve values up to -2999 * just in case we need to borrow something more ;) @@ -1543,7 +1543,7 @@ submitMsg(msg_t *pMsg) ISOBJ_TYPE_assert(pMsg, msg); MsgPrepareEnqueue(pMsg); - queueEnqObj(pMsgQueue, (void*) pMsg); + queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); RETiRet; } @@ -1640,7 +1640,7 @@ logmsg(msg_t *pMsg, int flags) pMsg->msgFlags = flags; MsgPrepareEnqueue(pMsg); - queueEnqObj(pMsgQueue, (void*) pMsg); + queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); ENDfunc } |