From c77519ab7b1fe246039bfdd99dbf6f17c44af449 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 Jan 2008 17:39:46 +0000 Subject: - implemented the $ActionResumeRetryCount config directive - added queue between main queue and action executor (currently works in "direct" mode only, else crashes) - added $ActionQueueFilename config directive - added $ActionQueueSize config directive - added $ActionQueueHighWaterMark config directive - added $ActionQueueLowWaterMark config directive - added $ActionQueueDiscardMark config directive - added $ActionQueueDiscardSeverity config directive - added $ActionQueueCheckpointInterval config directive - added $ActionQueueType config directive - added $ActionQueueWorkerThreads config directive - added $ActionQueueTimeoutshutdown config directive - added $ActionQueueTimeoutActionCompletion config directive - added $ActionQueueTimeoutenQueue config directive - added $ActionQueueTimeoutworkerThreadShutdown config directive - added $ActionQueueWorkerThreadMinimumMessages config directive - added $ActionQueueMaxFileSize config directive - added $ActionQueueSaveonShutdown config directive --- action.c | 149 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 147 insertions(+), 2 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 2154fba2..8a269e23 100644 --- a/action.c +++ b/action.c @@ -29,6 +29,8 @@ #include #include #include +#include +#include #include #include "syslogd.h" @@ -36,13 +38,35 @@ #include "action.h" #include "modules.h" #include "sync.h" +#include "cfsysline.h" #include "srUtils.h" +/* forward definitions */ +rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg); + /* object static data (once for all instances) */ static int glbliActionResumeInterval = 30; int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */ +/* main message queue and its configuration parameters */ +static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ +static int iActionQueueSize = 10000; /* size of the main message queue above */ +static int iActionQHighWtrMark = 8000; /* high water mark for disk-assisted queues */ +static int iActionQLowWtrMark = 2000; /* low water mark for disk-assisted queues */ +static int iActionQDiscardMark = 9800; /* begin to discard messages */ +static int iActionQDiscardSeverity = 4; /* discard warning and above */ +static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ +static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */ +static size_t iActionQueMaxFileSize = 1024*1024; +static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ +static int iActionQtoQShutdown = 0; /* queue shutdown */ +static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ +static int iActionQtoEnq = 2000; /* timeout for queue enque */ +static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ +static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ +static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + /* destructs an action descriptor object * rgerhards, 2007-08-01 */ @@ -91,6 +115,55 @@ finalize_it: } +/* action construction finalizer + */ +rsRetVal +actionConstructFinalize(action_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + /* create queue */ + CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction)); + + + /* ... set some properties ... */ +# define setQPROP(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ + logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ + } +# define setQPROPstr(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ + logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ + } + + queueSetpUsr(pThis->pQueue, pThis); + setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); + setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); + setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); + setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); + setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); + setQPROP(queueSettoWrkShutdown, "$ActionQueueTimeoutWorkerThreadShutdown", iActionQtoWrkShutdown); + setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); + setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); + setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); + setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); + setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); + setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); + setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); + +# undef setQPROP +# undef setQPROPstr + + CHKiRet(queueStart(pThis->pQueue)); + dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue); + +finalize_it: + RETiRet; +} + + /* set an action back to active state -- rgerhards, 2007-08-02 */ static rsRetVal actionResume(action_t *pThis) @@ -194,11 +267,23 @@ rsRetVal actionDbgPrint(action_t *pThis) } +/* schedule the message for processing + * rgerhards, 2008-01-28 + */ +rsRetVal +actionDoAction(action_t *pAction) +{ + DEFiRet; + iRet = queueEnqObj(pAction->pQueue, (void*) pAction->f_pMsg); + RETiRet; +} + + /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ rsRetVal -actionCallDoAction(action_t *pAction) +actionCallDoAction(action_t *pAction, msg_t *pMsg) { DEFiRet; int iRetries; @@ -209,7 +294,7 @@ actionCallDoAction(action_t *pAction) /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { - CHKiRet(tplToString(pAction->ppTpl[i], pAction->f_pMsg, &pAction->ppMsgs[i])); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &pAction->ppMsgs[i])); } iRetries = 0; @@ -256,6 +341,66 @@ finalize_it: RETiRet; } +/* set the action message queue mode + * TODO: probably move this into queue object, merge with MainMsgQueue! + * rgerhards, 2008-01-28 + */ +static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszType) +{ + DEFiRet; + + if (!strcasecmp((char *) pszType, "fixedarray")) { + ActionQueType = QUEUETYPE_FIXED_ARRAY; + dbgprintf("action queue type set to FIXED_ARRAY\n"); + } else if (!strcasecmp((char *) pszType, "linkedlist")) { + ActionQueType = QUEUETYPE_LINKEDLIST; + dbgprintf("action queue type set to LINKEDLIST\n"); + } else if (!strcasecmp((char *) pszType, "disk")) { + ActionQueType = QUEUETYPE_DISK; + dbgprintf("action queue type set to DISK\n"); + } else if (!strcasecmp((char *) pszType, "direct")) { + ActionQueType = QUEUETYPE_DIRECT; + dbgprintf("action queue type set to DIRECT (no queueing at all)\n"); + } else { + logerrorSz("unknown actionqueue parameter: %s", (char *) pszType); + iRet = RS_RET_INVALID_PARAMS; + } + free(pszType); /* no longer needed */ + + RETiRet; +} + + + +/* add our cfsysline handlers + * rgerhards, 2008-01-28 + */ +rsRetVal +actionAddCfSysLineHdrl(void) +{ + DEFiRet; + + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); + +finalize_it: + RETiRet; +} + /* * vi:set ai: -- cgit