summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog2
-rw-r--r--action.c140
-rw-r--r--action.h2
-rw-r--r--runtime/rule.c12
-rw-r--r--tools/syslogd.c2
5 files changed, 106 insertions, 52 deletions
diff --git a/ChangeLog b/ChangeLog
index 042aeae3..0d7bf02a 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,7 @@
---------------------------------------------------------------------------
Version 5.8.2 [V5-stable] (rgerhards), 2011-06-??
+- bugfix: problems in failover action handling
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270 (not yet confirmed!)
- bugfix: memory leak in imtcp & subsystems under some circumstances
This leak is tied to error conditions which lead to incorrect cleanup
of some data structures. [backport from v6]
diff --git a/action.c b/action.c
index c5bd03cb..b1986c38 100644
--- a/action.c
+++ b/action.c
@@ -39,7 +39,35 @@
* - processAction
* - submitBatch
* - tryDoAction
- * -
+ * - ...
+ *
+ * MORE ON PROCESSING, QUEUES and FILTERING
+ * All filtering needs to be done BEFORE messages are enqueued to an
+ * action. In previous code, part of the filtering was done at the
+ * "remote end" of the action queue, which lead to problems in
+ * non-direct mode (because then things run asynchronously). In order
+ * to solve this problem once and for all, I have changed the code so
+ * that all filtering is done before enq, and processing on the
+ * dequeue side of action processing now always executes whatever is
+ * enqueued. This is the only way to handle things consistently and
+ * (as much as possible) in a queue-type agnostic way. However, it is
+ * a rather radical change, which I unfortunately needed to make from
+ * stable version 5.8.1 to 5.8.2. If new problems pop up, you now know
+ * what may be their cause. In any case, the way it is done now is the
+ * only correct one.
+ * A problem is that, under fortunate conditions, we use the current
+ * batch for the output system as well. This is very good from a performance
+ * point of view, but makes the distinction between enq and deq side of
+ * the queue a bit hard. The current idea is that the filter condition
+ * alone is checked at the deq side of the queue (seems to be unavoidable
+ * to do it that way), but all other complex conditons (like failover
+ * handling) go into the computation of the filter condition. For
+ * non-direct queues, we still enqueue only what is acutally necessary.
+ * Note that in this case the rest of the code must ensure that the filter
+ * is set to "true". While this is not perfect and not as simple as
+ * we would like to see it, it looks like the best way to tackle that
+ * beast.
+ * rgerhards, 2011-06-15
*
* Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
@@ -611,8 +639,8 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
}
if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
- DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
- getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n",
+ pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
}
finalize_it:
@@ -932,16 +960,19 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
+dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ /* NOTE: do NOT extend the filter below! Anything else must be done on the
+ * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
+ */
if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC//) {
- && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
+ && pBatch->pElem[i].state != BATCH_STATE_DISC) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
- pBatch->pbShutdownImmediate);
- DBGPRINTF("action call returned %d\n", localRet);
+ pBatch->pbShutdownImmediate);
+ DBGPRINTF("action %p call returned %d\n", pAction, localRet);
/* Note: we directly modify the batch object state, because we know that
* wo do not overwrite BATCH_STATE_DISC indicators!
*/
@@ -1035,6 +1066,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 1;
} else {
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
+ DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
+ "for iRet %d\n", localRet);
submitBatch(pAction, pBatch, nElem / 2);
submitBatch(pAction, pBatch, nElem - (nElem / 2));
bDone = 1;
@@ -1224,11 +1257,13 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
/* This function builds up a batch of messages to be (later)
* submitted to the action queue.
- * Note: this function is also called from syslogd itself as part of its
- * flush processing. If so, pBatch will be NULL and idxBtch undefined.
+ * Important: this function MUST not be called with messages that are to
+ * be discarded due to their "prevWasSuspended" state. It will not check for
+ * this and submit all messages to the queue for execution. So these must
+ * be filtered out before calling us (what is done currently!).
*/
rsRetVal
-actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
+actionWriteToAction(action_t *pAction)
{
msg_t *pMsgSave; /* to save current message pointer, necessary to restore
it in case it needs to be updated (e.g. repeated msgs) */
@@ -1325,35 +1360,7 @@ actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- if( pBatch != NULL
- && (pAction->bExecWhenPrevSusp == 1 && pBatch->pElem[idxBtch].bPrevWasSuspended)) {
- /* in that case, we need to create a special batch which reflects the
- * suspended state. Otherwise, that information would be dropped inside
- * the queue engine. TODO: in later releases (v6?) create a better
- * solution than what we do here. However, for v5 this sounds much too
- * intrusive. -- rgerhardsm, 2011-03-16
- * (Code is copied over from queue.c and slightly modified)
- */
- batch_t singleBatch;
- batch_obj_t batchObj;
- int i;
- memset(&batchObj, 0, sizeof(batch_obj_t));
- memset(&singleBatch, 0, sizeof(batch_t));
- batchObj.state = BATCH_STATE_RDY;
- batchObj.pUsrp = (obj_t*) pAction->f_pMsg;
- batchObj.bPrevWasSuspended = 1;
- batchObj.bFilterOK = 1;
- singleBatch.nElem = 1; /* there always is only one in direct mode */
- singleBatch.pElem = &batchObj;
-
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, &singleBatch);
-
- for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
- free(batchObj.staticActStrings[i]);
- }
- } else { /* standard case, just submit */
- iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
- }
+ iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@@ -1413,7 +1420,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
* isolated messages), but back off so we'll flush less often in the future.
*/
if(getActNow(pAction) > REPEATTIME(pAction)) {
- iRet = actionWriteToAction(pAction, pBatch, idxBtch);
+ iRet = actionWriteToAction(pAction);
BACKOFF(pAction);
}
} else {/* new message, save it */
@@ -1422,7 +1429,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
*/
if(pAction->f_pMsg != NULL) {
if(pAction->f_prevcount > 0)
- actionWriteToAction(pAction, pBatch, idxBtch);
+ actionWriteToAction(pAction);
/* we do not care about iRet above - I think it's right but if we have
* some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
*/
@@ -1430,7 +1437,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
}
pAction->f_pMsg = MsgAddRef(pMsg);
/* call the output driver */
- iRet = actionWriteToAction(pAction, pBatch, idxBtch);
+ iRet = actionWriteToAction(pAction);
}
finalize_it:
@@ -1528,16 +1535,51 @@ static rsRetVal
doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
{
int i;
+ sbool bNeedSubmit;
DEFiRet;
+ /* TODO
+ ich arbeite an dieser funktion, es müssen die verscheidenen modi geprüft werden. ausserdem
+ muss geschaut werden, in welche anderen funktionen die neue Funktionalität noch eingebaut
+ werden muss, bzw. ob man das an zentralerer stelle machen kann. Am besten die gesamte
+ filter evaluation nochmal druchgehen (also das füllen des arrays).
+ */
+
DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
- if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
- else { /* in this case, we do single submits to the queue.
+ /* if necessary, take care of failover cases. We do this by simply
+ * changing the filter setting, which is perfectly legal.
+ */
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
+ /* note: for direct mode, we need to adjust the filter property. For non-direct
+ * this is not necessary, because in that case we enqueue only what actually needs
+ * to be processed.
+ */
+ if(pAction->bExecWhenPrevSusp) {
+ bNeedSubmit = 0;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(!pBatch->pElem[i].bPrevWasSuspended) {
+ DBGPRINTF("action enq stage: change bFilterOK to 0 due to "
+ "failover case in elem %d\n", i);
+ pBatch->pElem[i].bFilterOK = 0;
+ }
+ if(pBatch->pElem[i].bFilterOK)
+ bNeedSubmit = 1;
+ }
+ if(bNeedSubmit) {
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ } else {
+ DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
+ }
+ } else {
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ }
+ } else { /* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if(pBatch->pElem[i].bFilterOK) {
+ if( pBatch->pElem[i].bFilterOK
+ && pBatch->pElem[i].state != BATCH_STATE_DISC
+ && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
}
}
@@ -1558,8 +1600,12 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
int i;
DEFiRet;
- DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
+ DBGPRINTF("Called action %p (complex case), logging to %s\n",
+ pAction, module.GetStateName(pAction->pMod));
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( pBatch->pElem[i].bFilterOK
&& pBatch->pElem[i].state != BATCH_STATE_DISC
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
diff --git a/action.h b/action.h
index 0ab8062a..bae64d31 100644
--- a/action.h
+++ b/action.h
@@ -100,7 +100,7 @@ rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
-rsRetVal actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch);
+rsRetVal actionWriteToAction(action_t *pAction);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended);
diff --git a/runtime/rule.c b/runtime/rule.c
index d5f18e71..19239d61 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -266,6 +266,7 @@ static rsRetVal
processBatch(rule_t *pThis, batch_t *pBatch)
{
int i;
+ rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, rule);
@@ -273,9 +274,14 @@ processBatch(rule_t *pThis, batch_t *pBatch)
/* first check the filters and reset status variables */
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
- &(pBatch->pElem[i].bFilterOK)));
- // TODO: really abort on error? 2010-06-10
+ localRet = shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
+ &(pBatch->pElem[i].bFilterOK));
+ if(localRet != RS_RET_OK) {
+ DBGPRINTF("processBatch: iRet %d returned from shouldProcessThisMessage, "
+ "ignoring message\n", localRet);
+
+ pBatch->pElem[i].bFilterOK = 0;
+ }
if(pBatch->pElem[i].bFilterOK) {
/* re-init only when actually needed (cache write cost!) */
pBatch->pElem[i].bPrevWasSuspended = 0;
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 487ab364..096f9309 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -799,7 +799,7 @@ DEFFUNC_llExecFunc(flushRptdMsgsActions)
DBGPRINTF("flush %s: repeated %d times, %d sec.\n",
module.GetStateName(pAction->pMod), pAction->f_prevcount,
repeatinterval[pAction->f_repeatcount]);
- actionWriteToAction(pAction, NULL, 0);
+ actionWriteToAction(pAction);
BACKOFF(pAction);
}
UnlockObj(pAction);