diff options
Diffstat (limited to 'runtime/wti.c')
-rw-r--r-- | runtime/wti.c | 145 |
1 files changed, 83 insertions, 62 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index 544bffa7..465dc3e1 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -39,10 +39,10 @@ #include <pthread.h> #include <errno.h> -#ifdef OS_SOLARIS -# include <sched.h> -# define pthread_yield() sched_yield() -#endif +/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20 +//#ifdef OS_SOLARIS +//# include <sched.h> +//#endif #include "rsyslog.h" #include "stringbuf.h" @@ -201,8 +201,8 @@ CODESTARTobjDestruct(wti) pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); - if(pThis->pszDbgHdr != NULL) - free(pThis->pszDbgHdr); + free(pThis->batch.pElem); + free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -222,15 +222,20 @@ rsRetVal wtiConstructFinalize(wti_t *pThis) { DEFiRet; + int iDeqBatchSize; ISOBJ_TYPE_assert(pThis, wti); dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor */ - pThis->pUsrp = NULL; pThis->tCurrCmd = eWRKTHRD_STOPPED; + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ + CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); + CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t))); + +finalize_it: RETiRet; } @@ -314,7 +319,7 @@ wtiWorkerCancelCleanup(void *arg) DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); /* call user supplied handler (that one e.g. requeues the element) */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp); + pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); @@ -328,6 +333,34 @@ wtiWorkerCancelCleanup(void *arg) } +/* wait for queue to become non-empty or timeout + * helper to wtiWorker + * IMPORTANT: mutex must be locked when this code is called! + * rgerhards, 2009-05-20 + */ +static inline void +doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) +{ + struct timespec t; + + BEGINfunc + DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); + pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); + + if(pWtp->toWrkShutdown == -1) { + /* never shut down any started worker */ + d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); + } else { + timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ + if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { + DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); + *pbInactivityTOOccured = 1; /* indicate we had a timeout */ + } + } + ENDfunc +} + + /* generic worker thread framework * * Some special comments below, so that they do not clutter the main function code: @@ -336,100 +369,88 @@ wtiWorkerCancelCleanup(void *arg) * Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is * a cancellation point in itself. As we run most of the time without cancel enabled, I fear * we may never get cancelled if we do not create a cancellation point ourselfs. - * - * On the use of pthread_yield(): - * We yield to give the other threads a chance to obtain the mutex. If we do not - * do that, this thread may very well aquire the mutex again before another thread - * has even a chance to run. The reason is that mutex operations are free to be - * implemented in the quickest possible way (and they typically are!). That is, the - * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily - * schedule other threads waiting on the same mutex. That can lead to the same thread - * aquiring the mutex ever and ever again while all others are starving for it. We - * have exactly seen this behaviour when we deliberately introduced a long-running - * test action which basically did a sleep. I understand that with real actions the - * likelihood of this starvation condition is very low - but it could still happen - * and would be very hard to debug. The yield() is a sure fix, its performance overhead - * should be well accepted given the above facts. -- rgerhards, 2008-01-10 + * Note on rate-limiters: + * If we have a rate-limiter set for this worker pool, let's call it. Please + * keep in mind that the rate-limiter may hold us for an extended period + * of time. -- rgerhards, 2008-04-02 */ #pragma GCC diagnostic ignored "-Wempty-body" rsRetVal wtiWorker(wti_t *pThis) { - DEFiRet; DEFVARS_mutexProtection; - struct timespec t; wtp_t *pWtp; /* our worker thread pool */ int bInactivityTOOccured = 0; + rsRetVal localRet; + rsRetVal terminateRet; + DEFiRet; ISOBJ_TYPE_assert(pThis, wti); pWtp = pThis->pWtp; /* shortcut */ ISOBJ_TYPE_assert(pWtp, wtp); dbgSetThrdName(pThis->pszDbgHdr); - pThis->pUsrp = NULL; pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); + // TODO: if we have a problem, enable again! pThis->batch.nElemDeq = 0; /* re-init dequeue count */ + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); pWtp->pfOnWorkerStartup(pWtp->pUsr); - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); + END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ /* process any pending thread requests */ wtpProcessThrdChanges(pWtp); pthread_testcancel(); /* see big comment in function header */ -# if !defined(__hpux) /* pthread_yield is missing there! */ - if(pThis->bOptimizeUniProc) - pthread_yield(); /* see big comment in function header */ -# endif - - /* if we have a rate-limiter set for this worker pool, let's call it. Please - * keep in mind that the rate-limiter may hold us for an extended period - * of time. -- rgerhards, 2008-04-02 - */ - if(pWtp->pfRateLimiter != NULL) { + + if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); } wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); - - if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) - || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); - break; /* end worker thread run */ + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + + /* first check if we are in shutdown process (but evaluate a bit later) */ + terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED); + if(terminateRet == RS_RET_TERMINATE_NOW) { + /* we now need to free the old batch */ + localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); + dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", + localRet); + break; } - bInactivityTOOccured = 0; /* reset for next run */ - /* if we reach this point, we are still protected by the mutex */ - - if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) { - DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); - pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - - if(pWtp->toWrkShutdown == -1) { - /* never shut down any started worker */ - d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); - } else { - timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ - if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { - DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); - bInactivityTOOccured = 1; /* indicate we had a timeout */ - } + /* try to execute and process whatever we have */ + localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); + /* This function must and does RELEASE the MUTEX! */ + + if(localRet == RS_RET_IDLE) { + if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) { + break; /* end of loop */ } - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); + + if(bInactivityTOOccured) { + /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */ + break; /* end worker thread run */ + } + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); + END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); continue; /* request next iteration */ } - /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ - pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); + bInactivityTOOccured = 0; /* reset for next run */ } + /* if we exit the loop, the mutex is locked and must be unlocked */ + END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_pop(0); /* remove cleanup handler */ +RUNLOG_STR("XXX: Worker shutdown"); pWtp->pfOnWorkerShutdown(pWtp->pUsr); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); |