diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/debug.c | 2 | ||||
-rw-r--r-- | runtime/msg.c | 6 | ||||
-rw-r--r-- | runtime/msg.h | 1 | ||||
-rw-r--r-- | runtime/queue.c | 44 | ||||
-rw-r--r-- | runtime/ruleset.c | 7 | ||||
-rw-r--r-- | runtime/wti.c | 26 | ||||
-rw-r--r-- | runtime/wtp.c | 3 |
7 files changed, 66 insertions, 23 deletions
diff --git a/runtime/debug.c b/runtime/debug.c index 7c938008..476f8bf7 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -840,7 +840,7 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) static pthread_t ptLastThrdID = 0; static int bWasNL = 0; char pszThrdName[64]; /* 64 is to be on the safe side, anything over 20 is bad... */ - char pszWriteBuf[1024]; + char pszWriteBuf[32*1024]; size_t lenWriteBuf; struct timespec t; diff --git a/runtime/msg.c b/runtime/msg.c index 2c1af27e..dd541337 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -632,7 +632,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) /* initialize members in ORDER they appear in structure (think "cache line"!) */ pM->flowCtlType = 0; pM->bDoLock = 0; - pM->bParseHOSTNAME = 0; pM->iRefCount = 1; pM->iSeverity = -1; pM->iFacility = -1; @@ -861,7 +860,6 @@ msg_t* MsgDup(msg_t* pOld) pNew->iRefCount = 1; pNew->iSeverity = pOld->iSeverity; pNew->iFacility = pOld->iFacility; - pNew->bParseHOSTNAME = pOld->bParseHOSTNAME; pNew->msgFlags = pOld->msgFlags; pNew->iProtocolVersion = pOld->iProtocolVersion; pNew->ttGenTime = pOld->ttGenTime; @@ -935,7 +933,7 @@ msg_t* MsgDup(msg_t* pOld) * We do not serialize the cache properties. We re-create them when needed. * This saves us a lot of memory. Performance is no concern, as serializing * is a so slow operation that recration of the caches does not count. Also, - * we do not serialize bParseHOSTNAME, as this is only a helper variable + * we do not serialize --currently none--, as this is only a helper variable * during msg construction - and never again used later. * rgerhards, 2008-01-03 */ @@ -1959,7 +1957,6 @@ void MsgSetHOSTNAME(msg_t *pThis, uchar* pszHOSTNAME, int lenHOSTNAME) */ void MsgSetMSGoffs(msg_t *pMsg, short offs) { -BEGINfunc ISOBJ_TYPE_assert(pMsg, msg); pMsg->offMSG = offs; if(offs > pMsg->iLenRawMsg) { @@ -1968,7 +1965,6 @@ BEGINfunc } else { pMsg->iLenMSG = pMsg->iLenRawMsg - offs; } -ENDfunc } diff --git a/runtime/msg.h b/runtime/msg.h index b006cbec..f7d74597 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -60,7 +60,6 @@ struct msg { once data has entered the queue, this property is no longer needed. */ pthread_mutex_t mut; bool bDoLock; /* use the mutex? */ - bool bParseHOSTNAME; /* should the hostname be parsed from the message? */ short iRefCount; /* reference counter (0 = unused) */ /* background: the hostname is not present on "regular" messages * received via UNIX domain sockets from the same machine. However, diff --git a/runtime/queue.c b/runtime/queue.c index cb14b58d..101052a1 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1188,7 +1188,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) if(iRetLocal == RS_RET_TIMED_OUT) { DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); } else { - DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n"); + DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n"); } } @@ -1247,13 +1247,31 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } + /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This + * is necessary because we may be in a situation where the DA queue regular worker and the + * main queue worker stopped rather quickly. In this case, there is almost no time (and + * probably no thread switch!) between the point where we instructed the main queue DA + * worker to shutdown and this code location. In consequence, it may not even have + * noticed that it should should down, less acutally done this. So we provide it with a + * fixed 100ms timeout to try complete its work, what usually should be sufficient. + * rgerhards, 2009-10-06 + */ + timeoutComp(&tTimeout, 100); + DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool " + "(this is not good, but probably OK)\n"); + } else { + DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n"); + } } RETiRet; } -/* This function cancels all remenaing regular workers for both the main and the DA +/* This function cancels all remaining regular workers for both the main and the DA * queue. The main queue's DA worker pool continues to run (if it exists and is active). * rgerhards, 2009-05-29 */ @@ -1651,7 +1669,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); - d_pthread_mutex_unlock(pThis->mut); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1758,9 +1775,7 @@ RateLimiter(qqueue_t *pThis) } -/* This dequeues the next batch and checks if the queue is empty. If it is - * empty, return RS_RET_IDLE. That will trigger termination of the function - * and tell the upper layer caller to initiate idle processing. +/* This dequeues the next batch. * rgerhards, 2009-05-20 */ static inline rsRetVal @@ -1771,11 +1786,13 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); +dbgprintf("YYY: deqeueu for consumer"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) ABORT_FINALIZE(RS_RET_IDLE); + finalize_it: RETiRet; } @@ -1814,6 +1831,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit @@ -1826,6 +1847,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; @@ -1851,6 +1875,10 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs @@ -1860,6 +1888,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; @@ -2513,6 +2544,7 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ qqueueAdviseMaxWorkers(pThis); +dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 5ac9a8fd..0f4bc46d 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -46,8 +46,6 @@ #include "errmsg.h" #include "unicode-helper.h" -static rsRetVal debugPrintAll(void); // TODO: remove! - /* static data */ DEFobjStaticHelpers DEFobjCurrIf(errmsg) @@ -161,13 +159,10 @@ processMsg(msg_t *pMsg) CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg)); finalize_it: - - //if(iRet == RS_RET_DISCARDMSG) - //iRet = RS_RET_OK; - RETiRet; } + /* Add a new rule to the end of the current rule set. We do a number * of checks and ignore the rule if it does not pass them. */ diff --git a/runtime/wti.c b/runtime/wti.c index 9d0560dd..53b695b0 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -114,7 +114,12 @@ wtiSetState(wti_t *pThis, bool bNewVal) /* Cancel the thread. If the thread is not running. But it is save and legal to - * call wtiCancelThrd() in such situations. + * call wtiCancelThrd() in such situations. This function only returns when the + * thread has terminated. Else we may get race conditions all over the code... + * Note that when waiting for the thread to terminate, we do a busy wait, checking + * progress every 10ms. It is very unlikely that we will ever cancel a thread + * and, if so, it will only happen at the end of the rsyslog run. So doing this + * kind of not optimal wait is considered preferable over using condition variables. * rgerhards, 2008-02-26 */ rsRetVal @@ -127,6 +132,11 @@ wtiCancelThrd(wti_t *pThis) if(wtiGetState(pThis)) { dbgoprint((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); + /* now wait until the thread terminates... */ + while(wtiGetState(pThis)) { +//fprintf(stderr, "sleep loop for getState\n"); + srSleep(0, 10000); + } } RETiRet; @@ -214,9 +224,9 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - d_pthread_mutex_lock(pWtp->pmutUsr); if(pThis->bAlwaysRunning) { /* never shut down any started worker */ +dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ @@ -225,7 +235,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } - d_pthread_mutex_unlock(pWtp->pmutUsr); ENDfunc } @@ -258,8 +267,10 @@ wtiWorker(wti_t *pThis) pWtp->pfRateLimiter(pWtp->pUsr); } +dbgprintf("YYY/ZZZ: pre lock mutex\n"); d_pthread_mutex_lock(pWtp->pmutUsr); +dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); if(terminateRet == RS_RET_TERMINATE_NOW) { @@ -272,17 +283,24 @@ wtiWorker(wti_t *pThis) } /* try to execute and process whatever we have */ - /* This function must and does RELEASE the MUTEX! */ + /* Note that this function releases and re-aquires the mutex. The returned + * information on idle state must be processed before releasing the mutex again. + */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); +dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { + d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); + d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } + d_pthread_mutex_unlock(pWtp->pmutUsr); + bInactivityTOOccured = 0; /* reset for next run */ } diff --git a/runtime/wtp.c b/runtime/wtp.c index 4524e0c3..40d031dc 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -413,6 +413,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) ISOBJ_TYPE_assert(pThis, wtp); +int nMaxWrkrTmp = nMaxWrkr; if(nMaxWrkr == 0) FINALIZE; @@ -420,6 +421,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) nMaxWrkr = pThis->iNumWorkerThreads; nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd); +dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, pThis->iNumWorkerThreads, nMissing); if(nMissing > 0) { DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); @@ -428,6 +430,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) CHKiRet(wtpStartWrkr(pThis)); } } else { +dbgprintf("YYY: adivse signal cond busy"); pthread_cond_signal(pThis->pcondBusy); } |