summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c330
1 files changed, 102 insertions, 228 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index abdf4add..9d0560dd 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -39,9 +39,10 @@
#include <pthread.h>
#include <errno.h>
-#ifdef OS_SOLARIS
-# include <sched.h>
-#endif
+/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20
+//#ifdef OS_SOLARIS
+//# include <sched.h>
+//#endif
#include "rsyslog.h"
#include "stringbuf.h"
@@ -75,92 +76,45 @@ wtiGetDbgHdr(wti_t *pThis)
}
-/* get the current worker state. For simplicity and speed, we have
- * NOT used our regular calling interface this time. I hope that won't
- * bite in the long term... -- rgerhards, 2008-01-17
- * TODO: may be performance optimized by atomic operations
+/* return the current worker processing state. For the sake of
+ * simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17
*/
-qWrkCmd_t
-wtiGetState(wti_t *pThis, int bLockMutex)
+bool
+wtiGetState(wti_t *pThis)
{
- DEFVARS_mutexProtection;
- qWrkCmd_t tCmd;
-
- BEGINfunc
- ISOBJ_TYPE_assert(pThis, wti);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- tCmd = pThis->tCurrCmd;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-
- ENDfunc
- return tCmd;
+ return ATOMIC_FETCH_32BIT(pThis->bIsRunning);
}
-/* send a command to a specific thread
- * bActiveOnly specifies if the command should be sent only when the worker is
- * in an active state. -- rgerhards, 2008-01-20
+/* Set this thread to "always running" state (can not be unset)
+ * rgerhards, 2009-07-20
*/
rsRetVal
-wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
+wtiSetAlwaysRunning(wti_t *pThis)
{
- DEFiRet;
- qWrkCmd_t tCurrCmd;
- DEFVARS_mutexProtection;
-
ISOBJ_TYPE_assert(pThis, wti);
- assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
-
- tCurrCmd = pThis->tCurrCmd;
- /* all worker states must be followed sequentially, only termination can be set in any state */
- if( (bActiveOnly && (tCurrCmd < eWRKTHRD_RUN_CREATED))
- || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
- DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n",
- wtiGetDbgHdr(pThis), tCmd, tCurrCmd);
- } else {
- DBGPRINTF("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
- /* we could replace this with a simple if, but we leave the switch in in case we need
- * to add something at a later stage. -- rgerhards, 2008-09-30
- */
- switch(tCmd) {
- case eWRKTHRD_TERMINATING:
- /* TODO: re-enable meaningful debug msg! (via function callback?)
- dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n",
- wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize,
- pThis->pQueue->iCurNumWrkThrd);
- */
- pthread_cond_signal(&pThis->condExitDone);
- dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis));
- break;
- /* these cases just to satisfy the compiler, we do (yet) not act an them: */
- case eWRKTHRD_RUNNING:
- case eWRKTHRD_STOPPED:
- case eWRKTHRD_RUN_CREATED:
- case eWRKTHRD_RUN_INIT:
- case eWRKTHRD_SHUTDOWN:
- case eWRKTHRD_SHUTDOWN_IMMEDIATE:
- /* DO NOTHING */
- break;
- }
- /* apply the new state */
- unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd);
- if(val != tCurrCmd) {
- DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
- }
-
- }
+ pThis->bAlwaysRunning = TRUE;
+ return RS_RET_OK;
+}
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- RETiRet;
+/* Set status (thread is running or not), actually an property of
+ * use for wtp, but we need to have it per thread instance (thus it
+ * is inside wti). -- rgerhards, 2009-07-17
+ */
+rsRetVal
+wtiSetState(wti_t *pThis, bool bNewVal)
+{
+ ISOBJ_TYPE_assert(pThis, wti);
+ if(bNewVal)
+ ATOMIC_STORE_1_TO_INT(pThis->bIsRunning);
+ else
+ ATOMIC_STORE_0_TO_INT(pThis->bIsRunning);
+ return RS_RET_OK;
}
-/* Cancel the thread. If the thread is already cancelled or terminated,
- * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in
- * such situations.
+/* Cancel the thread. If the thread is not running. But it is save and legal to
+ * call wtiCancelThrd() in such situations.
* rgerhards, 2008-02-26
*/
rsRetVal
@@ -170,19 +124,11 @@ wtiCancelThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
- d_pthread_mutex_lock(&pThis->mut);
-
- wtiProcessThrdChanges(pThis, MUTEX_ALREADY_LOCKED); /* process state change, so that we have current state vars */
-
- if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) {
- dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd);
+ if(wtiGetState(pThis)) {
+ dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
}
- d_pthread_mutex_unlock(&pThis->mut);
-
RETiRet;
}
@@ -190,26 +136,8 @@ wtiCancelThrd(wti_t *pThis)
/* Destructor */
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
- /* if we reach this point, we must make sure the associated worker has terminated. It is
- * the callers duty to make sure the worker already knows it shall terminate.
- * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25
- */
- wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */
-
- d_pthread_mutex_lock(&pThis->mut);
- if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
- dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n",
- wtiGetDbgHdr(pThis), pThis);
- /* let's hope the caller actually instructed it to shutdown... */
- pthread_cond_wait(&pThis->condExitDone, &pThis->mut);
- wtiJoinThrd(pThis);
- }
- d_pthread_mutex_unlock(&pThis->mut);
-
/* actual destruction */
- pthread_cond_destroy(&pThis->condExitDone);
- pthread_mutex_destroy(&pThis->mut);
-
+ free(pThis->batch.pElem);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -217,8 +145,6 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
- pthread_cond_init(&pThis->condExitDone, NULL);
- pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -229,81 +155,28 @@ rsRetVal
wtiConstructFinalize(wti_t *pThis)
{
DEFiRet;
+ int iDeqBatchSize;
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
- /* initialize our thread instance descriptor */
- pThis->pUsrp = NULL;
- pThis->tCurrCmd = eWRKTHRD_STOPPED;
-
- RETiRet;
-}
-
-
-/* join a specific worker thread
- * we do not lock the mutex, because join will sync anyways...
- */
-rsRetVal
-wtiJoinThrd(wti_t *pThis)
-{
- DEFiRet;
+ /* initialize our thread instance descriptor (no concurrency here) */
+ pThis->bIsRunning = FALSE;
- ISOBJ_TYPE_assert(pThis, wti);
- dbgprintf("waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
- if (pThis->thrdID == 0) {
- dbgprintf("worker %s was already stopped\n", wtiGetDbgHdr(pThis));
- } else {
- pthread_join(pThis->thrdID, NULL);
- wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */
- pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
- dbgprintf("worker %s has stopped\n", wtiGetDbgHdr(pThis));
- }
-
- RETiRet;
-}
-
-/* check if we had a worker thread changes and, if so, act
- * on it. At a minimum, terminated threads are harvested (joined).
- */
-rsRetVal
-wtiProcessThrdChanges(wti_t *pThis, int bLockMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
-
- ISOBJ_TYPE_assert(pThis, wti);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- switch(pThis->tCurrCmd) {
- case eWRKTHRD_TERMINATING:
- /* we need to at least temporarily release the mutex, because otherwise
- * we may deadlock with the thread we intend to join (it aquires the mutex
- * during termination processing). -- rgerhards, 2008-02-26
- */
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- iRet = wtiJoinThrd(pThis);
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- break;
- /* these cases just to satisfy the compiler, we do not act an them: */
- case eWRKTHRD_STOPPED:
- case eWRKTHRD_RUN_CREATED:
- case eWRKTHRD_RUN_INIT:
- case eWRKTHRD_RUNNING:
- case eWRKTHRD_SHUTDOWN:
- case eWRKTHRD_SHUTDOWN_IMMEDIATE:
- /* DO NOTHING */
- break;
- }
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ /* we now alloc the array for user pointers. We obtain the max from the queue itself. */
+ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
+ CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t)));
+finalize_it:
RETiRet;
}
/* cancellation cleanup handler for queueWorker ()
* Updates admin structure and frees ressources.
+ * Keep in mind that cancellation is disabled if we run into
+ * the cancel cleanup handler (and have been cancelled).
* rgerhards, 2008-01-16
*/
static void
@@ -311,7 +184,6 @@ wtiWorkerCancelCleanup(void *arg)
{
wti_t *pThis = (wti_t*) arg;
wtp_t *pWtp;
- int iCancelStateSave;
BEGINfunc
ISOBJ_TYPE_assert(pThis, wti);
@@ -320,17 +192,40 @@ wtiWorkerCancelCleanup(void *arg)
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
- /* call user supplied handler (that one e.g. requeues the element) */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
+ /* call user supplied handler */
+ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pWtp->mut);
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
- ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ ENDfunc
+}
- d_pthread_mutex_unlock(&pWtp->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
+
+/* wait for queue to become non-empty or timeout
+ * helper to wtiWorker. Note the the predicate is
+ * re-tested by the caller, so it is OK to NOT do it here.
+ * rgerhards, 2009-05-20
+ */
+static inline void
+doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
+{
+ struct timespec t;
+
+ BEGINfunc
+ DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+
+ pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
+
+ d_pthread_mutex_lock(pWtp->pmutUsr);
+ if(pThis->bAlwaysRunning) {
+ /* never shut down any started worker */
+ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ } else {
+ timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
+ if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
+ *pbInactivityTOOccured = 1; /* indicate we had a timeout */
+ }
+ }
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
ENDfunc
}
@@ -341,82 +236,63 @@ wtiWorkerCancelCleanup(void *arg)
rsRetVal
wtiWorker(wti_t *pThis)
{
- DEFiRet;
- DEFVARS_mutexProtection;
- struct timespec t;
wtp_t *pWtp; /* our worker thread pool */
int bInactivityTOOccured = 0;
+ rsRetVal localRet;
+ rsRetVal terminateRet;
+ int iCancelStateSave;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
pWtp = pThis->pWtp; /* shortcut */
ISOBJ_TYPE_assert(pWtp, wtp);
dbgSetThrdName(pThis->pszDbgHdr);
- pThis->pUsrp = NULL;
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
- BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
pWtp->pfOnWorkerStartup(pWtp->pUsr);
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
- /* process any pending thread requests */
- wtpProcessThrdChanges(pWtp);
-
- /* if we have a rate-limiter set for this worker pool, let's call it. Please
- * keep in mind that the rate-limiter may hold us for an extended period
- * of time. -- rgerhards, 2008-04-02
- */
- if(pWtp->pfRateLimiter != NULL) {
+ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
- wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
- BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
-
- if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
- || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
- break; /* end worker thread run */
+ d_pthread_mutex_lock(pWtp->pmutUsr);
+
+ /* first check if we are in shutdown process (but evaluate a bit later) */
+ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
+ if(terminateRet == RS_RET_TERMINATE_NOW) {
+ /* we now need to free the old batch */
+ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
+ dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
+ localRet);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+ break;
}
- bInactivityTOOccured = 0; /* reset for next run */
- /* if we reach this point, we are still protected by the mutex */
-
- if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
- DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
- pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
-
- if(pWtp->toWrkShutdown == -1) {
- /* never shut down any started worker */
- d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
- } else {
- timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
- if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
- DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
- bInactivityTOOccured = 1; /* indicate we had a timeout */
- }
+ /* try to execute and process whatever we have */
+ /* This function must and does RELEASE the MUTEX! */
+ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
+
+ if(localRet == RS_RET_IDLE) {
+ if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
+ break; /* end of loop */
}
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+ doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
continue; /* request next iteration */
}
- /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
- pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
+ bInactivityTOOccured = 0; /* reset for next run */
}
/* indicate termination */
+ d_pthread_mutex_lock(pWtp->pmutUsr);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
-
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
-
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
- d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
RETiRet;
}
@@ -444,7 +320,6 @@ wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg)
if(pThis->pszDbgHdr != NULL) {
free(pThis->pszDbgHdr);
- pThis->pszDbgHdr = NULL;
}
if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL)
@@ -478,6 +353,5 @@ BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most
CHKiRet(objUse(glbl, CORE_COMPONENT));
ENDObjClassInit(wti)
-/*
- * vi:set ai:
+/* vi:set ai:
*/