diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-27 13:53:45 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-27 13:53:45 +0100 |
commit | 6cf7fc7ec2f1b2f69fb69d8b18df8240beed3380 (patch) | |
tree | ac0d1a59327ab5ae913faa3b3eb69bacb82639b1 /action.c | |
parent | 5b3787cdc21089134b0d897075a6ce6e53780c70 (diff) | |
download | rsyslog-6cf7fc7ec2f1b2f69fb69d8b18df8240beed3380.tar.gz rsyslog-6cf7fc7ec2f1b2f69fb69d8b18df8240beed3380.tar.xz rsyslog-6cf7fc7ec2f1b2f69fb69d8b18df8240beed3380.zip |
action processing optimized for queue shutdown
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 38 |
1 files changed, 15 insertions, 23 deletions
@@ -798,7 +798,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImmediate) { int i; int iElemProcessed; @@ -814,8 +814,9 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) iElemProcessed = 0; iCommittedUpTo = i; while(iElemProcessed <= *pnElem && i < pBatch->nElem) { + if(*pbShutdownImmediate) + ABORT_FINALIZE(RS_RET_FORCE_TERM); pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - DBGPRINTF("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p, state %d\n", i, pBatch->nElem, *pnElem, pMsg, pBatch->pElem[i].state);//remove later! if(pBatch->pElem[i].state != BATCH_STATE_DISC) { localRet = actionProcessMessage(pAction, pMsg); DBGPRINTF("action call returned %d\n", localRet); @@ -838,7 +839,6 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) iRet = localRet; FINALIZE; } -dbgprintf("XXX: submitBatch set element %d state to %d\n", i, pBatch->pElem[i].state); } ++i; ++iElemProcessed; @@ -861,7 +861,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmediate) { int i; int bDone; @@ -873,7 +873,9 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 0; do { dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem); - localRet = tryDoAction(pAction, pBatch, &nElem); + localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate); + if(localRet == RS_RET_FORCE_TERM) + FINALIZE; if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { @@ -904,13 +906,17 @@ dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem); bDone = 1; } else { /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); + submitBatch(pAction, pBatch, nElem / 2, pbShutdownImmediate); + submitBatch(pAction, pBatch, nElem - (nElem / 2), pbShutdownImmediate); bDone = 1; } } - } while(!bDone); /* do .. while()! */ + } while(!bDone && !*pbShutdownImmediate); /* do .. while()! */ + + if(*pbShutdownImmediate) + ABORT_FINALIZE(RS_RET_FORCE_TERM); +finalize_it: RETiRet; } @@ -921,25 +927,11 @@ dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem); static rsRetVal processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { - int i; - msg_t *pMsg; - rsRetVal localRet; DEFiRet; assert(pBatch != NULL); - pBatch->iDoneUpTo = 0; - /* TODO: think about action batches, must be handled at upper layer! - * MULTIQUEUE - */ - localRet = submitBatch(pAction, pBatch, pBatch->nElem); - CHKiRet(localRet); - - /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { - //for(i = 0 ; i < pBatch->nElem ; i++) { - pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - } + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pbShutdownImmediate)); iRet = finishBatch(pAction); finalize_it: |