diff options
-rw-r--r-- | ChangeLog | 4 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 1 | ||||
-rw-r--r-- | runtime/queue.c | 22 | ||||
-rw-r--r-- | runtime/wti.c | 15 | ||||
-rw-r--r-- | runtime/wtp.c | 3 | ||||
-rw-r--r-- | tcps_sess.c | 1 | ||||
-rwxr-xr-x | tests/arrayqueue.sh | 3 | ||||
-rwxr-xr-x | tests/da-mainmsg-q.sh | 2 | ||||
-rwxr-xr-x | tests/daqueue-persist-drvr.sh | 1 | ||||
-rwxr-xr-x | tests/diag.sh | 1 | ||||
-rwxr-xr-x | tests/linkedlistqueue.sh | 3 | ||||
-rw-r--r-- | tests/nettester.c | 28 |
12 files changed, 76 insertions, 8 deletions
@@ -3,6 +3,10 @@ Version 5.1.6 [v5-beta] (rgerhards), 2009-09-?? - feature imports from v4.5.6 - bugfix: potential race condition when queue worker threads were terminated +- bugfix: solved potential (temporary) stall of messages when the queue was + almost empty and few new data added (caused testbench to sometimes hang!) +- fixed some race condition in testbench +- added more elaborate diagnostics to parts of the testbench - bugfixes imported from 4.5.4: * bugfix: potential segfault in stream writer on destruction * bugfix: potential race in object loader (obj.c) during use/release diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index b9db1875..a393cf96 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -256,6 +256,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, pMsg->bParseHOSTNAME = 1; MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); +dbgprintf("XXX: submitting msg to queue\n"); CHKiRet(submitMsg(pMsg)); } } diff --git a/runtime/queue.c b/runtime/queue.c index 96ebd6d5..101052a1 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1669,7 +1669,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); - d_pthread_mutex_unlock(pThis->mut); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1776,9 +1775,7 @@ RateLimiter(qqueue_t *pThis) } -/* This dequeues the next batch and checks if the queue is empty. If it is - * empty, return RS_RET_IDLE. That will trigger termination of the function - * and tell the upper layer caller to initiate idle processing. +/* This dequeues the next batch. * rgerhards, 2009-05-20 */ static inline rsRetVal @@ -1789,11 +1786,13 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); +dbgprintf("YYY: deqeueu for consumer"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) ABORT_FINALIZE(RS_RET_IDLE); + finalize_it: RETiRet; } @@ -1832,6 +1831,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit @@ -1844,6 +1847,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; @@ -1869,6 +1875,10 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs @@ -1878,6 +1888,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; @@ -2531,6 +2544,7 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ qqueueAdviseMaxWorkers(pThis); +dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); diff --git a/runtime/wti.c b/runtime/wti.c index e624899b..53b695b0 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -134,6 +134,7 @@ wtiCancelThrd(wti_t *pThis) pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ while(wtiGetState(pThis)) { +//fprintf(stderr, "sleep loop for getState\n"); srSleep(0, 10000); } } @@ -223,9 +224,9 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - d_pthread_mutex_lock(pWtp->pmutUsr); if(pThis->bAlwaysRunning) { /* never shut down any started worker */ +dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ @@ -234,7 +235,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } - d_pthread_mutex_unlock(pWtp->pmutUsr); ENDfunc } @@ -267,8 +267,10 @@ wtiWorker(wti_t *pThis) pWtp->pfRateLimiter(pWtp->pUsr); } +dbgprintf("YYY/ZZZ: pre lock mutex\n"); d_pthread_mutex_lock(pWtp->pmutUsr); +dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); if(terminateRet == RS_RET_TERMINATE_NOW) { @@ -281,17 +283,24 @@ wtiWorker(wti_t *pThis) } /* try to execute and process whatever we have */ - /* This function must and does RELEASE the MUTEX! */ + /* Note that this function releases and re-aquires the mutex. The returned + * information on idle state must be processed before releasing the mutex again. + */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); +dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { + d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); + d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } + d_pthread_mutex_unlock(pWtp->pmutUsr); + bInactivityTOOccured = 0; /* reset for next run */ } diff --git a/runtime/wtp.c b/runtime/wtp.c index 4524e0c3..40d031dc 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -413,6 +413,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) ISOBJ_TYPE_assert(pThis, wtp); +int nMaxWrkrTmp = nMaxWrkr; if(nMaxWrkr == 0) FINALIZE; @@ -420,6 +421,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) nMaxWrkr = pThis->iNumWorkerThreads; nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd); +dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, pThis->iNumWorkerThreads, nMissing); if(nMissing > 0) { DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); @@ -428,6 +430,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) CHKiRet(wtpStartWrkr(pThis)); } } else { +dbgprintf("YYY: adivse signal cond busy"); pthread_cond_signal(pThis->pcondBusy); } diff --git a/tcps_sess.c b/tcps_sess.c index 8d307380..09861ab9 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -256,6 +256,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset); +dbgprintf("YYY: submitting msg to queue\n"); if(pMultiSub == NULL) { CHKiRet(submitMsg(pMsg)); } else { diff --git a/tests/arrayqueue.sh b/tests/arrayqueue.sh index 01fc133b..58fd24ae 100755 --- a/tests/arrayqueue.sh +++ b/tests/arrayqueue.sh @@ -10,5 +10,8 @@ source $srcdir/diag.sh injectmsg 0 40000 # terminate *now* (don't wait for queue to drain!) kill `cat rsyslog.pid` + +# now wait until rsyslog.pid is gone (and the process finished) +source $srcdir/diag.sh wait-shutdown source $srcdir/diag.sh seq-check 0 39999 source $srcdir/diag.sh exit diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh index 6ec2f3a9..d502fca3 100755 --- a/tests/da-mainmsg-q.sh +++ b/tests/da-mainmsg-q.sh @@ -7,7 +7,7 @@ # check everything recovers from DA mode correctly. # added 2009-04-22 by Rgerhards # This file is part of the rsyslog project, released under GPLv3 -echo "testing main message queue in DA mode (going to disk)" +echo "[da-mainmsg-q.sh]: testing main message queue in DA mode (going to disk)" source $srcdir/diag.sh init source $srcdir/diag.sh startup da-mainmsg-q.conf diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh index 7b6ec6dd..d95991fc 100755 --- a/tests/daqueue-persist-drvr.sh +++ b/tests/daqueue-persist-drvr.sh @@ -26,5 +26,6 @@ source $srcdir/diag.sh check-mainq-spool echo "#" > work-delay.conf source $srcdir/diag.sh startup queue-persist.conf source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +$srcdir/diag.sh wait-shutdown source $srcdir/diag.sh seq-check 0 4999 source $srcdir/diag.sh exit diff --git a/tests/diag.sh b/tests/diag.sh index 56810c4f..d8ba43b8 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -43,6 +43,7 @@ case $1 in then echo "ABORT! core file exists, starting interactive shell" bash + exit 1 fi ;; 'wait-queueempty') # wait for main message queue to be empty diff --git a/tests/linkedlistqueue.sh b/tests/linkedlistqueue.sh index 9570ed2b..72c2a403 100755 --- a/tests/linkedlistqueue.sh +++ b/tests/linkedlistqueue.sh @@ -10,5 +10,8 @@ source $srcdir/diag.sh injectmsg 0 40000 # terminate *now* (don't wait for queue to drain) kill `cat rsyslog.pid` + +# now wait until rsyslog.pid is gone (and the process finished) +source $srcdir/diag.sh wait-shutdown source $srcdir/diag.sh seq-check 0 39999 source $srcdir/diag.sh exit diff --git a/tests/nettester.c b/tests/nettester.c index 71641745..2838b919 100644 --- a/tests/nettester.c +++ b/tests/nettester.c @@ -47,6 +47,7 @@ #include <signal.h> #include <netinet/in.h> #include <getopt.h> +#include <errno.h> #define EXIT_FAILURE 1 #define INVALID_SOCKET -1 @@ -90,6 +91,7 @@ void readLine(int fd, char *ln) if(verbose) fprintf(stderr, "begin readLine\n"); lenRead = read(fd, &c, 1); + while(lenRead == 1 && c != '\n') { if(c == '\0') { *ln = c; @@ -102,6 +104,11 @@ void readLine(int fd, char *ln) } *ln = '\0'; + if(lenRead < 0) { + printf("read from rsyslogd returned with error '%s' - aborting test\n", strerror(errno)); + exit(1); + } + if(verbose) fprintf(stderr, "end readLine, val read '%s'\n", orig); } @@ -308,6 +315,10 @@ processTestFile(int fd, char *pszFileName) /* pull response from server and then check if it meets our expectation */ readLine(fd, buf); + if(strlen(buf) == 0) { + printf("something went wrong - read a zero-length string from rsyslogd"); + exit(1); + } if(strcmp(expected, buf)) { ++iFailed; printf("\nExpected Response:\n'%s'\nActual Response:\n'%s'\n", @@ -372,11 +383,24 @@ doTests(int fd, char *files) return(iFailed); } + +/* indicate that our child has died (where it is not permitted to!). + */ +void childDied(__attribute__((unused)) int sig) +{ + printf("ERROR: child died unexpectedly (maybe a segfault?)!\n"); + exit(1); +} + + /* cleanup */ void doAtExit(void) { int status; + /* disarm died-child handler */ + signal(SIGCHLD, SIG_IGN); + if(rsyslogdPid != 0) { kill(rsyslogdPid, SIGTERM); waitpid(rsyslogdPid, &status, 0); /* wait until instance terminates */ @@ -457,6 +481,9 @@ int main(int argc, char *argv[]) } fclose(fp); + /* arm died-child handler */ + signal(SIGCHLD, childDied); + /* start to be tested rsyslogd */ openPipe(testSuite, &rsyslogdPid, &fd); readLine(fd, buf); @@ -467,5 +494,6 @@ int main(int argc, char *argv[]) ret = 1; if(verbose) printf("End of nettester run (%d).\n", ret); + exit(ret); } |