summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog4
-rw-r--r--plugins/imudp/imudp.c1
-rw-r--r--runtime/queue.c22
-rw-r--r--runtime/wti.c15
-rw-r--r--runtime/wtp.c3
-rw-r--r--tcps_sess.c1
-rwxr-xr-xtests/arrayqueue.sh3
-rwxr-xr-xtests/da-mainmsg-q.sh2
-rwxr-xr-xtests/daqueue-persist-drvr.sh1
-rwxr-xr-xtests/diag.sh1
-rwxr-xr-xtests/linkedlistqueue.sh3
-rw-r--r--tests/nettester.c28
12 files changed, 76 insertions, 8 deletions
diff --git a/ChangeLog b/ChangeLog
index 1066b734..2ffc67a1 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -3,6 +3,10 @@ Version 5.1.6 [v5-beta] (rgerhards), 2009-09-??
- feature imports from v4.5.6
- bugfix: potential race condition when queue worker threads were
terminated
+- bugfix: solved potential (temporary) stall of messages when the queue was
+ almost empty and few new data added (caused testbench to sometimes hang!)
+- fixed some race condition in testbench
+- added more elaborate diagnostics to parts of the testbench
- bugfixes imported from 4.5.4:
* bugfix: potential segfault in stream writer on destruction
* bugfix: potential race in object loader (obj.c) during use/release
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index b9db1875..a393cf96 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -256,6 +256,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
pMsg->bParseHOSTNAME = 1;
MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
+dbgprintf("XXX: submitting msg to queue\n");
CHKiRet(submitMsg(pMsg));
}
}
diff --git a/runtime/queue.c b/runtime/queue.c
index 96ebd6d5..101052a1 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1669,7 +1669,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
// TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
- d_pthread_mutex_unlock(pThis->mut);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
@@ -1776,9 +1775,7 @@ RateLimiter(qqueue_t *pThis)
}
-/* This dequeues the next batch and checks if the queue is empty. If it is
- * empty, return RS_RET_IDLE. That will trigger termination of the function
- * and tell the upper layer caller to initiate idle processing.
+/* This dequeues the next batch.
* rgerhards, 2009-05-20
*/
static inline rsRetVal
@@ -1789,11 +1786,13 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
+dbgprintf("YYY: deqeueu for consumer");
CHKiRet(DequeueConsumable(pThis, pWti));
if(pWti->batch.nElem == 0)
ABORT_FINALIZE(RS_RET_IDLE);
+
finalize_it:
RETiRet;
}
@@ -1832,6 +1831,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1844,6 +1847,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
}
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
+
finalize_it:
dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
@@ -1869,6 +1875,10 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
@@ -1878,6 +1888,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
}
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
+
finalize_it:
DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
@@ -2531,6 +2544,7 @@ finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
+dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/runtime/wti.c b/runtime/wti.c
index e624899b..53b695b0 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -134,6 +134,7 @@ wtiCancelThrd(wti_t *pThis)
pthread_cancel(pThis->thrdID);
/* now wait until the thread terminates... */
while(wtiGetState(pThis)) {
+//fprintf(stderr, "sleep loop for getState\n");
srSleep(0, 10000);
}
}
@@ -223,9 +224,9 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
- d_pthread_mutex_lock(pWtp->pmutUsr);
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
+dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
@@ -234,7 +235,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
- d_pthread_mutex_unlock(pWtp->pmutUsr);
ENDfunc
}
@@ -267,8 +267,10 @@ wtiWorker(wti_t *pThis)
pWtp->pfRateLimiter(pWtp->pUsr);
}
+dbgprintf("YYY/ZZZ: pre lock mutex\n");
d_pthread_mutex_lock(pWtp->pmutUsr);
+dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
if(terminateRet == RS_RET_TERMINATE_NOW) {
@@ -281,17 +283,24 @@ wtiWorker(wti_t *pThis)
}
/* try to execute and process whatever we have */
- /* This function must and does RELEASE the MUTEX! */
+ /* Note that this function releases and re-aquires the mutex. The returned
+ * information on idle state must be processed before releasing the mutex again.
+ */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
+dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr);
if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
}
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
continue; /* request next iteration */
}
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+
bInactivityTOOccured = 0; /* reset for next run */
}
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 4524e0c3..40d031dc 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -413,6 +413,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
ISOBJ_TYPE_assert(pThis, wtp);
+int nMaxWrkrTmp = nMaxWrkr;
if(nMaxWrkr == 0)
FINALIZE;
@@ -420,6 +421,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
nMaxWrkr = pThis->iNumWorkerThreads;
nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd);
+dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, pThis->iNumWorkerThreads, nMissing);
if(nMissing > 0) {
DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
@@ -428,6 +430,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
CHKiRet(wtpStartWrkr(pThis));
}
} else {
+dbgprintf("YYY: adivse signal cond busy");
pthread_cond_signal(pThis->pcondBusy);
}
diff --git a/tcps_sess.c b/tcps_sess.c
index 8d307380..09861ab9 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -256,6 +256,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG
CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP));
MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset);
+dbgprintf("YYY: submitting msg to queue\n");
if(pMultiSub == NULL) {
CHKiRet(submitMsg(pMsg));
} else {
diff --git a/tests/arrayqueue.sh b/tests/arrayqueue.sh
index 01fc133b..58fd24ae 100755
--- a/tests/arrayqueue.sh
+++ b/tests/arrayqueue.sh
@@ -10,5 +10,8 @@ source $srcdir/diag.sh injectmsg 0 40000
# terminate *now* (don't wait for queue to drain!)
kill `cat rsyslog.pid`
+
+# now wait until rsyslog.pid is gone (and the process finished)
+source $srcdir/diag.sh wait-shutdown
source $srcdir/diag.sh seq-check 0 39999
source $srcdir/diag.sh exit
diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh
index 6ec2f3a9..d502fca3 100755
--- a/tests/da-mainmsg-q.sh
+++ b/tests/da-mainmsg-q.sh
@@ -7,7 +7,7 @@
# check everything recovers from DA mode correctly.
# added 2009-04-22 by Rgerhards
# This file is part of the rsyslog project, released under GPLv3
-echo "testing main message queue in DA mode (going to disk)"
+echo "[da-mainmsg-q.sh]: testing main message queue in DA mode (going to disk)"
source $srcdir/diag.sh init
source $srcdir/diag.sh startup da-mainmsg-q.conf
diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh
index 7b6ec6dd..d95991fc 100755
--- a/tests/daqueue-persist-drvr.sh
+++ b/tests/daqueue-persist-drvr.sh
@@ -26,5 +26,6 @@ source $srcdir/diag.sh check-mainq-spool
echo "#" > work-delay.conf
source $srcdir/diag.sh startup queue-persist.conf
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+$srcdir/diag.sh wait-shutdown
source $srcdir/diag.sh seq-check 0 4999
source $srcdir/diag.sh exit
diff --git a/tests/diag.sh b/tests/diag.sh
index 56810c4f..d8ba43b8 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -43,6 +43,7 @@ case $1 in
then
echo "ABORT! core file exists, starting interactive shell"
bash
+ exit 1
fi
;;
'wait-queueempty') # wait for main message queue to be empty
diff --git a/tests/linkedlistqueue.sh b/tests/linkedlistqueue.sh
index 9570ed2b..72c2a403 100755
--- a/tests/linkedlistqueue.sh
+++ b/tests/linkedlistqueue.sh
@@ -10,5 +10,8 @@ source $srcdir/diag.sh injectmsg 0 40000
# terminate *now* (don't wait for queue to drain)
kill `cat rsyslog.pid`
+
+# now wait until rsyslog.pid is gone (and the process finished)
+source $srcdir/diag.sh wait-shutdown
source $srcdir/diag.sh seq-check 0 39999
source $srcdir/diag.sh exit
diff --git a/tests/nettester.c b/tests/nettester.c
index 71641745..2838b919 100644
--- a/tests/nettester.c
+++ b/tests/nettester.c
@@ -47,6 +47,7 @@
#include <signal.h>
#include <netinet/in.h>
#include <getopt.h>
+#include <errno.h>
#define EXIT_FAILURE 1
#define INVALID_SOCKET -1
@@ -90,6 +91,7 @@ void readLine(int fd, char *ln)
if(verbose)
fprintf(stderr, "begin readLine\n");
lenRead = read(fd, &c, 1);
+
while(lenRead == 1 && c != '\n') {
if(c == '\0') {
*ln = c;
@@ -102,6 +104,11 @@ void readLine(int fd, char *ln)
}
*ln = '\0';
+ if(lenRead < 0) {
+ printf("read from rsyslogd returned with error '%s' - aborting test\n", strerror(errno));
+ exit(1);
+ }
+
if(verbose)
fprintf(stderr, "end readLine, val read '%s'\n", orig);
}
@@ -308,6 +315,10 @@ processTestFile(int fd, char *pszFileName)
/* pull response from server and then check if it meets our expectation */
readLine(fd, buf);
+ if(strlen(buf) == 0) {
+ printf("something went wrong - read a zero-length string from rsyslogd");
+ exit(1);
+ }
if(strcmp(expected, buf)) {
++iFailed;
printf("\nExpected Response:\n'%s'\nActual Response:\n'%s'\n",
@@ -372,11 +383,24 @@ doTests(int fd, char *files)
return(iFailed);
}
+
+/* indicate that our child has died (where it is not permitted to!).
+ */
+void childDied(__attribute__((unused)) int sig)
+{
+ printf("ERROR: child died unexpectedly (maybe a segfault?)!\n");
+ exit(1);
+}
+
+
/* cleanup */
void doAtExit(void)
{
int status;
+ /* disarm died-child handler */
+ signal(SIGCHLD, SIG_IGN);
+
if(rsyslogdPid != 0) {
kill(rsyslogdPid, SIGTERM);
waitpid(rsyslogdPid, &status, 0); /* wait until instance terminates */
@@ -457,6 +481,9 @@ int main(int argc, char *argv[])
}
fclose(fp);
+ /* arm died-child handler */
+ signal(SIGCHLD, childDied);
+
/* start to be tested rsyslogd */
openPipe(testSuite, &rsyslogdPid, &fd);
readLine(fd, buf);
@@ -467,5 +494,6 @@ int main(int argc, char *argv[])
ret = 1;
if(verbose) printf("End of nettester run (%d).\n", ret);
+
exit(ret);
}