summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c577
1 files changed, 421 insertions, 156 deletions
diff --git a/action.c b/action.c
index 34ca4fc0..d12d182a 100644
--- a/action.c
+++ b/action.c
@@ -8,7 +8,7 @@
* the right code in question): For performance reasons, this module
* uses different methods of message submission based on the user-selected
* configuration. This code is similar, but can not be abstracted because
- * of the performanse-affecting differences in it. As such, it is often
+ * of the performance-affecting differences in it. As such, it is often
* necessary to triple-check that everything works well in *all* modes.
* The different modes (and calling sequence) are:
*
@@ -103,15 +103,16 @@
#include "template.h"
#include "action.h"
#include "modules.h"
-#include "sync.h"
#include "cfsysline.h"
#include "srUtils.h"
#include "errmsg.h"
#include "batch.h"
#include "wti.h"
+#include "rsconf.h"
#include "datetime.h"
#include "unicode-helper.h"
#include "atomic.h"
+#include "ruleset.h"
#include "statsobj.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
@@ -129,39 +130,46 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(module)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(statsobj)
-
-static int iActExecOnceInterval = 0; /* execute action once every nn seconds */
-static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */
-static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */
-static int glbliActionResumeInterval = 30;
-int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */
-static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */
-
-static int bActionWriteAllMarkMsgs = FALSE; /* should all mark messages be unconditionally written? */
-static uchar *pszActionName; /* short name for the action */
-/* action queue and its configuration parameters */
-static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
-static int iActionQueueSize = 1000; /* size of the main message queue above */
-static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */
-static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
-static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
-static int iActionQDiscardMark = 9800; /* begin to discard messages */
-static int iActionQDiscardSeverity = 8; /* by default, discard nothing to prevent unintentional loss */
-static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
-static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */
-static int64 iActionQueMaxFileSize = 1024*1024;
-static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
-static int bActionQSyncQeueFiles = 0; /* sync queue files */
-static int iActionQtoQShutdown = 0; /* queue shutdown */
-static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
-static int iActionQtoEnq = 2000; /* timeout for queue enque */
-static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
-static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
-static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
-static int64 iActionQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
-static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
-static int iActionQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */
-static int iActionQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */
+DEFobjCurrIf(ruleset)
+
+
+typedef struct configSettings_s {
+ int bActExecWhenPrevSusp; /* execute action only when previous one was suspended? */
+ int bActionWriteAllMarkMsgs; /* should all mark messages be unconditionally written? */
+ int iActExecOnceInterval; /* execute action once every nn seconds */
+ int iActExecEveryNthOccur; /* execute action every n-th occurence (0,1=always) */
+ time_t iActExecEveryNthOccurTO; /* timeout for n-occurence setting (in seconds, 0=never) */
+ int glbliActionResumeInterval;
+ int glbliActionResumeRetryCount; /* how often should suspended actions be retried? */
+ int bActionRepMsgHasMsg; /* last messsage repeated... has msg fragment in it */
+ uchar *pszActionName; /* short name for the action */
+ /* action queue and its configuration parameters */
+ queueType_t ActionQueType; /* type of the main message queue above */
+ int iActionQueueSize; /* size of the main message queue above */
+ int iActionQueueDeqBatchSize; /* batch size for action queues */
+ int iActionQHighWtrMark; /* high water mark for disk-assisted queues */
+ int iActionQLowWtrMark; /* low water mark for disk-assisted queues */
+ int iActionQDiscardMark; /* begin to discard messages */
+ int iActionQDiscardSeverity; /* by default, discard nothing to prevent unintentional loss */
+ int iActionQueueNumWorkers; /* number of worker threads for the mm queue above */
+ uchar *pszActionQFName; /* prefix for the main message queue file */
+ int64 iActionQueMaxFileSize;
+ int iActionQPersistUpdCnt; /* persist queue info every n updates */
+ int bActionQSyncQeueFiles; /* sync queue files */
+ int iActionQtoQShutdown; /* queue shutdown */
+ int iActionQtoActShutdown; /* action shutdown (in phase 2) */
+ int iActionQtoEnq; /* timeout for queue enque */
+ int iActionQtoWrkShutdown; /* timeout for worker thread shutdown */
+ int iActionQWrkMinMsgs; /* minimum messages per worker needed to start a new one */
+ int bActionQSaveOnShutdown; /* save queue on shutdown (when DA enabled)? */
+ int64 iActionQueMaxDiskSpace; /* max disk space allocated 0 ==> unlimited */
+ int iActionQueueDeqSlowdown; /* dequeue slowdown (simple rate limiting) */
+ int iActionQueueDeqtWinFromHr; /* hour begin of time frame when queue is to be dequeued */
+ int iActionQueueDeqtWinToHr; /* hour begin of time frame when queue is to be dequeued */
+} configSettings_t;
+
+configSettings_t cs; /* our current config settings */
+configSettings_t cs_save; /* our saved (scope!) config settings */
/* the counter below counts actions created. It is used to obtain unique IDs for the action. They
* should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice
@@ -171,6 +179,25 @@ static int iActionQueueDeqtWinToHr = 25; /* hour begin of time frame when queu
*/
static int iActionNbr = 0;
+/* tables for interfacing with the v6 config system */
+static struct cnfparamdescr cnfparamdescr[] = {
+ { "name", eCmdHdlrGetWord, 0 }, /* legacy: actionname */
+ { "type", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: actionname */
+ { "action.writeallmarkmessages", eCmdHdlrBinary, 0 }, /* legacy: actionwriteallmarkmessages */
+ { "action.execonlyeverynthtime", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtime */
+ { "action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtimetimeout */
+ { "action.execonlyonceeveryinterval", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyonceeveryinterval */
+ { "action.execonlywhenpreviousissuspended", eCmdHdlrInt, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */
+ { "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */
+ { "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */
+ { "action.resumeinterval", eCmdHdlrInt, 0 }
+};
+static struct cnfparamblk pblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(cnfparamdescr)/sizeof(struct cnfparamdescr),
+ cnfparamdescr
+ };
+
/* ------------------------------ methods ------------------------------ */
/* This function returns the "current" time for this action. Current time
@@ -222,32 +249,32 @@ actionResetQueueParams(void)
{
DEFiRet;
- ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
- iActionQueueSize = 1000; /* size of the main message queue above */
- iActionQueueDeqBatchSize = 16; /* default batch size */
- iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
- iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
- iActionQDiscardMark = 9800; /* begin to discard messages */
- iActionQDiscardSeverity = 8; /* discard warning and above */
- iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
- iActionQueMaxFileSize = 1024*1024;
- iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
- bActionQSyncQeueFiles = 0;
- iActionQtoQShutdown = 0; /* queue shutdown */
- iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
- iActionQtoEnq = 2000; /* timeout for queue enque */
- iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
- iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
- bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
- iActionQueMaxDiskSpace = 0;
- iActionQueueDeqSlowdown = 0;
- iActionQueueDeqtWinFromHr = 0;
- iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */
-
- glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */
-
- d_free(pszActionQFName);
- pszActionQFName = NULL; /* prefix for the main message queue file */
+ cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
+ cs.iActionQueueSize = 1000; /* size of the main message queue above */
+ cs.iActionQueueDeqBatchSize = 16; /* default batch size */
+ cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
+ cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
+ cs.iActionQDiscardMark = 9800; /* begin to discard messages */
+ cs.iActionQDiscardSeverity = 8; /* discard warning and above */
+ cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
+ cs.iActionQueMaxFileSize = 1024*1024;
+ cs.iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+ cs.bActionQSyncQeueFiles = 0;
+ cs.iActionQtoQShutdown = 0; /* queue shutdown */
+ cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
+ cs.iActionQtoEnq = 2000; /* timeout for queue enque */
+ cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
+ cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
+ cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+ cs.iActionQueMaxDiskSpace = 0;
+ cs.iActionQueueDeqSlowdown = 0;
+ cs.iActionQueueDeqtWinFromHr = 0;
+ cs.iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */
+
+ cs.glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */
+
+ d_free(cs.pszActionQFName);
+ cs.pszActionQFName = NULL; /* prefix for the main message queue file */
RETiRet;
}
@@ -277,7 +304,7 @@ rsRetVal actionDestruct(action_t *pThis)
if(pThis->f_pMsg != NULL)
msgDestruct(&pThis->f_pMsg);
- SYNC_OBJ_TOOL_EXIT(pThis);
+ pthread_mutex_destroy(&pThis->mutAction);
pthread_mutex_destroy(&pThis->mutActExec);
d_free(pThis->pszName);
d_free(pThis->ppTpl);
@@ -290,6 +317,8 @@ rsRetVal actionDestruct(action_t *pThis)
/* create a new action descriptor object
* rgerhards, 2007-08-01
+ * Note that it is vital to set proper initial values as the v6 config
+ * system depends on these!
*/
rsRetVal actionConstruct(action_t **ppThis)
{
@@ -298,13 +327,24 @@ rsRetVal actionConstruct(action_t **ppThis)
ASSERT(ppThis != NULL);
+ if(cs.pszActionName != NULL) {
+ free(cs.pszActionName);
+ cs.pszActionName = NULL;
+ }
CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t)));
- pThis->iResumeInterval = glbliActionResumeInterval;
- pThis->iResumeRetryCount = glbliActionResumeRetryCount;
+ pThis->iResumeInterval = 30;
+ pThis->iResumeRetryCount = 0;
+ pThis->pszName = NULL;
+ pThis->bWriteAllMarkMsgs = FALSE;
+ pThis->iExecEveryNthOccur = 0;
+ pThis->iExecEveryNthOccurTO = 0;
+ pThis->iSecsExecOnceInterval = 0;
+ pThis->bExecWhenPrevSusp = 0;
+ pThis->bRepMsgHasMsg = 0;
pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */
pthread_mutex_init(&pThis->mutActExec, NULL);
+ pthread_mutex_init(&pThis->mutAction, NULL);
INIT_ATOMIC_HELPER_MUT(pThis->mutCAS);
- SYNC_OBJ_TOOL_INIT(pThis);
/* indicate we have a new action */
++iActionNbr;
@@ -318,7 +358,7 @@ finalize_it:
/* action construction finalizer
*/
rsRetVal
-actionConstructFinalize(action_t *pThis)
+actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
{
DEFiRet;
uchar pszAName[64]; /* friendly name of our action */
@@ -330,7 +370,7 @@ actionConstructFinalize(action_t *pThis)
snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr);
} else {
ustrncpy(pszAName, pThis->pszName, sizeof(pszAName));
- pszAName[63] = '\0'; /* to be on the save side */
+ pszAName[sizeof(pszAName)-1] = '\0'; /* to be on the save side */
}
/* support statistics gathering */
@@ -388,7 +428,7 @@ actionConstructFinalize(action_t *pThis)
* msg object thread safety in this case (this costs a bit performance and thus
* is not enabled by default. -- rgerhards, 2008-02-20
*/
- if(ActionQueType != QUEUETYPE_DIRECT)
+ if(cs.ActionQueType != QUEUETYPE_DIRECT)
MsgEnableThreadSafety();
/* create queue */
@@ -397,49 +437,55 @@ actionConstructFinalize(action_t *pThis)
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
- CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
+ CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize,
(rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszAName);
+ qqueueSetpUsr(pThis->pQueue, pThis);
- /* ... set some properties ... */
-# define setQPROP(func, directive, data) \
- CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
- errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
- }
-# define setQPROPstr(func, directive, data) \
- CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
- errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
+ if(queueParams == NULL) { /* use legacy params? */
+ /* ... set some properties ... */
+# define setQPROP(func, directive, data) \
+ CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
+ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", \
+ error %d. Ignored, running with default setting", iRet); \
+ }
+# define setQPROPstr(func, directive, data) \
+ CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
+ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", \
+ error %d. Ignored, running with default setting", iRet); \
+ }
+ setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize);
+ setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize);
+ setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName);
+ setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles);
+ setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown );
+ setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown);
+ setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown);
+ setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq);
+ setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark);
+ setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark);
+ setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark);
+ setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs);
+ setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown);
+ setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown);
+ setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr);
+ setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr);
+ } else {
+ /* we have v6-style config params */
+ qqueueSetDefaultsActionQueue(pThis->pQueue);
+ qqueueApplyCnfParam(pThis->pQueue, queueParams);
}
- qqueueSetpUsr(pThis->pQueue, pThis);
- setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
- setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize);
- setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
- setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
- setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
- setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", bActionQSyncQeueFiles);
- setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
- setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
- setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
- setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
- setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
- setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
- setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
- setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
- setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
- setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
- setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
- setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
- setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
-
# undef setQPROP
# undef setQPROPstr
dbgoprint((obj_t*) pThis->pQueue, "save on shutdown %d, max disk space allowed %lld\n",
- bActionQSaveOnShutdown, iActionQueMaxDiskSpace);
+ cs.bActionQSaveOnShutdown, cs.iActionQueMaxDiskSpace);
- CHKiRet(qqueueStart(pThis->pQueue));
DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue);
/* and now reset the queue params (see comment in its function header!) */
@@ -455,7 +501,7 @@ finalize_it:
*/
rsRetVal actionSetGlobalResumeInterval(int iNewVal)
{
- glbliActionResumeInterval = iNewVal;
+ cs.glbliActionResumeInterval = iNewVal;
return RS_RET_OK;
}
@@ -997,7 +1043,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
-dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
+ DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
@@ -1267,16 +1313,16 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
DEFiRet;
if (!strcasecmp((char *) pszType, "fixedarray")) {
- ActionQueType = QUEUETYPE_FIXED_ARRAY;
+ cs.ActionQueType = QUEUETYPE_FIXED_ARRAY;
DBGPRINTF("action queue type set to FIXED_ARRAY\n");
} else if (!strcasecmp((char *) pszType, "linkedlist")) {
- ActionQueType = QUEUETYPE_LINKEDLIST;
+ cs.ActionQueType = QUEUETYPE_LINKEDLIST;
DBGPRINTF("action queue type set to LINKEDLIST\n");
} else if (!strcasecmp((char *) pszType, "disk")) {
- ActionQueType = QUEUETYPE_DISK;
+ cs.ActionQueType = QUEUETYPE_DISK;
DBGPRINTF("action queue type set to DISK\n");
} else if (!strcasecmp((char *) pszType, "direct")) {
- ActionQueType = QUEUETYPE_DIRECT;
+ cs.ActionQueType = QUEUETYPE_DIRECT;
DBGPRINTF("action queue type set to DIRECT (no queueing at all)\n");
} else {
errmsg.LogError(0, RS_RET_INVALID_PARAMS, "unknown actionqueue parameter: %s", (char *) pszType);
@@ -1505,6 +1551,33 @@ finalize_it:
}
+/* helper to activateActions, it activates a specific action.
+ */
+DEFFUNC_llExecFunc(doActivateActions)
+{
+ action_t *pThis = (action_t*) pData;
+ BEGINfunc
+ qqueueStart(pThis->pQueue);
+ DBGPRINTF("Action %p: queue %p started\n", pThis, pThis->pQueue);
+ ENDfunc
+ return RS_RET_OK; /* we ignore errors, we can not do anything either way */
+}
+
+
+/* This function "activates" the action after privileges have been dropped. Currently,
+ * this means that the queues are started.
+ * rgerhards, 2011-05-02
+ */
+rsRetVal
+activateActions(void)
+{
+ DEFiRet;
+ iRet = ruleset.IterateAllActions(ourConf, doActivateActions, NULL);
+ RETiRet;
+}
+
+
+
/* This submits the message to the action queue in case where we need to handle
* bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop
* for the synchronization. Here, we just modify the filter condition to be false when
@@ -1726,23 +1799,68 @@ doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
{
DEFiRet;
- LockObj(pAction);
- pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
+ d_pthread_mutex_lock(&pAction->mutAction);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
iRet = helperSubmitToActionQComplexBatch(pAction, pBatch);
- UnlockObj(pAction);
+ d_pthread_mutex_unlock(&pAction->mutAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
+
+/* apply all params from param block to action. This supports the v6 config system.
+ * Defaults must have been set appropriately during action construct!
+ * rgerhards, 2011-08-01
+ */
+rsRetVal
+actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals)
+{
+ int i;
+ for(i = 0 ; i < pblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(pblk.descr[i].name, "name")) {
+ pAction->pszName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(pblk.descr[i].name, "type")) {
+ continue; /* this is handled seperately during module select! */
+ } else if(!strcmp(pblk.descr[i].name, "action.writeallmarkmessages")) {
+ pAction->bWriteAllMarkMsgs = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "action.execonlyeverynthtime")) {
+ pAction->iExecEveryNthOccur = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "action.execonlyeverynthtimetimeout")) {
+ pAction->iExecEveryNthOccurTO = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "action.execonlyonceeveryinterval")) {
+ pAction->iSecsExecOnceInterval = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "action.execonlywhenpreviousissuspended")) {
+ pAction->bExecWhenPrevSusp = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "action.repeatedmsgcontainsoriginalmsg")) {
+ pAction->bRepMsgHasMsg = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "action.resumeretrycount")) {
+ pAction->iResumeRetryCount = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "action.resumeinterval")) {
+ pAction->iResumeInterval = pvals[i].val.d.n;
+ } else {
+ dbgprintf("action: program error, non-handled "
+ "param '%s'\n", pblk.descr[i].name);
+ }
+ }
+ cnfparamvalsDestruct(pvals, &pblk);
+ return RS_RET_OK;
+}
+
+
+
/* add an Action to the current selector
* The pOMSR is freed, as it is not needed after this function.
* Note: this function pulls global data that specifies action config state.
* rgerhards, 2007-07-27
*/
rsRetVal
-addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended)
+addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
+ omodStringRequest_t *pOMSR, struct cnfparamvals *actParams,
+ struct cnfparamvals *queueParams, int bSuspended)
{
DEFiRet;
int i;
@@ -1754,22 +1872,28 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
assert(ppAction != NULL);
assert(pMod != NULL);
assert(pOMSR != NULL);
- DBGPRINTF("Module %s processed this config line.\n", module.GetName(pMod));
+ DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod));
CHKiRet(actionConstruct(&pAction)); /* create action object first */
pAction->pMod = pMod;
pAction->pModData = pModData;
- pAction->pszName = pszActionName;
- pszActionName = NULL; /* free again! */
- pAction->bWriteAllMarkMsgs = bActionWriteAllMarkMsgs;
- bActionWriteAllMarkMsgs = FALSE; /* reset */
- pAction->bExecWhenPrevSusp = bActExecWhenPrevSusp;
- pAction->iSecsExecOnceInterval = iActExecOnceInterval;
- pAction->iExecEveryNthOccur = iActExecEveryNthOccur;
- pAction->iExecEveryNthOccurTO = iActExecEveryNthOccurTO;
- pAction->bRepMsgHasMsg = bActionRepMsgHasMsg;
- iActExecEveryNthOccur = 0; /* auto-reset */
- iActExecEveryNthOccurTO = 0; /* auto-reset */
+ if(actParams == NULL) { /* use legacy systemn */
+ pAction->pszName = cs.pszActionName;
+ pAction->iResumeInterval = cs.glbliActionResumeInterval;
+ pAction->iResumeRetryCount = cs.glbliActionResumeRetryCount;
+ pAction->bWriteAllMarkMsgs = cs.bActionWriteAllMarkMsgs;
+ pAction->bExecWhenPrevSusp = cs.bActExecWhenPrevSusp;
+ pAction->iSecsExecOnceInterval = cs.iActExecOnceInterval;
+ pAction->iExecEveryNthOccur = cs.iActExecEveryNthOccur;
+ pAction->iExecEveryNthOccurTO = cs.iActExecEveryNthOccurTO;
+ pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg;
+ cs.iActExecEveryNthOccur = 0; /* auto-reset */
+ cs.iActExecEveryNthOccurTO = 0; /* auto-reset */
+ cs.bActionWriteAllMarkMsgs = FALSE; /* auto-reset */
+ cs.pszActionName = NULL; /* free again! */
+ } else {
+ actionApplyCnfParam(pAction, actParams);
+ }
/* check if we can obtain the template pointers - TODO: move to separate function? */
pAction->iNumTpls = OMSRgetEntryCount(pOMSR);
@@ -1788,7 +1912,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
/* Ok, we got everything, so it now is time to look up the template
* (Hint: templates MUST be defined before they are used!)
*/
- if((pAction->ppTpl[i] = tplFind((char*)pTplName, strlen((char*)pTplName))) == NULL) {
+ if((pAction->ppTpl[i] = tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) {
snprintf(errMsg, sizeof(errMsg) / sizeof(char),
" Could not find template '%s' - action disabled\n",
pTplName);
@@ -1820,9 +1944,9 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
pAction->pMod = pMod;
pAction->pModData = pModData;
/* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
- pAction->f_ReduceRepeated = bReduceRepeatMsgs;
- else {
+ if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) {
+ pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
+ } else {
DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
pAction->f_ReduceRepeated = 0;
}
@@ -1831,7 +1955,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
if(bSuspended)
actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */
- CHKiRet(actionConstructFinalize(pAction));
+ CHKiRet(actionConstructFinalize(pAction, queueParams));
/* TODO: if we exit here, we have a memory leak... */
@@ -1857,11 +1981,147 @@ finalize_it:
static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
- iActExecOnceInterval = 0;
+ cs.iActExecOnceInterval = 0;
+ cs.bActExecWhenPrevSusp = 0;
return RS_RET_OK;
}
+/* initialize (current) config variables.
+ * Used at program start and when a new scope is created.
+ */
+static inline void
+initConfigVariables(void)
+{
+ cs.bActionWriteAllMarkMsgs = FALSE;
+ cs.glbliActionResumeRetryCount = 0;
+ cs.bActExecWhenPrevSusp = 0;
+ cs.iActExecOnceInterval = 0;
+ cs.iActExecEveryNthOccur = 0;
+ cs.iActExecEveryNthOccurTO = 0;
+ cs.glbliActionResumeInterval = 30;
+ cs.glbliActionResumeRetryCount = 0;
+ cs.bActionRepMsgHasMsg = 0;
+ if(cs.pszActionName != NULL) {
+ free(cs.pszActionName);
+ cs.pszActionName = NULL;
+ }
+ actionResetQueueParams();
+}
+
+
+/* save our config and create a new scope. Note that things are messed up if
+ * this is called while the config is already saved (we currently do not
+ * have a stack as the design is we need none!
+ * rgerhards, 2010-07-23
+ */
+rsRetVal
+actionNewScope(void)
+{
+ DEFiRet;
+ memcpy(&cs_save, &cs, sizeof(cs));
+ initConfigVariables();
+ RETiRet;
+}
+
+
+/* restore previously saved scope.
+ * rgerhards, 2010-07-23
+ */
+rsRetVal
+actionRestoreScope(void)
+{
+ DEFiRet;
+ memcpy(&cs, &cs_save, sizeof(cs));
+ RETiRet;
+}
+
+
+
+rsRetVal
+actionNewInst(struct nvlst *lst, action_t **ppAction)
+{
+ struct cnfparamvals *paramvals;
+ struct cnfparamvals *queueParams;
+ modInfo_t *pMod;
+ uchar *cnfModName = NULL;
+ omodStringRequest_t *pOMSR;
+ void *pModData;
+ action_t *pAction;
+ int typeIdx;
+ DEFiRet;
+
+ paramvals = nvlstGetParams(lst, &pblk, NULL);
+ if(paramvals == NULL) {
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ dbgprintf("action param blk after actionNewInst:\n");
+ cnfparamsPrint(&pblk, paramvals);
+ typeIdx = cnfparamGetIdx(&pblk, "type");
+ if(paramvals[typeIdx].bUsed == 0) {
+ errmsg.LogError(0, RS_RET_CONF_RQRD_PARAM_MISSING, "action type missing");
+ ABORT_FINALIZE(RS_RET_CONF_RQRD_PARAM_MISSING); // TODO: move this into rainerscript handlers
+ }
+ cnfModName = (uchar*)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL);
+ if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) {
+ errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName);
+ ABORT_FINALIZE(RS_RET_MOD_UNKNOWN);
+ }
+ iRet = pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR);
+ // TODO: check if RS_RET_SUSPENDED is still valid in v6!
+ if(iRet != RS_RET_OK && iRet != RS_RET_SUSPENDED) {
+ FINALIZE; /* iRet is already set to error state */
+ }
+
+ qqueueDoCnfParams(lst, &queueParams);
+
+ if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams,
+ (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
+ /* now check if the module is compatible with select features */
+ if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
+ pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
+ else {
+ DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
+ pAction->f_ReduceRepeated = 0;
+ }
+ pAction->eState = ACT_STATE_RDY; /* action is enabled */
+ loadConf->actions.nbrActions++; /* one more active action! */
+ }
+ *ppAction = pAction;
+
+finalize_it:
+ free(cnfModName);
+ cnfparamvalsDestruct(paramvals, &pblk);
+ RETiRet;
+}
+
+
+/* Process a rsyslog v6 action config object (the now-primary config method).
+ * rgerhards, 2011-07-19
+ */
+rsRetVal
+actionProcessCnf(struct cnfobj *o)
+{
+ DEFiRet;
+#if 0 /* we need to check if we actually need this functionality -- later! */
+// This is for STAND-ALONE actions at the conf file TOP level
+ struct cnfparamvals *paramvals;
+
+ paramvals = nvlstGetParams(o->nvlst, &pblk, NULL);
+ if(paramvals == NULL) {
+ iRet = RS_RET_ERR;
+ goto finalize_it;
+ }
+ DBGPRINTF("action param blk after actionProcessCnf:\n");
+ cnfparamsPrint(&pblk, paramvals);
+
+ /* now find module to activate */
+finalize_it:
+#endif
+ RETiRet;
+}
+
+
/* TODO: we are not yet a real object, the ClassInit here just looks like it is..
*/
rsRetVal actionClassInit(void)
@@ -1873,37 +2133,42 @@ rsRetVal actionClassInit(void)
CHKiRet(objUse(module, CORE_COMPONENT));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(statsobj, CORE_COMPONENT));
-
- CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &bActionWriteAllMarkMsgs, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
+
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &cs.pszActionName, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &cs.pszActionQFName, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &cs.bActionWriteAllMarkMsgs, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqBatchSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxDiskSpace, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQHighWtrMark, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQLowWtrMark, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardMark, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardSeverity, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &cs.iActionQPersistUpdCnt, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &cs.bActionQSyncQeueFiles, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &iActExecOnceInterval, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &cs.iActionQueueNumWorkers, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoQShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &cs.iActionQtoActShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &cs.iActionQtoEnq, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoWrkShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &cs.iActionQWrkMinMsgs, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxFileSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &cs.bActionQSaveOnShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqSlowdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinFromHr, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinToHr, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccur, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccurTO, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &cs.iActExecOnceInterval, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &cs.bActionRepMsgHasMsg, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &cs.bActExecWhenPrevSusp, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeretrycount", 0, eCmdHdlrInt, NULL, &cs.glbliActionResumeRetryCount, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
+ initConfigVariables(); /* first-time init of config setings */
+
finalize_it:
RETiRet;
}