summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c186
1 files changed, 140 insertions, 46 deletions
diff --git a/action.c b/action.c
index dc6cdaaf..c8117b78 100644
--- a/action.c
+++ b/action.c
@@ -585,7 +585,7 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static rsRetVal actionPrepare(action_t *pThis)
+static inline rsRetVal actionPrepare(action_t *pThis)
{
DEFiRet;
@@ -665,7 +665,6 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **pp
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i]));
break;
case ACT_ARRAY_PASSING:
@@ -726,29 +725,23 @@ static rsRetVal cleanupDoActionParams(action_t *pAction, uchar **ppMsgs)
* rgerhards, 2008-01-28
*/
rsRetVal
-actionCallDoAction(action_t *pThis, msg_t *pMsg)
+actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
{
- uchar *ppMsgs[10];
- size_t lenMsgs[10];
int i;
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
- for(i = 0 ; i < 10 ; ++ i) {
- ppMsgs[i] = NULL;
- lenMsgs[i] = 0;
- }
-
DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
- CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs));
+ //CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs));
pThis->bHadAutoCommit = 0;
#if 1
//d_pthread_mutex_lock(&pThis->mutActExec);
//pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec);
- iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+ // original: iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+ iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData);
//pthread_cleanup_pop(1); /* unlock mutex */
//iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
#else
@@ -783,6 +776,7 @@ iRet = RS_RET_OK;
finalize_it:
+#if 0 // THIS NEEDS TO BE DONE TO THE BATCH!
switch(pThis->eParamPassing) {
case ACT_STRING_PASSING:
for(i = 0 ; i < 10 ; ++i)
@@ -795,6 +789,7 @@ finalize_it:
/* nothing to do in that case */
break;
}
+#endif
RETiRet;
@@ -805,8 +800,8 @@ finalize_it:
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
-rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg)
+static inline rsRetVal
+actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams)
{
DEFiRet;
@@ -815,7 +810,7 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg)
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX)
- CHKiRet(actionCallDoAction(pThis, pMsg));
+ CHKiRet(actionCallDoAction(pThis, pMsg, actParams));
iRet = getReturnCode(pThis);
finalize_it:
@@ -835,8 +830,11 @@ finishBatch(action_t *pThis, batch_t *pBatch)
ASSERT(pThis != NULL);
- if(pThis->eState == ACT_STATE_RDY)
+dbgprintf("ZZZ: finishBatch called, eState %d\n", pThis->eState);
+ if(pThis->eState == ACT_STATE_RDY) {
+ /* we just need to flag the batch as commited */
FINALIZE; /* nothing to do */
+ }
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX) {
@@ -846,7 +844,8 @@ finishBatch(action_t *pThis, batch_t *pBatch)
actionCommitted(pThis);
/* flag messages as committed */
for(i = 0 ; i < pBatch->nElem ; ++i) {
- pBatch->pElem[i].state = BATCH_STATE_COMM;
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+dbgprintf("ZZZ: finishBatch commits element %d\n", i);
}
break;
case RS_RET_SUSPENDED:
@@ -881,8 +880,8 @@ finalize_it:
/* try to submit a partial batch of elements.
* rgerhards, 2009-05-12
*/
-static rsRetVal
-tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImmediate)
+static inline rsRetVal
+tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
{
int i;
int iElemProcessed;
@@ -894,17 +893,22 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme
assert(pBatch != NULL);
assert(pnElem != NULL);
+dbgprintf("ZZZ1: tryDoAction, nElem %d, iDoneUpto %d\n", *pnElem, pBatch->iDoneUpTo);
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
- pAction->pbShutdownImmediate = pbShutdownImmediate;
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
- if(*pbShutdownImmediate)
+ if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
- if(pBatch->pElem[i].state != BATCH_STATE_DISC) {
- localRet = actionProcessMessage(pAction, pMsg);
+dbgprintf("ZZZ1: tryDoAction loop %d: filter %d, state %d\n", i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state);
+ if(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC) {
+dbgprintf("ZZZ1: trying to execute\n");
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams);
DBGPRINTF("action call returned %d\n", localRet);
+ /* Note: we directly modify the batch object state, because we know that
+ * wo do not overwrite DISC indicators!
+ */
if(localRet == RS_RET_OK) {
/* mark messages as committed */
while(iCommittedUpTo <= i) {
@@ -931,9 +935,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme
}
finalize_it:
- if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) {
- iRet = RS_RET_DISCARDMSG;
- } else if(pBatch->iDoneUpTo != iCommittedUpTo) {
+ if(pBatch->iDoneUpTo != iCommittedUpTo) {
*pnElem += iCommittedUpTo - pBatch->iDoneUpTo;
pBatch->iDoneUpTo = iCommittedUpTo;
}
@@ -944,21 +946,24 @@ finalize_it:
/* submit a batch for actual action processing.
* The first nElem elements are processed. This function calls itself
* recursively if it needs to handle errors.
+ * Note: we don't need the number of the first message to be processed as a parameter,
+ * because this is kept track of inside the batch itself (iDoneUpTo).
* rgerhards, 2009-05-12
*/
static rsRetVal
-submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmediate)
+submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
{
int i;
int bDone;
rsRetVal localRet;
DEFiRet;
+dbgprintf("ZZZ1: submitBatch, nElem %d\n", nElem);
assert(pBatch != NULL);
bDone = 0;
do {
- localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate);
+ localRet = tryDoAction(pAction, pBatch, &nElem);
if(localRet == RS_RET_FORCE_TERM) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
@@ -968,38 +973,36 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi
/* try commit transaction, once done, we can simply do so as if
* that return state was returned from tryDoAction().
*/
- localRet = finishBatch(pAction, pBatch); // TODO: careful, do we need the elem counter?
+ localRet = finishBatch(pAction, pBatch);
}
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
bDone = 1;
- } else if(localRet == RS_RET_DISCARDMSG) {
- iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */
- bDone = 1;
} else if(localRet == RS_RET_SUSPENDED) {
; /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
/* in this case, the whole batch can not be processed */
for(i = 0 ; i < nElem ; ++i) {
- pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD;
+dbgprintf("ZZZ2: setting batch state for item %d\n", i);
+ batchSetElemState(pBatch, i, BATCH_STATE_BAD);
}
bDone = 1;
} else {
if(nElem == 1) {
- pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD;
+ batchSetElemState(pBatch, i, BATCH_STATE_BAD);
bDone = 1;
} else {
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
- submitBatch(pAction, pBatch, nElem / 2, pbShutdownImmediate);
- submitBatch(pAction, pBatch, nElem - (nElem / 2), pbShutdownImmediate);
+ submitBatch(pAction, pBatch, nElem / 2);
+ submitBatch(pAction, pBatch, nElem - (nElem / 2));
bDone = 1;
}
}
- } while(!bDone && !*pbShutdownImmediate); /* do .. while()! */
+ } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */
- if(*pbShutdownImmediate)
+ if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
finalize_it:
@@ -1007,17 +1010,46 @@ finalize_it:
}
+
+/* The following function prepares a batch for processing, that it is
+ * reinitializes batch states, generates strings and does everything else
+ * that needs to be done in order to make the batch ready for submission to
+ * the actual output module. Note that we look at the precomputed
+ * filter OK condition and process only those messages, that actually matched
+ * the filter.
+ * rgerhards, 2010-06-14
+ */
+static inline rsRetVal
+prepareBatch(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ batch_obj_t *pElem;
+ DEFiRet;
+
+ pBatch->iDoneUpTo = 0;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ pElem = &(pBatch->pElem[i]);
+ if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
+ pElem->state = BATCH_STATE_RDY;
+ prepareDoActionParams(pAction, (msg_t*) pElem->pUsrp,
+ (uchar**) &(pElem->staticActParams), pElem->staticLenParams);
+ }
+ }
+ RETiRet;
+}
+
+
/* receive a batch and process it. This includes retry handling.
* rgerhards, 2009-05-12
*/
-static rsRetVal
-processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
+static inline rsRetVal
+processAction(action_t *pAction, batch_t *pBatch)
{
DEFiRet;
assert(pBatch != NULL);
- pBatch->iDoneUpTo = 0;
- CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pbShutdownImmediate));
+dbgprintf("ZZZ1: processAction\n");
+ CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem));
iRet = finishBatch(pAction, pBatch);
finalize_it:
@@ -1033,10 +1065,18 @@ finalize_it:
static rsRetVal
processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
{
+ int *pbShutdownImmdtSave;
DEFiRet;
assert(pBatch != NULL);
+dbgprintf("ZZZ1: processBatchMain\n");
+ pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
+ pBatch->pbShutdownImmediate = pbShutdownImmediate;
+ pAction->pbShutdownImmediate = pBatch->pbShutdownImmediate;
+ CHKiRet(prepareBatch(pAction, pBatch));
+
+dbgprintf("ZZZ1: processBatchMain\n");
/* We now must guard the output module against execution by multiple threads. The
* plugin interface specifies that output modules must not be thread-safe (except
* if they notify us they are - functionality not yet implemented...).
@@ -1045,10 +1085,19 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
d_pthread_mutex_lock(&pAction->mutActExec);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- iRet = processAction(pAction, pBatch, pbShutdownImmediate);
+ iRet = processAction(pAction, pBatch);
+ //
+ // DEBUG
+ int i;
+ for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
+dbgprintf("ZZZ: after processBatchMain item %d: filter %d, state %d\n", i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state);
+ }
+ // END DEBUG
pthread_cleanup_pop(1); /* unlock mutex */
+finalize_it:
+ pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
@@ -1323,7 +1372,11 @@ doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg)
}
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
- iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
+ else
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+ //iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
finalize_it:
RETiRet;
@@ -1341,7 +1394,48 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
DEFiRet;
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
- iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
+ else
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+ RETiRet;
+}
+
+
+//*** EXPERIMENTAL ***/
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed.
+ * rgerhards, 2010-06-08
+ */
+rsRetVal
+doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ DEFiRet;
+
+ DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ else { /* in this case, we do single submits to the queue.
+ * TODO: optimize this, we may do at least a multi-submit!
+ */
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+dbgprintf("ZZZ: submitToActQ item %d:%s\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15);
+ pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ }
+ }
+
+ // DEBUG
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+dbgprintf("ZZZ: batch state after processing item %d: %d\n", i, pBatch->pElem[i].state);
+ }
+ }
+ // END DEBUG
+ }
RETiRet;
}