summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c145
1 files changed, 83 insertions, 62 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index 544bffa7..465dc3e1 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -39,10 +39,10 @@
#include <pthread.h>
#include <errno.h>
-#ifdef OS_SOLARIS
-# include <sched.h>
-# define pthread_yield() sched_yield()
-#endif
+/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20
+//#ifdef OS_SOLARIS
+//# include <sched.h>
+//#endif
#include "rsyslog.h"
#include "stringbuf.h"
@@ -201,8 +201,8 @@ CODESTARTobjDestruct(wti)
pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
- if(pThis->pszDbgHdr != NULL)
- free(pThis->pszDbgHdr);
+ free(pThis->batch.pElem);
+ free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -222,15 +222,20 @@ rsRetVal
wtiConstructFinalize(wti_t *pThis)
{
DEFiRet;
+ int iDeqBatchSize;
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
/* initialize our thread instance descriptor */
- pThis->pUsrp = NULL;
pThis->tCurrCmd = eWRKTHRD_STOPPED;
+ /* we now alloc the array for user pointers. We obtain the max from the queue itself. */
+ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
+ CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t)));
+
+finalize_it:
RETiRet;
}
@@ -314,7 +319,7 @@ wtiWorkerCancelCleanup(void *arg)
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
+ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
@@ -328,6 +333,34 @@ wtiWorkerCancelCleanup(void *arg)
}
+/* wait for queue to become non-empty or timeout
+ * helper to wtiWorker
+ * IMPORTANT: mutex must be locked when this code is called!
+ * rgerhards, 2009-05-20
+ */
+static inline void
+doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
+{
+ struct timespec t;
+
+ BEGINfunc
+ DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+ pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
+
+ if(pWtp->toWrkShutdown == -1) {
+ /* never shut down any started worker */
+ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ } else {
+ timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
+ if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
+ *pbInactivityTOOccured = 1; /* indicate we had a timeout */
+ }
+ }
+ ENDfunc
+}
+
+
/* generic worker thread framework
*
* Some special comments below, so that they do not clutter the main function code:
@@ -336,100 +369,88 @@ wtiWorkerCancelCleanup(void *arg)
* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
* a cancellation point in itself. As we run most of the time without cancel enabled, I fear
* we may never get cancelled if we do not create a cancellation point ourselfs.
- *
- * On the use of pthread_yield():
- * We yield to give the other threads a chance to obtain the mutex. If we do not
- * do that, this thread may very well aquire the mutex again before another thread
- * has even a chance to run. The reason is that mutex operations are free to be
- * implemented in the quickest possible way (and they typically are!). That is, the
- * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily
- * schedule other threads waiting on the same mutex. That can lead to the same thread
- * aquiring the mutex ever and ever again while all others are starving for it. We
- * have exactly seen this behaviour when we deliberately introduced a long-running
- * test action which basically did a sleep. I understand that with real actions the
- * likelihood of this starvation condition is very low - but it could still happen
- * and would be very hard to debug. The yield() is a sure fix, its performance overhead
- * should be well accepted given the above facts. -- rgerhards, 2008-01-10
+ * Note on rate-limiters:
+ * If we have a rate-limiter set for this worker pool, let's call it. Please
+ * keep in mind that the rate-limiter may hold us for an extended period
+ * of time. -- rgerhards, 2008-04-02
*/
#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
wtiWorker(wti_t *pThis)
{
- DEFiRet;
DEFVARS_mutexProtection;
- struct timespec t;
wtp_t *pWtp; /* our worker thread pool */
int bInactivityTOOccured = 0;
+ rsRetVal localRet;
+ rsRetVal terminateRet;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
pWtp = pThis->pWtp; /* shortcut */
ISOBJ_TYPE_assert(pWtp, wtp);
dbgSetThrdName(pThis->pszDbgHdr);
- pThis->pUsrp = NULL;
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
- BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
+ // TODO: if we have a problem, enable again! pThis->batch.nElemDeq = 0; /* re-init dequeue count */
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
pWtp->pfOnWorkerStartup(pWtp->pUsr);
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
/* process any pending thread requests */
wtpProcessThrdChanges(pWtp);
pthread_testcancel(); /* see big comment in function header */
-# if !defined(__hpux) /* pthread_yield is missing there! */
- if(pThis->bOptimizeUniProc)
- pthread_yield(); /* see big comment in function header */
-# endif
-
- /* if we have a rate-limiter set for this worker pool, let's call it. Please
- * keep in mind that the rate-limiter may hold us for an extended period
- * of time. -- rgerhards, 2008-04-02
- */
- if(pWtp->pfRateLimiter != NULL) {
+
+ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
- BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
-
- if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
- || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
- break; /* end worker thread run */
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
+
+ /* first check if we are in shutdown process (but evaluate a bit later) */
+ terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED);
+ if(terminateRet == RS_RET_TERMINATE_NOW) {
+ /* we now need to free the old batch */
+ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
+ dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
+ localRet);
+ break;
}
- bInactivityTOOccured = 0; /* reset for next run */
- /* if we reach this point, we are still protected by the mutex */
-
- if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
- DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
- pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
-
- if(pWtp->toWrkShutdown == -1) {
- /* never shut down any started worker */
- d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
- } else {
- timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
- if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
- DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
- bInactivityTOOccured = 1; /* indicate we had a timeout */
- }
+ /* try to execute and process whatever we have */
+ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
+ /* This function must and does RELEASE the MUTEX! */
+
+ if(localRet == RS_RET_IDLE) {
+ if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) {
+ break; /* end of loop */
}
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+
+ if(bInactivityTOOccured) {
+ /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
+ break; /* end worker thread run */
+ }
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
+ doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
continue; /* request next iteration */
}
- /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
- pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
+ bInactivityTOOccured = 0; /* reset for next run */
}
+ /* if we exit the loop, the mutex is locked and must be unlocked */
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
+
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
+RUNLOG_STR("XXX: Worker shutdown");
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);