summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c142
1 files changed, 111 insertions, 31 deletions
diff --git a/action.c b/action.c
index 44d2275d..4e60ba58 100644
--- a/action.c
+++ b/action.c
@@ -4,7 +4,44 @@
*
* File begun on 2007-08-06 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH.
+ * Some notes on processing (this hopefully makes it easier to find
+ * the right code in question): For performance reasons, this module
+ * uses different methods of message submission based on the user-selected
+ * configuration. This code is similar, but can not be abstracted because
+ * of the performanse-affecting differences in it. As such, it is often
+ * necessary to triple-check that everything works well in *all* modes.
+ * The different modes (and calling sequence) are:
+ *
+ * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval
+ * - doSubmitToActionQComplexBatch
+ * - helperSubmitToActionQComplexBatch
+ * - doActionCallAction
+ * handles duplicate message processing, but in essence calls
+ * - actionWriteToAction
+ * - qqueueEnqObj
+ * (now queue engine processing)
+ * if(pThis->bWriteAllMarkMsgs == FALSE) - this is the DEFAULT
+ * - doSubmitToActionQNotAllMarkBatch
+ * - doSubmitToActionQBatch (and from here like in the else case below!)
+ * else
+ * - doSubmitToActionQBatch
+ * - doSubmitToActionQ
+ * - qqueueEnqObj
+ * (now queue engine processing)
+ *
+ * Note that bWriteAllMakrMsgs on or off creates almost the same processing.
+ * The difference ist that if WriteAllMarkMsgs is not set, we need to
+ * preprocess the batch and drop mark messages which are not yet due for
+ * writing.
+ *
+ * After dequeue, processing is as follows:
+ * - processBatchMain
+ * - processAction
+ * - submitBatch
+ * - tryDoAction
+ * -
+ *
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -905,7 +942,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC
+ && pBatch->pElem[i].state != BATCH_STATE_DISC//) {
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
@@ -1040,7 +1077,8 @@ prepareBatch(action_t *pAction, batch_t *pBatch)
pElem = &(pBatch->pElem[i]);
if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
pElem->state = BATCH_STATE_RDY;
- prepareDoActionParams(pAction, pElem);
+ if(prepareDoActionParams(pAction, pElem) != RS_RET_OK)
+ pElem->bFilterOK = FALSE;
}
}
RETiRet;
@@ -1170,11 +1208,33 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
}
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed. This is also utilized to submit messages in complex case once
+ * the complex logic has been applied ;)
+ * rgerhards, 2010-06-08
+ */
+static inline rsRetVal
+doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
+ else
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+ RETiRet;
+}
+
+
/* This function builds up a batch of messages to be (later)
* submitted to the action queue.
+ * Note: this function is also called from syslogd itself as part of its
+ * flush processing. If so, pBatch will be NULL and idxBtch undefined.
*/
rsRetVal
-actionWriteToAction(action_t *pAction)
+actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
{
msg_t *pMsgSave; /* to save current message pointer, necessary to restore
it in case it needs to be updated (e.g. repeated msgs) */
@@ -1245,7 +1305,7 @@ actionWriteToAction(action_t *pAction)
pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
}
- DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+ DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
/* now check if we need to drop the message because otherwise the action would be too
* frequently called. -- rgerhards, 2008-04-08
@@ -1263,14 +1323,43 @@ actionWriteToAction(action_t *pAction)
FINALIZE;
}
- /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
+ /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;)
+ * rgerhards, 2008-09-17 */
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
pAction->f_time = pAction->f_pMsg->ttGenTime;
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
+ if( pBatch != NULL
+ && (pAction->bExecWhenPrevSusp == 1 && pBatch->pElem[idxBtch].bPrevWasSuspended)) {
+ /* in that case, we need to create a special batch which reflects the
+ * suspended state. Otherwise, that information would be dropped inside
+ * the queue engine. TODO: in later releases (v6?) create a better
+ * solution than what we do here. However, for v5 this sounds much too
+ * intrusive. -- rgerhardsm, 2011-03-16
+ * (Code is copied over from queue.c and slightly modified)
+ */
+ batch_t singleBatch;
+ batch_obj_t batchObj;
+ int i;
+ memset(&batchObj, 0, sizeof(batch_obj_t));
+ memset(&singleBatch, 0, sizeof(batch_t));
+ batchObj.state = BATCH_STATE_RDY;
+ batchObj.pUsrp = (obj_t*) pAction->f_pMsg;
+ batchObj.bPrevWasSuspended = 1;
+ batchObj.bFilterOK = 1;
+ singleBatch.nElem = 1; /* there always is only one in direct mode */
+ singleBatch.pElem = &batchObj;
+
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, &singleBatch);
+
+ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
+ free(batchObj.staticActStrings[i]);
+ }
+ } else { /* standard case, just submit */
+ iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
+ }
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@@ -1298,10 +1387,12 @@ finalize_it:
* pthread_cleanup_push() POSIX macro...
*/
static inline rsRetVal
-doActionCallAction(action_t *pAction, msg_t *pMsg)
+doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
{
+ msg_t *pMsg;
DEFiRet;
+ pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp);
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
/* don't output marks to recently written outputs */
@@ -1328,7 +1419,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
* isolated messages), but back off so we'll flush less often in the future.
*/
if(getActNow(pAction) > REPEATTIME(pAction)) {
- iRet = actionWriteToAction(pAction);
+ iRet = actionWriteToAction(pAction, pBatch, idxBtch);
BACKOFF(pAction);
}
} else {/* new message, save it */
@@ -1337,7 +1428,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
*/
if(pAction->f_pMsg != NULL) {
if(pAction->f_prevcount > 0)
- actionWriteToAction(pAction);
+ actionWriteToAction(pAction, pBatch, idxBtch);
/* we do not care about iRet above - I think it's right but if we have
* some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
*/
@@ -1345,33 +1436,21 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
}
pAction->f_pMsg = MsgAddRef(pMsg);
/* call the output driver */
- iRet = actionWriteToAction(pAction);
+ iRet = actionWriteToAction(pAction, pBatch, idxBtch);
}
finalize_it:
- RETiRet;
-}
-
-/* This submits the message to the action queue in case we do NOT need to handle repeat
- * message processing. That case permits us to gain lots of freedom during processing
- * and thus speed.
- * rgerhards, 2010-06-08
- */
-static inline rsRetVal
-doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
-{
- DEFiRet;
-
- if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
- else
- iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+ /* we need to update the batch to handle failover processing correctly */
+ if(iRet == RS_RET_OK) {
+ pBatch->pElem[idxBtch].bPrevWasSuspended = 0;
+ } else if(iRet == RS_RET_ACTION_FAILED) {
+ pBatch->pElem[idxBtch].bPrevWasSuspended = 1;
+ }
RETiRet;
}
-
/* This submits the message to the action queue in case where we need to handle
* bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop
* for the synchronization. Here, we just modify the filter condition to be false when
@@ -1488,8 +1567,9 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC) {
- doActionCallAction(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ && pBatch->pElem[i].state != BATCH_STATE_DISC
+ && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
+ doActionCallAction(pAction, pBatch, i);
}
}