summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c101
1 files changed, 89 insertions, 12 deletions
diff --git a/action.c b/action.c
index 0fac6cf8..b055ebf4 100644
--- a/action.c
+++ b/action.c
@@ -280,6 +280,29 @@ actionConstructFinalize(action_t *pThis)
/* find a name for our queue */
snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ /* now check if we can run the action in "firehose mode" during stage one of
+ * its processing (that is before messages are enqueued into the action q).
+ * This is only possible if some features, which require strict sequence, are
+ * not used. Thankfully, that is usually the case. The benefit of firehose
+ * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
+ */
+ /* how about bWriteAllMarkMsgs??? */
+ if( pThis->iExecEveryNthOccur > 1
+ || pThis->f_ReduceRepeated
+ || pThis->iSecsExecOnceInterval
+ ) {
+ DBGPRINTF("info: firehose mode disabled for action because "
+ "iExecEveryNthOccur=%d, "
+ "ReduceRepeated=%d, "
+ "iSecsExecOnceInterval=%d\n",
+ pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
+ pThis->iSecsExecOnceInterval
+ );
+ pThis->bSubmitFirehoseMode = 0;
+ } else {
+ pThis->bSubmitFirehoseMode = 1;
+ }
+
/* we need to make a safety check: if the queue is NOT in direct mode, a single
* message object may be accessed by multiple threads. As such, we need to enable
* msg object thread safety in this case (this costs a bit performance and thus
@@ -633,6 +656,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
+ dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode);
dbgprintf("\n");
RETiRet;
@@ -642,7 +666,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
-static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
+static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs)
{
int i;
DEFiRet;
@@ -653,7 +677,8 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
+ //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i]));
break;
case ACT_ARRAY_PASSING:
CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(((uchar**)pAction->ppMsgs)[i])));
@@ -722,16 +747,32 @@ static rsRetVal cleanupDoActionParams(action_t *pAction)
rsRetVal
actionCallDoAction(action_t *pThis, msg_t *pMsg)
{
+ uchar *ppMsgs[10];
+ size_t lenMsgs[10];
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
+ for(i = 0 ; i < 10 ; ++ i) {
+ ppMsgs[i] = NULL;
+ lenMsgs[i] = 0;
+ }
+
DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
- CHKiRet(prepareDoActionParams(pThis, pMsg));
+ CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs));
pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+#if 1
+d_pthread_mutex_lock(&pThis->mutActExec);
+pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec);
+ iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+pthread_cleanup_pop(1); /* unlock mutex */
+ //iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+#else
+iRet = RS_RET_OK;
+#endif
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -762,6 +803,9 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg)
finalize_it:
cleanupDoActionParams(pThis); /* iRet ignored! */
+for(i = 0 ; i < 10 ; ++i)
+ free(ppMsgs[i]);
+
RETiRet;
}
@@ -778,7 +822,6 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg)
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
-RUNLOG_STR("inside actionProcessMsg()");
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, pMsg));
@@ -1008,12 +1051,12 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
* if they notify us they are - functionality not yet implemented...).
* rgerhards, 2008-01-30
*/
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+// d_pthread_mutex_lock(&pAction->mutActExec);
+// pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
iRet = processAction(pAction, pBatch, pbShutdownImmediate);
- pthread_cleanup_pop(1); /* unlock mutex */
+// pthread_cleanup_pop(1); /* unlock mutex */
RETiRet;
}
@@ -1274,6 +1317,40 @@ finalize_it:
}
+
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed.
+ * rgerhards, 2010-06-08
+ */
+static inline rsRetVal
+doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+
+#if 0 // TODO: we need to care about this -- after PoC 2010-06-08
+ /* don't output marks to recently written outputs */
+ if(pAction->bWriteAllMarkMsgs == FALSE
+ && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+#endif
+
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+
+#if 0 // we would need this for bWriteAllMarkMsgs
+ /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
+ pAction->tLastExec = getActNow(pAction); /* re-init time flags */
+ pAction->f_time = pAction->f_pMsg->ttGenTime;
+
+#endif
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+ RETiRet;
+}
+
+
+
/* call the configured action. Does all necessary housekeeping.
* rgerhards, 2007-08-01
* FYI: currently, this function is only called from the queue
@@ -1293,15 +1370,15 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
/* We need to lock the mutex only for repeated line processing.
* rgerhards, 2009-06-19
*/
- //if(pAction->f_ReduceRepeated == 1) {
+ if(pAction->bSubmitFirehoseMode == 1) {
+ iRet = doSubmitToActionQ(pAction, pMsg);
+ } else {
LockObj(pAction);
pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
iRet = doActionCallAction(pAction, pMsg);
UnlockObj(pAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- //} else {
- //iRet = doActionCallAction(pAction, pMsg);
- //}
+ }
RETiRet;
}