summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-20 08:58:03 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-20 08:58:03 +0200
commit01acb7928e4e72b08279da15d376adff9c3c3840 (patch)
tree1b176ee8f3b6e304db72386ca5a4872c1dcbdf88
parent88132c79f1e75edb2caf5d3f9ad1685834785be7 (diff)
downloadrsyslog-01acb7928e4e72b08279da15d376adff9c3c3840.tar.gz
rsyslog-01acb7928e4e72b08279da15d376adff9c3c3840.tar.xz
rsyslog-01acb7928e4e72b08279da15d376adff9c3c3840.zip
some more threading changes
... as well as some cleanup
-rw-r--r--runtime/queue.c46
-rw-r--r--runtime/wtp.c24
-rw-r--r--runtime/wtp.h1
-rwxr-xr-xtests/queue-persist.sh1
4 files changed, 28 insertions, 44 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index a2bb4c1d..0ef0174e 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -59,7 +59,6 @@
#ifdef OS_SOLARIS
# include <sched.h>
-# define pthread_yield() sched_yield()
#endif
/* static data */
@@ -1277,33 +1276,30 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
timeoutComp(&tTimeout, pThis->toActShutdown);
- d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
- if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
- d_pthread_mutex_unlock(pThis->mut);
- dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
- "triggers cancellation)\n");
- } else if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
- "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
- }
- /* we need to re-aquire the mutex for the next check in this case! */
- d_pthread_mutex_lock(pThis->mut);
+ dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n");
+ iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
+ "triggers cancellation)\n");
+ } else if(iRetLocal != RS_RET_OK) {
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
+ "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
- if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) {
- /* and now the same for the DA queue */
+ d_pthread_mutex_lock(pThis->mut);
+ if(pThis->bRunsDA) {
d_pthread_mutex_unlock(pThis->mut);
- dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n");
- iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and "
- "triggers cancellation)\n");
- } else if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
- "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
+ if(wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) {
+ /* and now the same for the DA queue */
+ dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n");
+ iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable "
+ "and triggers cancellation)\n");
+ } else if(iRetLocal != RS_RET_OK) {
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
+ "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
+ }
}
} else {
d_pthread_mutex_unlock(pThis->mut);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 46b5f4bb..4d4d0f0e 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -157,20 +157,6 @@ CODESTARTobjDestruct(wtp)
ENDobjDestruct(wtp)
-/* wake up at least one worker thread.
- * rgerhards, 2008-01-20
- */
-rsRetVal
-wtpWakeupWrkr(wtp_t *pThis)
-{
- DEFiRet;
-
- /* TODO; mutex? I think not needed, as we do not need predictable exec order -- rgerhards, 2008-01-28 */
- ISOBJ_TYPE_assert(pThis, wtp);
- pthread_cond_signal(pThis->pcondBusy);
- RETiRet;
-}
-
/* wake up all worker threads.
* rgerhards, 2008-01-16
*/
@@ -239,7 +225,9 @@ finalize_it:
#pragma GCC diagnostic ignored "-Wempty-body"
/* Send a shutdown command to all workers and see if they terminate.
- * A timeout may be specified.
+ * A timeout may be specified. This function may also be called with
+ * the current number of workers being 0, in which case it does not
+ * shut down any worker.
* rgerhards, 2008-01-14
*/
rsRetVal
@@ -383,8 +371,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp, bLockMutex);
- pThis->iCurNumWrkThrd++;
-
/* find free spot in thread table. */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
if(wtiGetState(pThis->pWrkr[i]) == WRKTHRD_STOPPED) {
@@ -395,6 +381,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
if(i == pThis->iNumWorkerThreads)
ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
+ pThis->iCurNumWrkThrd++; /* we got one more! */
+
pWti = pThis->pWrkr[i];
wtiSetState(pWti, WRKTHRD_RUNNING);
pthread_attr_init(&attr);
@@ -445,7 +433,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
}
} else {
if(nMaxWrkr > 0) {
- wtpWakeupWrkr(pThis);
+ pthread_cond_signal(pThis->pcondBusy);
}
}
diff --git a/runtime/wtp.h b/runtime/wtp.h
index e2dd9409..88683ea2 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -82,7 +82,6 @@ rsRetVal wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr);
rsRetVal wtpProcessThrdChanges(wtp_t *pThis);
rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex);
rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState);
-rsRetVal wtpWakeupWrkr(wtp_t *pThis);
rsRetVal wtpWakeupAllWrkr(wtp_t *pThis);
rsRetVal wtpCancelAll(wtp_t *pThis);
rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
diff --git a/tests/queue-persist.sh b/tests/queue-persist.sh
index 999655b1..e05b3da3 100755
--- a/tests/queue-persist.sh
+++ b/tests/queue-persist.sh
@@ -2,6 +2,7 @@
# to carry out multiple tests with different queue modes
# added 2009-05-27 by Rgerhards
# This file is part of the rsyslog project, released under GPLv3
+echo TEST: queue-persist.sh
source $srcdir/queue-persist-drvr.sh LinkedList
source $srcdir/queue-persist-drvr.sh FixedArray
# the disk test should not fail, however, the config is extreme and using