summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-23 12:48:27 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-23 12:48:27 +0200
commitf48128f34a17aae7e7b9405fe32b396db45443ca (patch)
treea122a65d4a2495570b5d8662fd75a1104cb0c521 /runtime
parentd6a3b08f56cc608bc832251cd49e86e71d9574dc (diff)
downloadrsyslog-f48128f34a17aae7e7b9405fe32b396db45443ca.tar.gz
rsyslog-f48128f34a17aae7e7b9405fe32b396db45443ca.tar.xz
rsyslog-f48128f34a17aae7e7b9405fe32b396db45443ca.zip
fixed a couple of regressions
by implementing some code that was missing so far ;) as well as finding some real bugs. I also did some general cleanup, removing debug strings and such. This code should be fairly OK to use, except when "exec only when previous action was suspended" is used -- this is NOT yet re-implemented in the tuned engine.
Diffstat (limited to 'runtime')
-rw-r--r--runtime/nspoll.c3
-rw-r--r--runtime/queue.c7
-rw-r--r--runtime/rule.c17
-rw-r--r--runtime/ruleset.c11
-rw-r--r--runtime/stream.c5
5 files changed, 9 insertions, 34 deletions
diff --git a/runtime/nspoll.c b/runtime/nspoll.c
index f287cd4e..64927280 100644
--- a/runtime/nspoll.c
+++ b/runtime/nspoll.c
@@ -79,7 +79,6 @@ loadDrvr(nspoll_t *pThis)
* about this hack, but for the time being it is efficient and clean
* enough. -- rgerhards, 2008-04-18
*/
-RUNLOG_VAR("%s", szDrvrName+2);
CHKiRet(obj.UseObj(__FILE__, szDrvrName+2, DONT_LOAD_LIB, (void*) &pThis->Drvr));
finalize_it:
@@ -120,11 +119,9 @@ ConstructFinalize(nspoll_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, nspoll);
-RUNLOG_STR("trying to load epoll driver\n");
CHKiRet(loadDrvr(pThis));
CHKiRet(pThis->Drvr.Construct(&pThis->pDrvrData));
finalize_it:
-dbgprintf("XXX: done trying to load epoll driver, state %d\n", iRet);
RETiRet;
}
diff --git a/runtime/queue.c b/runtime/queue.c
index 5e9c67ca..60d17086 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -852,8 +852,11 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
+ memset(&batchObj, 0, sizeof(batch_obj_t));
+ memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
batchObj.pUsrp = (obj_t*) pUsr;
+ batchObj.bFilterOK = 1;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
@@ -862,7 +865,9 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
RETiRet;
}
-/*** EXPERIMENTAL ***/
+/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead
+ * otherwise incured. -- rgerhards, ~2010-06-23
+ */
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
{
DEFiRet;
diff --git a/runtime/rule.c b/runtime/rule.c
index 453e631d..c3c974bf 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -101,6 +101,7 @@ DEFFUNC_llExecFunc(processBatchDoActions)
assert(pAction != NULL);
+#warning execonly when prev suspended functionality missing!
#if 0 // TODO: move this to the action object
if((pAction->bExecWhenPrevSusp == 1) && (pDoActData->bPrevWasSuspended == 0)) {
dbgprintf("not calling action because the previous one is not suspended\n");
@@ -108,20 +109,8 @@ DEFFUNC_llExecFunc(processBatchDoActions)
}
#endif
-#if 1
- // NEW (potentially):
- iRetMod = doSubmitToActionQBatch(pAction, (batch_t*) pParam);
-#else
- // old code -- milestone check
- int i;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
-dbgprintf("ZZZ: inside processBatchDoActions, processing elem %d/%d\n", i, batchNumMsgs(pBatch));
- if(pBatch->pElem[i].bFilterOK) {
- iRetMod = pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
- }
- }
-#endif
- //end old code
+ iRetMod = pAction->submitToActQ(pAction, pBatch);
+
#if 0 // TODO: this must be done inside the action as well!
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 31c2e1a7..8d6a1c2f 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -175,7 +175,6 @@ processBatchMultiRuleset(batch_t *pBatch)
CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
-dbgprintf("ZZZ: multi-ruleset batch of %d elements must be processed\n", pBatch->nElem);
while(1) { /* loop broken inside */
/* search for first unprocessed element */
for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart)
@@ -189,7 +188,6 @@ dbgprintf("ZZZ: multi-ruleset batch of %d elements must be processed\n", pBatch-
iNew = 0;
for(i = iStart ; i < pBatch->nElem ; ++i) {
if(batchElemGetRuleset(pBatch, i) == currRuleset) {
-dbgprintf("ZZZ: proc elem %d:'%s'\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15);
batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i]));
/* We indicate the element also as done, so it will not be processed again */
pBatch->pElem[i].state = BATCH_STATE_DISC;
@@ -199,15 +197,6 @@ dbgprintf("ZZZ: proc elem %d:'%s'\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szR
batchSetSingleRuleset(&snglRuleBatch, 1);
/* process temp batch */
processBatch(&snglRuleBatch);
-
-#if 0
-for(i = iStart ; i < pBatch->nElem ; ++i) {
- dbgprintf("ZZZ: after partial execution item %d state %d\n", i, pBatch->pElem[i].state);
-}
-//dbgprintf("ZZZ: search item %d: state %d, bFilterOK %d, IsValid %d, msg:%s\n",
-//iStart, pBatch->pElem[iStart].state, pBatch->pElem[iStart].bFilterOK, batchIsValidElem(pBatch, iStart),
-//((msg_t*)(pBatch->pElem[iStart].pUsrp))->szRawMsg+40);
-#endif
}
batchFree(&snglRuleBatch);
diff --git a/runtime/stream.c b/runtime/stream.c
index e0b97f9f..b4295762 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -937,7 +937,6 @@ asyncWriterThread(void *pPtr)
while(1) { /* loop broken inside */
d_pthread_mutex_lock(&pThis->mut);
-dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName);
while(pThis->iCnt == 0) {
if(pThis->bStopWriter) {
pthread_cond_broadcast(&pThis->isEmpty);
@@ -953,7 +952,6 @@ dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName);
bTimedOut = 0;
timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ // TODO: check the 2000?!?
if(pThis->bDoTimedWait) {
-dbgprintf("asyncWriter thread going to timeout sleep\n");
if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) {
int err = errno;
if(err == ETIMEDOUT) {
@@ -967,16 +965,13 @@ dbgprintf("asyncWriter thread going to timeout sleep\n");
}
}
} else {
-dbgprintf("asyncWriter thread going to eternal sleep\n");
d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
}
-dbgprintf("asyncWriter woke up\n");
}
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
-dbgprintf("asyncWriter writes data\n");
doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
// TODO: error check????? 2009-07-06