diff options
Diffstat (limited to 'runtime/wti.c')
-rw-r--r-- | runtime/wti.c | 125 |
1 files changed, 82 insertions, 43 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index 9de7c365..9c137f57 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -39,10 +39,10 @@ #include <pthread.h> #include <errno.h> -#ifdef OS_SOLARIS -# include <sched.h> -# define pthread_yield() sched_yield() -#endif +/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20 +//#ifdef OS_SOLARIS +//# include <sched.h> +//#endif #include "rsyslog.h" #include "stringbuf.h" @@ -151,7 +151,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) if(val != tCurrCmd) { DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd); } - } END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -209,6 +208,7 @@ CODESTARTobjDestruct(wti) pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); + free(pThis->batch.pElem); free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -228,15 +228,20 @@ rsRetVal wtiConstructFinalize(wti_t *pThis) { DEFiRet; + int iDeqBatchSize; ISOBJ_TYPE_assert(pThis, wti); dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor */ - pThis->pUsrp = NULL; pThis->tCurrCmd = eWRKTHRD_STOPPED; + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ + CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); + CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t))); + +finalize_it: RETiRet; } @@ -320,7 +325,7 @@ wtiWorkerCancelCleanup(void *arg) DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); /* call user supplied handler (that one e.g. requeues the element) */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp); + pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); @@ -334,75 +339,108 @@ wtiWorkerCancelCleanup(void *arg) } +/* wait for queue to become non-empty or timeout + * helper to wtiWorker + * IMPORTANT: mutex must be locked when this code is called! + * rgerhards, 2009-05-20 + */ +static inline void +doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) +{ + struct timespec t; + + BEGINfunc + DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); + pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); + + if(pWtp->toWrkShutdown == -1) { + /* never shut down any started worker */ + d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); + } else { + timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ + if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { + DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); + *pbInactivityTOOccured = 1; /* indicate we had a timeout */ + } + } + ENDfunc +} + + /* generic worker thread framework */ #pragma GCC diagnostic ignored "-Wempty-body" rsRetVal wtiWorker(wti_t *pThis) { - DEFiRet; - DEFVARS_mutexProtection; - struct timespec t; + DEFVARS_mutexProtection_uncond; wtp_t *pWtp; /* our worker thread pool */ int bInactivityTOOccured = 0; + rsRetVal localRet; + rsRetVal terminateRet; + bool bMutexIsLocked; + DEFiRet; ISOBJ_TYPE_assert(pThis, wti); pWtp = pThis->pWtp; /* shortcut */ ISOBJ_TYPE_assert(pWtp, wtp); dbgSetThrdName(pThis->pszDbgHdr); - pThis->pUsrp = NULL; pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); pWtp->pfOnWorkerStartup(pWtp->pUsr); - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); + END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ /* process any pending thread requests */ wtpProcessThrdChanges(pWtp); - /* if we have a rate-limiter set for this worker pool, let's call it. Please - * keep in mind that the rate-limiter may hold us for an extended period - * of time. -- rgerhards, 2008-04-02 - */ - if(pWtp->pfRateLimiter != NULL) { + if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); } wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); - - if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) - || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); - break; /* end worker thread run */ + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + bMutexIsLocked = TRUE; + + /* first check if we are in shutdown process (but evaluate a bit later) */ + terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED); + if(terminateRet == RS_RET_TERMINATE_NOW) { + /* we now need to free the old batch */ + localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); + dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", + localRet); + break; } - bInactivityTOOccured = 0; /* reset for next run */ - /* if we reach this point, we are still protected by the mutex */ - - if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) { - DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); - pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - - if(pWtp->toWrkShutdown == -1) { - /* never shut down any started worker */ - d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); - } else { - timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ - if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { - DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); - bInactivityTOOccured = 1; /* indicate we had a timeout */ - } + /* try to execute and process whatever we have */ + /* This function must and does RELEASE the MUTEX! */ + localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); + bMutexIsLocked = FALSE; + + if(localRet == RS_RET_IDLE) { + if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) { + break; /* end of loop */ + } + + if(bInactivityTOOccured) { + /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */ + break; /* end worker thread run */ } - END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); + END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); continue; /* request next iteration */ } - /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ - pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); + bInactivityTOOccured = 0; /* reset for next run */ + } + + /* if we exit the loop, the mutex may be locked and, if so, must be unlocked */ + if(bMutexIsLocked) { + END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); } /* indicate termination */ @@ -410,6 +448,7 @@ wtiWorker(wti_t *pThis) d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_pop(0); /* remove cleanup handler */ +RUNLOG_STR("XXX: Worker shutdown"); pWtp->pfOnWorkerShutdown(pWtp->pUsr); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); |