diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 1328 |
1 files changed, 1080 insertions, 248 deletions
@@ -4,7 +4,72 @@ * * File begun on 2007-08-06 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH. + * Some notes on processing (this hopefully makes it easier to find + * the right code in question): For performance reasons, this module + * uses different methods of message submission based on the user-selected + * configuration. This code is similar, but can not be abstracted because + * of the performanse-affecting differences in it. As such, it is often + * necessary to triple-check that everything works well in *all* modes. + * The different modes (and calling sequence) are: + * + * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval + * - doSubmitToActionQComplexBatch + * - helperSubmitToActionQComplexBatch + * - doActionCallAction + * handles duplicate message processing, but in essence calls + * - actionWriteToAction + * - qqueueEnqObj + * (now queue engine processing) + * if(pThis->bWriteAllMarkMsgs == FALSE) - this is the DEFAULT + * - doSubmitToActionQNotAllMarkBatch + * - doSubmitToActionQBatch (and from here like in the else case below!) + * else + * - doSubmitToActionQBatch + * - doSubmitToActionQ + * - qqueueEnqObj + * (now queue engine processing) + * + * Note that bWriteAllMakrMsgs on or off creates almost the same processing. + * The difference ist that if WriteAllMarkMsgs is not set, we need to + * preprocess the batch and drop mark messages which are not yet due for + * writing. + * + * After dequeue, processing is as follows: + * - processBatchMain + * - processAction + * - submitBatch + * - tryDoAction + * - ... + * + * MORE ON PROCESSING, QUEUES and FILTERING + * All filtering needs to be done BEFORE messages are enqueued to an + * action. In previous code, part of the filtering was done at the + * "remote end" of the action queue, which lead to problems in + * non-direct mode (because then things run asynchronously). In order + * to solve this problem once and for all, I have changed the code so + * that all filtering is done before enq, and processing on the + * dequeue side of action processing now always executes whatever is + * enqueued. This is the only way to handle things consistently and + * (as much as possible) in a queue-type agnostic way. However, it is + * a rather radical change, which I unfortunately needed to make from + * stable version 5.8.1 to 5.8.2. If new problems pop up, you now know + * what may be their cause. In any case, the way it is done now is the + * only correct one. + * A problem is that, under fortunate conditions, we use the current + * batch for the output system as well. This is very good from a performance + * point of view, but makes the distinction between enq and deq side of + * the queue a bit hard. The current idea is that the filter condition + * alone is checked at the deq side of the queue (seems to be unavoidable + * to do it that way), but all other complex conditons (like failover + * handling) go into the computation of the filter condition. For + * non-direct queues, we still enqueue only what is acutally necessary. + * Note that in this case the rest of the code must ensure that the filter + * is set to "true". While this is not perfect and not as simple as + * we would like to see it, it looks like the best way to tackle that + * beast. + * rgerhards, 2011-06-15 + * + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -42,11 +107,19 @@ #include "cfsysline.h" #include "srUtils.h" #include "errmsg.h" +#include "batch.h" +#include "wti.h" #include "datetime.h" #include "unicode-helper.h" +#include "atomic.h" + +#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg); +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -55,16 +128,19 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(errmsg) +static int iActExecOnceInterval = 0; /* execute action once every nn seconds */ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */ static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */ static int glbliActionResumeInterval = 30; int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */ static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */ +static int bActionWriteAllMarkMsgs = FALSE; /* should all mark messages be unconditionally written? */ static uchar *pszActionName; /* short name for the action */ -/* main message queue and its configuration parameters */ +/* action queue and its configuration parameters */ static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ static int iActionQueueSize = 1000; /* size of the main message queue above */ +static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */ static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ static int iActionQDiscardMark = 9800; /* begin to discard messages */ @@ -122,7 +198,7 @@ getActNow(action_t *pThis) { assert(pThis != NULL); if(pThis->tActNow == -1) { - pThis->tActNow = time(NULL); /* good time call - the only one done */ + pThis->tActNow = datetime.GetTime(NULL); /* good time call - the only one done */ if(pThis->tLastExec > pThis->tActNow) { /* if we are traveling back in time, reset tLastExec */ pThis->tLastExec = (time_t) 0; @@ -146,6 +222,7 @@ actionResetQueueParams(void) ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ iActionQueueSize = 1000; /* size of the main message queue above */ + iActionQueueDeqBatchSize = 16; /* default batch size */ iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ iActionQDiscardMark = 9800; /* begin to discard messages */ @@ -179,7 +256,6 @@ actionResetQueueParams(void) */ rsRetVal actionDestruct(action_t *pThis) { - int i; DEFiRet; ASSERT(pThis != NULL); @@ -198,32 +274,6 @@ rsRetVal actionDestruct(action_t *pThis) d_free(pThis->pszName); d_free(pThis->ppTpl); - /* message ptr cleanup */ - for(i = 0 ; i < pThis->iNumTpls ; ++i) { - if(((uchar**)pThis->ppMsgs)[i] != NULL) { - switch(pThis->eParamPassing) { - case ACT_ARRAY_PASSING: -#if 0 /* later! */ - iArr = 0; - while(((char **)pThis->ppMsgs[i])[iArr] != NULL) { - d_free(((char **)pThis->ppMsgs[i])[iArr++]); - ((char **)pThis->ppMsgs[i])[iArr++] = NULL; - } - d_free(pThis->ppMsgs[i]); - pThis->ppMsgs[i] = NULL; -#endif - break; - case ACT_STRING_PASSING: - d_free(((uchar**)pThis->ppMsgs)[i]); - break; - default: - assert(0); - } - } - } - d_free(pThis->ppMsgs); - d_free(pThis->lenMsgs); - d_free(pThis); RETiRet; @@ -243,8 +293,9 @@ rsRetVal actionConstruct(action_t **ppThis) CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t))); pThis->iResumeInterval = glbliActionResumeInterval; pThis->iResumeRetryCount = glbliActionResumeRetryCount; - pThis->tLastOccur = time(NULL); /* done once per action on startup only */ + pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pthread_mutex_init(&pThis->mutActExec, NULL); + INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); SYNC_OBJ_TOOL_INIT(pThis); /* indicate we have a new action */ @@ -269,6 +320,32 @@ actionConstructFinalize(action_t *pThis) /* find a name for our queue */ snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); + /* now check if we can run the action in "firehose mode" during stage one of + * its processing (that is before messages are enqueued into the action q). + * This is only possible if some features, which require strict sequence, are + * not used. Thankfully, that is usually the case. The benefit of firehose + * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 + */ + if( pThis->iExecEveryNthOccur > 1 + || pThis->f_ReduceRepeated + || pThis->iSecsExecOnceInterval + ) { + DBGPRINTF("info: firehose mode disabled for action because " + "iExecEveryNthOccur=%d, " + "ReduceRepeated=%d, " + "iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, + pThis->iSecsExecOnceInterval + ); + pThis->submitToActQ = doSubmitToActionQComplexBatch; + } else if(pThis->bWriteAllMarkMsgs == FALSE) { + /* nearly full-speed submission mode, default case */ + pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch; + } else { + /* full firehose submission mode */ + pThis->submitToActQ = doSubmitToActionQBatch; + } + /* we need to make a safety check: if the queue is NOT in direct mode, a single * message object may be accessed by multiple threads. As such, we need to enable * msg object thread safety in this case (this costs a bit performance and thus @@ -283,7 +360,8 @@ actionConstructFinalize(action_t *pThis) * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); + CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, + (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -298,6 +376,7 @@ actionConstructFinalize(action_t *pThis) qqueueSetpUsr(pThis->pQueue, pThis); setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize); setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); @@ -334,87 +413,272 @@ finalize_it: } -/* set an action back to active state -- rgerhards, 2007-08-02 + +/* set the global resume interval + */ +rsRetVal actionSetGlobalResumeInterval(int iNewVal) +{ + glbliActionResumeInterval = iNewVal; + return RS_RET_OK; +} + + +/* returns the action state name in human-readable form + * returned string must not be modified. + * rgerhards, 2009-05-07 + */ +static uchar *getActStateName(action_t *pThis) +{ + switch(pThis->eState) { + case ACT_STATE_RDY: + return (uchar*) "rdy"; + case ACT_STATE_ITX: + return (uchar*) "itx"; + case ACT_STATE_RTRY: + return (uchar*) "rtry"; + case ACT_STATE_SUSP: + return (uchar*) "susp"; + case ACT_STATE_DIED: + return (uchar*) "died"; + case ACT_STATE_COMM: + return (uchar*) "comm"; + default: + return (uchar*) "ERROR/UNKNWON"; + } +} + + +/* returns a suitable return code based on action state + * rgerhards, 2009-05-07 */ -static rsRetVal actionResume(action_t *pThis) +static rsRetVal getReturnCode(action_t *pThis) { DEFiRet; ASSERT(pThis != NULL); - pThis->bSuspended = 0; + switch(pThis->eState) { + case ACT_STATE_RDY: + iRet = RS_RET_OK; + break; + case ACT_STATE_ITX: + if(pThis->bHadAutoCommit) { + pThis->bHadAutoCommit = 0; /* auto-reset */ + iRet = RS_RET_PREVIOUS_COMMITTED; + } else { + iRet = RS_RET_DEFER_COMMIT; + } + break; + case ACT_STATE_RTRY: + iRet = RS_RET_SUSPENDED; + break; + case ACT_STATE_SUSP: + case ACT_STATE_DIED: + iRet = RS_RET_ACTION_FAILED; + break; + default: + DBGPRINTF("Invalid action engine state %d, program error\n", + (int) pThis->eState); + iRet = RS_RET_ERR; + break; + } RETiRet; } -/* set the global resume interval +/* set the action to a new state + * rgerhards, 2007-08-02 */ -rsRetVal actionSetGlobalResumeInterval(int iNewVal) +static inline void actionSetState(action_t *pThis, action_state_t newState) { - glbliActionResumeInterval = iNewVal; - return RS_RET_OK; + pThis->eState = newState; + DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); } +/* Handles the transient commit state. So far, this is + * mostly a dummy... + * rgerhards, 2007-08-02 + */ +static void actionCommitted(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RDY); +} + + +/* set action to "rtry" state. + * rgerhards, 2007-08-02 + */ +static void actionRetry(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RTRY); + pThis->iResumeOKinRow++; +} -/* suspend an action -- rgerhards, 2007-08-02 + +/* Disable action, this means it will never again be usable + * until rsyslog is reloaded. Use only as a last resort, but + * depends on output module. + * rgerhards, 2007-08-02 */ -static rsRetVal actionSuspend(action_t *pThis, time_t tNow) +static void actionDisable(action_t *pThis) { + actionSetState(pThis, ACT_STATE_DIED); +} + + +/* Suspend action, this involves changing the acton state as well + * as setting the next retry time. + * if we have more than 10 retries, we prolong the + * retry interval. If something is really stalled, it will + * get re-tried only very, very seldom - but that saves + * CPU time. TODO: maybe a config option for that? + * rgerhards, 2007-08-02 + */ +static inline void actionSuspend(action_t *pThis, time_t ttNow) +{ + if(ttNow == NO_TIME_PROVIDED) + datetime.GetTime(&ttNow); + pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + actionSetState(pThis, ACT_STATE_SUSP); + DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); +} + + +/* actually do retry processing. Note that the function receives a timestamp so + * that we do not need to call the (expensive) time() API. + * Note that we do the full retry processing here, doing the configured number of + * iterations. -- rgerhards, 2009-05-07 + * We need to guard against module which always return RS_RET_OK from their tryResume() + * entry point. This is invalid, but has harsh consequences: it will cause the rsyslog + * engine to go into a tight loop. That obviously is not acceptable. As such, we track the + * count of iterations that a tryResume returning RS_RET_OK is immediately followed by + * an unsuccessful call to doAction(). If that happens more than 1,000 times, we assume + * the return acutally is a RS_RET_SUSPENDED. In order to go through the various + * resumption stages, we do this for every 1000 requests. This magic number 1000 may + * not be the most appropriate, but it should be thought of a "if nothing else helps" + * kind of facility: in the first place, the module should return a proper indication + * of its inability to recover. -- rgerhards, 2010-04-26. + */ +static inline rsRetVal +actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate) +{ + int iRetries; + int iSleepPeriod; + int bTreatOKasSusp; DEFiRet; ASSERT(pThis != NULL); - pThis->bSuspended = 1; - pThis->ttResumeRtry = tNow + pThis->iResumeInterval; - pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */ + iRetries = 0; + while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + iRet = pThis->pMod->tryResume(pThis->pModData); + if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) { + bTreatOKasSusp = 1; + pThis->iResumeOKinRow = 0; + } else { + bTreatOKasSusp = 0; + } + if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { + actionSetState(pThis, ACT_STATE_RDY); + } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { + /* max retries reached? */ + if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { + actionSuspend(pThis, ttNow); + } else { + ++pThis->iNbrResRtry; + ++iRetries; + iSleepPeriod = pThis->iResumeInterval; + ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */ + srSleep(iSleepPeriod, 0); + if(*pbShutdownImmediate) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + } + } else if(iRet == RS_RET_DISABLE_ACTION) { + actionDisable(pThis); + } + } + + if(pThis->eState == ACT_STATE_RDY) { + pThis->iNbrResRtry = 0; + } + +finalize_it: RETiRet; } /* try to resume an action -- rgerhards, 2007-08-02 - * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the - * action is still suspended. + * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis) +static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) { DEFiRet; - time_t ttNow; + time_t ttNow = NO_TIME_PROVIDED; ASSERT(pThis != NULL); - /* for resume handling, we must always obtain a fresh timestamp. We used - * to use the action timestamp, but in this case we will never reach a - * point where a resumption is actually tried, because the action timestamp - * is always in the past. So we can not avoid doing a fresh time() call - * here. -- rgerhards, 2009-03-18 - */ - time(&ttNow); /* cache "now" */ - - /* first check if it is time for a re-try */ - if(ttNow >= pThis->ttResumeRtry) { - iRet = pThis->pMod->tryResume(pThis->pModData); - if(iRet == RS_RET_SUSPENDED) { - /* set new tryResume time */ - ++pThis->iNbrResRtry; - /* if we have more than 10 retries, we prolong the - * retry interval. If something is really stalled, it will - * get re-tried only very, very seldom - but that saves - * CPU time. TODO: maybe a config option for that? - * rgerhards, 2007-08-02 - */ - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + if(pThis->eState == ACT_STATE_SUSP) { + /* if we are suspended, we need to check if the timeout expired. + * for this handling, we must always obtain a fresh timestamp. We used + * to use the action timestamp, but in this case we will never reach a + * point where a resumption is actually tried, because the action timestamp + * is always in the past. So we can not avoid doing a fresh time() call + * here. -- rgerhards, 2009-03-18 + */ + datetime.GetTime(&ttNow); /* cache "now" */ + if(ttNow >= pThis->ttResumeRtry) { + actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ } - } else { - /* it's too early, we are still suspended --> indicate this */ - iRet = RS_RET_SUSPENDED; } - if(iRet == RS_RET_OK) - actionResume(pThis); + if(pThis->eState == ACT_STATE_RTRY) { + if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ + datetime.GetTime(&ttNow); + CHKiRet(actionDoRetry(pThis, ttNow, pbShutdownImmediate)); + } + + if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n", + pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + } + +finalize_it: + RETiRet; +} + + +/* prepare an action for performing work. This involves trying to recover it, + * depending on its current state. + * rgerhards, 2009-05-07 + */ +static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate) +{ + DEFiRet; + + assert(pThis != NULL); + CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); - DBGPRINTF("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n", - iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + /* if we are now ready, we initialize the transaction and advance + * action state accordingly + */ + if(pThis->eState == ACT_STATE_RDY) { + iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionSetState(pThis, ACT_STATE_ITX); + break; + case RS_RET_SUSPENDED: + actionRetry(pThis); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + default:FINALIZE; + } + } +finalize_it: RETiRet; } @@ -425,129 +689,503 @@ static rsRetVal actionTryResume(action_t *pThis) rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; + char *sz; dbgprintf("%s: ", module.GetStateName(pThis->pMod)); pThis->pMod->dbgPrintInstInfo(pThis->pModData); dbgprintf("\n\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - dbgprintf("\tSuspended: %d", pThis->bSuspended); - if(pThis->bSuspended) { - dbgprintf(" next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); + if(pThis->eState == ACT_STATE_SUSP) { + dbgprintf("\tresume next retry: %u, number retries: %d", + (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\n"); - dbgprintf("\tDisabled: %d\n", !pThis->bEnabled); + dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); + if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { + sz = "slow, but feature-rich"; + } else if(pThis->submitToActQ == doSubmitToActionQNotAllMarkBatch) { + sz = "fast, but supports partial mark messages"; + } else if(pThis->submitToActQ == doSubmitToActionQBatch) { + sz = "firehose (fastest)"; + } else { + sz = "unknown (need to update debug display?)"; + } + dbgprintf("\tsubmission mode: %s\n", sz); dbgprintf("\n"); RETiRet; } -/* call the DoAction output plugin entry point - * rgerhards, 2008-01-28 +/* prepare the calling parameters for doAction() + * rgerhards, 2009-05-07 */ -#pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal -actionCallDoAction(action_t *pAction, msg_t *pMsg) +static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem) { - DEFiRet; - int iRetries; int i; - int iArr; - int iSleepPeriod; - int bCallAction; - int iCancelStateSave; + msg_t *pMsg; + DEFiRet; ASSERT(pAction != NULL); + ASSERT(pElem != NULL); - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - pthread_setcancelstate(iCancelStateSave, NULL); - + pMsg = (msg_t*) pElem->pUsrp; /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i]))); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), + &pElem->staticLenStrings[i])); + pElem->staticActParams[i] = pElem->staticActStrings[i]; break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(((uchar**)pAction->ppMsgs)[i]))); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]))); + break; + case ACT_MSG_PASSING: + pElem->staticActParams[i] = (void*) pMsg; + break; + default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", + (int) pAction->eParamPassing); + assert(0); /* software bug if this happens! */ break; - default:assert(0); /* software bug if this happens! */ } } - iRetries = 0; - do { - /* on first invocation, this if should never be true. We just put it at the top - * of the loop so that processing (and code) is simplified. This code is actually - * triggered on the 2nd+ invocation. -- rgerhards, 2008-01-30 - */ - if(iRet == RS_RET_SUSPENDED) { - /* ok, this calls for our retry logic... */ - ++iRetries; - iSleepPeriod = pAction->iResumeInterval; - srSleep(iSleepPeriod, 0); - } - /* first check if we are suspended and, if so, retry */ - if(actionIsSuspended(pAction)) { - iRet = actionTryResume(pAction); - if(iRet == RS_RET_OK) - bCallAction = 1; - else - bCallAction = 0; - } else { - bCallAction = 1; - } +finalize_it: + RETiRet; +} + + +/* free a batches ressources, but not string buffers (because they will + * most probably be reused). String buffers are only deleted upon final + * destruction of the batch. + * This function here must be called only when the batch is actually no + * longer used, also not for retrying actions or such. It invalidates + * buffers. + * rgerhards, 2010-12-17 + */ +static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) +{ + int jArr; + int i, j; + batch_obj_t *pElem; + uchar ***ppMsgs; + DEFiRet; + + ASSERT(pAction != NULL); - if(bCallAction) { - /* call configured action */ - iRet = pAction->pMod->mod.om.doAction(pAction->ppMsgs, pMsg->msgFlags, pAction->pModData); - if(iRet == RS_RET_SUSPENDED) { - DBGPRINTF("Action requested to be suspended, done that.\n"); - actionSuspend(pAction, getActNow(pAction)); + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pElem = &(pBatch->pElem[i]); + if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pElem->staticActParams; + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + if(((uchar**)ppMsgs)[j] != NULL) { + jArr = 0; + while(ppMsgs[j][jArr] != NULL) { + d_free(ppMsgs[j][jArr++]); + ppMsgs[j][jArr++] = NULL; + } + d_free(((uchar**)ppMsgs)[j]); + ((uchar**)ppMsgs)[j] = NULL; + } + } + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* nothing to do in that case */ + /* TODO ... and yet we do something ;) This is considered not + * really needed, but I was not bold enough to remove that while + * fixing the stable. It should be removed in a devel version + * soon (I really don't see a reason why we would need it). + * rgerhards, 2010-12-16 + */ + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + ((uchar**)pElem->staticActParams)[j] = NULL; + } + break; } } + } - } while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */ + RETiRet; +} - if(iRet == RS_RET_DISABLE_ACTION) { - DBGPRINTF("Action requested to be disabled, done that.\n"); - pAction->bEnabled = 0; /* that's it... */ + +/* call the DoAction output plugin entry point + * rgerhards, 2008-01-28 + */ +rsRetVal +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); + + pThis->bHadAutoCommit = 0; + iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + break; + case RS_RET_DEFER_COMMIT: + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + /* we are done, action state remains the same */ + break; + case RS_RET_PREVIOUS_COMMITTED: + /* action state remains the same, but we had a commit. */ + pThis->bHadAutoCommit = 1; + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + break; + case RS_RET_SUSPENDED: + actionRetry(pThis); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; } + iRet = getReturnCode(pThis); finalize_it: - /* cleanup */ - for(i = 0 ; i < pAction->iNumTpls ; ++i) { - if(((uchar**)pAction->ppMsgs)[i] != NULL) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - iArr = 0; - while((((uchar***)pAction->ppMsgs)[i][iArr]) != NULL) { - d_free(((uchar ***)pAction->ppMsgs)[i][iArr++]); - ((uchar ***)pAction->ppMsgs)[i][iArr++] = NULL; + RETiRet; +} + + +/* process a message + * this readies the action and then calls doAction() + * rgerhards, 2008-01-28 + */ +static inline rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + + CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + if(pThis->eState == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + + iRet = getReturnCode(pThis); +finalize_it: + RETiRet; +} + + +/* finish processing a batch. Most importantly, that means we commit if we + * need to do so. + * rgerhards, 2008-01-28 + */ +static rsRetVal +finishBatch(action_t *pThis, batch_t *pBatch) +{ + int i; + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->eState == ACT_STATE_RDY) { + /* we just need to flag the batch as commited */ + FINALIZE; /* nothing to do */ + } + + CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); + if(pThis->eState == ACT_STATE_ITX) { + iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + /* flag messages as committed */ + for(i = 0 ; i < pBatch->nElem ; ++i) { + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ } - d_free(((uchar**)pAction->ppMsgs)[i]); - ((uchar**)pAction->ppMsgs)[i] = NULL; break; - case ACT_STRING_PASSING: + case RS_RET_SUSPENDED: + actionRetry(pThis); break; - default: - assert(0); + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + case RS_RET_DEFER_COMMIT: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " + "- ignored\n"); + actionCommitted(pThis); + break; + case RS_RET_PREVIOUS_COMMITTED: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " + "- ignored\n"); + actionCommitted(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; + } + } + iRet = getReturnCode(pThis); + +finalize_it: + RETiRet; +} + + +/* try to submit a partial batch of elements. + * rgerhards, 2009-05-12 + */ +static inline rsRetVal +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +{ + int i; + int iElemProcessed; + int iCommittedUpTo; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + assert(pnElem != NULL); + + i = pBatch->iDoneUpTo; /* all messages below that index are processed */ + iElemProcessed = 0; + iCommittedUpTo = i; +dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); + while(iElemProcessed <= *pnElem && i < pBatch->nElem) { + if(*(pBatch->pbShutdownImmediate)) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + /* NOTE: do NOT extend the filter below! Anything else must be done on the + * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 + */ + if( pBatch->pElem[i].bFilterOK + && pBatch->pElem[i].state != BATCH_STATE_DISC) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, + pBatch->pbShutdownImmediate); + DBGPRINTF("action %p call returned %d\n", pAction, localRet); + /* Note: we directly modify the batch object state, because we know that + * wo do not overwrite BATCH_STATE_DISC indicators! + */ + if(localRet == RS_RET_OK) { + /* mark messages as committed */ + while(iCommittedUpTo <= i) { + pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ + batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); + ++iCommittedUpTo; + //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + /* mark messages as committed */ + while(iCommittedUpTo < i) { + pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ + batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); + ++iCommittedUpTo; + //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else if(localRet == RS_RET_DEFER_COMMIT) { + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else if(localRet == RS_RET_DISCARDMSG) { + pBatch->pElem[i].state = BATCH_STATE_DISC; + } else { + dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", + localRet, *pnElem, iCommittedUpTo); + iRet = localRet; + FINALIZE; + } + } + ++i; + ++iElemProcessed; + } + +finalize_it: + if(pBatch->iDoneUpTo != iCommittedUpTo) { + pBatch->iDoneUpTo = iCommittedUpTo; + } + RETiRet; +} + +/* debug aid */ +static void displayBatchState(batch_t *pBatch) +{ + int i; + for(i = 0 ; i < pBatch->nElem ; ++i) { + dbgprintf("XXXXX: displayBatchState2 %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + } +} + + +/* submit a batch for actual action processing. + * The first nElem elements are processed. This function calls itself + * recursively if it needs to handle errors. + * Note: we don't need the number of the first message to be processed as a parameter, + * because this is kept track of inside the batch itself (iDoneUpTo). + * rgerhards, 2009-05-12 + */ +static rsRetVal +submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +{ + int i; + int bDone; + rsRetVal localRet; + int wasDoneTo; + DEFiRet; + + assert(pBatch != NULL); + + wasDoneTo = pBatch->iDoneUpTo; + bDone = 0; + do { + localRet = tryDoAction(pAction, pBatch, &nElem); + if(localRet == RS_RET_FORCE_TERM) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + /* try commit transaction, once done, we can simply do so as if + * that return state was returned from tryDoAction(). + */ + localRet = finishBatch(pAction, pBatch); + } + + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + bDone = 1; + } else if(localRet == RS_RET_SUSPENDED) { + ; /* do nothing, this will retry the full batch */ + } else if(localRet == RS_RET_ACTION_FAILED) { + /* in this case, everything not yet committed is BAD */ + for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { + if( pBatch->pElem[i].state != BATCH_STATE_DISC + && pBatch->pElem[i].state != BATCH_STATE_COMM ) { + pBatch->pElem[i].state = BATCH_STATE_BAD; + pBatch->pElem[i].bPrevWasSuspended = 1; + } + } + bDone = 1; + } else { + if(nElem == 1) { + batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); + bDone = 1; + } else { + /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ + DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " + "for iRet %d\n", localRet); + submitBatch(pAction, pBatch, nElem / 2); + submitBatch(pAction, pBatch, nElem - (nElem / 2)); + bDone = 1; } } + } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ + + if(*(pBatch->pbShutdownImmediate)) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + +finalize_it: + RETiRet; +} + + + +/* The following function prepares a batch for processing, that it is + * reinitializes batch states, generates strings and does everything else + * that needs to be done in order to make the batch ready for submission to + * the actual output module. Note that we look at the precomputed + * filter OK condition and process only those messages, that actually matched + * the filter. + * rgerhards, 2010-06-14 + */ +static inline rsRetVal +prepareBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + batch_obj_t *pElem; + DEFiRet; + + pBatch->iDoneUpTo = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pElem = &(pBatch->pElem[i]); + if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + pElem->state = BATCH_STATE_RDY; + if(prepareDoActionParams(pAction, pElem) != RS_RET_OK) + pElem->bFilterOK = FALSE; + } } + RETiRet; +} + + +/* receive a batch and process it. This includes retry handling. + * rgerhards, 2009-05-12 + */ +static inline rsRetVal +processAction(action_t *pAction, batch_t *pBatch) +{ + DEFiRet; + + assert(pBatch != NULL); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); + iRet = finishBatch(pAction, pBatch); + +finalize_it: + RETiRet; +} + + +#pragma GCC diagnostic ignored "-Wempty-body" +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +static rsRetVal +processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +{ + int *pbShutdownImmdtSave; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + + pbShutdownImmdtSave = pBatch->pbShutdownImmediate; + pBatch->pbShutdownImmediate = pbShutdownImmediate; + CHKiRet(prepareBatch(pAction, pBatch)); + + /* We now must guard the output module against execution by multiple threads. The + * plugin interface specifies that output modules must not be thread-safe (except + * if they notify us they are - functionality not yet implemented...). + * rgerhards, 2008-01-30 + */ + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + + iRet = processAction(pAction, pBatch); pthread_cleanup_pop(1); /* unlock mutex */ - msgDestruct(&pMsg); /* we are now finished with the message */ + /* even if processAction failed, we need to release the batch (else we + * have a memory leak). So we do this first, and then check if we need to + * return an error code. If so, the code from processAction has priority. + * rgerhards, 2010-12-17 + */ + localRet = releaseBatch(pAction, pBatch); + + if(iRet == RS_RET_OK) + iRet = localRet; + +finalize_it: + pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -563,7 +1201,6 @@ rsRetVal actionCallHUPHdlr(action_t *pAction) { DEFiRet; - int iCancelStateSave; ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); @@ -572,10 +1209,8 @@ actionCallHUPHdlr(action_t *pAction) FINALIZE; /* no HUP handler, so we are done ;) */ } - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - pthread_setcancelstate(iCancelStateSave, NULL); CHKiRet(pAction->pMod->doHUP(pAction->pModData)); pthread_cleanup_pop(1); /* unlock mutex */ @@ -615,20 +1250,32 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } -/* rgerhards 2004-11-09: fprintlog() is the actual driver for - * the output channel. It receives the channel description (f) as - * well as the message and outputs them according to the channel - * semantics. The message is typically already contained in the - * channel save buffer (f->f_prevline). This is not only the case - * when a message was already repeated but also when a new message - * arrived. - * rgerhards 2007-08-01: interface changed to use action_t - * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE - * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do - * not do this here. Failing to do so results in all kinds of - * "interesting" problems! - * RGERHARDS, 2008-01-29: - * This is now the action caller and has been renamed. +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. This is also utilized to submit messages in complex case once + * the complex logic has been applied ;) + * rgerhards, 2010-06-08 + */ +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + +/* This function builds up a batch of messages to be (later) + * submitted to the action queue. + * Important: this function MUST not be called with messages that are to + * be discarded due to their "prevWasSuspended" state. It will not check for + * this and submit all messages to the queue for execution. So these must + * be filtered out before calling us (what is done currently!). */ rsRetVal actionWriteToAction(action_t *pAction) @@ -702,7 +1349,7 @@ actionWriteToAction(action_t *pAction) pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */ } - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); /* now check if we need to drop the message because otherwise the action would be too * frequently called. -- rgerhards, 2008-04-08 @@ -719,14 +1366,15 @@ actionWriteToAction(action_t *pAction) FINALIZE; } - /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) + * rgerhards, 2008-09-17 */ pAction->tLastExec = getActNow(pAction); /* re-init time flags */ pAction->f_time = pAction->f_pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); + iRet = doSubmitToActionQ(pAction, pAction->f_pMsg); if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ @@ -753,28 +1401,18 @@ finalize_it: /* helper to actonCallAction, mostly needed because of this damn * pthread_cleanup_push() POSIX macro... */ -static rsRetVal -doActionCallAction(action_t *pAction, msg_t *pMsg) +static inline rsRetVal +doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) { + msg_t *pMsg; DEFiRet; - /* first, we need to check if this is a disabled - * entry. If so, we must not further process it. - * rgerhards 2005-09-26 - * In the future, disabled modules may be re-probed from time - * to time. They are in a perfectly legal state, except that the - * doAction method indicated that it wanted to be disabled - but - * we do not consider this is a solution for eternity... So we - * should check from time to time if affairs have improved. - * rgerhards, 2007-07-24 - */ - if(pAction->bEnabled == 0) { - ABORT_FINALIZE(RS_RET_OK); - } + pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp); pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ - if((pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { + if(pAction->bWriteAllMarkMsgs == FALSE + && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { ABORT_FINALIZE(RS_RET_OK); } @@ -817,79 +1455,231 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) } finalize_it: + /* we need to update the batch to handle failover processing correctly */ + if(iRet == RS_RET_OK) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 0; + } else if(iRet == RS_RET_ACTION_FAILED) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 1; + } + RETiRet; } -/* call the configured action. Does all necessary housekeeping. - * rgerhards, 2007-08-01 - * FYI: currently, this function is only called from the queue - * consumer. So we (conceptually) run detached from the input - * threads (which also means we may run much later than when the - * message was generated). + +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. Here, we just modify the filter condition to be false when + * a mark message must not be written. However, in this case we must save the previous + * filter as we may need it in the next action (potential future optimization: check if this is + * the last action TODO). + * rgerhards, 2010-06-08 */ -#pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal -actionCallAction(action_t *pAction, msg_t *pMsg) +static rsRetVal +doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) { + time_t now = 0; + time_t lastAct; + int i; + int bModifiedFilter; + sbool FilterSave[1024]; + sbool *pFilterSave; DEFiRet; - int iCancelStateSave; - ISOBJ_TYPE_assert(pMsg, msg); - ASSERT(pAction != NULL); + if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) { + pFilterSave = FilterSave; + } else { + CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); + } - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - LockObj(pAction); - pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - pthread_setcancelstate(iCancelStateSave, NULL); - iRet = doActionCallAction(pAction, pMsg); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - UnlockObj(pAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - pthread_setcancelstate(iCancelStateSave, NULL); + bModifiedFilter = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + if(!pBatch->pElem[i].bFilterOK) + continue; + pFilterSave[i] = pBatch->pElem[i].bFilterOK; + if(now == 0) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + } + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) { + if((now - lastAct) < MarkInterval / 2) { + pBatch->pElem[i].bFilterOK = 0; + bModifiedFilter = 1; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ + } + } + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0); + if(pBatch->pElem[i].bFilterOK) { + DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", + i, module.GetStateName(pAction->pMod)); + } + } + + iRet = doSubmitToActionQBatch(pAction, pBatch); + + if(bModifiedFilter) { + /* in this case, we need to restore previous state */ + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + /* note: clang static code analyzer reports a false positive below */ + pBatch->pElem[i].bFilterOK = pFilterSave[i]; + } + } + +finalize_it: + if(pFilterSave != FilterSave) + free(pFilterSave); RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" -/* add our cfsysline handlers - * rgerhards, 2008-01-28 +/* enqueue a batch in direct mode. We have put this into its own function just to avoid + * cluttering the actual submit function. + * rgerhards, 2011-06-16 */ -rsRetVal -actionAddCfSysLineHdrl(void) +static inline rsRetVal +doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) { + sbool FilterSave[1024]; + sbool *pFilterSave; + sbool bNeedSubmit; + sbool bModifiedFilter; + int i; DEFiRet; - CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); - + if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) { + pFilterSave = FilterSave; + } else { + CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); + } + + /* note: for direct mode, we need to adjust the filter property. For non-direct + * this is not necessary, because in that case we enqueue only what actually needs + * to be processed. + */ + if(pAction->bExecWhenPrevSusp) { + bNeedSubmit = 0; + bModifiedFilter = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pFilterSave[i] = pBatch->pElem[i].bFilterOK; + if(!pBatch->pElem[i].bPrevWasSuspended) { + DBGPRINTF("action enq stage: change bFilterOK to 0 due to " + "failover case in elem %d\n", i); + pBatch->pElem[i].bFilterOK = 0; + bModifiedFilter = 1; + } + if(pBatch->pElem[i].bFilterOK) + bNeedSubmit = 1; + DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); + } + if(bNeedSubmit) { + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + } else { + DBGPRINTF("no need to submit batch, all bFilterOK==0\n"); + } + if(bModifiedFilter) { + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); + /* note: clang static code analyzer reports a false positive below */ + pBatch->pElem[i].bFilterOK = pFilterSave[i]; + } + } + } else { + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + } + finalize_it: RETiRet; } +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { + iRet = doQueueEnqObjDirectBatch(pAction, pBatch); + } else {/* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); + if( pBatch->pElem[i].bFilterOK + && pBatch->pElem[i].state != BATCH_STATE_DISC + && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { + doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + } + } + } + + RETiRet; +} + + + +/* Helper to submit a batch of actions to the engine. Note that we have rather + * complicated processing here, so we need to do this one message after another. + * rgerhards, 2010-06-23 + */ +static inline rsRetVal +helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); + if( pBatch->pElem[i].bFilterOK + && pBatch->pElem[i].state != BATCH_STATE_DISC + && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { + doActionCallAction(pAction, pBatch, i); + } + } + + RETiRet; +} + +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 + */ +#pragma GCC diagnostic ignored "-Wempty-body" +static rsRetVal +doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +{ + DEFiRet; + + LockObj(pAction); + pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); + UnlockObj(pAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ + + RETiRet; +} +#pragma GCC diagnostic warning "-Wempty-body" /* add an Action to the current selector * The pOMSR is freed, as it is not needed after this function. @@ -916,6 +1706,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->pModData = pModData; pAction->pszName = pszActionName; pszActionName = NULL; /* free again! */ + pAction->bWriteAllMarkMsgs = bActionWriteAllMarkMsgs; + bActionWriteAllMarkMsgs = FALSE; /* reset */ pAction->bExecWhenPrevSusp = bActExecWhenPrevSusp; pAction->iSecsExecOnceInterval = iActExecOnceInterval; pAction->iExecEveryNthOccur = iActExecEveryNthOccur; @@ -934,15 +1726,12 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques if(pAction->iNumTpls > 0) { /* we first need to create the template pointer array */ CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *))); - CHKmalloc(pAction->ppMsgs = (uchar**) calloc(pAction->iNumTpls, sizeof(uchar *))); - CHKmalloc(pAction->lenMsgs = (size_t*) calloc(pAction->iNumTpls, sizeof(size_t))); } for(i = 0 ; i < pAction->iNumTpls ; ++i) { CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts)); - /* Ok, we got everything, so it now is time to look up the - * template (Hint: templates MUST be defined before they are - * used!) + /* Ok, we got everything, so it now is time to look up the template + * (Hint: templates MUST be defined before they are used!) */ if((pAction->ppTpl[i] = tplFind((char*)pTplName, strlen((char*)pTplName))) == NULL) { snprintf(errMsg, sizeof(errMsg) / sizeof(char), @@ -964,6 +1753,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques /* set parameter-passing mode */ if(iTplOpts & OMSR_TPL_AS_ARRAY) { pAction->eParamPassing = ACT_ARRAY_PASSING; + } else if(iTplOpts & OMSR_TPL_AS_MSG) { + pAction->eParamPassing = ACT_MSG_PASSING; } else { pAction->eParamPassing = ACT_STRING_PASSING; } @@ -980,10 +1771,10 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n"); pAction->f_ReduceRepeated = 0; } - pAction->bEnabled = 1; /* action is enabled */ + pAction->eState = ACT_STATE_RDY; /* action is enabled */ if(bSuspended) - actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */ + actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */ CHKiRet(actionConstructFinalize(pAction)); @@ -1005,6 +1796,17 @@ finalize_it: } +/* Reset config variables to default values. + * rgerhards, 2009-11-12 + */ +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + iActExecOnceInterval = 0; + return RS_RET_OK; +} + + /* TODO: we are not yet a real object, the ClassInit here just looks like it is.. */ rsRetVal actionClassInit(void) @@ -1016,6 +1818,36 @@ rsRetVal actionClassInit(void) CHKiRet(objUse(module, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &bActionWriteAllMarkMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &iActExecOnceInterval, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); + finalize_it: RETiRet; } |