diff options
-rw-r--r-- | runtime/queue.c | 3 | ||||
-rw-r--r-- | runtime/queue.h | 11 | ||||
-rw-r--r-- | runtime/wti.c | 125 | ||||
-rw-r--r-- | runtime/wti.h | 14 | ||||
-rw-r--r-- | runtime/wtp.c | 64 | ||||
-rw-r--r-- | runtime/wtp.h | 13 |
6 files changed, 54 insertions, 176 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 37ec3663..a2bb4c1d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -312,7 +312,6 @@ TurnOffDAMode(qqueue_t *pThis) * during the lifetime of DA-mode, depending on how often the DA worker receives an * inactivity timeout. -- rgerhards, 2008-01-25 */ -dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pThis->pqDA)); if(getLogicalQueueSize(pThis->pqDA) == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, @@ -1270,7 +1269,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 */ pThis->bEnqOnly = 1; - wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); + /* need to set this so that the DA queue begins shutdown in parallel! */ if(pThis->pqDA != NULL) { pThis->pqDA->bEnqOnly = 1; wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); diff --git a/runtime/queue.h b/runtime/queue.h index e873c456..7b10e5dd 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -54,17 +54,6 @@ typedef struct qLinkedList_S { } qLinkedList_t; -typedef struct qWrkThrd_s { - pthread_t thrdID; /* thread ID */ - qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ - obj_t *pUsr; /* current user object being processed (or NULL if none) */ - struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */ - int iThrd; /* my worker thread array index */ - pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */ - pthread_mutex_t mut; -} qWrkThrd_t; /* type for queue worker threads */ - - /* the queue object */ typedef struct queue_s { BEGINobjInstance; diff --git a/runtime/wti.c b/runtime/wti.c index c536e545..b6a09c65 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -76,74 +76,32 @@ wtiGetDbgHdr(wti_t *pThis) } -/* get the current worker state. For simplicity and speed, we have - * NOT used our regular calling interface this time. I hope that won't - * bite in the long term... -- rgerhards, 2008-01-17 - * TODO: may be performance optimized by atomic operations +/* return the current worker processing state. For the sake of + * simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17 */ -qWrkCmd_t -wtiGetState(wti_t *pThis, int bLockMutex) +bool +wtiGetState(wti_t *pThis) { - DEFVARS_mutexProtection; - qWrkCmd_t tCmd; - - BEGINfunc - ISOBJ_TYPE_assert(pThis, wti); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - tCmd = pThis->tCurrCmd; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - - ENDfunc - return tCmd; + return pThis->bIsRunning; } -/* send a command to a specific thread - * rgerhards, 2008-01-20 +/* Set status (thread is running or not), actually an property of + * use for wtp, but we need to have it per thread instance (thus it + * is inside wti). -- rgerhards, 2009-07-17 */ rsRetVal -wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bLockMutex) +wtiSetState(wti_t *pThis, bool bNewVal) { - DEFiRet; - qWrkCmd_t tCurrCmd; - DEFVARS_mutexProtection; - ISOBJ_TYPE_assert(pThis, wti); - assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - - tCurrCmd = pThis->tCurrCmd; - /* all worker states must be followed sequentially, only termination can be set in any state */ - if(tCurrCmd > tCmd && !(tCmd == eWRKTHRD_STOPPED)) { - DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n", - wtiGetDbgHdr(pThis), tCmd, tCurrCmd); - } else { - DBGPRINTF("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd); - /* we could replace this with a simple if, but we leave the switch in in case we need - * to add something at a later stage. -- rgerhards, 2008-09-30 - */ - if(tCmd == eWRKTHRD_STOPPED) { - dbgprintf("%s: worker almost stopped, assuming it has\n", wtiGetDbgHdr(pThis)); - pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - } - /* apply the new state */ -dbgprintf("worker terminator will write stateval %d\n", tCmd); - unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd); - if(val != tCurrCmd) { - DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd); - } - } - - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - RETiRet; + pThis->bIsRunning = bNewVal; + return RS_RET_OK; } -/* Cancel the thread. If the thread is already cancelled or terminated, - * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in - * such situations. +/* Cancel the thread. If the thread is not running. But it is save and legal to + * call wtiCancelThrd() in such situations. + * IMPORTANT: WTP mutex must be locked while this function is called! * rgerhards, 2008-02-26 */ rsRetVal @@ -153,17 +111,11 @@ wtiCancelThrd(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - d_pthread_mutex_lock(&pThis->mut); - - if(pThis->tCurrCmd != eWRKTHRD_STOPPED) { - dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd); + if(pThis->bIsRunning) { + dbgoprint((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); - /* TODO: check: the following check should automatically be done by cancel cleanup handler! 2009-07-08 rgerhards */ - wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED); } - d_pthread_mutex_unlock(&pThis->mut); - RETiRet; } @@ -171,14 +123,7 @@ wtiCancelThrd(wti_t *pThis) /* Destructor */ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(wti) - if(Debug && wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { - dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - ignoring\n", - wtiGetDbgHdr(pThis), pThis); - } - /* actual destruction */ - pthread_mutex_destroy(&pThis->mut); - free(pThis->batch.pElem); free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -187,7 +132,6 @@ ENDobjDestruct(wti) /* Standard-Constructor for the wti object */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ - pthread_mutex_init(&pThis->mut, NULL); ENDobjConstruct(wti) @@ -205,7 +149,7 @@ wtiConstructFinalize(wti_t *pThis) dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor */ - pThis->tCurrCmd = eWRKTHRD_STOPPED; + pThis->bIsRunning = FALSE; /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); @@ -238,17 +182,13 @@ wtiWorkerCancelCleanup(void *arg) /* call user supplied handler (that one e.g. requeues the element) */ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); - d_pthread_mutex_lock(&pWtp->mut); - wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED); - /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ - d_pthread_mutex_unlock(&pWtp->mut); ENDfunc } /* wait for queue to become non-empty or timeout - * helper to wtiWorker - * IMPORTANT: mutex must be locked when this code is called! + * helper to wtiWorker. Note the the predicate is + * re-tested by the caller, so it is OK to NOT do it here. * rgerhards, 2009-05-20 */ static inline void @@ -258,8 +198,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) BEGINfunc DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); + pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); + d_pthread_mutex_lock(pWtp->pmutUsr); if(pWtp->toWrkShutdown == -1) { /* never shut down any started worker */ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); @@ -270,6 +212,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } + d_pthread_mutex_unlock(pWtp->pmutUsr); ENDfunc } @@ -284,7 +227,6 @@ wtiWorker(wti_t *pThis) int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; - bool bMutexIsLocked; int iCancelStateSave; DEFiRet; @@ -305,9 +247,7 @@ wtiWorker(wti_t *pThis) pWtp->pfRateLimiter(pWtp->pUsr); } - wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ d_pthread_mutex_lock(pWtp->pmutUsr); - bMutexIsLocked = TRUE; /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED); @@ -316,46 +256,29 @@ wtiWorker(wti_t *pThis) localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", localRet); + d_pthread_mutex_unlock(pWtp->pmutUsr); break; } /* try to execute and process whatever we have */ /* This function must and does RELEASE the MUTEX! */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); - bMutexIsLocked = FALSE; if(localRet == RS_RET_IDLE) { - if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) { + if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { break; /* end of loop */ } - - if(bInactivityTOOccured) { - /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */ - break; /* end worker thread run */ - } - d_pthread_mutex_lock(pWtp->pmutUsr); doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); - d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } bInactivityTOOccured = 0; /* reset for next run */ } - /* if we exit the loop, the mutex may be locked and, if so, must be unlocked */ - if(bMutexIsLocked) { - d_pthread_mutex_unlock(pWtp->pmutUsr); - } - /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_pop(0); /* remove cleanup handler */ - pWtp->pfOnWorkerShutdown(pWtp->pUsr); - - wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED); - d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; diff --git a/runtime/wti.h b/runtime/wti.h index 56901165..67d89a2f 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -33,29 +33,23 @@ /* the worker thread instance class */ struct wti_s { BEGINobjInstance; - pthread_t thrdID; /* thread ID */ - qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ + pthread_t thrdID; /* thread ID */ + bool bIsRunning; /* is this thread currently running? */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ - pthread_mutex_t mut; batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ - bool bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */ uchar *pszDbgHdr; /* header string for debug messages */ }; -/* some symbolic constants for easier reference */ - /* prototypes */ rsRetVal wtiConstruct(wti_t **ppThis); rsRetVal wtiConstructFinalize(wti_t *pThis); rsRetVal wtiDestruct(wti_t **ppThis); rsRetVal wtiWorker(wti_t *pThis); -rsRetVal wtiProcessThrdChanges(wti_t *pThis, int bLockMutex); rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg); -rsRetVal wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bLockMutex); -rsRetVal wtiJoinThrd(wti_t *pThis); rsRetVal wtiCancelThrd(wti_t *pThis); -qWrkCmd_t wtiGetState(wti_t *pThis, int bLockMutex); +rsRetVal wtiSetState(wti_t *pThis, bool bNew); +bool wtiGetState(wti_t *pThis); PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); diff --git a/runtime/wtp.c b/runtime/wtp.c index beeaf01c..93e87987 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -8,7 +8,7 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -99,7 +99,6 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pThis->pfOnWorkerCancel = NotImplementedDummy; pThis->pfOnWorkerStartup = NotImplementedDummy; pThis->pfOnWorkerShutdown = NotImplementedDummy; -dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState); ENDobjConstruct(wtp) @@ -285,11 +284,12 @@ wtpCancelAll(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); + d_pthread_mutex_lock(&pThis->mut); /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); wtiCancelThrd(pThis->pWrkr[i]); } + d_pthread_mutex_unlock(&pThis->mut); RETiRet; } @@ -320,13 +320,24 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex) static void wtpWrkrExecCancelCleanup(void *arg) { - wtp_t *pThis = (wtp_t*) arg; + wti_t *pWti = (wti_t*) arg; + wtp_t *pThis; BEGINfunc + ISOBJ_TYPE_assert(pWti, wti); + pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); + + // TODO: the mutex_lock is dangerous, if we are cancelled within some function + // that already has the mutex locked... + d_pthread_mutex_lock(&pThis->mut); pThis->iCurNumWrkThrd--; + wtiSetState(pWti, WRKTHRD_STOPPED); pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd); + d_pthread_mutex_unlock(&pThis->mut); + + DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", + wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); ENDfunc } @@ -341,11 +352,11 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in { uchar *pszDbgHdr; uchar thrdName[32] = "rs:"; - DEFiRet; wti_t *pWti = (wti_t*) arg; wtp_t *pThis; sigset_t sigSet; + BEGINfunc ISOBJ_TYPE_assert(pWti, wti); pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); @@ -362,36 +373,9 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in } # endif - d_pthread_mutex_lock(&pThis->mut); - pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis); - - /* change to RUNNING state. We need to check if we actually should still run, - * because someone may have requested us to shut down even before we got a chance to do - * our init. That would be a bad race... -- rgerhards, 2008-01-16 - */ - wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */ - - do { - d_pthread_mutex_unlock(&pThis->mut); - - iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */ - - d_pthread_mutex_lock(&pThis->mut); - } while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1); - /* inactivity guard prevents shutdown of all workers while one should be running due to race - * condition. It can lead to one more worker running than desired, but that is acceptable. After - * all, that worker will shutdown itself due to inactivity timeout. If, however, none were running - * when one was required, processing could come to a halt. -- rgerhards, 2008-01-21 - */ - - pthread_cleanup_pop(0); - pThis->iCurNumWrkThrd--; - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - - dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n", - wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); - - d_pthread_mutex_unlock(&pThis->mut); + pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti); + wtiWorker(pWti); + pthread_cleanup_pop(1); ENDfunc pthread_exit(0); @@ -418,7 +402,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) /* find free spot in thread table. */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) { + if(wtiGetState(pThis->pWrkr[i]) == WRKTHRD_STOPPED) { break; } } @@ -427,7 +411,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) ABORT_FINALIZE(RS_RET_NO_MORE_THREADS); pWti = pThis->pWrkr[i]; - wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX); + wtiSetState(pWti, WRKTHRD_RUNNING); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti); @@ -435,9 +419,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); - /* indicate we just started a worker and would like to see it running */ - wtpSetInactivityGuard(pThis, 1, MUTEX_ALREADY_LOCKED); - finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; @@ -479,7 +460,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) } } else { if(nMaxWrkr > 0) { - dbgprintf("wtpAdviseMaxWorkers signals busy\n"); wtpWakeupWrkr(pThis); } } diff --git a/runtime/wtp.h b/runtime/wtp.h index c933f337..cff6d3f9 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -27,16 +27,9 @@ #include <pthread.h> #include "obj.h" -/* commands and states for worker threads. */ -typedef enum { - eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */ - /* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */ - eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but is not fully running (prob. not yet scheduled) */ - eWRKTHRD_RUNNING = 4, /* worker thread is up and running and shall continue to do so */ - eWRKTHRD_SHUTDOWN = 5, /* worker thread is running but shall terminate when wtp is empty */ - eWRKTHRD_SHUTDOWN_IMMEDIATE = 6/* worker thread is running but shall terminate even if wtp is full */ - /* SHUTDOWN_IMMEDIATE MUST always be the numerically highest state! */ -} qWrkCmd_t; +/* states for worker threads. */ +#define WRKTHRD_STOPPED FALSE +#define WRKTHRD_RUNNING TRUE /* possible states of a worker thread pool */ |