summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c63
1 files changed, 22 insertions, 41 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index 544bffa7..abdf4add 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -41,7 +41,6 @@
#ifdef OS_SOLARIS
# include <sched.h>
-# define pthread_yield() sched_yield()
#endif
#include "rsyslog.h"
@@ -51,6 +50,7 @@
#include "wti.h"
#include "obj.h"
#include "glbl.h"
+#include "atomic.h"
/* static data */
DEFobjStaticHelpers
@@ -106,6 +106,7 @@ rsRetVal
wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
{
DEFiRet;
+ qWrkCmd_t tCurrCmd;
DEFVARS_mutexProtection;
ISOBJ_TYPE_assert(pThis, wti);
@@ -113,13 +114,14 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+ tCurrCmd = pThis->tCurrCmd;
/* all worker states must be followed sequentially, only termination can be set in any state */
- if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED))
- || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
- dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n",
- wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd);
+ if( (bActiveOnly && (tCurrCmd < eWRKTHRD_RUN_CREATED))
+ || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
+ DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n",
+ wtiGetDbgHdr(pThis), tCmd, tCurrCmd);
} else {
- dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
+ DBGPRINTF("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
/* we could replace this with a simple if, but we leave the switch in in case we need
* to add something at a later stage. -- rgerhards, 2008-09-30
*/
@@ -143,7 +145,12 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
/* DO NOTHING */
break;
}
- pThis->tCurrCmd = tCmd; /* apply the new state */
+ /* apply the new state */
+ unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd);
+ if(val != tCurrCmd) {
+ DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
+ }
+
}
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
@@ -151,7 +158,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
}
-/* Cancel the thread. If the thread is already cancelled or termination,
+/* Cancel the thread. If the thread is already cancelled or terminated,
* we do not again cancel it. But it is save and legal to call wtiCancelThrd() in
* such situations.
* rgerhards, 2008-02-26
@@ -165,11 +172,13 @@ wtiCancelThrd(wti_t *pThis)
d_pthread_mutex_lock(&pThis->mut);
+ wtiProcessThrdChanges(pThis, MUTEX_ALREADY_LOCKED); /* process state change, so that we have current state vars */
+
if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) {
- dbgoprint((obj_t*) pThis, "canceling worker thread\n");
+ dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd);
pthread_cancel(pThis->thrdID);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+ ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
}
d_pthread_mutex_unlock(&pThis->mut);
@@ -201,15 +210,13 @@ CODESTARTobjDestruct(wti)
pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
- if(pThis->pszDbgHdr != NULL)
- free(pThis->pszDbgHdr);
+ free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
- pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -320,7 +327,7 @@ wtiWorkerCancelCleanup(void *arg)
d_pthread_mutex_lock(&pWtp->mut);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
/* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
- pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+ ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pWtp->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -329,27 +336,6 @@ wtiWorkerCancelCleanup(void *arg)
/* generic worker thread framework
- *
- * Some special comments below, so that they do not clutter the main function code:
- *
- * On the use of pthread_testcancel():
- * Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
- * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
- * we may never get cancelled if we do not create a cancellation point ourselfs.
- *
- * On the use of pthread_yield():
- * We yield to give the other threads a chance to obtain the mutex. If we do not
- * do that, this thread may very well aquire the mutex again before another thread
- * has even a chance to run. The reason is that mutex operations are free to be
- * implemented in the quickest possible way (and they typically are!). That is, the
- * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily
- * schedule other threads waiting on the same mutex. That can lead to the same thread
- * aquiring the mutex ever and ever again while all others are starving for it. We
- * have exactly seen this behaviour when we deliberately introduced a long-running
- * test action which basically did a sleep. I understand that with real actions the
- * likelihood of this starvation condition is very low - but it could still happen
- * and would be very hard to debug. The yield() is a sure fix, its performance overhead
- * should be well accepted given the above facts. -- rgerhards, 2008-01-10
*/
#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
@@ -377,11 +363,6 @@ wtiWorker(wti_t *pThis)
while(1) { /* loop will be broken below - need to do mutex locks */
/* process any pending thread requests */
wtpProcessThrdChanges(pWtp);
- pthread_testcancel(); /* see big comment in function header */
-# if !defined(__hpux) /* pthread_yield is missing there! */
- if(pThis->bOptimizeUniProc)
- pthread_yield(); /* see big comment in function header */
-# endif
/* if we have a rate-limiter set for this worker pool, let's call it. Please
* keep in mind that the rate-limiter may hold us for an extended period
@@ -433,7 +414,7 @@ wtiWorker(wti_t *pThis)
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+ ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);