summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c65
1 files changed, 44 insertions, 21 deletions
diff --git a/action.c b/action.c
index 8a29df2e..1e02d23e 100644
--- a/action.c
+++ b/action.c
@@ -65,8 +65,9 @@ static int glbliActionResumeInterval = 30;
int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */
static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */
+static int bActionWriteAllMarkMsgs = FALSE; /* should all mark messages be unconditionally written? */
static uchar *pszActionName; /* short name for the action */
-/* main message queue and its configuration parameters */
+/* action queue and its configuration parameters */
static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
static int iActionQueueSize = 1000; /* size of the main message queue above */
static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */
@@ -808,41 +809,50 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
assert(pBatch != NULL);
assert(pnElem != NULL);
+dbgprintf("XXXX: ENTER tryDoAction elt 0 state %d\n", pBatch->pElem[0].state);
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
- dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later!
- localRet = actionProcessMessage(pAction, pMsg);
- dbgprintf("action call returned %d\n", localRet);
- if(localRet == RS_RET_OK) {
- /* mark messages as committed */
- while(iCommittedUpTo < i) {
- pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
- }
- } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
- /* mark messages as committed */
- while(iCommittedUpTo < i - 1) {
- pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p, state %d\n", i, pBatch->nElem, *pnElem, pMsg, pBatch->pElem[i].state);//remove later!
+ if(pBatch->pElem[i].state != BATCH_STATE_DISC) {
+ localRet = actionProcessMessage(pAction, pMsg);
+ dbgprintf("action call returned %d\n", localRet);
+ if(localRet == RS_RET_OK) {
+ /* mark messages as committed */
+ while(iCommittedUpTo < i) {
+ pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ }
+ } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
+ /* mark messages as committed */
+ while(iCommittedUpTo < i - 1) {
+ pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ }
+ pBatch->pElem[i].state = BATCH_STATE_SUB;
+ } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
+ pBatch->pElem[i].state = BATCH_STATE_SUB;
+ } else if(localRet == RS_RET_DISCARDMSG) {
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+dbgprintf("XXXX: discardmsg! change state to _DISC: %d\n", pBatch->pElem[i].state);
+ } else {
+ iRet = localRet;
+ FINALIZE;
}
- pBatch->pElem[i].state = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
- pBatch->pElem[i].state = BATCH_STATE_SUB;
- } else {
- iRet = localRet;
- FINALIZE;
}
++i;
++iElemProcessed;
}
finalize_it:
- if(pBatch->iDoneUpTo != iCommittedUpTo) {
+ if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) {
+ iRet = RS_RET_DISCARDMSG;
+ } else if(pBatch->iDoneUpTo != iCommittedUpTo) {
*pnElem += iCommittedUpTo - pBatch->iDoneUpTo;
pBatch->iDoneUpTo = iCommittedUpTo;
}
+dbgprintf("XXXX: done tryDoAction elt 0 state %d, iret %d\n", pBatch->pElem[0].state, iRet);
RETiRet;
}
@@ -865,6 +875,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 0;
do {
localRet = tryDoAction(pAction, pBatch, &nElem);
+dbgprintf("XXXX: submitBatch got state %d\n", localRet);
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
@@ -874,10 +885,15 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
localRet = finishBatch(pAction);
}
+dbgprintf("XXXX: submitBatch got state %d\n", localRet);
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
bDone = 1;
+ } else if(localRet == RS_RET_DISCARDMSG) {
+ iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */
+ bDone = 1;
+dbgprintf("XXXX: submitBatch DONE state %d\n", localRet);
} else if(localRet == RS_RET_SUSPENDED) {
; /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
@@ -897,8 +913,10 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 1;
}
}
+dbgprintf("XXXX: submitBatch pre while state %d\n", localRet);
} while(!bDone); /* do .. while()! */
+dbgprintf("XXXX: END submitBatch elt 0 state %d, iRet %d\n", pBatch->pElem[0].state, iRet);
RETiRet;
}
@@ -1134,6 +1152,7 @@ actionWriteToAction(action_t *pAction)
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
+dbgprintf("XXXX: queueEnqObj returned %d\n", iRet);
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@@ -1168,7 +1187,8 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
/* don't output marks to recently written outputs */
- if((pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
+ if(pAction->bWriteAllMarkMsgs == FALSE
+ && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
ABORT_FINALIZE(RS_RET_OK);
}
@@ -1260,6 +1280,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &bActionWriteAllMarkMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
@@ -1314,6 +1335,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
pAction->pModData = pModData;
pAction->pszName = pszActionName;
pszActionName = NULL; /* free again! */
+ pAction->bWriteAllMarkMsgs = bActionWriteAllMarkMsgs;
+ bActionWriteAllMarkMsgs = FALSE; /* reset */
pAction->bExecWhenPrevSusp = bActExecWhenPrevSusp;
pAction->iSecsExecOnceInterval = iActExecOnceInterval;
pAction->iExecEveryNthOccur = iActExecEveryNthOccur;