From c77519ab7b1fe246039bfdd99dbf6f17c44af449 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 Jan 2008 17:39:46 +0000 Subject: - implemented the $ActionResumeRetryCount config directive - added queue between main queue and action executor (currently works in "direct" mode only, else crashes) - added $ActionQueueFilename config directive - added $ActionQueueSize config directive - added $ActionQueueHighWaterMark config directive - added $ActionQueueLowWaterMark config directive - added $ActionQueueDiscardMark config directive - added $ActionQueueDiscardSeverity config directive - added $ActionQueueCheckpointInterval config directive - added $ActionQueueType config directive - added $ActionQueueWorkerThreads config directive - added $ActionQueueTimeoutshutdown config directive - added $ActionQueueTimeoutActionCompletion config directive - added $ActionQueueTimeoutenQueue config directive - added $ActionQueueTimeoutworkerThreadShutdown config directive - added $ActionQueueWorkerThreadMinimumMessages config directive - added $ActionQueueMaxFileSize config directive - added $ActionQueueSaveonShutdown config directive --- ChangeLog | 16 +++++++ action.c | 149 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- action.h | 6 ++- queue.c | 7 +-- queue.h | 11 ++++- syslogd.c | 30 ++++++++++--- 6 files changed, 206 insertions(+), 13 deletions(-) diff --git a/ChangeLog b/ChangeLog index 0d9cf9ec..8e3047d3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,22 @@ --------------------------------------------------------------------------- Version 3.10.4 (rgerhards), 2008-01-?? - implemented the $ActionResumeRetryCount config directive +- added $ActionQueueFilename config directive +- added $ActionQueueSize config directive +- added $ActionQueueHighWaterMark config directive +- added $ActionQueueLowWaterMark config directive +- added $ActionQueueDiscardMark config directive +- added $ActionQueueDiscardSeverity config directive +- added $ActionQueueCheckpointInterval config directive +- added $ActionQueueType config directive +- added $ActionQueueWorkerThreads config directive +- added $ActionQueueTimeoutshutdown config directive +- added $ActionQueueTimeoutActionCompletion config directive +- added $ActionQueueTimeoutenQueue config directive +- added $ActionQueueTimeoutworkerThreadShutdown config directive +- added $ActionQueueWorkerThreadMinimumMessages config directive +- added $ActionQueueMaxFileSize config directive +- added $ActionQueueSaveonShutdown config directive --------------------------------------------------------------------------- Version 3.10.3 (rgerhards), 2008-01-28 - fixed a bug with standard template definitions (not a big deal) - thanks diff --git a/action.c b/action.c index 2154fba2..8a269e23 100644 --- a/action.c +++ b/action.c @@ -29,6 +29,8 @@ #include #include #include +#include +#include #include #include "syslogd.h" @@ -36,13 +38,35 @@ #include "action.h" #include "modules.h" #include "sync.h" +#include "cfsysline.h" #include "srUtils.h" +/* forward definitions */ +rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg); + /* object static data (once for all instances) */ static int glbliActionResumeInterval = 30; int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */ +/* main message queue and its configuration parameters */ +static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ +static int iActionQueueSize = 10000; /* size of the main message queue above */ +static int iActionQHighWtrMark = 8000; /* high water mark for disk-assisted queues */ +static int iActionQLowWtrMark = 2000; /* low water mark for disk-assisted queues */ +static int iActionQDiscardMark = 9800; /* begin to discard messages */ +static int iActionQDiscardSeverity = 4; /* discard warning and above */ +static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ +static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */ +static size_t iActionQueMaxFileSize = 1024*1024; +static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ +static int iActionQtoQShutdown = 0; /* queue shutdown */ +static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ +static int iActionQtoEnq = 2000; /* timeout for queue enque */ +static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ +static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ +static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + /* destructs an action descriptor object * rgerhards, 2007-08-01 */ @@ -91,6 +115,55 @@ finalize_it: } +/* action construction finalizer + */ +rsRetVal +actionConstructFinalize(action_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + /* create queue */ + CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction)); + + + /* ... set some properties ... */ +# define setQPROP(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ + logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ + } +# define setQPROPstr(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ + logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ + } + + queueSetpUsr(pThis->pQueue, pThis); + setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); + setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); + setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); + setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); + setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); + setQPROP(queueSettoWrkShutdown, "$ActionQueueTimeoutWorkerThreadShutdown", iActionQtoWrkShutdown); + setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); + setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); + setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); + setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); + setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); + setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); + setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); + +# undef setQPROP +# undef setQPROPstr + + CHKiRet(queueStart(pThis->pQueue)); + dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue); + +finalize_it: + RETiRet; +} + + /* set an action back to active state -- rgerhards, 2007-08-02 */ static rsRetVal actionResume(action_t *pThis) @@ -194,11 +267,23 @@ rsRetVal actionDbgPrint(action_t *pThis) } +/* schedule the message for processing + * rgerhards, 2008-01-28 + */ +rsRetVal +actionDoAction(action_t *pAction) +{ + DEFiRet; + iRet = queueEnqObj(pAction->pQueue, (void*) pAction->f_pMsg); + RETiRet; +} + + /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ rsRetVal -actionCallDoAction(action_t *pAction) +actionCallDoAction(action_t *pAction, msg_t *pMsg) { DEFiRet; int iRetries; @@ -209,7 +294,7 @@ actionCallDoAction(action_t *pAction) /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { - CHKiRet(tplToString(pAction->ppTpl[i], pAction->f_pMsg, &pAction->ppMsgs[i])); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &pAction->ppMsgs[i])); } iRetries = 0; @@ -256,6 +341,66 @@ finalize_it: RETiRet; } +/* set the action message queue mode + * TODO: probably move this into queue object, merge with MainMsgQueue! + * rgerhards, 2008-01-28 + */ +static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszType) +{ + DEFiRet; + + if (!strcasecmp((char *) pszType, "fixedarray")) { + ActionQueType = QUEUETYPE_FIXED_ARRAY; + dbgprintf("action queue type set to FIXED_ARRAY\n"); + } else if (!strcasecmp((char *) pszType, "linkedlist")) { + ActionQueType = QUEUETYPE_LINKEDLIST; + dbgprintf("action queue type set to LINKEDLIST\n"); + } else if (!strcasecmp((char *) pszType, "disk")) { + ActionQueType = QUEUETYPE_DISK; + dbgprintf("action queue type set to DISK\n"); + } else if (!strcasecmp((char *) pszType, "direct")) { + ActionQueType = QUEUETYPE_DIRECT; + dbgprintf("action queue type set to DIRECT (no queueing at all)\n"); + } else { + logerrorSz("unknown actionqueue parameter: %s", (char *) pszType); + iRet = RS_RET_INVALID_PARAMS; + } + free(pszType); /* no longer needed */ + + RETiRet; +} + + + +/* add our cfsysline handlers + * rgerhards, 2008-01-28 + */ +rsRetVal +actionAddCfSysLineHdrl(void) +{ + DEFiRet; + + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); + +finalize_it: + RETiRet; +} + /* * vi:set ai: diff --git a/action.h b/action.h index 0355b834..e725ae19 100644 --- a/action.h +++ b/action.h @@ -27,6 +27,7 @@ #include "syslogd-types.h" #include "sync.h" +#include "queue.h" /* external data - this is to be removed when we change the action * object interface (will happen some time..., at latest when the @@ -60,6 +61,7 @@ struct action_s { * content later). This is preserved after the message has been * processed - it is also used to detect duplicates. */ + queue_t *pQueue; /* action queue */ SYNC_OBJ_TOOL; /* required for mutex support */ }; typedef struct action_s action_t; @@ -68,12 +70,14 @@ typedef struct action_s action_t; /* function prototypes */ rsRetVal actionConstruct(action_t **ppThis); +rsRetVal actionConstructFinalize(action_t *pThis); rsRetVal actionDestruct(action_t *pThis); +rsRetVal actionAddCfSysLineHdrl(void); rsRetVal actionTryResume(action_t *pThis); rsRetVal actionSuspend(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); -rsRetVal actionCallDoAction(action_t *pAction); +rsRetVal actionDoAction(action_t *pAction); #if 1 #define actionIsSuspended(pThis) ((pThis)->bSuspended == 1) diff --git a/queue.c b/queue.c index a89c7de9..7651957a 100644 --- a/queue.c +++ b/queue.c @@ -753,7 +753,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) assert(pThis != NULL); /* calling the consumer is quite different here than it is from a worker thread */ - iRetLocal = pThis->pConsumer(pUsr); + iRetLocal = pThis->pConsumer(pThis->pUsr, pUsr); if(iRetLocal != RS_RET_OK) dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", queueGetID(pThis), iRetLocal); @@ -1047,7 +1047,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown); * to modify some parameters before the queue is actually started. */ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)) { DEFiRet; queue_t *pThis; @@ -1240,7 +1240,7 @@ queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(pThis->pConsumer(pWti->pUsrp)); + CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); finalize_it: dbgprintf("Queue %p: regular consumer returns %d\n", pThis, iRet); @@ -1877,6 +1877,7 @@ DEFpropSetMeth(queue, iDiscardSeverity, int); DEFpropSetMeth(queue, bIsDA, int); DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); DEFpropSetMeth(queue, bSaveOnShutdown, int); +DEFpropSetMeth(queue, pUsr, void*); /* This function can be used as a generic way to set properties. Only the subset diff --git a/queue.h b/queue.h index a05c6341..e48d9796 100644 --- a/queue.h +++ b/queue.h @@ -68,6 +68,7 @@ typedef struct queue_s { int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */ wtp_t *pWtpDA; wtp_t *pWtpReg; + void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ @@ -79,7 +80,12 @@ typedef struct queue_s { int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ int toEnq; /* enqueue timeout */ - rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ + /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the + * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer + * to message) + * rgerhards, 2008-01-28 + */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); rsRetVal (*qDestruct)(struct queue_s *pThis); @@ -148,7 +154,7 @@ rsRetVal queueStart(queue_t *pThis); rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize); rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); PROTOTYPEObjClassInit(queue); PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(queue, toQShutdown, long); @@ -161,6 +167,7 @@ PROTOTYPEpropSetMeth(queue, iDiscardMrk, int); PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int); PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int); PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int); +PROTOTYPEpropSetMeth(queue, pUsr, void*); #define queueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ diff --git a/syslogd.c b/syslogd.c index d60abade..c9ba1c53 100644 --- a/syslogd.c +++ b/syslogd.c @@ -1848,7 +1848,7 @@ processMsg(msg_t *pMsg) * Please note: the message object is destructed by the queue itself! */ static rsRetVal -msgConsumer(void *pUsr) +msgConsumer(void __attribute__((unused)) *notNeeded, void *pUsr) { DEFiRet; msg_t *pMsg = (msg_t*) pUsr; @@ -2394,7 +2394,7 @@ fprintlog(action_t *pAction) /* When we reach this point, we have a valid, non-disabled action. * So let's execute it. -- rgerhards, 2007-07-24 */ - iRet = actionCallDoAction(pAction); + iRet = actionDoAction(pAction); finalize_it: if(pMsgSave != NULL) { @@ -3993,6 +3993,10 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr if(bSuspended) actionSuspend(pAction); + CHKiRet(actionConstructFinalize(pAction)); + + /* TODO: if we exit here, we have a memory leak... */ + *ppAction = pAction; /* finally store the action pointer */ finalize_it: @@ -4558,11 +4562,11 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoWrkShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"dropmsgswithmaliciousdnsptrrecords", 0, eCmdHdlrBinary, NULL, &bDropMalPTRMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL)); @@ -4579,6 +4583,11 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"moddir", 0, eCmdHdlrGetWord, NULL, &pModDir, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); + /* now add other modules handlers (we should work on that to be able to do it in ClassInit(), but so far + * that is not possible). -- rgerhards, 2008-01-28 + */ + CHKiRet(actionAddCfSysLineHdrl()); + finalize_it: RETiRet; } @@ -4713,9 +4722,8 @@ finalize_it: /* This is the main entry point into rsyslogd. Over time, we should try to * modularize it a bit more... */ -int main(int argc, char **argv) +int realMain(int argc, char **argv) { -dbgClassInit(); DEFiRet; register int i; @@ -5000,6 +5008,18 @@ finalize_it: } +/* This is the main entry point into rsyslogd. This must be a function in its own + * right in order to intialize the debug system in a portable way (otherwise we would + * need to have a statement before variable definitions. + * rgerhards, 20080-01-28 + */ +int main(int argc, char **argv) +{ + dbgClassInit(); + return realMain(argc, argv); +} + + /* * vi:set ai: */ -- cgit