summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-06-16 17:52:16 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2011-06-16 17:52:16 +0200
commit257b06aac8222cdea231a95cbbe659679a2d417e (patch)
tree60db3a6c8c3bcf1e9cc031cdc84d8ee7e709048d /action.c
parent90f8c7300495da9b61e4706be652c612fccc084f (diff)
downloadrsyslog-257b06aac8222cdea231a95cbbe659679a2d417e.tar.gz
rsyslog-257b06aac8222cdea231a95cbbe659679a2d417e.tar.xz
rsyslog-257b06aac8222cdea231a95cbbe659679a2d417e.zip
failover problem was not totally solved, now (hopefully ;))
I overlooked a border case, which came up on a larger testbench run.
Diffstat (limited to 'action.c')
-rw-r--r--action.c86
1 files changed, 59 insertions, 27 deletions
diff --git a/action.c b/action.c
index b1986c38..7909d8e3 100644
--- a/action.c
+++ b/action.c
@@ -1526,6 +1526,63 @@ finalize_it:
}
+/* enqueue a batch in direct mode. We have put this into its own function just to avoid
+ * cluttering the actual submit function.
+ * rgerhards, 2011-06-16
+ */
+static inline rsRetVal
+doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
+{
+ sbool FilterSave[1024];
+ sbool *pFilterSave;
+ sbool bNeedSubmit;
+ sbool bModifiedFilter;
+ int i;
+ DEFiRet;
+
+ if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) {
+ pFilterSave = FilterSave;
+ } else {
+ CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
+ }
+
+ /* note: for direct mode, we need to adjust the filter property. For non-direct
+ * this is not necessary, because in that case we enqueue only what actually needs
+ * to be processed.
+ */
+ if(pAction->bExecWhenPrevSusp) {
+ bNeedSubmit = 0;
+ bModifiedFilter = 0;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ pFilterSave[i] = pBatch->pElem[i].bFilterOK;
+ if(!pBatch->pElem[i].bPrevWasSuspended) {
+ DBGPRINTF("action enq stage: change bFilterOK to 0 due to "
+ "failover case in elem %d\n", i);
+ pBatch->pElem[i].bFilterOK = 0;
+ bModifiedFilter = 1;
+ }
+ if(pBatch->pElem[i].bFilterOK)
+ bNeedSubmit = 1;
+ }
+ if(bNeedSubmit) {
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ } else {
+ DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
+ }
+ if(bModifiedFilter) {
+ for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
+ /* note: clang static code analyzer reports a false positive below */
+ pBatch->pElem[i].bFilterOK = pFilterSave[i];
+ }
+ }
+ } else {
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
* and thus speed.
@@ -1535,7 +1592,6 @@ static rsRetVal
doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
{
int i;
- sbool bNeedSubmit;
DEFiRet;
/* TODO
@@ -1546,33 +1602,9 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
*/
DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
- /* if necessary, take care of failover cases. We do this by simply
- * changing the filter setting, which is perfectly legal.
- */
+
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
- /* note: for direct mode, we need to adjust the filter property. For non-direct
- * this is not necessary, because in that case we enqueue only what actually needs
- * to be processed.
- */
- if(pAction->bExecWhenPrevSusp) {
- bNeedSubmit = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if(!pBatch->pElem[i].bPrevWasSuspended) {
- DBGPRINTF("action enq stage: change bFilterOK to 0 due to "
- "failover case in elem %d\n", i);
- pBatch->pElem[i].bFilterOK = 0;
- }
- if(pBatch->pElem[i].bFilterOK)
- bNeedSubmit = 1;
- }
- if(bNeedSubmit) {
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
- } else {
- DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
- }
- } else {
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
- }
+ iRet = doQueueEnqObjDirectBatch(pAction, pBatch);
} else { /* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/