summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-28 17:39:46 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-28 17:39:46 +0000
commitc77519ab7b1fe246039bfdd99dbf6f17c44af449 (patch)
tree24029fe44f67badcddedbd8f37cdb68f72156479 /action.c
parent7ddc2511193930c1f9aa755904cef6f3b860a670 (diff)
downloadrsyslog-c77519ab7b1fe246039bfdd99dbf6f17c44af449.tar.gz
rsyslog-c77519ab7b1fe246039bfdd99dbf6f17c44af449.tar.xz
rsyslog-c77519ab7b1fe246039bfdd99dbf6f17c44af449.zip
- implemented the $ActionResumeRetryCount config directive
- added queue between main queue and action executor (currently works in "direct" mode only, else crashes) - added $ActionQueueFilename config directive - added $ActionQueueSize config directive - added $ActionQueueHighWaterMark config directive - added $ActionQueueLowWaterMark config directive - added $ActionQueueDiscardMark config directive - added $ActionQueueDiscardSeverity config directive - added $ActionQueueCheckpointInterval config directive - added $ActionQueueType config directive - added $ActionQueueWorkerThreads config directive - added $ActionQueueTimeoutshutdown config directive - added $ActionQueueTimeoutActionCompletion config directive - added $ActionQueueTimeoutenQueue config directive - added $ActionQueueTimeoutworkerThreadShutdown config directive - added $ActionQueueWorkerThreadMinimumMessages config directive - added $ActionQueueMaxFileSize config directive - added $ActionQueueSaveonShutdown config directive
Diffstat (limited to 'action.c')
-rw-r--r--action.c149
1 files changed, 147 insertions, 2 deletions
diff --git a/action.c b/action.c
index 2154fba2..8a269e23 100644
--- a/action.c
+++ b/action.c
@@ -29,6 +29,8 @@
#include <assert.h>
#include <stdarg.h>
#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
#include <time.h>
#include "syslogd.h"
@@ -36,13 +38,35 @@
#include "action.h"
#include "modules.h"
#include "sync.h"
+#include "cfsysline.h"
#include "srUtils.h"
+/* forward definitions */
+rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg);
+
/* object static data (once for all instances) */
static int glbliActionResumeInterval = 30;
int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */
+/* main message queue and its configuration parameters */
+static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
+static int iActionQueueSize = 10000; /* size of the main message queue above */
+static int iActionQHighWtrMark = 8000; /* high water mark for disk-assisted queues */
+static int iActionQLowWtrMark = 2000; /* low water mark for disk-assisted queues */
+static int iActionQDiscardMark = 9800; /* begin to discard messages */
+static int iActionQDiscardSeverity = 4; /* discard warning and above */
+static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
+static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */
+static size_t iActionQueMaxFileSize = 1024*1024;
+static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+static int iActionQtoQShutdown = 0; /* queue shutdown */
+static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
+static int iActionQtoEnq = 2000; /* timeout for queue enque */
+static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
+static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
+static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+
/* destructs an action descriptor object
* rgerhards, 2007-08-01
*/
@@ -91,6 +115,55 @@ finalize_it:
}
+/* action construction finalizer
+ */
+rsRetVal
+actionConstructFinalize(action_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ /* create queue */
+ CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction));
+
+
+ /* ... set some properties ... */
+# define setQPROP(func, directive, data) \
+ CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
+ logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
+ }
+# define setQPROPstr(func, directive, data) \
+ CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
+ logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
+ }
+
+ queueSetpUsr(pThis->pQueue, pThis);
+ setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
+ setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
+ setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
+ setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
+ setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
+ setQPROP(queueSettoWrkShutdown, "$ActionQueueTimeoutWorkerThreadShutdown", iActionQtoWrkShutdown);
+ setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
+ setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
+ setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
+ setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
+ setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
+ setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
+ setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
+
+# undef setQPROP
+# undef setQPROPstr
+
+ CHKiRet(queueStart(pThis->pQueue));
+ dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue);
+
+finalize_it:
+ RETiRet;
+}
+
+
/* set an action back to active state -- rgerhards, 2007-08-02
*/
static rsRetVal actionResume(action_t *pThis)
@@ -194,11 +267,23 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
+/* schedule the message for processing
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionDoAction(action_t *pAction)
+{
+ DEFiRet;
+ iRet = queueEnqObj(pAction->pQueue, (void*) pAction->f_pMsg);
+ RETiRet;
+}
+
+
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
*/
rsRetVal
-actionCallDoAction(action_t *pAction)
+actionCallDoAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
int iRetries;
@@ -209,7 +294,7 @@ actionCallDoAction(action_t *pAction)
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- CHKiRet(tplToString(pAction->ppTpl[i], pAction->f_pMsg, &pAction->ppMsgs[i]));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &pAction->ppMsgs[i]));
}
iRetries = 0;
@@ -256,6 +341,66 @@ finalize_it:
RETiRet;
}
+/* set the action message queue mode
+ * TODO: probably move this into queue object, merge with MainMsgQueue!
+ * rgerhards, 2008-01-28
+ */
+static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszType)
+{
+ DEFiRet;
+
+ if (!strcasecmp((char *) pszType, "fixedarray")) {
+ ActionQueType = QUEUETYPE_FIXED_ARRAY;
+ dbgprintf("action queue type set to FIXED_ARRAY\n");
+ } else if (!strcasecmp((char *) pszType, "linkedlist")) {
+ ActionQueType = QUEUETYPE_LINKEDLIST;
+ dbgprintf("action queue type set to LINKEDLIST\n");
+ } else if (!strcasecmp((char *) pszType, "disk")) {
+ ActionQueType = QUEUETYPE_DISK;
+ dbgprintf("action queue type set to DISK\n");
+ } else if (!strcasecmp((char *) pszType, "direct")) {
+ ActionQueType = QUEUETYPE_DIRECT;
+ dbgprintf("action queue type set to DIRECT (no queueing at all)\n");
+ } else {
+ logerrorSz("unknown actionqueue parameter: %s", (char *) pszType);
+ iRet = RS_RET_INVALID_PARAMS;
+ }
+ free(pszType); /* no longer needed */
+
+ RETiRet;
+}
+
+
+
+/* add our cfsysline handlers
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionAddCfSysLineHdrl(void)
+{
+ DEFiRet;
+
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL));
+
+finalize_it:
+ RETiRet;
+}
+
/*
* vi:set ai: