diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-04-16 08:56:48 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-04-16 08:56:48 +0200 |
commit | 3af28bbd2dc4c79b242aad3b9e3f45cffe0f19d0 (patch) | |
tree | 6b132af0bfb750b080ce296faa015779e8444bd3 /wti.c | |
parent | d7f33053da7eb73a8c475956af6e3847e596c80a (diff) | |
download | rsyslog-3af28bbd2dc4c79b242aad3b9e3f45cffe0f19d0.tar.gz rsyslog-3af28bbd2dc4c79b242aad3b9e3f45cffe0f19d0.tar.xz rsyslog-3af28bbd2dc4c79b242aad3b9e3f45cffe0f19d0.zip |
moved runtime files into their own directory
Diffstat (limited to 'wti.c')
-rw-r--r-- | wti.c | 480 |
1 files changed, 0 insertions, 480 deletions
@@ -1,480 +0,0 @@ -/* wti.c - * - * This file implements the worker thread instance (wti) class. - * - * File begun on 2008-01-20 by RGerhards based on functions from the - * previous queue object class (the wti functions have been extracted) - * - * There is some in-depth documentation available in doc/dev_queue.html - * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it - * if you are getting aquainted to the object. - * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. - * - * This file is part of the rsyslog runtime library. - * - * The rsyslog runtime library is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * The rsyslog runtime library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. - * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. - */ -#include "config.h" - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <assert.h> -#include <signal.h> -#include <pthread.h> -#include <errno.h> - -#include "rsyslog.h" -#include "syslogd.h" -#include "stringbuf.h" -#include "srUtils.h" -#include "wtp.h" -#include "wti.h" -#include "obj.h" - -/* static data */ -DEFobjStaticHelpers - -/* forward-definitions */ - -/* methods */ - -/* get the header for debug messages - * The caller must NOT free or otherwise modify the returned string! - */ -static inline uchar * -wtiGetDbgHdr(wti_t *pThis) -{ - ISOBJ_TYPE_assert(pThis, wti); - - if(pThis->pszDbgHdr == NULL) - return (uchar*) "wti"; /* should not normally happen */ - else - return pThis->pszDbgHdr; -} - - -/* get the current worker state. For simplicity and speed, we have - * NOT used our regular calling interface this time. I hope that won't - * bite in the long term... -- rgerhards, 2008-01-17 - * TODO: may be performance optimized by atomic operations - */ -qWrkCmd_t -wtiGetState(wti_t *pThis, int bLockMutex) -{ - DEFVARS_mutexProtection; - qWrkCmd_t tCmd; - - BEGINfunc - ISOBJ_TYPE_assert(pThis, wti); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - tCmd = pThis->tCurrCmd; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - - ENDfunc - return tCmd; -} - - -/* send a command to a specific thread - * bActiveOnly specifies if the command should be sent only when the worker is - * in an active state. -- rgerhards, 2008-01-20 - */ -rsRetVal -wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, wti); - assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - - /* all worker states must be followed sequentially, only termination can be set in any state */ - if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED)) - || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) { - dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n", - wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd); - } else { - dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd); - switch(tCmd) { - case eWRKTHRD_TERMINATING: - /* TODO: re-enable meaningful debug msg! (via function callback?) - dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n", - wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize, - pThis->pQueue->iCurNumWrkThrd); - */ - pthread_cond_signal(&pThis->condExitDone); - dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis)); - break; - case eWRKTHRD_RUNNING: - pthread_cond_signal(&pThis->condInitDone); - break; - /* these cases just to satisfy the compiler, we do (yet) not act an them: */ - case eWRKTHRD_STOPPED: - case eWRKTHRD_RUN_CREATED: - case eWRKTHRD_RUN_INIT: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; - } - pThis->tCurrCmd = tCmd; /* apply the new state */ - } - - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - RETiRet; -} - - -/* Cancel the thread. If the thread is already cancelled or termination, - * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in - * such situations. - * rgerhards, 2008-02-26 - */ -rsRetVal -wtiCancelThrd(wti_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wti); - - d_pthread_mutex_lock(&pThis->mut); - - if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) { - dbgoprint((obj_t*) pThis, "canceling worker thread\n"); - pthread_cancel(pThis->thrdID); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - } - - d_pthread_mutex_unlock(&pThis->mut); - - RETiRet; -} - - -/* Destructor */ -BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ -CODESTARTobjDestruct(wti) - /* if we reach this point, we must make sure the associated worker has terminated. It is - * the callers duty to make sure the worker already knows it shall terminate. - * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25 - */ - wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */ - - d_pthread_mutex_lock(&pThis->mut); - if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { - dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n", - wtiGetDbgHdr(pThis), pThis); - /* let's hope the caller actually instructed it to shutdown... */ - pthread_cond_wait(&pThis->condExitDone, &pThis->mut); - wtiJoinThrd(pThis); - } - d_pthread_mutex_unlock(&pThis->mut); - - /* actual destruction */ - pthread_cond_destroy(&pThis->condInitDone); - pthread_cond_destroy(&pThis->condExitDone); - pthread_mutex_destroy(&pThis->mut); - - if(pThis->pszDbgHdr != NULL) - free(pThis->pszDbgHdr); -ENDobjDestruct(wti) - - -/* Standard-Constructor for the wti object - */ -BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ - pthread_cond_init(&pThis->condInitDone, NULL); - pthread_cond_init(&pThis->condExitDone, NULL); - pthread_mutex_init(&pThis->mut, NULL); -ENDobjConstruct(wti) - - -/* Construction finalizer - * rgerhards, 2008-01-17 - */ -rsRetVal -wtiConstructFinalize(wti_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wti); - - dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); - - /* initialize our thread instance descriptor */ - pThis->pUsrp = NULL; - pThis->tCurrCmd = eWRKTHRD_STOPPED; - - RETiRet; -} - - -/* join a specific worker thread - * we do not lock the mutex, because join will sync anyways... - */ -rsRetVal -wtiJoinThrd(wti_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wti); - dbgprintf("waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); - pthread_join(pThis->thrdID, NULL); - wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */ - pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - dbgprintf("worker %s has stopped\n", wtiGetDbgHdr(pThis)); - - RETiRet; -} - -/* check if we had a worker thread changes and, if so, act - * on it. At a minimum, terminated threads are harvested (joined). - */ -rsRetVal -wtiProcessThrdChanges(wti_t *pThis, int bLockMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, wti); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - switch(pThis->tCurrCmd) { - case eWRKTHRD_TERMINATING: - /* we need to at least temporarily release the mutex, because otherwise - * we may deadlock with the thread we intend to join (it aquires the mutex - * during termination processing). -- rgerhards, 2008-02-26 - */ - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - iRet = wtiJoinThrd(pThis); - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - break; - /* these cases just to satisfy the compiler, we do not act an them: */ - case eWRKTHRD_STOPPED: - case eWRKTHRD_RUN_CREATED: - case eWRKTHRD_RUN_INIT: - case eWRKTHRD_RUNNING: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; - } - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - - RETiRet; -} - - -/* cancellation cleanup handler for queueWorker () - * Updates admin structure and frees ressources. - * rgerhards, 2008-01-16 - */ -static void -wtiWorkerCancelCleanup(void *arg) -{ - wti_t *pThis = (wti_t*) arg; - wtp_t *pWtp; - int iCancelStateSave; - - BEGINfunc - ISOBJ_TYPE_assert(pThis, wti); - pWtp = pThis->pWtp; - ISOBJ_TYPE_assert(pWtp, wtp); - - dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); - - /* call user supplied handler (that one e.g. requeues the element) */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pWtp->mut); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ - pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - - d_pthread_mutex_unlock(&pWtp->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - ENDfunc -} - - -/* generic worker thread framework - * - * Some special comments below, so that they do not clutter the main function code: - * - * On the use of pthread_testcancel(): - * Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is - * a cancellation point in itself. As we run most of the time without cancel enabled, I fear - * we may never get cancelled if we do not create a cancellation point ourselfs. - * - * On the use of pthread_yield(): - * We yield to give the other threads a chance to obtain the mutex. If we do not - * do that, this thread may very well aquire the mutex again before another thread - * has even a chance to run. The reason is that mutex operations are free to be - * implemented in the quickest possible way (and they typically are!). That is, the - * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily - * schedule other threads waiting on the same mutex. That can lead to the same thread - * aquiring the mutex ever and ever again while all others are starving for it. We - * have exactly seen this behaviour when we deliberately introduced a long-running - * test action which basically did a sleep. I understand that with real actions the - * likelihood of this starvation condition is very low - but it could still happen - * and would be very hard to debug. The yield() is a sure fix, its performance overhead - * should be well accepted given the above facts. -- rgerhards, 2008-01-10 - */ -rsRetVal -wtiWorker(wti_t *pThis) -{ - DEFiRet; - DEFVARS_mutexProtection; - struct timespec t; - wtp_t *pWtp; /* our worker thread pool */ - int bInactivityTOOccured = 0; - - ISOBJ_TYPE_assert(pThis, wti); - pWtp = pThis->pWtp; /* shortcut */ - ISOBJ_TYPE_assert(pWtp, wtp); - - dbgSetThrdName(pThis->pszDbgHdr); - pThis->pUsrp = NULL; - pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - - BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); - pWtp->pfOnWorkerStartup(pWtp->pUsr); - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); - - /* now we have our identity, on to real processing */ - while(1) { /* loop will be broken below - need to do mutex locks */ - /* process any pending thread requests */ - wtpProcessThrdChanges(pWtp); - pthread_testcancel(); /* see big comment in function header */ -# if !defined(__hpux) /* pthread_yield is missing there! */ - pthread_yield(); /* see big comment in function header */ -# endif - - /* if we have a rate-limiter set for this worker pool, let's call it. Please - * keep in mind that the rate-limiter may hold us for an extended period - * of time. -- rgerhards, 2008-04-02 - */ - if(pWtp->pfRateLimiter != NULL) { - pWtp->pfRateLimiter(pWtp->pUsr); - } - - wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); - - if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) - || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); - break; /* end worker thread run */ - } - bInactivityTOOccured = 0; /* reset for next run */ - - /* if we reach this point, we are still protected by the mutex */ - - if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) { - dbgprintf("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); - pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - - if(pWtp->toWrkShutdown == -1) { - /* never shut down any started worker */ - d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); - } else { - timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ - if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { - dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); - bInactivityTOOccured = 1; /* indicate we had a timeout */ - } - } - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); - continue; /* request next iteration */ - } - - /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ - pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); - } - - /* indicate termination */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mut); - pthread_cleanup_pop(0); /* remove cleanup handler */ - - pWtp->pfOnWorkerShutdown(pWtp->pUsr); - - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - d_pthread_mutex_unlock(&pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - - RETiRet; -} - - -/* some simple object access methods */ -DEFpropSetMeth(wti, pWtp, wtp_t*); - -/* set the debug header message - * The passed-in string is duplicated. So if the caller does not need - * it any longer, it must free it. Must be called only before object is finalized. - * rgerhards, 2008-01-09 - */ -rsRetVal -wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wti); - assert(pszMsg != NULL); - - if(lenMsg < 1) - ABORT_FINALIZE(RS_RET_PARAM_ERROR); - - if(pThis->pszDbgHdr != NULL) { - free(pThis->pszDbgHdr); - pThis->pszDbgHdr = NULL; - } - - if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - - memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */ - -finalize_it: - RETiRet; -} - - -/* dummy */ -rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } - - -/* Initialize the wti class. Must be called as the very first method - * before anything else is called inside this class. - * rgerhards, 2008-01-09 - */ -BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */ - /* request objects we use */ -ENDObjClassInit(wti) - -/* - * vi:set ai: - */ |