summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-20 11:52:05 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-20 11:52:05 +0200
commit4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d (patch)
treea08d9a3b839f40bb5d26d06f5b2b54787277faf9
parentef70e6174d4b373a601b73757ca19bb0f7dd6502 (diff)
downloadrsyslog-4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d.tar.gz
rsyslog-4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d.tar.xz
rsyslog-4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d.zip
enhanced worker thread pool by atomic ops
... greater performance and was able to remove a potential troublespot in a cancel cleanup handler.
-rw-r--r--runtime/wti.c12
-rw-r--r--runtime/wti.h2
-rw-r--r--runtime/wtp.c41
3 files changed, 23 insertions, 32 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index 1d8f075f..91c63ffe 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -82,7 +82,7 @@ wtiGetDbgHdr(wti_t *pThis)
bool
wtiGetState(wti_t *pThis)
{
- return pThis->bIsRunning;
+ return ATOMIC_FETCH_32BIT(pThis->bIsRunning);
}
@@ -105,14 +105,16 @@ rsRetVal
wtiSetState(wti_t *pThis, bool bNewVal)
{
ISOBJ_TYPE_assert(pThis, wti);
- pThis->bIsRunning = bNewVal;
+ if(bNewVal)
+ ATOMIC_STORE_1_TO_INT(pThis->bIsRunning);
+ else
+ ATOMIC_STORE_0_TO_INT(pThis->bIsRunning);
return RS_RET_OK;
}
/* Cancel the thread. If the thread is not running. But it is save and legal to
* call wtiCancelThrd() in such situations.
- * IMPORTANT: WTP mutex must be locked while this function is called!
* rgerhards, 2008-02-26
*/
rsRetVal
@@ -122,7 +124,7 @@ wtiCancelThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
- if(pThis->bIsRunning) {
+ if(wtiGetState(pThis)) {
dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
}
@@ -159,7 +161,7 @@ wtiConstructFinalize(wti_t *pThis)
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
- /* initialize our thread instance descriptor */
+ /* initialize our thread instance descriptor (no concurrency here) */
pThis->bIsRunning = FALSE;
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
diff --git a/runtime/wti.h b/runtime/wti.h
index cd408bde..f466a053 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -34,8 +34,8 @@
struct wti_s {
BEGINobjInstance;
pthread_t thrdID; /* thread ID */
+ int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */
bool bAlwaysRunning; /* should this thread always run? */
- bool bIsRunning; /* is this thread currently running? */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index e8bc5120..23726802 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -116,7 +116,7 @@ wtpConstructFinalize(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
+ DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
/* alloc and construct workers - this can only be done in finalizer as we previously do
* not know the max number of workers
*/
@@ -246,11 +246,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
- dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
- wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
+ DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
+ wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) {
- dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
+ DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
}
@@ -275,12 +275,10 @@ wtpCancelAll(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- d_pthread_mutex_lock(&pThis->mutWtp);
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
wtiCancelThrd(pThis->pWrkr[i]);
}
- d_pthread_mutex_unlock(&pThis->mutWtp);
RETiRet;
}
@@ -301,16 +299,12 @@ wtpWrkrExecCancelCleanup(void *arg)
pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
- // TODO: the mutex_lock is dangerous, if we are cancelled within some function
- // that already has the mutex locked...
- d_pthread_mutex_lock(&pThis->mutWtp);
wtiSetState(pWti, WRKTHRD_STOPPED);
- pThis->iCurNumWrkThrd--;
- pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- d_pthread_mutex_unlock(&pThis->mutWtp);
+ ATOMIC_DEC(pThis->iCurNumWrkThrd);
+ pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n",
- wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
+ wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
ENDfunc
}
@@ -391,10 +385,10 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti);
pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */
- pThis->iCurNumWrkThrd++; /* we got one more! */
+ ATOMIC_INC(pThis->iCurNumWrkThrd); /* we got one more! */
- dbgprintf("%s: started with state %d, num workers now %d\n",
- wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
+ DBGPRINTF("%s: started with state %d, num workers now %d\n",
+ wtpGetDbgHdr(pThis), iState, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp);
@@ -422,28 +416,23 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
if(nMaxWrkr == 0)
FINALIZE;
- d_pthread_mutex_lock(&pThis->mutWtp);
-
if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
nMaxWrkr = pThis->iNumWorkerThreads;
- nMissing = nMaxWrkr - pThis->iCurNumWrkThrd;
+ nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd);
if(nMissing > 0) {
- dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
+ DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
/* start the rqtd nbr of workers */
for(i = 0 ; i < nMissing ; ++i) {
- CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
- }
- } else {
- if(nMaxWrkr > 0) {
- pthread_cond_signal(pThis->pcondBusy);
+ CHKiRet(wtpStartWrkr(pThis, LOCK_MUTEX));
}
+ } else {
+ pthread_cond_signal(pThis->pcondBusy);
}
finalize_it:
- d_pthread_mutex_unlock(&pThis->mutWtp);
RETiRet;
}