summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c149
1 files changed, 119 insertions, 30 deletions
diff --git a/action.c b/action.c
index d41449d0..4312c377 100644
--- a/action.c
+++ b/action.c
@@ -4,7 +4,44 @@
*
* File begun on 2007-08-06 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH.
+ * Some notes on processing (this hopefully makes it easier to find
+ * the right code in question): For performance reasons, this module
+ * uses different methods of message submission based on the user-selected
+ * configuration. This code is similar, but can not be abstracted because
+ * of the performanse-affecting differences in it. As such, it is often
+ * necessary to triple-check that everything works well in *all* modes.
+ * The different modes (and calling sequence) are:
+ *
+ * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval
+ * - doSubmitToActionQComplexBatch
+ * - helperSubmitToActionQComplexBatch
+ * - doActionCallAction
+ * handles duplicate message processing, but in essence calls
+ * - actionWriteToAction
+ * - qqueueEnqObj
+ * (now queue engine processing)
+ * if(pThis->bWriteAllMarkMsgs == FALSE) - this is the DEFAULT
+ * - doSubmitToActionQNotAllMarkBatch
+ * - doSubmitToActionQBatch (and from here like in the else case below!)
+ * else
+ * - doSubmitToActionQBatch
+ * - doSubmitToActionQ
+ * - qqueueEnqObj
+ * (now queue engine processing)
+ *
+ * Note that bWriteAllMakrMsgs on or off creates almost the same processing.
+ * The difference ist that if WriteAllMarkMsgs is not set, we need to
+ * preprocess the batch and drop mark messages which are not yet due for
+ * writing.
+ *
+ * After dequeue, processing is as follows:
+ * - processBatchMain
+ * - processAction
+ * - submitBatch
+ * - tryDoAction
+ * -
+ *
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -710,6 +747,7 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
ASSERT(pAction != NULL);
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+dbgprintf("XXXX: releaseBatch %d: bPrevWasSuspended %d\n", i, pBatch->pElem[i].bPrevWasSuspended);
pElem = &(pBatch->pElem[i]);
if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
switch(pAction->eParamPassing) {
@@ -897,8 +935,9 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
+dbgprintf("XXXX:tryDoAction %d: bExecWhenPrevSusp: %d, bPrevWasSuspended %d ptr %p\n", i, pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended, &(pBatch->pElem[i].bPrevWasSuspended));
if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC
+ && pBatch->pElem[i].state != BATCH_STATE_DISC//) {
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams);
@@ -962,6 +1001,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 0;
do {
localRet = tryDoAction(pAction, pBatch, &nElem);
+dbgprintf("XXXX: tryDoAction returns %d\n", localRet);
if(localRet == RS_RET_FORCE_TERM) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
@@ -981,12 +1021,14 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
} else if(localRet == RS_RET_SUSPENDED) {
; /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
+dbgprintf("XXXX: in ACTION_FAILED branch, doneUpTo %d\n", pBatch->iDoneUpTo);
/* in this case, everything not yet committed is BAD */
for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) {
if( pBatch->pElem[i].state != BATCH_STATE_DISC
&& pBatch->pElem[i].state != BATCH_STATE_COMM ) {
pBatch->pElem[i].state = BATCH_STATE_BAD;
pBatch->pElem[i].bPrevWasSuspended = 1;
+dbgprintf("XXXX: setting susps for item %d ptr %p\n", i, &(pBatch->pElem[i].bPrevWasSuspended));
}
}
bDone = 1;
@@ -1029,6 +1071,7 @@ prepareBatch(action_t *pAction, batch_t *pBatch)
pBatch->iDoneUpTo = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+dbgprintf("XXXX: prepareBatch %d: bPrevWasSuspended %d\n", i, pBatch->pElem[i].bPrevWasSuspended);
pElem = &(pBatch->pElem[i]);
if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
pElem->state = BATCH_STATE_RDY;
@@ -1069,6 +1112,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
DEFiRet;
assert(pBatch != NULL);
+dbgprintf("XXXX: running processBatchMain\n");
pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
pBatch->pbShutdownImmediate = pbShutdownImmediate;
@@ -1098,6 +1142,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
iRet = localRet;
finalize_it:
+dbgprintf("XXXX: exiting processBatchMain\n");
pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
RETiRet;
}
@@ -1163,12 +1208,33 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
}
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed. This is also utilized to submit messages in complex case once
+ * the complex logic has been applied ;)
+ * rgerhards, 2010-06-08
+ */
+static inline rsRetVal
+doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
+ else
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+ RETiRet;
+}
+
+
/* This function builds up a batch of messages to be (later)
* submitted to the action queue.
*/
rsRetVal
-actionWriteToAction(action_t *pAction)
+actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
{
+dbgprintf("XXXX: enter actionWriteToAction idx %d\n", idxBtch);
msg_t *pMsgSave; /* to save current message pointer, necessary to restore
it in case it needs to be updated (e.g. repeated msgs) */
DEFiRet;
@@ -1238,7 +1304,7 @@ actionWriteToAction(action_t *pAction)
pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
}
- DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+ DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
/* now check if we need to drop the message because otherwise the action would be too
* frequently called. -- rgerhards, 2008-04-08
@@ -1256,15 +1322,45 @@ actionWriteToAction(action_t *pAction)
FINALIZE;
}
- /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
+ /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;)
+ * rgerhards, 2008-09-17 */
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
pAction->f_time = pAction->f_pMsg->ttGenTime;
+dbgprintf("XXXX:actionWriteToAction %d: bExecWhenPrevSusp: %d, bPrevWasSuspended %d ptr %p\n", idxBtch, pAction->bExecWhenPrevSusp, pBatch->pElem[idxBtch].bPrevWasSuspended, &(pBatch->pElem[idxBtch].bPrevWasSuspended));
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
+ if(pAction->bExecWhenPrevSusp == 1 && pBatch->pElem[idxBtch].bPrevWasSuspended) {
+ /* in that case, we need to create a special batch which reflects the
+ * suspended state. Otherwise, that information would be dropped inside
+ * the queue engine. TODO: in later releases (v6?) create a better
+ * solution than what we do here. However, for v5 this sounds much too
+ * intrusive. -- rgerhardsm, 2011-03-16
+ * (Code is copied over from queue.c and slightly modified)
+ */
+ batch_t singleBatch;
+ batch_obj_t batchObj;
+ int i;
+ memset(&batchObj, 0, sizeof(batch_obj_t));
+ memset(&singleBatch, 0, sizeof(batch_t));
+ batchObj.state = BATCH_STATE_RDY;
+ batchObj.pUsrp = (obj_t*) pAction->f_pMsg;
+ batchObj.bPrevWasSuspended = 1;
+ batchObj.bFilterOK = 1;
+ singleBatch.nElem = 1; /* there always is only one in direct mode */
+ singleBatch.pElem = &batchObj;
+
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, &singleBatch);
+
+ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
+ free(batchObj.staticActStrings[i]);
+ }
+ } else { /* standard case, just submit */
+ iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
+ }
+dbgprintf("XXXX: action iRet %d\n", iRet);
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@@ -1291,10 +1387,13 @@ finalize_it:
* pthread_cleanup_push() POSIX macro...
*/
static inline rsRetVal
-doActionCallAction(action_t *pAction, msg_t *pMsg)
+doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
{
+ msg_t *pMsg;
DEFiRet;
+dbgprintf("XXXX: enter doActionCallAction\n");
+ pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp);
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
/* don't output marks to recently written outputs */
@@ -1321,7 +1420,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
* isolated messages), but back off so we'll flush less often in the future.
*/
if(getActNow(pAction) > REPEATTIME(pAction)) {
- iRet = actionWriteToAction(pAction);
+ iRet = actionWriteToAction(pAction, pBatch, idxBtch);
BACKOFF(pAction);
}
} else {/* new message, save it */
@@ -1330,7 +1429,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
*/
if(pAction->f_pMsg != NULL) {
if(pAction->f_prevcount > 0)
- actionWriteToAction(pAction);
+ actionWriteToAction(pAction, pBatch, idxBtch);
/* we do not care about iRet above - I think it's right but if we have
* some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
*/
@@ -1338,33 +1437,21 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
}
pAction->f_pMsg = MsgAddRef(pMsg);
/* call the output driver */
- iRet = actionWriteToAction(pAction);
+ iRet = actionWriteToAction(pAction, pBatch, idxBtch);
}
finalize_it:
- RETiRet;
-}
-
-/* This submits the message to the action queue in case we do NOT need to handle repeat
- * message processing. That case permits us to gain lots of freedom during processing
- * and thus speed.
- * rgerhards, 2010-06-08
- */
-static inline rsRetVal
-doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
-{
- DEFiRet;
-
- if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
- else
- iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+ /* we need to update the batch to handle failover processing correctly */
+ if(iRet == RS_RET_OK) {
+ pBatch->pElem[idxBtch].bPrevWasSuspended = 0;
+ } else if(iRet == RS_RET_ACTION_FAILED) {
+ pBatch->pElem[idxBtch].bPrevWasSuspended = 1;
+ }
RETiRet;
}
-
/* This submits the message to the action queue in case where we need to handle
* bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop
* for the synchronization. Here, we just modify the filter condition to be false when
@@ -1480,9 +1567,11 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+dbgprintf("XXXX: helper complex %d: bExecWhenPrevSusp: %d, bPrevWasSuspended %d ptr %p\n", i, pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended, &(pBatch->pElem[i].bPrevWasSuspended));
if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC) {
- doActionCallAction(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ && pBatch->pElem[i].state != BATCH_STATE_DISC
+ && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
+ doActionCallAction(pAction, pBatch, i);
}
}