summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-08 18:12:05 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-08 18:12:05 +0200
commit4cfbc7254a49503a3c980a1b20264915d6b3b807 (patch)
tree29c6fb1789588760b8c68b48ba2617b90262ac52
parent9f286c0c4c21128c66305166ae379d3f7b07f673 (diff)
downloadrsyslog-4cfbc7254a49503a3c980a1b20264915d6b3b807.tar.gz
rsyslog-4cfbc7254a49503a3c980a1b20264915d6b3b807.tar.xz
rsyslog-4cfbc7254a49503a3c980a1b20264915d6b3b807.zip
simplified worker thread handling
based on now working with detached threads. This is probably the biggest patch in this series and with large bug potential.
-rw-r--r--runtime/wti.c114
-rw-r--r--runtime/wti.h2
-rw-r--r--runtime/wtp.c68
-rw-r--r--runtime/wtp.h2
-rwxr-xr-xtests/daqueue-persist.sh1
5 files changed, 16 insertions, 171 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index 93c66028..f6a32c34 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -118,7 +118,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
tCurrCmd = pThis->tCurrCmd;
/* all worker states must be followed sequentially, only termination can be set in any state */
if( (bActiveOnly && (tCurrCmd < eWRKTHRD_RUN_CREATED))
- || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
+ || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_STOPPED))) {
DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n",
wtiGetDbgHdr(pThis), tCmd, tCurrCmd);
} else {
@@ -126,25 +126,9 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
/* we could replace this with a simple if, but we leave the switch in in case we need
* to add something at a later stage. -- rgerhards, 2008-09-30
*/
- switch(tCmd) {
- case eWRKTHRD_TERMINATING:
- /* TODO: re-enable meaningful debug msg! (via function callback?)
- dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n",
- wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize,
- pThis->pQueue->iCurNumWrkThrd);
- */
- pthread_cond_signal(&pThis->condExitDone);
- dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis));
- break;
- /* these cases just to satisfy the compiler, we do (yet) not act an them: */
- case eWRKTHRD_RUNNING:
- case eWRKTHRD_STOPPED:
- case eWRKTHRD_RUN_CREATED:
- case eWRKTHRD_RUN_INIT:
- case eWRKTHRD_SHUTDOWN:
- case eWRKTHRD_SHUTDOWN_IMMEDIATE:
- /* DO NOTHING */
- break;
+ if(tCmd == eWRKTHRD_STOPPED) {
+ dbgprintf("%s: worker almost stopped, assuming it has\n", wtiGetDbgHdr(pThis));
+ pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
}
/* apply the new state */
dbgprintf("worker terminator will write stateval %d\n", tCmd);
@@ -152,7 +136,6 @@ dbgprintf("worker terminator will write stateval %d\n", tCmd);
if(val != tCurrCmd) {
DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
}
-//dbgprintf("worker terminator has written stateval %d\n", tCmd);
}
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
@@ -174,12 +157,11 @@ wtiCancelThrd(wti_t *pThis)
d_pthread_mutex_lock(&pThis->mut);
- wtiProcessThrdChanges(pThis, MUTEX_ALREADY_LOCKED); /* process state change, so that we have current state vars */
-
- if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) {
+ if(pThis->tCurrCmd != eWRKTHRD_STOPPED) {
dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd);
pthread_cancel(pThis->thrdID);
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
+ /* TODO: check: the following check should automatically be done by cancel cleanup handler! 2009-07-08 rgerhards */
+ wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED);
ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
}
@@ -192,24 +174,12 @@ wtiCancelThrd(wti_t *pThis)
/* Destructor */
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
- /* if we reach this point, we must make sure the associated worker has terminated. It is
- * the callers duty to make sure the worker already knows it shall terminate.
- * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25
- */
- wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */
-
- d_pthread_mutex_lock(&pThis->mut);
- if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
- dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n",
+ if(Debug && wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
+ dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - ignoring\n",
wtiGetDbgHdr(pThis), pThis);
- /* let's hope the caller actually instructed it to shutdown... */
- pthread_cond_wait(&pThis->condExitDone, &pThis->mut);
- wtiJoinThrd(pThis);
}
- d_pthread_mutex_unlock(&pThis->mut);
/* actual destruction */
- pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
free(pThis->batch.pElem);
@@ -220,7 +190,6 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
- pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -250,66 +219,6 @@ finalize_it:
}
-/* join a specific worker thread
- * we do not lock the mutex, because join will sync anyways...
- */
-rsRetVal
-wtiJoinThrd(wti_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, wti);
- dbgprintf("waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
- if (pThis->thrdID == 0) {
- dbgprintf("worker %s was already stopped\n", wtiGetDbgHdr(pThis));
- } else {
- //pthread_join(pThis->thrdID, NULL);
- wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */
- pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
- dbgprintf("worker %s has stopped\n", wtiGetDbgHdr(pThis));
- }
-
- RETiRet;
-}
-
-/* check if we had a worker thread changes and, if so, act
- * on it. At a minimum, terminated threads are harvested (joined).
- */
-rsRetVal
-wtiProcessThrdChanges(wti_t *pThis, int bLockMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
-
- ISOBJ_TYPE_assert(pThis, wti);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- switch(pThis->tCurrCmd) {
- case eWRKTHRD_TERMINATING:
- /* we need to at least temporarily release the mutex, because otherwise
- * we may deadlock with the thread we intend to join (it aquires the mutex
- * during termination processing). -- rgerhards, 2008-02-26
- */
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- iRet = wtiJoinThrd(pThis);
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- break;
- /* these cases just to satisfy the compiler, we do not act an them: */
- case eWRKTHRD_STOPPED:
- case eWRKTHRD_RUN_CREATED:
- case eWRKTHRD_RUN_INIT:
- case eWRKTHRD_RUNNING:
- case eWRKTHRD_SHUTDOWN:
- case eWRKTHRD_SHUTDOWN_IMMEDIATE:
- /* DO NOTHING */
- break;
- }
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-
- RETiRet;
-}
-
-
/* cancellation cleanup handler for queueWorker ()
* Updates admin structure and frees ressources.
* rgerhards, 2008-01-16
@@ -333,7 +242,7 @@ wtiWorkerCancelCleanup(void *arg)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
+ wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED);
/* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
@@ -399,7 +308,6 @@ wtiWorker(wti_t *pThis)
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
/* process any pending thread requests */
- // wtpProcessThrdChanges(pWtp);
if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
@@ -455,7 +363,7 @@ wtiWorker(wti_t *pThis)
RUNLOG_STR("XXX: Worker shutdown");
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
+ wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED);
ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/runtime/wti.h b/runtime/wti.h
index 2acd3cf6..5b7e5baf 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -36,7 +36,7 @@ struct wti_s {
pthread_t thrdID; /* thread ID */
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
- pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */
+ //pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */
pthread_mutex_t mut;
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
bool bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 59553984..b27ce9f0 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -142,8 +142,6 @@ finalize_it:
BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODESTART macros! */
int i;
CODESTARTobjDestruct(wtp)
- wtpProcessThrdChanges(pThis); /* process thread changes one last time */
-
/* destruct workers */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
wtiDestruct(&pThis->pWrkr[i]);
@@ -190,51 +188,6 @@ wtpWakeupAllWrkr(wtp_t *pThis)
}
-/* check if we had any worker thread changes and, if so, act
- * on them. At a minimum, terminated threads are harvested (joined).
- * This function MUST NEVER block on the queue mutex!
- */
-rsRetVal
-wtpProcessThrdChanges(wtp_t *pThis)
-{
- DEFiRet;
- int i;
-
- ISOBJ_TYPE_assert(pThis, wtp);
-
- if(pThis->bThrdStateChanged == 0)
- FINALIZE;
-
- if(d_pthread_mutex_trylock(&(pThis->mutThrdShutdwn)) != 0) {
- /* another thread is already in the loop */
- FINALIZE;
- }
-
- /* Note: there is a left-over potential race condition below:
- * pThis->bThrdStateChanged may be re-set by another thread while
- * we work on it and thus the loop may terminate too early. However,
- * there are no really bad effects from that so I perfer - for this
- * version - to live with the problem as is. Not a good idea to
- * introduce that large change into the stable branch without very
- * good reason. -- rgerhards, 2009-04-02
- */
- do {
- /* reset the change marker */
- ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged);
- /* go through all threads */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX);
- }
- /* restart if another change occured while we were processing the changes */
- } while(pThis->bThrdStateChanged != 0);
-
- d_pthread_mutex_unlock(&(pThis->mutThrdShutdwn));
-
-finalize_it:
- RETiRet;
-}
-
-
/* Sent a specific state for the worker thread pool.
* rgerhards, 2008-01-21
*/
@@ -299,12 +252,7 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
wtpSetState(pThis, tShutdownCmd);
wtpWakeupAllWrkr(pThis);
- /* see if we need to harvest (join) any terminated threads (even in timeout case,
- * some may have terminated...
- */
- wtpProcessThrdChanges(pThis);
-
- /* and wait for their termination */
+ /* wait for worker thread termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
@@ -324,11 +272,6 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
if(bTimedOut)
iRet = RS_RET_TIMED_OUT;
- /* see if we need to harvest (join) any terminated threads (even in timeout case,
- * some may have terminated...
- */
- wtpProcessThrdChanges(pThis);
-
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
@@ -369,9 +312,6 @@ wtpCancelAll(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- /* process any pending thread requests so that we know who actually is still running */
- wtpProcessThrdChanges(pThis);
-
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
@@ -502,15 +442,11 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
ISOBJ_TYPE_assert(pThis, wtp);
- wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls
-
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
pThis->iCurNumWrkThrd++;
- /* find free spot in thread table. If we find at least one worker that is in initialization,
- * we do NOT start a new one. Let's give the other one a chance, first.
- */
+ /* find free spot in thread table. */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
break;
diff --git a/runtime/wtp.h b/runtime/wtp.h
index ef61d7e6..03bc538f 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -30,7 +30,7 @@
/* commands and states for worker threads. */
typedef enum {
eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */
- eWRKTHRD_TERMINATING = 1,/* worker thread has shut down, but some finalzing is still needed */
+ //eWRKTHRD_TERMINATING = 1,/* worker thread has shut down, but some finalzing is still needed */
/* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */
eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but not yet begun initialization (prob. not yet scheduled) */
eWRKTHRD_RUN_INIT = 3, /* worker thread is initializing, but not yet fully running */
diff --git a/tests/daqueue-persist.sh b/tests/daqueue-persist.sh
index e071e413..ff81c987 100755
--- a/tests/daqueue-persist.sh
+++ b/tests/daqueue-persist.sh
@@ -2,6 +2,7 @@
# to carry out multiple tests with different queue modes
# added 2009-05-27 by Rgerhards
# This file is part of the rsyslog project, released under GPLv3
+echo TEST: daqueue-persist.sh
source $srcdir/daqueue-persist-drvr.sh LinkedList
source $srcdir/daqueue-persist-drvr.sh FixedArray
# the disk test should not fail, however, the config is extreme and using