summaryrefslogtreecommitdiffstats
path: root/wti.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-24 17:55:09 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-24 17:55:09 +0000
commit5c686c8adcc473cbdbb14e4b2d736f9123210ee6 (patch)
treeeb83fbca0d98ac4948b6d9ca22d8a0e4828815a9 /wti.c
parent76782c240db52c81825c907c40c31ca8b48218de (diff)
downloadrsyslog-5c686c8adcc473cbdbb14e4b2d736f9123210ee6.tar.gz
rsyslog-5c686c8adcc473cbdbb14e4b2d736f9123210ee6.tar.xz
rsyslog-5c686c8adcc473cbdbb14e4b2d736f9123210ee6.zip
redesigned queue to utilize helper classes for threading support. This is
finally in a running state for regular (non disk-assisted) queues, with a minor nit at shutdown. So I can finally commit the work again to CVS...
Diffstat (limited to 'wti.c')
-rw-r--r--wti.c53
1 files changed, 33 insertions, 20 deletions
diff --git a/wti.c b/wti.c
index 0e33b60f..045330c6 100644
--- a/wti.c
+++ b/wti.c
@@ -73,18 +73,20 @@ wtiGetDbgHdr(wti_t *pThis)
* bite in the long term... -- rgerhards, 2008-01-17
* TODO: may be performance optimized by atomic operations
*/
-static inline qWrkCmd_t
+qWrkCmd_t
wtiGetState(wti_t *pThis, int bLockMutex)
{
DEFVARS_mutexProtection;
qWrkCmd_t tCmd;
+ BEGINfunc
ISOBJ_TYPE_assert(pThis, wti);
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
tCmd = pThis->tCurrCmd;
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ ENDfunc
return tCmd;
}
@@ -95,19 +97,20 @@ wtiGetState(wti_t *pThis, int bLockMutex)
* in an active state. -- rgerhards, 2008-01-20
*/
rsRetVal
-wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly)
+wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
{
DEFiRet;
- DEFVARS_mutex_cancelsafeLock;
+ DEFVARS_mutexProtection;
ISOBJ_TYPE_assert(pThis, wti);
assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
- mutex_cancelsafe_lock(&pThis->mut);
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+RUNLOG_VAR("%d", bActiveOnly);
/* all worker states must be followed sequentially, only termination can be set in any state */
if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED))
- || (pThis->tCurrCmd > tCmd && tCmd != eWRKTHRD_TERMINATING)) {
+ || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n",
wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd);
} else {
@@ -137,7 +140,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly)
pThis->tCurrCmd = tCmd; /* apply the new state */
}
- mutex_cancelsafe_unlock(&pThis->mut);
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
@@ -188,15 +191,17 @@ ENDobjConstruct(wti)
rsRetVal
wtiConstructFinalize(wti_t *pThis)
{
+ DEFiRet;
+
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
/* initialize our thread instance descriptor */
- pThis->pUsr = NULL;
+ pThis->pUsrp = NULL;
pThis->tCurrCmd = eWRKTHRD_STOPPED;
- return RS_RET_OK;
+ RETiRet;
}
@@ -211,7 +216,9 @@ wtiJoinThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("wti: waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
pthread_join(pThis->thrdID, NULL);
- wtiSetState(pThis, eWRKTHRD_STOPPED, 0); /* back to virgin... */
+RUNLOG;
+ wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */
+RUNLOG;
pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis));
@@ -261,6 +268,7 @@ wtiWorkerCancelCleanup(void *arg)
wtp_t *pWtp;
int iCancelStateSave;
+ BEGINfunc
ISOBJ_TYPE_assert(pThis, wti);
pWtp = pThis->pWtp;
ISOBJ_TYPE_assert(pWtp, wtp);
@@ -268,17 +276,17 @@ wtiWorkerCancelCleanup(void *arg)
dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr);
+ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0);
+ wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
// TODO: sync access!
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
- pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */
d_pthread_mutex_unlock(&pWtp->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+ ENDfunc
}
@@ -318,8 +326,11 @@ wtiWorker(wti_t *pThis)
pWtp = pThis->pWtp; /* shortcut */
ISOBJ_TYPE_assert(pWtp, wtp);
+ dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
+ pWtp->pfOnWorkerStartup(pWtp->pUsr);
+
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
@@ -349,21 +360,22 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
if(pWtp->toWrkShutdown == -1) {
dbgprintf("worker never times out!\n"); // DEL
/* never shut down any started worker */
- pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
- if(pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
bInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
- dbgprintf("%s: post condwait ->notEmpty\n", wtiGetDbgHdr(pThis)); // DEL
+ dbgprintf("%s: post condwait ->Busy or timeout\n", wtiGetDbgHdr(pThis)); // DEL
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
continue; /* request next iteration */
}
/* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
- pWtp->pfDoWork(pThis, iCancelStateSave);
+ dbgprintf("%s: calling consumer\n", wtiGetDbgHdr(pThis));
+ pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
/* TODO: move this above into one of the chck Term functions */
//if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
@@ -371,6 +383,8 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
// " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize);
}
+ pWtp->pfOnWorkerShutdown(pWtp->pUsr);
+
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis));
@@ -391,11 +405,9 @@ dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis));
wtiSetState(pWrkrInst, eWRKTHRD_TERMINATING, 0);
}
#else
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0);
+ wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
#endif
- // TODO: call, mutex:
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
- pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */
d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -404,6 +416,7 @@ dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis));
/* some simple object access methods */
+DEFpropSetMeth(wti, pWtp, wtp_t*);
/* set the debug header message
* The passed-in string is duplicated. So if the caller does not need
@@ -441,7 +454,7 @@ finalize_it:
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(wti, 1) /* one is the object version (most important for persisting) */
-ENDObjClassInit(queue)
+ENDObjClassInit(wti)
/*
* vi:set ai: