summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/queue.c3
-rw-r--r--runtime/queue.h11
-rw-r--r--runtime/wti.c125
-rw-r--r--runtime/wti.h14
-rw-r--r--runtime/wtp.c64
-rw-r--r--runtime/wtp.h13
6 files changed, 54 insertions, 176 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 37ec3663..a2bb4c1d 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -312,7 +312,6 @@ TurnOffDAMode(qqueue_t *pThis)
* during the lifetime of DA-mode, depending on how often the DA worker receives an
* inactivity timeout. -- rgerhards, 2008-01-25
*/
-dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pThis->pqDA));
if(getLogicalQueueSize(pThis->pqDA) == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
@@ -1270,7 +1269,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
* startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
*/
pThis->bEnqOnly = 1;
- wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
+ /* need to set this so that the DA queue begins shutdown in parallel! */
if(pThis->pqDA != NULL) {
pThis->pqDA->bEnqOnly = 1;
wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
diff --git a/runtime/queue.h b/runtime/queue.h
index e873c456..7b10e5dd 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -54,17 +54,6 @@ typedef struct qLinkedList_S {
} qLinkedList_t;
-typedef struct qWrkThrd_s {
- pthread_t thrdID; /* thread ID */
- qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
- obj_t *pUsr; /* current user object being processed (or NULL if none) */
- struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */
- int iThrd; /* my worker thread array index */
- pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */
- pthread_mutex_t mut;
-} qWrkThrd_t; /* type for queue worker threads */
-
-
/* the queue object */
typedef struct queue_s {
BEGINobjInstance;
diff --git a/runtime/wti.c b/runtime/wti.c
index c536e545..b6a09c65 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -76,74 +76,32 @@ wtiGetDbgHdr(wti_t *pThis)
}
-/* get the current worker state. For simplicity and speed, we have
- * NOT used our regular calling interface this time. I hope that won't
- * bite in the long term... -- rgerhards, 2008-01-17
- * TODO: may be performance optimized by atomic operations
+/* return the current worker processing state. For the sake of
+ * simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17
*/
-qWrkCmd_t
-wtiGetState(wti_t *pThis, int bLockMutex)
+bool
+wtiGetState(wti_t *pThis)
{
- DEFVARS_mutexProtection;
- qWrkCmd_t tCmd;
-
- BEGINfunc
- ISOBJ_TYPE_assert(pThis, wti);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- tCmd = pThis->tCurrCmd;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-
- ENDfunc
- return tCmd;
+ return pThis->bIsRunning;
}
-/* send a command to a specific thread
- * rgerhards, 2008-01-20
+/* Set status (thread is running or not), actually an property of
+ * use for wtp, but we need to have it per thread instance (thus it
+ * is inside wti). -- rgerhards, 2009-07-17
*/
rsRetVal
-wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bLockMutex)
+wtiSetState(wti_t *pThis, bool bNewVal)
{
- DEFiRet;
- qWrkCmd_t tCurrCmd;
- DEFVARS_mutexProtection;
-
ISOBJ_TYPE_assert(pThis, wti);
- assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
-
- tCurrCmd = pThis->tCurrCmd;
- /* all worker states must be followed sequentially, only termination can be set in any state */
- if(tCurrCmd > tCmd && !(tCmd == eWRKTHRD_STOPPED)) {
- DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n",
- wtiGetDbgHdr(pThis), tCmd, tCurrCmd);
- } else {
- DBGPRINTF("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
- /* we could replace this with a simple if, but we leave the switch in in case we need
- * to add something at a later stage. -- rgerhards, 2008-09-30
- */
- if(tCmd == eWRKTHRD_STOPPED) {
- dbgprintf("%s: worker almost stopped, assuming it has\n", wtiGetDbgHdr(pThis));
- pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
- }
- /* apply the new state */
-dbgprintf("worker terminator will write stateval %d\n", tCmd);
- unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd);
- if(val != tCurrCmd) {
- DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
- }
- }
-
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- RETiRet;
+ pThis->bIsRunning = bNewVal;
+ return RS_RET_OK;
}
-/* Cancel the thread. If the thread is already cancelled or terminated,
- * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in
- * such situations.
+/* Cancel the thread. If the thread is not running. But it is save and legal to
+ * call wtiCancelThrd() in such situations.
+ * IMPORTANT: WTP mutex must be locked while this function is called!
* rgerhards, 2008-02-26
*/
rsRetVal
@@ -153,17 +111,11 @@ wtiCancelThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
- d_pthread_mutex_lock(&pThis->mut);
-
- if(pThis->tCurrCmd != eWRKTHRD_STOPPED) {
- dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd);
+ if(pThis->bIsRunning) {
+ dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
- /* TODO: check: the following check should automatically be done by cancel cleanup handler! 2009-07-08 rgerhards */
- wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
}
- d_pthread_mutex_unlock(&pThis->mut);
-
RETiRet;
}
@@ -171,14 +123,7 @@ wtiCancelThrd(wti_t *pThis)
/* Destructor */
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
- if(Debug && wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
- dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - ignoring\n",
- wtiGetDbgHdr(pThis), pThis);
- }
-
/* actual destruction */
- pthread_mutex_destroy(&pThis->mut);
-
free(pThis->batch.pElem);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -187,7 +132,6 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
- pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -205,7 +149,7 @@ wtiConstructFinalize(wti_t *pThis)
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
/* initialize our thread instance descriptor */
- pThis->tCurrCmd = eWRKTHRD_STOPPED;
+ pThis->bIsRunning = FALSE;
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
@@ -238,17 +182,13 @@ wtiWorkerCancelCleanup(void *arg)
/* call user supplied handler (that one e.g. requeues the element) */
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
- d_pthread_mutex_lock(&pWtp->mut);
- wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
- /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
- d_pthread_mutex_unlock(&pWtp->mut);
ENDfunc
}
/* wait for queue to become non-empty or timeout
- * helper to wtiWorker
- * IMPORTANT: mutex must be locked when this code is called!
+ * helper to wtiWorker. Note the the predicate is
+ * re-tested by the caller, so it is OK to NOT do it here.
* rgerhards, 2009-05-20
*/
static inline void
@@ -258,8 +198,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
BEGINfunc
DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
+ d_pthread_mutex_lock(pWtp->pmutUsr);
if(pWtp->toWrkShutdown == -1) {
/* never shut down any started worker */
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
@@ -270,6 +212,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
ENDfunc
}
@@ -284,7 +227,6 @@ wtiWorker(wti_t *pThis)
int bInactivityTOOccured = 0;
rsRetVal localRet;
rsRetVal terminateRet;
- bool bMutexIsLocked;
int iCancelStateSave;
DEFiRet;
@@ -305,9 +247,7 @@ wtiWorker(wti_t *pThis)
pWtp->pfRateLimiter(pWtp->pUsr);
}
- wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
d_pthread_mutex_lock(pWtp->pmutUsr);
- bMutexIsLocked = TRUE;
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED);
@@ -316,46 +256,29 @@ wtiWorker(wti_t *pThis)
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
localRet);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
/* try to execute and process whatever we have */
/* This function must and does RELEASE the MUTEX! */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
- bMutexIsLocked = FALSE;
if(localRet == RS_RET_IDLE) {
- if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) {
+ if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
break; /* end of loop */
}
-
- if(bInactivityTOOccured) {
- /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
- break; /* end worker thread run */
- }
- d_pthread_mutex_lock(pWtp->pmutUsr);
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
continue; /* request next iteration */
}
bInactivityTOOccured = 0; /* reset for next run */
}
- /* if we exit the loop, the mutex may be locked and, if so, must be unlocked */
- if(bMutexIsLocked) {
- d_pthread_mutex_unlock(pWtp->pmutUsr);
- }
-
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
-
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
-
- wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
- d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
diff --git a/runtime/wti.h b/runtime/wti.h
index 56901165..67d89a2f 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -33,29 +33,23 @@
/* the worker thread instance class */
struct wti_s {
BEGINobjInstance;
- pthread_t thrdID; /* thread ID */
- qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
+ pthread_t thrdID; /* thread ID */
+ bool bIsRunning; /* is this thread currently running? */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
- pthread_mutex_t mut;
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
- bool bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */
uchar *pszDbgHdr; /* header string for debug messages */
};
-/* some symbolic constants for easier reference */
-
/* prototypes */
rsRetVal wtiConstruct(wti_t **ppThis);
rsRetVal wtiConstructFinalize(wti_t *pThis);
rsRetVal wtiDestruct(wti_t **ppThis);
rsRetVal wtiWorker(wti_t *pThis);
-rsRetVal wtiProcessThrdChanges(wti_t *pThis, int bLockMutex);
rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg);
-rsRetVal wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bLockMutex);
-rsRetVal wtiJoinThrd(wti_t *pThis);
rsRetVal wtiCancelThrd(wti_t *pThis);
-qWrkCmd_t wtiGetState(wti_t *pThis, int bLockMutex);
+rsRetVal wtiSetState(wti_t *pThis, bool bNew);
+bool wtiGetState(wti_t *pThis);
PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index beeaf01c..93e87987 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -8,7 +8,7 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -99,7 +99,6 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pThis->pfOnWorkerCancel = NotImplementedDummy;
pThis->pfOnWorkerStartup = NotImplementedDummy;
pThis->pfOnWorkerShutdown = NotImplementedDummy;
-dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState);
ENDobjConstruct(wtp)
@@ -285,11 +284,12 @@ wtpCancelAll(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
+ d_pthread_mutex_lock(&pThis->mut);
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
wtiCancelThrd(pThis->pWrkr[i]);
}
+ d_pthread_mutex_unlock(&pThis->mut);
RETiRet;
}
@@ -320,13 +320,24 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
static void
wtpWrkrExecCancelCleanup(void *arg)
{
- wtp_t *pThis = (wtp_t*) arg;
+ wti_t *pWti = (wti_t*) arg;
+ wtp_t *pThis;
BEGINfunc
+ ISOBJ_TYPE_assert(pWti, wti);
+ pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
+
+ // TODO: the mutex_lock is dangerous, if we are cancelled within some function
+ // that already has the mutex locked...
+ d_pthread_mutex_lock(&pThis->mut);
pThis->iCurNumWrkThrd--;
+ wtiSetState(pWti, WRKTHRD_STOPPED);
pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
+ d_pthread_mutex_unlock(&pThis->mut);
+
+ DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n",
+ wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
ENDfunc
}
@@ -341,11 +352,11 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
{
uchar *pszDbgHdr;
uchar thrdName[32] = "rs:";
- DEFiRet;
wti_t *pWti = (wti_t*) arg;
wtp_t *pThis;
sigset_t sigSet;
+ BEGINfunc
ISOBJ_TYPE_assert(pWti, wti);
pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -362,36 +373,9 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
}
# endif
- d_pthread_mutex_lock(&pThis->mut);
- pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis);
-
- /* change to RUNNING state. We need to check if we actually should still run,
- * because someone may have requested us to shut down even before we got a chance to do
- * our init. That would be a bad race... -- rgerhards, 2008-01-16
- */
- wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */
-
- do {
- d_pthread_mutex_unlock(&pThis->mut);
-
- iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */
-
- d_pthread_mutex_lock(&pThis->mut);
- } while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1);
- /* inactivity guard prevents shutdown of all workers while one should be running due to race
- * condition. It can lead to one more worker running than desired, but that is acceptable. After
- * all, that worker will shutdown itself due to inactivity timeout. If, however, none were running
- * when one was required, processing could come to a halt. -- rgerhards, 2008-01-21
- */
-
- pthread_cleanup_pop(0);
- pThis->iCurNumWrkThrd--;
- pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
-
- dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
- wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
-
- d_pthread_mutex_unlock(&pThis->mut);
+ pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti);
+ wtiWorker(pWti);
+ pthread_cleanup_pop(1);
ENDfunc
pthread_exit(0);
@@ -418,7 +402,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
/* find free spot in thread table. */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
+ if(wtiGetState(pThis->pWrkr[i]) == WRKTHRD_STOPPED) {
break;
}
}
@@ -427,7 +411,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
pWti = pThis->pWrkr[i];
- wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX);
+ wtiSetState(pWti, WRKTHRD_RUNNING);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti);
@@ -435,9 +419,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
dbgprintf("%s: started with state %d, num workers now %d\n",
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
- /* indicate we just started a worker and would like to see it running */
- wtpSetInactivityGuard(pThis, 1, MUTEX_ALREADY_LOCKED);
-
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
@@ -479,7 +460,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
}
} else {
if(nMaxWrkr > 0) {
- dbgprintf("wtpAdviseMaxWorkers signals busy\n");
wtpWakeupWrkr(pThis);
}
}
diff --git a/runtime/wtp.h b/runtime/wtp.h
index c933f337..cff6d3f9 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -27,16 +27,9 @@
#include <pthread.h>
#include "obj.h"
-/* commands and states for worker threads. */
-typedef enum {
- eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */
- /* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */
- eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but is not fully running (prob. not yet scheduled) */
- eWRKTHRD_RUNNING = 4, /* worker thread is up and running and shall continue to do so */
- eWRKTHRD_SHUTDOWN = 5, /* worker thread is running but shall terminate when wtp is empty */
- eWRKTHRD_SHUTDOWN_IMMEDIATE = 6/* worker thread is running but shall terminate even if wtp is full */
- /* SHUTDOWN_IMMEDIATE MUST always be the numerically highest state! */
-} qWrkCmd_t;
+/* states for worker threads. */
+#define WRKTHRD_STOPPED FALSE
+#define WRKTHRD_RUNNING TRUE
/* possible states of a worker thread pool */