summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--wti.c230
-rw-r--r--wti.h3
2 files changed, 185 insertions, 48 deletions
diff --git a/wti.c b/wti.c
index ac324bcc..0e33b60f 100644
--- a/wti.c
+++ b/wti.c
@@ -50,7 +50,6 @@
DEFobjStaticHelpers
/* forward-definitions */
-static void *wtiWorker(void *arg);
/* methods */
@@ -72,13 +71,21 @@ wtiGetDbgHdr(wti_t *pThis)
/* get the current worker state. For simplicity and speed, we have
* NOT used our regular calling interface this time. I hope that won't
* bite in the long term... -- rgerhards, 2008-01-17
+ * TODO: may be performance optimized by atomic operations
*/
static inline qWrkCmd_t
-wtiGetState(wti_t *pThis)
+wtiGetState(wti_t *pThis, int bLockMutex)
{
+ DEFVARS_mutexProtection;
+ qWrkCmd_t tCmd;
+
ISOBJ_TYPE_assert(pThis, wti);
- // TODO: lock mutex?
- return pThis->tCurrCmd;
+
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+ tCmd = pThis->tCurrCmd;
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+
+ return tCmd;
}
@@ -88,11 +95,10 @@ wtiGetState(wti_t *pThis)
* in an active state. -- rgerhards, 2008-01-20
*/
rsRetVal
-wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, bActiveOnly)
+wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly)
{
DEFiRet;
DEFVARS_mutex_cancelsafeLock;
- int iState;
ISOBJ_TYPE_assert(pThis, wti);
assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
@@ -108,9 +114,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, bActiveOnly)
dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
switch(tCmd) {
case eWRKTHRD_RUN_CREATED:
- assert(pThis->tCurrCmd < eWRKTHRD_RUN_CREATED);
- iState = pthread_create(&(pThis->thrdID), NULL, wtiWorker, (void*) pThis);
- dbgprintf("wti: Worker thread %s, started with state %d.\n", wtiGetDbgHdr(pThis), iState);
break;
case eWRKTHRD_TERMINATING:
/* TODO: re-enable meaningful debug msg! (via function callback?)
@@ -135,7 +138,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, bActiveOnly)
}
mutex_cancelsafe_unlock(&pThis->mut);
- return iRet;
+ RETiRet;
}
@@ -167,7 +170,7 @@ rsRetVal wtiDestruct(wti_t **ppThis)
/* back to normal */
pthread_setcancelstate(iCancelStateSave, NULL);
- return iRet;
+ RETiRet;
}
@@ -197,28 +200,6 @@ wtiConstructFinalize(wti_t *pThis)
}
-/* Waits until the specified worker thread
- * changed to full running state (aka has started up).
- * rgerhards, 2008-01-17
- */
-static inline rsRetVal
-wtiWaitStartup(wti_t *pThis)
-{
- DEFVARS_mutex_cancelsafeLock;
- ISOBJ_TYPE_assert(pThis, wti);
-
- mutex_cancelsafe_lock(&pThis->mut);
- if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) {
- dbgprintf("wti: waiting on worker thread %s startup\n", wtiGetDbgHdr(pThis));
- pthread_cond_wait(&pThis->condInitDone, &pThis->mut);
-dbgprintf("worker startup done!\n");
- }
- mutex_cancelsafe_unlock(&pThis->mut);
-
- return RS_RET_OK;
-}
-
-
/* join a specific worker thread
* we do not lock the mutex, because join will sync anyways...
*/
@@ -230,42 +211,199 @@ wtiJoinThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("wti: waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
pthread_join(pThis->thrdID, NULL);
- wtiSetState(pThis, eWRKTHRD_STOPPED); /* back to virgin... */
+ wtiSetState(pThis, eWRKTHRD_STOPPED, 0); /* back to virgin... */
pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis));
- return iRet;
+ RETiRet;
+}
+
+/* check if we had a worker thread changes and, if so, act
+ * on it. At a minimum, terminated threads are harvested (joined).
+ */
+rsRetVal
+wtiProcessThrdChanges(wti_t *pThis, int bLockMutex)
+{
+ DEFiRet;
+ DEFVARS_mutexProtection;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+ switch(pThis->tCurrCmd) {
+ case eWRKTHRD_TERMINATING:
+ iRet = wtiJoinThrd(pThis);
+ break;
+ /* these cases just to satisfy the compiler, we do not act an them: */
+ case eWRKTHRD_STOPPED:
+ case eWRKTHRD_RUN_CREATED:
+ case eWRKTHRD_RUN_INIT:
+ case eWRKTHRD_RUNNING:
+ case eWRKTHRD_SHUTDOWN:
+ case eWRKTHRD_SHUTDOWN_IMMEDIATE:
+ /* DO NOTHING */
+ break;
+ }
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+
+ RETiRet;
}
-static void *
-wtiWorker(void *arg)
+/* cancellation cleanup handler for queueWorker ()
+ * Updates admin structure and frees ressources.
+ * rgerhards, 2008-01-16
+ */
+static void
+wtiWorkerCancelCleanup(void *arg)
{
wti_t *pThis = (wti_t*) arg;
+ wtp_t *pWtp;
+ int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, wti);
+ pWtp = pThis->pWtp;
+ ISOBJ_TYPE_assert(pWtp, wtp);
- // TODO: add logic!
- //
- pthread_exit(0);
+ dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
+
+ /* call user supplied handler (that one e.g. requeues the element) */
+ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr);
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pWtp->mut);
+ wtiSetState(pThis, eWRKTHRD_TERMINATING, 0);
+ // TODO: sync access!
+ pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+
+ pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */
+ d_pthread_mutex_unlock(&pWtp->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
}
-/* Starts a worker thread (on a specific index [i]!)
+
+/* generic worker thread framework
+ *
+ * Some special comments below, so that they do not clutter the main function code:
+ *
+ * On the use of pthread_testcancel():
+ * Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
+ * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
+ * we may never get cancelled if we do not create a cancellation point ourselfs.
+ *
+ * On the use of pthread_yield():
+ * We yield to give the other threads a chance to obtain the mutex. If we do not
+ * do that, this thread may very well aquire the mutex again before another thread
+ * has even a chance to run. The reason is that mutex operations are free to be
+ * implemented in the quickest possible way (and they typically are!). That is, the
+ * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily
+ * schedule other threads waiting on the same mutex. That can lead to the same thread
+ * aquiring the mutex ever and ever again while all others are starving for it. We
+ * have exactly seen this behaviour when we deliberately introduced a long-running
+ * test action which basically did a sleep. I understand that with real actions the
+ * likelihood of this starvation condition is very low - but it could still happen
+ * and would be very hard to debug. The yield() is a sure fix, its performance overhead
+ * should be well accepted given the above facts. -- rgerhards, 2008-01-10
*/
rsRetVal
-wtiStart(wti_t *pThis)
+wtiWorker(wti_t *pThis)
{
DEFiRet;
+ DEFVARS_mutexProtection;
+ struct timespec t;
+ wtp_t *pWtp; /* our worker thread pool */
+ int bInactivityTOOccured = 0;
ISOBJ_TYPE_assert(pThis, wti);
- wtiSetState(pThis, eWRKTHRD_RUN_CREATED);
+ pWtp = pThis->pWtp; /* shortcut */
+ ISOBJ_TYPE_assert(pWtp, wtp);
+
+ pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
+
+ /* now we have our identity, on to real processing */
+ while(1) { /* loop will be broken below - need to do mutex locks */
+dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
+ /* process any pending thread requests */
+ wtpProcessThrdChanges(pWtp);
+ pthread_testcancel(); /* see big comment in function header */
+ pthread_yield(); /* see big comment in function header */
+
+ wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
+
+ if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
+ || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
+ END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+ break; /* end worker thread run */
+ }
+ bInactivityTOOccured = 0; /* reset for next run */
+
+ /* if we reach this point, we are still protected by the mutex */
+
+ if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
+ dbgprintf("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+ pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
+
+ dbgprintf("%s: pre condwait ->notEmpty, worker shutdown %d\n",
+ wtiGetDbgHdr(pThis), pThis->pWtp->toWrkShutdown); // DEL
+ if(pWtp->toWrkShutdown == -1) {
+ dbgprintf("worker never times out!\n"); // DEL
+ /* never shut down any started worker */
+ pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ } else {
+ timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
+ if(pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
+ bInactivityTOOccured = 1; /* indicate we had a timeout */
+ }
+ }
+ dbgprintf("%s: post condwait ->notEmpty\n", wtiGetDbgHdr(pThis)); // DEL
+ END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+ continue; /* request next iteration */
+ }
+
+ /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
+ pWtp->pfDoWork(pThis, iCancelStateSave);
+
+ /* TODO: move this above into one of the chck Term functions */
+ //if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
+ // dbgprintf("%s: worker does not yet terminate because it still has "
+ // " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize);
+ }
+
+ /* indicate termination */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis));
+ d_pthread_mutex_lock(&pThis->mut);
+ pthread_cleanup_pop(0); /* remove cleanup handler */
+
+ // TODO: I think we no longer need that - but check!
+#if 0
+ /* if we ever need finalize_it, here would be the place for it! */
+ if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN ||
+ qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE ||
+ qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT ||
+ qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) {
+ /* in shutdown case, we need to flag termination. All other commands
+ * have a meaning to the thread harvester, so we can not overwrite them
+ */
+dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis));
+ wtiSetState(pWrkrInst, eWRKTHRD_TERMINATING, 0);
+ }
+#else
+ wtiSetState(pThis, eWRKTHRD_TERMINATING, 0);
+#endif
+ // TODO: call, mutex:
+ pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+ pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */
+ d_pthread_mutex_unlock(&pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
- return iRet;
+ RETiRet;
}
/* some simple object access methods */
-DEFpropSetMeth(wti, toShutdown, int);
/* set the debug header message
* The passed-in string is duplicated. So if the caller does not need
@@ -294,7 +432,7 @@ wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg)
memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */
finalize_it:
- return iRet;
+ RETiRet;
}
diff --git a/wti.h b/wti.h
index fbf1f150..4b028f73 100644
--- a/wti.h
+++ b/wti.h
@@ -34,7 +34,6 @@ typedef struct wti_s {
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
obj_t *pUsr; /* current user object being processed (or NULL if none) */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
- int toShutdown; /* shutdown timeout, used when idle */
pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */
pthread_mutex_t mut;
uchar *pszDbgHdr; /* header string for debug messages */
@@ -47,9 +46,9 @@ typedef struct wti_s {
rsRetVal wtiConstruct(wti_t **ppThis);
rsRetVal wtiConstructFinalize(wti_t *pThis);
rsRetVal wtiDestruct(wti_t **ppThis);
+rsRetVal wtiWorker(wti_t *pThis);
PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
-PROTOTYPEpropSetMeth(wti, toShutdown, int);
#define wtiGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef WTI_H_INCLUDED */