diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 549 |
1 files changed, 430 insertions, 119 deletions
@@ -42,10 +42,13 @@ #include "cfsysline.h" #include "srUtils.h" #include "errmsg.h" +#include "wti.h" #include "datetime.h" +#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ + /* forward definitions */ -rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg); +rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -64,6 +67,7 @@ static uchar *pszActionName; /* short name for the action */ /* main message queue and its configuration parameters */ static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ static int iActionQueueSize = 1000; /* size of the main message queue above */ +static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */ static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ static int iActionQDiscardMark = 9800; /* begin to discard messages */ @@ -144,6 +148,7 @@ actionResetQueueParams(void) ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ iActionQueueSize = 1000; /* size of the main message queue above */ + iActionQueueDeqBatchSize = 16; /* default batch size */ iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ iActionQDiscardMark = 9800; /* begin to discard messages */ @@ -255,7 +260,8 @@ actionConstructFinalize(action_t *pThis) * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); + CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, + (rsRetVal (*)(void*,aUsrp_t*))actionCallDoActionMULTIQUEUE)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -270,6 +276,7 @@ actionConstructFinalize(action_t *pThis) qqueueSetpUsr(pThis->pQueue, pThis); setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize); setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); @@ -305,87 +312,246 @@ finalize_it: } -/* set an action back to active state -- rgerhards, 2007-08-02 + +/* set the global resume interval + */ +rsRetVal actionSetGlobalResumeInterval(int iNewVal) +{ + glbliActionResumeInterval = iNewVal; + return RS_RET_OK; +} + + +/* returns the action state name in human-readable form + * returned string must not be modified. + * rgerhards, 2009-05-07 + */ +static uchar *getActStateName(action_t *pThis) +{ + switch(pThis->eState) { + case ACT_STATE_RDY: + return (uchar*) "rdy"; + case ACT_STATE_ITX: + return (uchar*) "itx"; + case ACT_STATE_RTRY: + return (uchar*) "rtry"; + case ACT_STATE_SUSP: + return (uchar*) "susp"; + case ACT_STATE_DIED: + return (uchar*) "died"; + case ACT_STATE_COMM: + return (uchar*) "comm"; + default: + return (uchar*) "ERROR/UNKNWON"; + } +} + + +/* returns a suitable return code based on action state + * rgerhards, 2009-05-07 */ -static rsRetVal actionResume(action_t *pThis) +static rsRetVal getReturnCode(action_t *pThis) { DEFiRet; ASSERT(pThis != NULL); - pThis->bSuspended = 0; + switch(pThis->eState) { + case ACT_STATE_RDY: + iRet = RS_RET_OK; + break; + case ACT_STATE_ITX: + if(pThis->bHadAutoCommit) { + pThis->bHadAutoCommit = 0; /* auto-reset */ + iRet = RS_RET_PREVIOUS_COMMITTED; + } else { + iRet = RS_RET_DEFER_COMMIT; + } + break; + case ACT_STATE_RTRY: + iRet = RS_RET_SUSPENDED; + break; + case ACT_STATE_SUSP: + iRet = RS_RET_SUSPENDED; + break; + case ACT_STATE_DIED: + iRet = RS_RET_DISABLE_ACTION; + break; + default: + DBGPRINTF("Invalid action engine state %d, program error\n", + (int) pThis->eState); + iRet = RS_RET_ERR; + break; + } RETiRet; } -/* set the global resume interval +/* set the action to a new state + * rgerhards, 2007-08-02 */ -rsRetVal actionSetGlobalResumeInterval(int iNewVal) +static inline void actionSetState(action_t *pThis, action_state_t newState) { - glbliActionResumeInterval = iNewVal; - return RS_RET_OK; + pThis->eState = newState; + DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); } +/* Handles the transient commit state. So far, this is + * mostly a dummy... + * rgerhards, 2007-08-02 + */ +static void actionCommitted(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RDY); +} -/* suspend an action -- rgerhards, 2007-08-02 + +/* set action to "rtry" state. + * rgerhards, 2007-08-02 + */ +static void actionRetry(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RTRY); +} + + +/* Disable action, this means it will never again be usable + * until rsyslog is reloaded. Use only as a last resort, but + * depends on output module. + * rgerhards, 2007-08-02 + */ +static void actionDisable(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_DIED); +} + + +/* Suspend action, this involves changing the acton state as well + * as setting the next retry time. + * if we have more than 10 retries, we prolong the + * retry interval. If something is really stalled, it will + * get re-tried only very, very seldom - but that saves + * CPU time. TODO: maybe a config option for that? + * rgerhards, 2007-08-02 */ -static rsRetVal actionSuspend(action_t *pThis, time_t tNow) +static inline void actionSuspend(action_t *pThis, time_t ttNow) { + if(ttNow == NO_TIME_PROVIDED) + time(&ttNow); + pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + actionSetState(pThis, ACT_STATE_SUSP); + DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); +} + + +/* actually do retry processing. Note that the function receives a timestamp so + * that we do not need to call the (expensive) time() API. + * Note that we do the full retry processing here, doing the configured number of + * iterations. + * rgerhards, 2009-05-07 + */ +static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) +{ + int iRetries; + int iSleepPeriod; DEFiRet; ASSERT(pThis != NULL); - pThis->bSuspended = 1; - pThis->ttResumeRtry = tNow + pThis->iResumeInterval; - pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */ + +RUNLOG_STR("actionDoRetry():"); + iRetries = 0; + while(pThis->eState == ACT_STATE_RTRY) { + iRet = pThis->pMod->tryResume(pThis->pModData); + if(iRet == RS_RET_OK) { + actionSetState(pThis, ACT_STATE_RDY); +RUNLOG_STR("tryResume succeeded"); + } else if(iRet == RS_RET_SUSPENDED) { +RUNLOG_STR("still suspended");; + /* max retries reached? */ + if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { + actionSuspend(pThis, ttNow); + } else { + ++pThis->iNbrResRtry; + ++iRetries; + iSleepPeriod = pThis->iResumeInterval; + ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */ + srSleep(iSleepPeriod, 0); + } + } else if(iRet == RS_RET_DISABLE_ACTION) { + actionDisable(pThis); + } + } + + if(pThis->eState == ACT_STATE_RDY) { + pThis->iNbrResRtry = 0; + } RETiRet; } /* try to resume an action -- rgerhards, 2007-08-02 - * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the - * action is still suspended. + * changed to new action state engine -- rgerhards, 2009-05-07 */ static rsRetVal actionTryResume(action_t *pThis) { DEFiRet; - time_t ttNow; + time_t ttNow = NO_TIME_PROVIDED; ASSERT(pThis != NULL); - /* for resume handling, we must always obtain a fresh timestamp. We used - * to use the action timestamp, but in this case we will never reach a - * point where a resumption is actually tried, because the action timestamp - * is always in the past. So we can not avoid doing a fresh time() call - * here. -- rgerhards, 2009-03-18 - */ - time(&ttNow); /* cache "now" */ - - /* first check if it is time for a re-try */ - if(ttNow > pThis->ttResumeRtry) { - iRet = pThis->pMod->tryResume(pThis->pModData); - if(iRet == RS_RET_SUSPENDED) { - /* set new tryResume time */ - ++pThis->iNbrResRtry; - /* if we have more than 10 retries, we prolong the - * retry interval. If something is really stalled, it will - * get re-tried only very, very seldom - but that saves - * CPU time. TODO: maybe a config option for that? - * rgerhards, 2007-08-02 - */ - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); +RUNLOG_STR("actionTryResume()"); + if(pThis->eState == ACT_STATE_SUSP) { + /* if we are suspended, we need to check if the timeout expired. + * for this handling, we must always obtain a fresh timestamp. We used + * to use the action timestamp, but in this case we will never reach a + * point where a resumption is actually tried, because the action timestamp + * is always in the past. So we can not avoid doing a fresh time() call + * here. -- rgerhards, 2009-03-18 + */ + time(&ttNow); /* cache "now" */ + if(ttNow > pThis->ttResumeRtry) { + actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ } - } else { - /* it's too early, we are still suspended --> indicate this */ - iRet = RS_RET_SUSPENDED; } - if(iRet == RS_RET_OK) - actionResume(pThis); + if(pThis->eState == ACT_STATE_RTRY) { + if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ + time(&ttNow); + CHKiRet(actionDoRetry(pThis, ttNow)); + } + + DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n", + getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + +finalize_it: + RETiRet; +} + + +/* prepare an action for performing work. This involves trying to recover it, + * depending on its current state. + * rgerhards, 2009-05-07 + */ +static rsRetVal actionPrepare(action_t *pThis) +{ + DEFiRet; + +RUNLOG_STR("actionPrepare()"); + assert(pThis != NULL); + if(pThis->eState == ACT_STATE_RTRY) { + CHKiRet(actionTryResume(pThis)); + } - dbgprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n", - iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + /* if we are now ready, we initialize the transaction and advance + * action state accordingly + */ + if(pThis->eState == ACT_STATE_RDY) { + CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData)); + actionSetState(pThis, ACT_STATE_ITX); + } +finalize_it: RETiRet; } @@ -402,12 +568,11 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("\n\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - dbgprintf("\tSuspended: %d", pThis->bSuspended); - if(pThis->bSuspended) { - dbgprintf(" next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); + if(pThis->eState == ACT_STATE_SUSP) { + dbgprintf("\tresume next retry: %u, number retries: %d", + (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\n"); - dbgprintf("\tDisabled: %d\n", !pThis->bEnabled); + dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); dbgprintf("\n"); @@ -415,24 +580,16 @@ rsRetVal actionDbgPrint(action_t *pThis) } -/* call the DoAction output plugin entry point - * rgerhards, 2008-01-28 +/* prepare the calling parameters for doAction() + * rgerhards, 2009-05-07 */ -#pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal -actionCallDoAction(action_t *pAction, msg_t *pMsg) +static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar ***pppMsgs) { - DEFiRet; - int iRetries; + uchar **ppMsgs = *pppMsgs; int i; - int iArr; - int iSleepPeriod; - int bCallAction; - int iCancelStateSave; - uchar **ppMsgs; /* array of message pointers for doAction */ + DEFiRet; ASSERT(pAction != NULL); - /* create the array for doAction() message pointers */ if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -450,77 +607,230 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) default:assert(0); /* software bug if this happens! */ } } - iRetries = 0; - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - pthread_setcancelstate(iCancelStateSave, NULL); - do { - /* on first invocation, this if should never be true. We just put it at the top - * of the loop so that processing (and code) is simplified. This code is actually - * triggered on the 2nd+ invocation. -- rgerhards, 2008-01-30 - */ - if(iRet == RS_RET_SUSPENDED) { - /* ok, this calls for our retry logic... */ - ++iRetries; - iSleepPeriod = pAction->iResumeInterval; - srSleep(iSleepPeriod, 0); - } - /* first check if we are suspended and, if so, retry */ - if(actionIsSuspended(pAction)) { - iRet = actionTryResume(pAction); - if(iRet == RS_RET_OK) - bCallAction = 1; - else - bCallAction = 0; - } else { - bCallAction = 1; - } - - if(bCallAction) { - /* call configured action */ - iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData); - if(iRet == RS_RET_SUSPENDED) { - dbgprintf("Action requested to be suspended, done that.\n"); - actionSuspend(pAction, getActNow(pAction)); - } - } - } while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */ +finalize_it: + *pppMsgs = ppMsgs; + RETiRet; +} - if(iRet == RS_RET_DISABLE_ACTION) { - dbgprintf("Action requested to be disabled, done that.\n"); - pAction->bEnabled = 0; /* that's it... */ - } - pthread_cleanup_pop(1); /* unlock mutex */ +/* cleanup doAction calling parameters + * rgerhards, 2009-05-07 + */ +static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***pppMsgs) +{ + uchar **ppMsgs = *pppMsgs; + int i; + int iArr; + DEFiRet; -finalize_it: - /* cleanup */ + ASSERT(pAction != NULL); for(i = 0 ; i < pAction->iNumTpls ; ++i) { if(ppMsgs[i] != NULL) { switch(pAction->eParamPassing) { case ACT_ARRAY_PASSING: iArr = 0; while(((char **)ppMsgs[i])[iArr] != NULL) - d_free(((char **)ppMsgs[i])[iArr++]); - d_free(ppMsgs[i]); + free(((char **)ppMsgs[i])[iArr++]); + free(ppMsgs[i]); break; case ACT_STRING_PASSING: - d_free(ppMsgs[i]); + free(ppMsgs[i]); break; default: assert(0); } } } - d_free(ppMsgs); - msgDestruct(&pMsg); /* we are now finished with the message */ + free(ppMsgs); + *pppMsgs = NULL; + + RETiRet; +} + + +/* call the DoAction output plugin entry point + * Performance note: we build the action parameters here in this function. That + * means we do it while we hold the action look, potentially reducing concurrency + * (especially if the action queue is run in DIRECT mode). As an alternative, we + * may generate all params for the batch as whole before aquiring the action. However, + * that requires more memory, for large batches potentially a lot of memory. So for the + * time being, I am doing it here - the performance hit should be very minor and may even + * not be a hit because we may gain CPU cache locality gains with the "fewer memory" + * approach (I'd say that is rater likely). + * rgerhards, 2008-01-28 + */ +rsRetVal +actionCallDoAction(action_t *pThis, msg_t *pMsg) +{ + uchar **ppMsgs; /* array of message pointers for doAction */ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); + CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs)); + + pThis->bHadAutoCommit = 0; + iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + break; + case RS_RET_DEFER_COMMIT: + /* we are done, action state remains the same */ + break; + case RS_RET_PREVIOUS_COMMITTED: + /* action state remains the same, but we had a commit. */ + pThis->bHadAutoCommit = 1; + break; + case RS_RET_SUSPENDED: + actionRetry(pThis); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; + } + iRet = getReturnCode(pThis); + +finalize_it: + cleanupDoActionParams(pThis, &ppMsgs); /* iRet ignored! */ + + RETiRet; +} + + +/* process a message + * this readies the action and then calls doAction() + * rgerhards, 2008-01-28 + */ +rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + +RUNLOG_STR("inside actionProcessMsg()"); + CHKiRet(actionPrepare(pThis)); + if(pThis->eState == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, pMsg)); + + iRet = getReturnCode(pThis); +finalize_it: + RETiRet; +} + + +/* finish processing a batch. Most importantly, that means we commit if we + * need to do so. + * rgerhards, 2008-01-28 + */ +static rsRetVal +finishBatch(action_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->eState == ACT_STATE_RDY) + FINALIZE; /* nothing to do */ + + CHKiRet(actionPrepare(pThis)); + if(pThis->eState == ACT_STATE_ITX) { + iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + break; + case RS_RET_SUSPENDED: + actionRetry(pThis); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + case RS_RET_DEFER_COMMIT: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " + "- ignored\n"); + actionCommitted(pThis); + break; + case RS_RET_PREVIOUS_COMMITTED: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " + "- ignored\n"); + actionCommitted(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; + } + } + iRet = getReturnCode(pThis); + +finalize_it: + RETiRet; +} + + +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +rsRetVal +actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, aUsrp_t *paUsrp) +{ + int i; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(paUsrp != NULL); + + for(i = 0 ; i < paUsrp->nElem ; i++) { + pMsg = (msg_t*) paUsrp->pUsrp[i]; +dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); + localRet = actionProcessMessage(pAction, pMsg); + dbgprintf("action call returned %d\n", localRet); + msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ + CHKiRet(localRet); + } + iRet = finishBatch(pAction); + +finalize_it: + RETiRet; +} +#pragma GCC diagnostic ignored "-Wempty-body" +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +rsRetVal +actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) +{ + int iCancelStateSave; + DEFiRet; + + assert(paUsrp != NULL); + + /* We now must guard the output module against execution by multiple threads. The + * plugin interface specifies that output modules must not be thread-safe (except + * if they notify us they are - functionality not yet implemented...). + * rgerhards, 2008-01-30 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + pthread_setcancelstate(iCancelStateSave, NULL); + + iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, paUsrp); + + pthread_cleanup_pop(1); /* unlock mutex */ RETiRet; } @@ -762,8 +1072,8 @@ actionCallAction(action_t *pAction, msg_t *pMsg) * should check from time to time if affairs have improved. * rgerhards, 2007-07-24 */ - if(pAction->bEnabled == 0) { - ABORT_FINALIZE(RS_RET_OK); + if(pAction->eState == ACT_STATE_DIED) { + ABORT_FINALIZE(RS_RET_DISABLE_ACTION); } pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ @@ -832,6 +1142,7 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); @@ -948,7 +1259,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n"); pAction->f_ReduceRepeated = 0; } - pAction->bEnabled = 1; /* action is enabled */ + pAction->eState = ACT_STATE_RDY; /* action is enabled */ if(bSuspended) actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */ |