summaryrefslogtreecommitdiffstats
path: root/runtime/wtp.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r--runtime/wtp.c444
1 files changed, 152 insertions, 292 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c
index b4fd2e04..e615fb19 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -8,7 +8,7 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -44,9 +44,10 @@
# include <sys/prctl.h>
#endif
-#ifdef OS_SOLARIS
-# include <sched.h>
-#endif
+/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20
+//#ifdef OS_SOLARIS
+//# include <sched.h>
+//#endif
#include "rsyslog.h"
#include "stringbuf.h"
@@ -82,22 +83,27 @@ wtpGetDbgHdr(wtp_t *pThis)
/* Not implemented dummy function for constructor */
-static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
+static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; }
/* Standard-Constructor for the wtp object
*/
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
- pthread_mutex_init(&pThis->mut, NULL);
- pthread_mutex_init(&pThis->mutThrdShutdwn, NULL);
+ pthread_mutex_init(&pThis->mutWtp, NULL);
pthread_cond_init(&pThis->condThrdTrm, NULL);
+ pthread_attr_init(&pThis->attrThrd);
+ /* Set thread scheduling policy to default */
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy);
+ pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param);
+ pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED);
+#endif
+ pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
- pThis->pfIsIdle = NotImplementedDummy;
+ pThis->pfGetDeqBatchSize = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
- pThis->pfOnIdle = NotImplementedDummy;
- pThis->pfOnWorkerCancel = NotImplementedDummy;
- pThis->pfOnWorkerStartup = NotImplementedDummy;
- pThis->pfOnWorkerShutdown = NotImplementedDummy;
- INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
+ pThis->pfObjProcessed = NotImplementedDummy;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutWtpState);
ENDobjConstruct(wtp)
@@ -115,13 +121,12 @@ wtpConstructFinalize(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
+ DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
/* alloc and construct workers - this can only be done in finalizer as we previously do
* not know the max number of workers
*/
- if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
+ CHKmalloc(pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads));
+
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
pWti = pThis->pWrkr[i];
@@ -141,8 +146,6 @@ finalize_it:
BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODESTART macros! */
int i;
CODESTARTobjDestruct(wtp)
- wtpProcessThrdChanges(pThis); /* process thread changes one last time */
-
/* destruct workers */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
wtiDestruct(&pThis->pWrkr[i]);
@@ -152,149 +155,70 @@ CODESTARTobjDestruct(wtp)
/* actual destruction */
pthread_cond_destroy(&pThis->condThrdTrm);
- pthread_mutex_destroy(&pThis->mut);
- pthread_mutex_destroy(&pThis->mutThrdShutdwn);
- DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
+ pthread_mutex_destroy(&pThis->mutWtp);
+ pthread_attr_destroy(&pThis->attrThrd);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutWtpState);
free(pThis->pszDbgHdr);
ENDobjDestruct(wtp)
-/* wake up at least one worker thread.
- * rgerhards, 2008-01-20
- */
-rsRetVal
-wtpWakeupWrkr(wtp_t *pThis)
-{
- DEFiRet;
-
- /* TODO; mutex? I think not needed, as we do not need predictable exec order -- rgerhards, 2008-01-28 */
- ISOBJ_TYPE_assert(pThis, wtp);
- pthread_cond_signal(pThis->pcondBusy);
- RETiRet;
-}
-
-/* wake up all worker threads.
- * rgerhards, 2008-01-16
- */
-rsRetVal
-wtpWakeupAllWrkr(wtp_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, wtp);
- pthread_cond_broadcast(pThis->pcondBusy);
- RETiRet;
-}
-
-
-/* set the bThrdStateChanged in an atomic way. Note that
- * val may only be 0 or 1.
- */
-void
-wtpSetThrdStateChanged(wtp_t *pThis, int val)
-{
- if(val == 0) {
- ATOMIC_STORE_0_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged);
- } else {
- ATOMIC_STORE_1_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged);
- }
-}
-
-
-/* check if we had any worker thread changes and, if so, act
- * on them. At a minimum, terminated threads are harvested (joined).
- * This function MUST NEVER block on the queue mutex!
- */
-rsRetVal
-wtpProcessThrdChanges(wtp_t *pThis)
-{
- DEFiRet;
- int i;
-
- ISOBJ_TYPE_assert(pThis, wtp);
-
- if(pThis->bThrdStateChanged == 0)
- FINALIZE;
-
- if(d_pthread_mutex_trylock(&(pThis->mutThrdShutdwn)) != 0) {
- /* another thread is already in the loop */
- FINALIZE;
- }
-
- /* Note: there is a left-over potential race condition below:
- * pThis->bThrdStateChanged may be re-set by another thread while
- * we work on it and thus the loop may terminate too early. However,
- * there are no really bad effects from that so I perfer - for this
- * version - to live with the problem as is. Not a good idea to
- * introduce that large change into the stable branch without very
- * good reason. -- rgerhards, 2009-04-02
- */
- do {
- /* reset the change marker */
- wtpSetThrdStateChanged(pThis, 0);
- /* go through all threads */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX);
- }
- /* restart if another change occured while we were processing the changes */
- } while(pThis->bThrdStateChanged != 0);
-
- d_pthread_mutex_unlock(&(pThis->mutThrdShutdwn));
-
-finalize_it:
- RETiRet;
-}
-
-
-/* Sent a specific state for the worker thread pool.
- * rgerhards, 2008-01-21
+/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21
+ * We do not need to do atomic instructions as set operations are only
+ * called when terminating the pool, and then in strict sequence. So we
+ * can never overwrite each other. On the other hand, it also doesn't
+ * matter if the read operation obtains an older value, as we then simply
+ * do one more iteration, what is perfectly legal (during shutdown
+ * they are awoken in any case). -- rgerhards, 2009-07-20
*/
rsRetVal
wtpSetState(wtp_t *pThis, wtpState_t iNewState)
{
- DEFiRet;
-
ISOBJ_TYPE_assert(pThis, wtp);
- pThis->wtpState = iNewState;
- /* TODO: must wakeup workers? seen to be not needed -- rgerhards, 2008-01-28 */
-
- RETiRet;
+ pThis->wtpState = iNewState; // TODO: do we need a mutex here? 2010-04-26
+ return RS_RET_OK;
}
/* check if the worker shall shutdown (1 = yes, 0 = no)
- * TODO: check if we can use atomic operations to enhance performance
* Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
* (e.g. the queue clas)
* rgerhards, 2008-01-21
*/
rsRetVal
-wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
+wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex)
{
DEFiRet;
- DEFVARS_mutexProtection;
+ wtpState_t wtpState;
ISOBJ_TYPE_assert(pThis, wtp);
+ /* we need a consistent value, but it doesn't really matter if it is changed
+ * right after the fetch - then we simply do one more iteration in the worker
+ */
+ wtpState = (wtpState_t) ATOMIC_FETCH_32BIT((int*)&pThis->wtpState, &pThis->mutWtpState);
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
- || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
- iRet = RS_RET_TERMINATE_NOW;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
+ } else if(wtpState == wtpState_SHUTDOWN) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
+ }
/* try customer handler if one was set and we do not yet have a definite result */
- if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
+ if(pThis->pfChkStopWrkr != NULL) {
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
}
+finalize_it:
RETiRet;
}
#pragma GCC diagnostic ignored "-Wempty-body"
/* Send a shutdown command to all workers and see if they terminate.
- * A timeout may be specified.
+ * A timeout may be specified. This function may also be called with
+ * the current number of workers being 0, in which case it does not
+ * shut down any worker.
* rgerhards, 2008-01-14
*/
rsRetVal
@@ -302,72 +226,50 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
{
DEFiRet;
int bTimedOut;
- int iCancelStateSave;
+ int i;
ISOBJ_TYPE_assert(pThis, wtp);
+ /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */
+ d_pthread_mutex_lock(pThis->pmutUsr);
wtpSetState(pThis, tShutdownCmd);
- wtpWakeupAllWrkr(pThis);
+ pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
+ /* awake workers in retry loop */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ wtiWakeupThrd(pThis->pWrkr[i]);
+ }
+ d_pthread_mutex_unlock(pThis->pmutUsr);
- /* see if we need to harvest (join) any terminated threads (even in timeout case,
- * some may have terminated...
- */
- wtpProcessThrdChanges(pThis);
-
- /* and wait for their termination */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mut);
- pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
+ /* wait for worker thread termination */
+ d_pthread_mutex_lock(&pThis->mutWtp);
+ pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
- dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
- wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
+ DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
+ wtpGetDbgHdr(pThis), timeoutVal(ptTimeout),
+ ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
- if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, ptTimeout) != 0) {
- dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
+ if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) {
+ DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
+
+ /* awake workers in retry loop */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ wtiWakeupThrd(pThis->pWrkr[i]);
+ }
+
}
pthread_cleanup_pop(1);
if(bTimedOut)
iRet = RS_RET_TIMED_OUT;
- /* see if we need to harvest (join) any terminated threads (even in timeout case,
- * some may have terminated...
- */
- wtpProcessThrdChanges(pThis);
-
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
-/* indicate that a thread has terminated and awake anyone waiting on it
- * rgerhards, 2008-01-23
- */
-rsRetVal wtpSignalWrkrTermination(wtp_t *pThis)
-{
- DEFiRet;
- /* I leave the mutex code here out as it gives us deadlocks. I think it is not really
- * needed and we are on the safe side. I leave this comment in if practice proves us
- * wrong. The whole thing should be removed after half a year or year if we see there
- * actually is no issue (or revisit it from a theoretical POV).
- * rgerhards, 2008-01-28
- * revisited 2008-09-30, still a bit unclear, leave in
- */
- /*TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;*/
-
- ISOBJ_TYPE_assert(pThis, wtp);
-
- /*BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);*/
- pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- /*END_MTX_PROTECTED_OPERATIONS(&pThis->mut);*/
- RETiRet;
-}
-
-
/* Unconditionally cancel all running worker threads.
* rgerhards, 2008-01-14
*/
@@ -379,12 +281,8 @@ wtpCancelAll(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- /* process any pending thread requests so that we know who actually is still running */
- wtpProcessThrdChanges(pThis);
-
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
wtiCancelThrd(pThis->pWrkr[i]);
}
@@ -392,40 +290,57 @@ wtpCancelAll(wtp_t *pThis)
}
-
-/* Set the Inactivity Guard
- * rgerhards, 2008-01-21
+/* this function contains shared code for both regular worker shutdown as
+ * well as shutdown via cancellation. We can not simply use pthread_cleanup_pop(1)
+ * as this introduces a race in the debug system (RETiRet system).
+ * rgerhards, 2009-10-26
*/
-rsRetVal
-wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
+static inline void
+wtpWrkrExecCleanup(wti_t *pWti)
{
- DEFiRet;
- DEFVARS_mutexProtection;
+ wtp_t *pThis;
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- pThis->bInactivityGuard = bNewState;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ BEGINfunc
+ ISOBJ_TYPE_assert(pWti, wti);
+ pThis = pWti->pWtp;
+ ISOBJ_TYPE_assert(pThis, wtp);
- RETiRet;
+ /* the order of the next two statements is important! */
+ wtiSetState(pWti, WRKTHRD_STOPPED);
+ ATOMIC_DEC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
+
+ DBGPRINTF("%s: Worker thread %lx, terminated, um workers now %d\n",
+ wtpGetDbgHdr(pThis), (unsigned long) pWti,
+ ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
+
+ ENDfunc
}
-/* cancellation cleanup handler for executing worker
- * decrements the worker counter
- * rgerhards, 2008-01-20
+/* cancellation cleanup handler for executing worker decrements the worker counter.
+ * rgerhards, 2009-07-20
*/
-void
+static void
wtpWrkrExecCancelCleanup(void *arg)
{
- wtp_t *pThis = (wtp_t*) arg;
+ wti_t *pWti = (wti_t*) arg;
+ wtp_t *pThis;
BEGINfunc
+ ISOBJ_TYPE_assert(pWti, wti);
+ pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
- pThis->iCurNumWrkThrd--;
- wtpSignalWrkrTermination(pThis);
+ DBGPRINTF("%s: Worker thread %lx requested to be cancelled.\n",
+ wtpGetDbgHdr(pThis), (unsigned long) pWti);
+
+ wtpWrkrExecCleanup(pWti);
- dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
ENDfunc
+ /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
+ * thread after the broadcast, which could destroy the debug class, resulting in a potential
+ * segfault. So we need to do the broadcast as actually the last action in our processing
+ */
+ pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
}
@@ -437,8 +352,6 @@ wtpWrkrExecCancelCleanup(void *arg)
static void *
wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
{
- DEFiRet;
- DEFVARS_mutexProtection;
wti_t *pWti = (wti_t*) arg;
wtp_t *pThis;
sigset_t sigSet;
@@ -447,13 +360,20 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
uchar thrdName[32] = "rs:";
# endif
+ BEGINfunc
ISOBJ_TYPE_assert(pWti, wti);
pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
+ /* block all signals */
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+ /* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */
+ sigemptyset(&sigSet);
+ sigaddset(&sigSet, SIGTTIN);
+ pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL);
+
# if HAVE_PRCTL && defined PR_SET_NAME
/* set thread name - we ignore if the call fails, has no harsh consequences... */
pszDbgHdr = wtpGetDbgHdr(pThis);
@@ -463,41 +383,17 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
}
# endif
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
-
- /* do some late initialization */
-
- pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis);
-
- /* finally change to RUNNING state. We need to check if we actually should still run,
- * because someone may have requested us to shut down even before we got a chance to do
- * our init. That would be a bad race... -- rgerhards, 2008-01-16
- */
- wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */
-
- do {
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-
- wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
- } while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1);
- /* inactivity guard prevents shutdown of all workers while one should be running due to race
- * condition. It can lead to one more worker running than desired, but that is acceptable. After
- * all, that worker will shutdown itself due to inactivity timeout. If, however, none were running
- * when one was required, processing could come to a halt. -- rgerhards, 2008-01-21
- */
-
+ pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti);
+ wtiWorker(pWti);
pthread_cleanup_pop(0);
- pThis->iCurNumWrkThrd--;
- wtpSignalWrkrTermination(pThis);
-
- dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
- wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
-
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ wtpWrkrExecCleanup(pWti);
ENDfunc
+ /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main
+ * thread after the broadcast, which could destroy the debug class, resulting in a potential
+ * segfault. So we need to do the broadcast as actually the last action in our processing
+ */
+ pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
pthread_exit(0);
}
#pragma GCC diagnostic warning "-Wempty-body"
@@ -505,27 +401,20 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
/* start a new worker */
static rsRetVal
-wtpStartWrkr(wtp_t *pThis, int bLockMutex)
+wtpStartWrkr(wtp_t *pThis)
{
- DEFiRet;
- DEFVARS_mutexProtection;
wti_t *pWti;
int i;
int iState;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, wtp);
- wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls
+ d_pthread_mutex_lock(&pThis->mutWtp);
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
-
- pThis->iCurNumWrkThrd++;
-
- /* find free spot in thread table. If we find at least one worker that is in initialization,
- * we do NOT start a new one. Let's give the other one a chance, first.
- */
+ /* find free spot in thread table. */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
+ if(wtiGetState(pThis->pWrkr[i]) == WRKTHRD_STOPPED) {
break;
}
}
@@ -533,17 +422,21 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
if(i == pThis->iNumWorkerThreads)
ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
+ if(i == 0 || pThis->toWrkShutdown == -1) {
+ wtiSetAlwaysRunning(pThis->pWrkr[i]);
+ }
+
pWti = pThis->pWrkr[i];
- wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX);
- iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti);
- dbgprintf("%s: started with state %d, num workers now %d\n",
- wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
+ wtiSetState(pWti, WRKTHRD_RUNNING);
+ iState = pthread_create(&(pWti->thrdID), &pThis->attrThrd, wtpWorker, (void*) pWti);
+ ATOMIC_INC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); /* we got one more! */
- /* indicate we just started a worker and would like to see it running */
- wtpSetInactivityGuard(pThis, 1, MUTEX_ALREADY_LOCKED);
+ DBGPRINTF("%s: started with state %d, num workers now %d\n",
+ wtpGetDbgHdr(pThis), iState,
+ ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
finalize_it:
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ d_pthread_mutex_unlock(&pThis->mutWtp);
RETiRet;
}
@@ -560,7 +453,6 @@ rsRetVal
wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
{
DEFiRet;
- DEFVARS_mutexProtection;
int nMissing; /* number workers missing to run */
int i;
@@ -569,29 +461,24 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
if(nMaxWrkr == 0)
FINALIZE;
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
-
if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
nMaxWrkr = pThis->iNumWorkerThreads;
- nMissing = nMaxWrkr - pThis->iCurNumWrkThrd;
+ nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
if(nMissing > 0) {
- dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
+ DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n",
+ wtpGetDbgHdr(pThis), nMissing);
/* start the rqtd nbr of workers */
for(i = 0 ; i < nMissing ; ++i) {
- CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
- }
- } else {
- if(nMaxWrkr > 0) {
- dbgprintf("wtpAdviseMaxWorkers signals busy\n");
- wtpWakeupWrkr(pThis);
+ CHKiRet(wtpStartWrkr(pThis));
}
+ } else {
+ pthread_cond_signal(pThis->pcondBusy);
}
finalize_it:
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
@@ -605,35 +492,9 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t)
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
-DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
-DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
-DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
-DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
-DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*))
-DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*))
-
-
-/* return the current number of worker threads.
- * TODO: atomic operation would bring a nice performance
- * enhancemcent
- * rgerhards, 2008-01-27
- */
-int
-wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex)
-{
- DEFVARS_mutexProtection;
- int iNumWrkr;
-
- BEGINfunc
- ISOBJ_TYPE_assert(pThis, wtp);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- iNumWrkr = pThis->iCurNumWrkThrd;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-
- ENDfunc
- return iNumWrkr;
-}
+DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
+DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*))
+DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
/* set the debug header message
@@ -657,7 +518,7 @@ wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg)
pThis->pszDbgHdr = NULL;
}
- if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL)
+ if((pThis->pszDbgHdr = MALLOC(sizeof(uchar) * lenMsg + 1)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */
@@ -687,6 +548,5 @@ BEGINObjClassInit(wtp, 1, OBJ_IS_CORE_MODULE)
CHKiRet(objUse(glbl, CORE_COMPONENT));
ENDObjClassInit(wtp)
-/*
- * vi:set ai:
+/* vi:set ai:
*/