summaryrefslogtreecommitdiffstats
path: root/runtime/wtp.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r--runtime/wtp.c64
1 files changed, 22 insertions, 42 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c
index beeaf01c..93e87987 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -8,7 +8,7 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -99,7 +99,6 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pThis->pfOnWorkerCancel = NotImplementedDummy;
pThis->pfOnWorkerStartup = NotImplementedDummy;
pThis->pfOnWorkerShutdown = NotImplementedDummy;
-dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState);
ENDobjConstruct(wtp)
@@ -285,11 +284,12 @@ wtpCancelAll(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
+ d_pthread_mutex_lock(&pThis->mut);
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
wtiCancelThrd(pThis->pWrkr[i]);
}
+ d_pthread_mutex_unlock(&pThis->mut);
RETiRet;
}
@@ -320,13 +320,24 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
static void
wtpWrkrExecCancelCleanup(void *arg)
{
- wtp_t *pThis = (wtp_t*) arg;
+ wti_t *pWti = (wti_t*) arg;
+ wtp_t *pThis;
BEGINfunc
+ ISOBJ_TYPE_assert(pWti, wti);
+ pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
+
+ // TODO: the mutex_lock is dangerous, if we are cancelled within some function
+ // that already has the mutex locked...
+ d_pthread_mutex_lock(&pThis->mut);
pThis->iCurNumWrkThrd--;
+ wtiSetState(pWti, WRKTHRD_STOPPED);
pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
+ d_pthread_mutex_unlock(&pThis->mut);
+
+ DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n",
+ wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
ENDfunc
}
@@ -341,11 +352,11 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
{
uchar *pszDbgHdr;
uchar thrdName[32] = "rs:";
- DEFiRet;
wti_t *pWti = (wti_t*) arg;
wtp_t *pThis;
sigset_t sigSet;
+ BEGINfunc
ISOBJ_TYPE_assert(pWti, wti);
pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -362,36 +373,9 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
}
# endif
- d_pthread_mutex_lock(&pThis->mut);
- pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis);
-
- /* change to RUNNING state. We need to check if we actually should still run,
- * because someone may have requested us to shut down even before we got a chance to do
- * our init. That would be a bad race... -- rgerhards, 2008-01-16
- */
- wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */
-
- do {
- d_pthread_mutex_unlock(&pThis->mut);
-
- iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */
-
- d_pthread_mutex_lock(&pThis->mut);
- } while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1);
- /* inactivity guard prevents shutdown of all workers while one should be running due to race
- * condition. It can lead to one more worker running than desired, but that is acceptable. After
- * all, that worker will shutdown itself due to inactivity timeout. If, however, none were running
- * when one was required, processing could come to a halt. -- rgerhards, 2008-01-21
- */
-
- pthread_cleanup_pop(0);
- pThis->iCurNumWrkThrd--;
- pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
-
- dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
- wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
-
- d_pthread_mutex_unlock(&pThis->mut);
+ pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti);
+ wtiWorker(pWti);
+ pthread_cleanup_pop(1);
ENDfunc
pthread_exit(0);
@@ -418,7 +402,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
/* find free spot in thread table. */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
+ if(wtiGetState(pThis->pWrkr[i]) == WRKTHRD_STOPPED) {
break;
}
}
@@ -427,7 +411,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
pWti = pThis->pWrkr[i];
- wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX);
+ wtiSetState(pWti, WRKTHRD_RUNNING);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti);
@@ -435,9 +419,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
dbgprintf("%s: started with state %d, num workers now %d\n",
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
- /* indicate we just started a worker and would like to see it running */
- wtpSetInactivityGuard(pThis, 1, MUTEX_ALREADY_LOCKED);
-
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
@@ -479,7 +460,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
}
} else {
if(nMaxWrkr > 0) {
- dbgprintf("wtpAdviseMaxWorkers signals busy\n");
wtpWakeupWrkr(pThis);
}
}