diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-23 12:48:27 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-23 12:48:27 +0200 |
commit | f48128f34a17aae7e7b9405fe32b396db45443ca (patch) | |
tree | a122a65d4a2495570b5d8662fd75a1104cb0c521 /runtime | |
parent | d6a3b08f56cc608bc832251cd49e86e71d9574dc (diff) | |
download | rsyslog-f48128f34a17aae7e7b9405fe32b396db45443ca.tar.gz rsyslog-f48128f34a17aae7e7b9405fe32b396db45443ca.tar.xz rsyslog-f48128f34a17aae7e7b9405fe32b396db45443ca.zip |
fixed a couple of regressions
by implementing some code that was missing so far ;) as well as
finding some real bugs. I also did some general cleanup, removing
debug strings and such. This code should be fairly OK to use, except
when "exec only when previous action was suspended" is used -- this is
NOT yet re-implemented in the tuned engine.
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/nspoll.c | 3 | ||||
-rw-r--r-- | runtime/queue.c | 7 | ||||
-rw-r--r-- | runtime/rule.c | 17 | ||||
-rw-r--r-- | runtime/ruleset.c | 11 | ||||
-rw-r--r-- | runtime/stream.c | 5 |
5 files changed, 9 insertions, 34 deletions
diff --git a/runtime/nspoll.c b/runtime/nspoll.c index f287cd4e..64927280 100644 --- a/runtime/nspoll.c +++ b/runtime/nspoll.c @@ -79,7 +79,6 @@ loadDrvr(nspoll_t *pThis) * about this hack, but for the time being it is efficient and clean * enough. -- rgerhards, 2008-04-18 */ -RUNLOG_VAR("%s", szDrvrName+2); CHKiRet(obj.UseObj(__FILE__, szDrvrName+2, DONT_LOAD_LIB, (void*) &pThis->Drvr)); finalize_it: @@ -120,11 +119,9 @@ ConstructFinalize(nspoll_t *pThis) { DEFiRet; ISOBJ_TYPE_assert(pThis, nspoll); -RUNLOG_STR("trying to load epoll driver\n"); CHKiRet(loadDrvr(pThis)); CHKiRet(pThis->Drvr.Construct(&pThis->pDrvrData)); finalize_it: -dbgprintf("XXX: done trying to load epoll driver, state %d\n", iRet); RETiRet; } diff --git a/runtime/queue.c b/runtime/queue.c index 5e9c67ca..60d17086 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -852,8 +852,11 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ + memset(&batchObj, 0, sizeof(batch_obj_t)); + memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; batchObj.pUsrp = (obj_t*) pUsr; + batchObj.bFilterOK = 1; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); @@ -862,7 +865,9 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) RETiRet; } -/*** EXPERIMENTAL ***/ +/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead + * otherwise incured. -- rgerhards, ~2010-06-23 + */ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) { DEFiRet; diff --git a/runtime/rule.c b/runtime/rule.c index 453e631d..c3c974bf 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -101,6 +101,7 @@ DEFFUNC_llExecFunc(processBatchDoActions) assert(pAction != NULL); +#warning execonly when prev suspended functionality missing! #if 0 // TODO: move this to the action object if((pAction->bExecWhenPrevSusp == 1) && (pDoActData->bPrevWasSuspended == 0)) { dbgprintf("not calling action because the previous one is not suspended\n"); @@ -108,20 +109,8 @@ DEFFUNC_llExecFunc(processBatchDoActions) } #endif -#if 1 - // NEW (potentially): - iRetMod = doSubmitToActionQBatch(pAction, (batch_t*) pParam); -#else - // old code -- milestone check - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { -dbgprintf("ZZZ: inside processBatchDoActions, processing elem %d/%d\n", i, batchNumMsgs(pBatch)); - if(pBatch->pElem[i].bFilterOK) { - iRetMod = pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); - } - } -#endif - //end old code + iRetMod = pAction->submitToActQ(pAction, pBatch); + #if 0 // TODO: this must be done inside the action as well! if(iRetMod == RS_RET_DISCARDMSG) { ABORT_FINALIZE(RS_RET_DISCARDMSG); diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 31c2e1a7..8d6a1c2f 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -175,7 +175,6 @@ processBatchMultiRuleset(batch_t *pBatch) CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem)); snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate; -dbgprintf("ZZZ: multi-ruleset batch of %d elements must be processed\n", pBatch->nElem); while(1) { /* loop broken inside */ /* search for first unprocessed element */ for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart) @@ -189,7 +188,6 @@ dbgprintf("ZZZ: multi-ruleset batch of %d elements must be processed\n", pBatch- iNew = 0; for(i = iStart ; i < pBatch->nElem ; ++i) { if(batchElemGetRuleset(pBatch, i) == currRuleset) { -dbgprintf("ZZZ: proc elem %d:'%s'\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15); batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i])); /* We indicate the element also as done, so it will not be processed again */ pBatch->pElem[i].state = BATCH_STATE_DISC; @@ -199,15 +197,6 @@ dbgprintf("ZZZ: proc elem %d:'%s'\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szR batchSetSingleRuleset(&snglRuleBatch, 1); /* process temp batch */ processBatch(&snglRuleBatch); - -#if 0 -for(i = iStart ; i < pBatch->nElem ; ++i) { - dbgprintf("ZZZ: after partial execution item %d state %d\n", i, pBatch->pElem[i].state); -} -//dbgprintf("ZZZ: search item %d: state %d, bFilterOK %d, IsValid %d, msg:%s\n", -//iStart, pBatch->pElem[iStart].state, pBatch->pElem[iStart].bFilterOK, batchIsValidElem(pBatch, iStart), -//((msg_t*)(pBatch->pElem[iStart].pUsrp))->szRawMsg+40); -#endif } batchFree(&snglRuleBatch); diff --git a/runtime/stream.c b/runtime/stream.c index e0b97f9f..b4295762 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -937,7 +937,6 @@ asyncWriterThread(void *pPtr) while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); -dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName); while(pThis->iCnt == 0) { if(pThis->bStopWriter) { pthread_cond_broadcast(&pThis->isEmpty); @@ -953,7 +952,6 @@ dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName); bTimedOut = 0; timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ // TODO: check the 2000?!? if(pThis->bDoTimedWait) { -dbgprintf("asyncWriter thread going to timeout sleep\n"); if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { int err = errno; if(err == ETIMEDOUT) { @@ -967,16 +965,13 @@ dbgprintf("asyncWriter thread going to timeout sleep\n"); } } } else { -dbgprintf("asyncWriter thread going to eternal sleep\n"); d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); } -dbgprintf("asyncWriter woke up\n"); } bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; -dbgprintf("asyncWriter writes data\n"); doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); // TODO: error check????? 2009-07-06 |