summaryrefslogtreecommitdiffstats
path: root/wtp.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-24 17:55:09 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-24 17:55:09 +0000
commit5c686c8adcc473cbdbb14e4b2d736f9123210ee6 (patch)
treeeb83fbca0d98ac4948b6d9ca22d8a0e4828815a9 /wtp.c
parent76782c240db52c81825c907c40c31ca8b48218de (diff)
downloadrsyslog-5c686c8adcc473cbdbb14e4b2d736f9123210ee6.tar.gz
rsyslog-5c686c8adcc473cbdbb14e4b2d736f9123210ee6.tar.xz
rsyslog-5c686c8adcc473cbdbb14e4b2d736f9123210ee6.zip
redesigned queue to utilize helper classes for threading support. This is
finally in a running state for regular (non disk-assisted) queues, with a minor nit at shutdown. So I can finally commit the work again to CVS...
Diffstat (limited to 'wtp.c')
-rw-r--r--wtp.c312
1 files changed, 169 insertions, 143 deletions
diff --git a/wtp.c b/wtp.c
index 4bc0cd4f..4c3ea921 100644
--- a/wtp.c
+++ b/wtp.c
@@ -71,52 +71,59 @@ wtpGetDbgHdr(wtp_t *pThis)
/* Not implemented dummy function for constructor */
-static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; }
+static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
/* Standard-Constructor for the wtp object
*/
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
- int i;
- uchar pszBuf[64];
- size_t lenBuf;
- wti_t *pWti;
-
pthread_mutex_init(&pThis->mut, NULL);
pthread_cond_init(&pThis->condThrdTrm, NULL);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
- pThis->pfOnShutdownAdvise = NotImplementedDummy;
pThis->pfOnIdle = NotImplementedDummy;
pThis->pfOnWorkerCancel = NotImplementedDummy;
+ pThis->pfOnWorkerStartup = NotImplementedDummy;
+ pThis->pfOnWorkerShutdown = NotImplementedDummy;
+ENDobjConstruct(wtp)
+
+
+/* Construction finalizer
+ * rgerhards, 2008-01-17
+ */
+rsRetVal
+wtpConstructFinalize(wtp_t *pThis)
+{
+ DEFiRet;
+ int i;
+ uchar pszBuf[64];
+ size_t lenBuf;
+ wti_t *pWti;
- /* alloc and construct workers */
+ ISOBJ_TYPE_assert(pThis, wtp);
+
+ dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
+ /* alloc and construct workers - this can only be done in finalizer as we previously do
+ * not know the max number of workers
+ */
+RUNLOG;
if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+RUNLOG_VAR("%d", i);
+RUNLOG_VAR("%p", pThis->pWrkr[i]);
CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
pWti = pThis->pWrkr[i];
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s/w%d", wtpGetDbgHdr(pThis), i);
CHKiRet(wtiSetDbgHdr(pWti, pszBuf, lenBuf));
+ CHKiRet(wtiSetpWtp(pWti, pThis));
CHKiRet(wtiConstructFinalize(pWti));
}
-finalize_it:
-ENDobjConstruct(wtp)
-
-/* Construction finalizer
- * rgerhards, 2008-01-17
- */
-rsRetVal
-wtpConstructFinalize(wtp_t __attribute__((unused)) *pThis)
-{
- ISOBJ_TYPE_assert(pThis, wtp);
-
- dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
-
- return RS_RET_OK;
+finalize_it:
+ RETiRet;
}
@@ -129,6 +136,8 @@ wtpDestruct(wtp_t **ppThis)
int iCancelStateSave;
int i;
+dbgPrintAllDebugInfo();
+RUNLOG;
assert(ppThis != NULL);
pThis = *ppThis;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -161,17 +170,31 @@ wtpDestruct(wtp_t **ppThis)
}
-/* wake up all worker threads. Param bWithDAWrk tells if the DA worker
- * is to be awaken, too. It needs special handling because it waits on
- * two different conditions depending on processing state.
+/* wake up at least one worker thread.
+ * rgerhards, 2008-01-20
+ */
+rsRetVal
+wtpWakeupWrkr(wtp_t *pThis)
+{
+ DEFiRet;
+
+ // TODO; mutex?
+ ISOBJ_TYPE_assert(pThis, wtp);
+dbgprintf("wtpWakeupWrkr 1, cond %p\n", pThis->pcondBusy);
+ pthread_cond_signal(pThis->pcondBusy);
+dbgprintf("wtpWakeupWrkr 2\n");
+ RETiRet;
+}
+/* wake up all worker threads.
* rgerhards, 2008-01-16
*/
-static inline rsRetVal
-wtpWakeupWrks(wtp_t *pThis)
+rsRetVal
+wtpWakeupAllWrkr(wtp_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, wtp);
+ // TODO; mutex?
pthread_cond_broadcast(pThis->pcondBusy);
RETiRet;
}
@@ -189,8 +212,10 @@ wtpProcessThrdChanges(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
+ RUNLOG;
if(pThis->bThrdStateChanged == 0)
FINALIZE;
+ RUNLOG;
/* go through all threads */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
@@ -198,6 +223,7 @@ wtpProcessThrdChanges(wtp_t *pThis)
}
finalize_it:
+ RUNLOG;
RETiRet;
}
@@ -218,39 +244,68 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState)
}
-#if 0
+/* check if the worker shall shutdown (1 = yes, 0 = no)
+ * TODO: check if we can use atomic operations to enhance performance
+ * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
+ * (e.g. the queue clas)
+ * rgerhards, 2008-01-21
+ */
+rsRetVal
+wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
+{
+ DEFiRet;
+ DEFVARS_mutexProtection;
+
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+ if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
+ || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
+ iRet = RS_RET_TERMINATE_NOW;
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+
+ /* try customer handler if one was set and we do not yet have a definite result */
+ if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
+ iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
+ }
+
+ RETiRet;
+}
+
+
/* Send a shutdown command to all workers and see if they terminate.
* A timeout may be specified.
* rgerhards, 2008-01-14
*/
-static rsRetVal
-wtpWrkrShutdown(wtp_t *pThis)
+rsRetVal
+wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout)
{
DEFiRet;
int bTimedOut;
struct timespec t;
int iCancelStateSave;
- // TODO: implement
- ISOBJ_TYPE_assert(pThis);
- queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
- queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
- /* race: must make sure all are running! */
- queueTimeoutComp(&t, iTimeout);/* get timeout */
+dbgPrintAllDebugInfo();
+RUNLOG_VAR("%p", pThis);
+RUNLOG_VAR("%ld", iTimeout);
+RUNLOG_VAR("%d", tShutdownCmd);
+ ISOBJ_TYPE_assert(pThis, wtp);
+
+ wtpSetState(pThis, tShutdownCmd);
+ wtpWakeupAllWrkr(pThis);
+ timeoutComp(&t, iTimeout);/* get timeout */
/* and wait for their termination */
-dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt);
+dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mutThrdMgmt);
- pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
+ d_pthread_mutex_lock(&pThis->mut);
+ pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
- dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n",
- queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
+ dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
+ wtpGetDbgHdr(pThis), iTimeout, pThis->iCurNumWrkThrd);
- if(pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutThrdMgmt, &t) != 0) {
- dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis));
+ if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, &t) != 0) {
+ dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
}
@@ -259,122 +314,61 @@ dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt);
if(bTimedOut)
iRet = RS_RET_TIMED_OUT;
+dbgprintf("wtpShutdownAll exit");
RETiRet;
}
-/* Unconditionally cancel all running worker threads.
- * rgerhards, 2008-01-14
+/* indicate that a thread has terminated and awake anyone waiting on it
+ * rgerhards, 2008-01-23
*/
-static rsRetVal
-wtpWrkrCancel(wtp_t *pThis)
+rsRetVal wtpSignalWrkrTermination(wtp_t *pThis)
{
DEFiRet;
- int i;
- // TODO: we need to implement peek(), without it (today!) we lose one message upon
- // worker cancellation! -- rgerhards, 2008-01-14
-
- ISOB_TYPE_assert(pThis);
- /* process any pending thread requests so that we know who actually is still running */
- queueChkWrkThrdChanges(pThis);
-
- /* awake the workers one more time, just to be sure */
- queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
+ //TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;
- /* first tell the workers our request */
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) {
- dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
- pthread_cancel(pThis->pWrkThrds[i].thrdID);
- }
- }
+ ISOBJ_TYPE_assert(pThis, wtp);
+ //BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
+dbgprintf("signaling thread termination, cond %p\n", &pThis->condThrdTrm);
+ pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
+ //END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
-/* Worker thread management function carried out when the main
- * worker is about to terminate.
+/* Unconditionally cancel all running worker threads.
+ * rgerhards, 2008-01-14
*/
-static rsRetVal
-wtpShutdownWorkers(wtp_t *pThis)
+rsRetVal
+wtpCancelAll(wtp_t *pThis)
{
DEFiRet;
int i;
+ // TODO: we need to implement peek(), without it (today!) we lose one message upon
+ // worker cancellation! -- rgerhards, 2008-01-14
- assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pThis, wtp);
- dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis);
+ /* process any pending thread requests so that we know who actually is still running */
+ wtpProcessThrdChanges(pThis);
- ISOB_TYPE_assert(pThis);
- /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This
- * is necessary so that all threads get sent the termination command. With a timeout of 0, however,
- * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as
- * good.
- */
- iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN, pThis->toQShutdown);
- if(iRet == RS_RET_TIMED_OUT) {
- if(pThis->toQShutdown == 0) {
- iRet = RS_RET_OK;
- } else {
- /* OK, we now need to try force the shutdown */
- dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
- queueGetID(pThis));
- iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
- }
- }
+ /* awake the workers one more time, just to be sure */
+ wtpWakeupAllWrkr(pThis);
- if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
- /* still didn't work out - so we now need to cancel the workers */
- dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
- iRet = queueWrkThrdCancel(pThis);
- }
-
- /* finally join the threads
- * In case of a cancellation, this may actually take some time. This is also
- * needed to clean up the thread descriptors, even with a regular termination.
- * And, most importantly, this is needed if we have an indifitite termination
- * time set (timeout == 0)! -- rgerhards, 2008-01-14
- */
+ /* first tell the workers our request */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) {
- queueJoinWrkThrd(pThis, i);
+ // TODO: mutex lock!
+ if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) {
+ dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
+ pthread_cancel(pThis->pWrkr[i]->thrdID);
}
}
- dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
- queueGetID(pThis), pThis->iQueueSize);
-
RETiRet;
}
-#endif
-
-
-
-/* check if the worker shall shutdown
- * TODO: check if we can use atomic operations to enhance performance
- * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
- * (e.g. the queue clas)
- * rgerhards, 2008-01-21
- */
-rsRetVal
-wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
- || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
- iRet = RS_RET_TERMINATE_NOW;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- /* try customer handler if one was set and we do not yet have a definite result */
- if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL)
- iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
-
- RETiRet;
-}
/* Set the Inactivity Guard
* rgerhards, 2008-01-21
@@ -385,9 +379,13 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
DEFiRet;
DEFVARS_mutexProtection;
+RUNLOG;
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+RUNLOG;
pThis->bInactivityGuard = bNewState;
+RUNLOG;
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+RUNLOG;
RETiRet;
}
@@ -404,6 +402,7 @@ wtpWrkrExecCancelCleanup(void *arg)
ISOBJ_TYPE_assert(pThis, wtp);
pThis->iCurNumWrkThrd--;
+ wtpSignalWrkrTermination(pThis);
dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
@@ -415,7 +414,7 @@ wtpWrkrExecCancelCleanup(void *arg)
* rgerhards, 2008-01-21
*/
static void *
-wtpWorker(void *arg)
+wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -442,7 +441,7 @@ wtpWorker(void *arg)
* our init. That would be a bad race... -- rgerhards, 2008-01-16
*/
//if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT)
- wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */
+ wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */
do {
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
@@ -459,12 +458,14 @@ wtpWorker(void *arg)
pthread_cleanup_pop(0);
pThis->iCurNumWrkThrd--;
+ wtpSignalWrkrTermination(pThis);
dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ ENDfunc
pthread_exit(0);
}
@@ -492,7 +493,9 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
*/
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
// TODO: sync!
- if(pThis->pWrkr[i]->tCurrCmd == eWRKTHRD_STOPPED) {
+RUNLOG;
+dbgprintf("%s: i %d, wti_T* %p\n", wtpGetDbgHdr(pThis), i, pThis->pWrkr[i]);
+ if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
break;
}
}
@@ -502,11 +505,12 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis
ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
pWti = pThis->pWrkr[i];
- wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX); // TODO: thuink about mutex lock
+ wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX);
iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti);
dbgprintf("%s: started with state %d, num workers now %d\n",
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
+RUNLOG;
/* we try to give the starting worker a little boost. It won't help much as we still
* hold the queue's mutex, but at least it has a chance to start on a single-CPU system.
*/
@@ -517,6 +521,7 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+RUNLOG;
RETiRet;
}
@@ -524,7 +529,9 @@ finalize_it:
/* set the number of worker threads that should be running. If less than currently running,
* a new worker may be started. Please note that there is no guarantee the number of workers
* said will be running after we exit this function. It is just a hint. If the number is
- * higher than one, the "busy" condition is also signaled to awake a worker.
+ * higher than one, and no worker is started, the "busy" condition is signaled to awake a worker.
+ * So the caller can assume that there is at least one worker re-checking if there is "work to do"
+ * after this function call.
* rgerhards, 2008-01-21
*/
rsRetVal
@@ -535,8 +542,10 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
int nMissing; /* number workers missing to run */
int i;
+ if(pThis == NULL) dbgPrintAllDebugInfo();
ISOBJ_TYPE_assert(pThis, wtp);
+dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetDbgHdr(pThis), nMaxWrkr, pThis->iCurNumWrkThrd, pThis->iNumWorkerThreads);
if(nMaxWrkr == 0)
FINALIZE;
@@ -548,15 +557,20 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
else if(nMissing < 0)
nMissing = 0;
- dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
- /* start the rqtd nbr of workers */
- for(i = 0 ; i < nMissing ; ++i) {
- CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
+ if(nMissing > 0) {
+ dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
+ /* start the rqtd nbr of workers */
+ for(i = 0 ; i < nMissing ; ++i) {
+ CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
+ }
+ } else {
+dbgprintf("wtpAdviseMaxWorkers signals busy\n");
+ wtpWakeupWrkr(pThis);
}
-
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+
finalize_it:
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
@@ -564,6 +578,17 @@ finalize_it:
/* some simple object access methods */
DEFpropSetMeth(wtp, toWrkShutdown, long);
DEFpropSetMeth(wtp, wtpState, wtpState_t);
+DEFpropSetMeth(wtp, iNumWorkerThreads, int);
+DEFpropSetMeth(wtp, pUsr, void*);
+DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t);
+DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t);
+DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
+DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
+DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
+DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
+DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*));
+DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*));
+DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*));
/* set the debug header message
@@ -572,11 +597,12 @@ DEFpropSetMeth(wtp, wtpState, wtpState_t);
* rgerhards, 2008-01-09
*/
rsRetVal
-wtpSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg)
+wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, wti);
+dbgprintf("objID: %d\n", pThis->pObjInfo->objID);
+ ISOBJ_TYPE_assert(pThis, wtp);
assert(pszMsg != NULL);
if(lenMsg < 1)