summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/debug.c2
-rw-r--r--runtime/msg.c6
-rw-r--r--runtime/msg.h1
-rw-r--r--runtime/queue.c44
-rw-r--r--runtime/ruleset.c7
-rw-r--r--runtime/wti.c26
-rw-r--r--runtime/wtp.c3
7 files changed, 66 insertions, 23 deletions
diff --git a/runtime/debug.c b/runtime/debug.c
index 7c938008..476f8bf7 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -840,7 +840,7 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg)
static pthread_t ptLastThrdID = 0;
static int bWasNL = 0;
char pszThrdName[64]; /* 64 is to be on the safe side, anything over 20 is bad... */
- char pszWriteBuf[1024];
+ char pszWriteBuf[32*1024];
size_t lenWriteBuf;
struct timespec t;
diff --git a/runtime/msg.c b/runtime/msg.c
index 2c1af27e..dd541337 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -632,7 +632,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
/* initialize members in ORDER they appear in structure (think "cache line"!) */
pM->flowCtlType = 0;
pM->bDoLock = 0;
- pM->bParseHOSTNAME = 0;
pM->iRefCount = 1;
pM->iSeverity = -1;
pM->iFacility = -1;
@@ -861,7 +860,6 @@ msg_t* MsgDup(msg_t* pOld)
pNew->iRefCount = 1;
pNew->iSeverity = pOld->iSeverity;
pNew->iFacility = pOld->iFacility;
- pNew->bParseHOSTNAME = pOld->bParseHOSTNAME;
pNew->msgFlags = pOld->msgFlags;
pNew->iProtocolVersion = pOld->iProtocolVersion;
pNew->ttGenTime = pOld->ttGenTime;
@@ -935,7 +933,7 @@ msg_t* MsgDup(msg_t* pOld)
* We do not serialize the cache properties. We re-create them when needed.
* This saves us a lot of memory. Performance is no concern, as serializing
* is a so slow operation that recration of the caches does not count. Also,
- * we do not serialize bParseHOSTNAME, as this is only a helper variable
+ * we do not serialize --currently none--, as this is only a helper variable
* during msg construction - and never again used later.
* rgerhards, 2008-01-03
*/
@@ -1959,7 +1957,6 @@ void MsgSetHOSTNAME(msg_t *pThis, uchar* pszHOSTNAME, int lenHOSTNAME)
*/
void MsgSetMSGoffs(msg_t *pMsg, short offs)
{
-BEGINfunc
ISOBJ_TYPE_assert(pMsg, msg);
pMsg->offMSG = offs;
if(offs > pMsg->iLenRawMsg) {
@@ -1968,7 +1965,6 @@ BEGINfunc
} else {
pMsg->iLenMSG = pMsg->iLenRawMsg - offs;
}
-ENDfunc
}
diff --git a/runtime/msg.h b/runtime/msg.h
index b006cbec..f7d74597 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -60,7 +60,6 @@ struct msg {
once data has entered the queue, this property is no longer needed. */
pthread_mutex_t mut;
bool bDoLock; /* use the mutex? */
- bool bParseHOSTNAME; /* should the hostname be parsed from the message? */
short iRefCount; /* reference counter (0 = unused) */
/* background: the hostname is not present on "regular" messages
* received via UNIX domain sockets from the same machine. However,
diff --git a/runtime/queue.c b/runtime/queue.c
index cb14b58d..101052a1 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1188,7 +1188,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
if(iRetLocal == RS_RET_TIMED_OUT) {
DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
} else {
- DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n");
+ DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n");
}
}
@@ -1247,13 +1247,31 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
"queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
+ /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This
+ * is necessary because we may be in a situation where the DA queue regular worker and the
+ * main queue worker stopped rather quickly. In this case, there is almost no time (and
+ * probably no thread switch!) between the point where we instructed the main queue DA
+ * worker to shutdown and this code location. In consequence, it may not even have
+ * noticed that it should should down, less acutally done this. So we provide it with a
+ * fixed 100ms timeout to try complete its work, what usually should be sufficient.
+ * rgerhards, 2009-10-06
+ */
+ timeoutComp(&tTimeout, 100);
+ DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n");
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
+ "(this is not good, but probably OK)\n");
+ } else {
+ DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n");
+ }
}
RETiRet;
}
-/* This function cancels all remenaing regular workers for both the main and the DA
+/* This function cancels all remaining regular workers for both the main and the DA
* queue. The main queue's DA worker pool continues to run (if it exists and is active).
* rgerhards, 2009-05-29
*/
@@ -1651,7 +1669,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
// TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
- d_pthread_mutex_unlock(pThis->mut);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
@@ -1758,9 +1775,7 @@ RateLimiter(qqueue_t *pThis)
}
-/* This dequeues the next batch and checks if the queue is empty. If it is
- * empty, return RS_RET_IDLE. That will trigger termination of the function
- * and tell the upper layer caller to initiate idle processing.
+/* This dequeues the next batch.
* rgerhards, 2009-05-20
*/
static inline rsRetVal
@@ -1771,11 +1786,13 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
+dbgprintf("YYY: deqeueu for consumer");
CHKiRet(DequeueConsumable(pThis, pWti));
if(pWti->batch.nElem == 0)
ABORT_FINALIZE(RS_RET_IDLE);
+
finalize_it:
RETiRet;
}
@@ -1814,6 +1831,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1826,6 +1847,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
}
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
+
finalize_it:
dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
@@ -1851,6 +1875,10 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
@@ -1860,6 +1888,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
}
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
+
finalize_it:
DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
@@ -2513,6 +2544,7 @@ finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
+dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 5ac9a8fd..0f4bc46d 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -46,8 +46,6 @@
#include "errmsg.h"
#include "unicode-helper.h"
-static rsRetVal debugPrintAll(void); // TODO: remove!
-
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(errmsg)
@@ -161,13 +159,10 @@ processMsg(msg_t *pMsg)
CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg));
finalize_it:
-
- //if(iRet == RS_RET_DISCARDMSG)
- //iRet = RS_RET_OK;
-
RETiRet;
}
+
/* Add a new rule to the end of the current rule set. We do a number
* of checks and ignore the rule if it does not pass them.
*/
diff --git a/runtime/wti.c b/runtime/wti.c
index 9d0560dd..53b695b0 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -114,7 +114,12 @@ wtiSetState(wti_t *pThis, bool bNewVal)
/* Cancel the thread. If the thread is not running. But it is save and legal to
- * call wtiCancelThrd() in such situations.
+ * call wtiCancelThrd() in such situations. This function only returns when the
+ * thread has terminated. Else we may get race conditions all over the code...
+ * Note that when waiting for the thread to terminate, we do a busy wait, checking
+ * progress every 10ms. It is very unlikely that we will ever cancel a thread
+ * and, if so, it will only happen at the end of the rsyslog run. So doing this
+ * kind of not optimal wait is considered preferable over using condition variables.
* rgerhards, 2008-02-26
*/
rsRetVal
@@ -127,6 +132,11 @@ wtiCancelThrd(wti_t *pThis)
if(wtiGetState(pThis)) {
dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
+ /* now wait until the thread terminates... */
+ while(wtiGetState(pThis)) {
+//fprintf(stderr, "sleep loop for getState\n");
+ srSleep(0, 10000);
+ }
}
RETiRet;
@@ -214,9 +224,9 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
- d_pthread_mutex_lock(pWtp->pmutUsr);
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
+dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
@@ -225,7 +235,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
- d_pthread_mutex_unlock(pWtp->pmutUsr);
ENDfunc
}
@@ -258,8 +267,10 @@ wtiWorker(wti_t *pThis)
pWtp->pfRateLimiter(pWtp->pUsr);
}
+dbgprintf("YYY/ZZZ: pre lock mutex\n");
d_pthread_mutex_lock(pWtp->pmutUsr);
+dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
if(terminateRet == RS_RET_TERMINATE_NOW) {
@@ -272,17 +283,24 @@ wtiWorker(wti_t *pThis)
}
/* try to execute and process whatever we have */
- /* This function must and does RELEASE the MUTEX! */
+ /* Note that this function releases and re-aquires the mutex. The returned
+ * information on idle state must be processed before releasing the mutex again.
+ */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
+dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr);
if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
}
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
continue; /* request next iteration */
}
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+
bInactivityTOOccured = 0; /* reset for next run */
}
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 4524e0c3..40d031dc 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -413,6 +413,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
ISOBJ_TYPE_assert(pThis, wtp);
+int nMaxWrkrTmp = nMaxWrkr;
if(nMaxWrkr == 0)
FINALIZE;
@@ -420,6 +421,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
nMaxWrkr = pThis->iNumWorkerThreads;
nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd);
+dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, pThis->iNumWorkerThreads, nMissing);
if(nMissing > 0) {
DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
@@ -428,6 +430,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
CHKiRet(wtpStartWrkr(pThis));
}
} else {
+dbgprintf("YYY: adivse signal cond busy");
pthread_cond_signal(pThis->pcondBusy);
}