diff options
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r-- | runtime/wtp.c | 41 |
1 files changed, 26 insertions, 15 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c index ba1f94b3..59553984 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -42,10 +42,10 @@ #include <atomic.h> #include <sys/prctl.h> -#ifdef OS_SOLARIS -# include <sched.h> -# define pthread_yield() sched_yield() -#endif +/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20 +//#ifdef OS_SOLARIS +//# include <sched.h> +//#endif #include "rsyslog.h" #include "stringbuf.h" @@ -81,7 +81,7 @@ wtpGetDbgHdr(wtp_t *pThis) /* Not implemented dummy function for constructor */ -static rsRetVal NotImplementedDummy() { return RS_RET_OK; } +static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; } /* Standard-Constructor for the wtp object */ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */ @@ -90,12 +90,15 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; + pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; + pThis->pfObjProcessed = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; pThis->pfOnWorkerCancel = NotImplementedDummy; pThis->pfOnWorkerStartup = NotImplementedDummy; pThis->pfOnWorkerShutdown = NotImplementedDummy; +dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState); ENDobjConstruct(wtp) @@ -119,7 +122,7 @@ wtpConstructFinalize(wtp_t *pThis) */ if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { CHKiRet(wtiConstruct(&pThis->pWrkr[i])); pWti = pThis->pWrkr[i]; @@ -249,7 +252,6 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState) /* check if the worker shall shutdown (1 = yes, 0 = no) - * TODO: check if we can use atomic operations to enhance performance * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" * (e.g. the queue clas) * rgerhards, 2008-01-21 @@ -263,16 +265,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) ISOBJ_TYPE_assert(pThis, wtp); BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) - || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) - iRet = RS_RET_TERMINATE_NOW; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) { + ABORT_FINALIZE(RS_RET_TERMINATE_NOW); + } else if(pThis->wtpState == wtpState_SHUTDOWN) { + ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); + } /* try customer handler if one was set and we do not yet have a definite result */ - if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) { + if(pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } +finalize_it: + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } @@ -488,11 +493,12 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in static rsRetVal wtpStartWrkr(wtp_t *pThis, int bLockMutex) { - DEFiRet; DEFVARS_mutexProtection; wti_t *pWti; int i; int iState; + pthread_attr_t attr; + DEFiRet; ISOBJ_TYPE_assert(pThis, wtp); @@ -516,7 +522,10 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) pWti = pThis->pWrkr[i]; wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX); - iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti); + pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */ dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); @@ -586,8 +595,10 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)) +DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) +DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) +DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)) |