diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-21 13:25:01 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-21 13:25:01 +0000 |
commit | c876b04da21a40e9cfe6588d89c15c226504d26e (patch) | |
tree | 970feddd0b73c0f7012c1a741faca62df16e38ce /wti.c | |
parent | f553ede5d992fd30ac66297c68b9d79b933693e4 (diff) | |
download | rsyslog-c876b04da21a40e9cfe6588d89c15c226504d26e.tar.gz rsyslog-c876b04da21a40e9cfe6588d89c15c226504d26e.tar.xz rsyslog-c876b04da21a40e9cfe6588d89c15c226504d26e.zip |
continued implementing wti class
Diffstat (limited to 'wti.c')
-rw-r--r-- | wti.c | 230 |
1 files changed, 184 insertions, 46 deletions
@@ -50,7 +50,6 @@ DEFobjStaticHelpers /* forward-definitions */ -static void *wtiWorker(void *arg); /* methods */ @@ -72,13 +71,21 @@ wtiGetDbgHdr(wti_t *pThis) /* get the current worker state. For simplicity and speed, we have * NOT used our regular calling interface this time. I hope that won't * bite in the long term... -- rgerhards, 2008-01-17 + * TODO: may be performance optimized by atomic operations */ static inline qWrkCmd_t -wtiGetState(wti_t *pThis) +wtiGetState(wti_t *pThis, int bLockMutex) { + DEFVARS_mutexProtection; + qWrkCmd_t tCmd; + ISOBJ_TYPE_assert(pThis, wti); - // TODO: lock mutex? - return pThis->tCurrCmd; + + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); + tCmd = pThis->tCurrCmd; + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + + return tCmd; } @@ -88,11 +95,10 @@ wtiGetState(wti_t *pThis) * in an active state. -- rgerhards, 2008-01-20 */ rsRetVal -wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, bActiveOnly) +wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly) { DEFiRet; DEFVARS_mutex_cancelsafeLock; - int iState; ISOBJ_TYPE_assert(pThis, wti); assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE); @@ -108,9 +114,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, bActiveOnly) dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd); switch(tCmd) { case eWRKTHRD_RUN_CREATED: - assert(pThis->tCurrCmd < eWRKTHRD_RUN_CREATED); - iState = pthread_create(&(pThis->thrdID), NULL, wtiWorker, (void*) pThis); - dbgprintf("wti: Worker thread %s, started with state %d.\n", wtiGetDbgHdr(pThis), iState); break; case eWRKTHRD_TERMINATING: /* TODO: re-enable meaningful debug msg! (via function callback?) @@ -135,7 +138,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, bActiveOnly) } mutex_cancelsafe_unlock(&pThis->mut); - return iRet; + RETiRet; } @@ -167,7 +170,7 @@ rsRetVal wtiDestruct(wti_t **ppThis) /* back to normal */ pthread_setcancelstate(iCancelStateSave, NULL); - return iRet; + RETiRet; } @@ -197,28 +200,6 @@ wtiConstructFinalize(wti_t *pThis) } -/* Waits until the specified worker thread - * changed to full running state (aka has started up). - * rgerhards, 2008-01-17 - */ -static inline rsRetVal -wtiWaitStartup(wti_t *pThis) -{ - DEFVARS_mutex_cancelsafeLock; - ISOBJ_TYPE_assert(pThis, wti); - - mutex_cancelsafe_lock(&pThis->mut); - if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) { - dbgprintf("wti: waiting on worker thread %s startup\n", wtiGetDbgHdr(pThis)); - pthread_cond_wait(&pThis->condInitDone, &pThis->mut); -dbgprintf("worker startup done!\n"); - } - mutex_cancelsafe_unlock(&pThis->mut); - - return RS_RET_OK; -} - - /* join a specific worker thread * we do not lock the mutex, because join will sync anyways... */ @@ -230,42 +211,199 @@ wtiJoinThrd(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); dbgprintf("wti: waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); pthread_join(pThis->thrdID, NULL); - wtiSetState(pThis, eWRKTHRD_STOPPED); /* back to virgin... */ + wtiSetState(pThis, eWRKTHRD_STOPPED, 0); /* back to virgin... */ pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis)); - return iRet; + RETiRet; +} + +/* check if we had a worker thread changes and, if so, act + * on it. At a minimum, terminated threads are harvested (joined). + */ +rsRetVal +wtiProcessThrdChanges(wti_t *pThis, int bLockMutex) +{ + DEFiRet; + DEFVARS_mutexProtection; + + ISOBJ_TYPE_assert(pThis, wti); + + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); + switch(pThis->tCurrCmd) { + case eWRKTHRD_TERMINATING: + iRet = wtiJoinThrd(pThis); + break; + /* these cases just to satisfy the compiler, we do not act an them: */ + case eWRKTHRD_STOPPED: + case eWRKTHRD_RUN_CREATED: + case eWRKTHRD_RUN_INIT: + case eWRKTHRD_RUNNING: + case eWRKTHRD_SHUTDOWN: + case eWRKTHRD_SHUTDOWN_IMMEDIATE: + /* DO NOTHING */ + break; + } + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + + RETiRet; } -static void * -wtiWorker(void *arg) +/* cancellation cleanup handler for queueWorker () + * Updates admin structure and frees ressources. + * rgerhards, 2008-01-16 + */ +static void +wtiWorkerCancelCleanup(void *arg) { wti_t *pThis = (wti_t*) arg; + wtp_t *pWtp; + int iCancelStateSave; ISOBJ_TYPE_assert(pThis, wti); + pWtp = pThis->pWtp; + ISOBJ_TYPE_assert(pWtp, wtp); - // TODO: add logic! - // - pthread_exit(0); + dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); + + /* call user supplied handler (that one e.g. requeues the element) */ + pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pWtp->mut); + wtiSetState(pThis, eWRKTHRD_TERMINATING, 0); + // TODO: sync access! + pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ + + pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */ + d_pthread_mutex_unlock(&pWtp->mut); + pthread_setcancelstate(iCancelStateSave, NULL); } -/* Starts a worker thread (on a specific index [i]!) + +/* generic worker thread framework + * + * Some special comments below, so that they do not clutter the main function code: + * + * On the use of pthread_testcancel(): + * Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is + * a cancellation point in itself. As we run most of the time without cancel enabled, I fear + * we may never get cancelled if we do not create a cancellation point ourselfs. + * + * On the use of pthread_yield(): + * We yield to give the other threads a chance to obtain the mutex. If we do not + * do that, this thread may very well aquire the mutex again before another thread + * has even a chance to run. The reason is that mutex operations are free to be + * implemented in the quickest possible way (and they typically are!). That is, the + * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily + * schedule other threads waiting on the same mutex. That can lead to the same thread + * aquiring the mutex ever and ever again while all others are starving for it. We + * have exactly seen this behaviour when we deliberately introduced a long-running + * test action which basically did a sleep. I understand that with real actions the + * likelihood of this starvation condition is very low - but it could still happen + * and would be very hard to debug. The yield() is a sure fix, its performance overhead + * should be well accepted given the above facts. -- rgerhards, 2008-01-10 */ rsRetVal -wtiStart(wti_t *pThis) +wtiWorker(wti_t *pThis) { DEFiRet; + DEFVARS_mutexProtection; + struct timespec t; + wtp_t *pWtp; /* our worker thread pool */ + int bInactivityTOOccured = 0; ISOBJ_TYPE_assert(pThis, wti); - wtiSetState(pThis, eWRKTHRD_RUN_CREATED); + pWtp = pThis->pWtp; /* shortcut */ + ISOBJ_TYPE_assert(pWtp, wtp); + + pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + + /* now we have our identity, on to real processing */ + while(1) { /* loop will be broken below - need to do mutex locks */ +dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); + /* process any pending thread requests */ + wtpProcessThrdChanges(pWtp); + pthread_testcancel(); /* see big comment in function header */ + pthread_yield(); /* see big comment in function header */ + + wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ + BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); + + if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) + || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { + END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); + break; /* end worker thread run */ + } + bInactivityTOOccured = 0; /* reset for next run */ + + /* if we reach this point, we are still protected by the mutex */ + + if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) { + dbgprintf("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); + pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); + + dbgprintf("%s: pre condwait ->notEmpty, worker shutdown %d\n", + wtiGetDbgHdr(pThis), pThis->pWtp->toWrkShutdown); // DEL + if(pWtp->toWrkShutdown == -1) { + dbgprintf("worker never times out!\n"); // DEL + /* never shut down any started worker */ + pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); + } else { + timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ + if(pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { + dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); + bInactivityTOOccured = 1; /* indicate we had a timeout */ + } + } + dbgprintf("%s: post condwait ->notEmpty\n", wtiGetDbgHdr(pThis)); // DEL + END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); + continue; /* request next iteration */ + } + + /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ + pWtp->pfDoWork(pThis, iCancelStateSave); + + /* TODO: move this above into one of the chck Term functions */ + //if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) + // dbgprintf("%s: worker does not yet terminate because it still has " + // " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize); + } + + /* indicate termination */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis)); + d_pthread_mutex_lock(&pThis->mut); + pthread_cleanup_pop(0); /* remove cleanup handler */ + + // TODO: I think we no longer need that - but check! +#if 0 + /* if we ever need finalize_it, here would be the place for it! */ + if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN || + qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE || + qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT || + qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) { + /* in shutdown case, we need to flag termination. All other commands + * have a meaning to the thread harvester, so we can not overwrite them + */ +dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis)); + wtiSetState(pWrkrInst, eWRKTHRD_TERMINATING, 0); + } +#else + wtiSetState(pThis, eWRKTHRD_TERMINATING, 0); +#endif + // TODO: call, mutex: + pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ + pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */ + d_pthread_mutex_unlock(&pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); - return iRet; + RETiRet; } /* some simple object access methods */ -DEFpropSetMeth(wti, toShutdown, int); /* set the debug header message * The passed-in string is duplicated. So if the caller does not need @@ -294,7 +432,7 @@ wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg) memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */ finalize_it: - return iRet; + RETiRet; } |