summaryrefslogtreecommitdiffstats
path: root/runtime/wtp.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-20 11:52:05 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-20 11:52:05 +0200
commit4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d (patch)
treea08d9a3b839f40bb5d26d06f5b2b54787277faf9 /runtime/wtp.c
parentef70e6174d4b373a601b73757ca19bb0f7dd6502 (diff)
downloadrsyslog-4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d.tar.gz
rsyslog-4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d.tar.xz
rsyslog-4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d.zip
enhanced worker thread pool by atomic ops
... greater performance and was able to remove a potential troublespot in a cancel cleanup handler.
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r--runtime/wtp.c41
1 files changed, 15 insertions, 26 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c
index e8bc5120..23726802 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -116,7 +116,7 @@ wtpConstructFinalize(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
+ DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
/* alloc and construct workers - this can only be done in finalizer as we previously do
* not know the max number of workers
*/
@@ -246,11 +246,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
- dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
- wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
+ DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
+ wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) {
- dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
+ DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
}
@@ -275,12 +275,10 @@ wtpCancelAll(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- d_pthread_mutex_lock(&pThis->mutWtp);
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
wtiCancelThrd(pThis->pWrkr[i]);
}
- d_pthread_mutex_unlock(&pThis->mutWtp);
RETiRet;
}
@@ -301,16 +299,12 @@ wtpWrkrExecCancelCleanup(void *arg)
pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
- // TODO: the mutex_lock is dangerous, if we are cancelled within some function
- // that already has the mutex locked...
- d_pthread_mutex_lock(&pThis->mutWtp);
wtiSetState(pWti, WRKTHRD_STOPPED);
- pThis->iCurNumWrkThrd--;
- pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- d_pthread_mutex_unlock(&pThis->mutWtp);
+ ATOMIC_DEC(pThis->iCurNumWrkThrd);
+ pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n",
- wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
+ wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
ENDfunc
}
@@ -391,10 +385,10 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti);
pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */
- pThis->iCurNumWrkThrd++; /* we got one more! */
+ ATOMIC_INC(pThis->iCurNumWrkThrd); /* we got one more! */
- dbgprintf("%s: started with state %d, num workers now %d\n",
- wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
+ DBGPRINTF("%s: started with state %d, num workers now %d\n",
+ wtpGetDbgHdr(pThis), iState, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp);
@@ -422,28 +416,23 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
if(nMaxWrkr == 0)
FINALIZE;
- d_pthread_mutex_lock(&pThis->mutWtp);
-
if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
nMaxWrkr = pThis->iNumWorkerThreads;
- nMissing = nMaxWrkr - pThis->iCurNumWrkThrd;
+ nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd);
if(nMissing > 0) {
- dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
+ DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
/* start the rqtd nbr of workers */
for(i = 0 ; i < nMissing ; ++i) {
- CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
- }
- } else {
- if(nMaxWrkr > 0) {
- pthread_cond_signal(pThis->pcondBusy);
+ CHKiRet(wtpStartWrkr(pThis, LOCK_MUTEX));
}
+ } else {
+ pthread_cond_signal(pThis->pcondBusy);
}
finalize_it:
- d_pthread_mutex_unlock(&pThis->mutWtp);
RETiRet;
}