summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/debug.c51
-rw-r--r--runtime/queue.c51
-rw-r--r--runtime/rsyslog.h4
-rw-r--r--runtime/wti.c76
-rw-r--r--runtime/wtp.c7
-rw-r--r--runtime/wtp.h8
6 files changed, 129 insertions, 68 deletions
diff --git a/runtime/debug.c b/runtime/debug.c
index 4f45a1e3..248c5ea3 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -828,13 +828,12 @@ sigsegvHdlr(int signum)
abort();
}
-#if 1
-#pragma GCC diagnostic ignored "-Wempty-body"
-/* write the debug message. This is a helper to dbgprintf and dbgoprint which
- * contains common code. added 2008-09-26 rgerhards
+/* actually write the debug message. This is a separate fuction because the cleanup_push/_pop
+ * interface otherwise is unsafe to use (generates compiler warnings at least).
+ * 2009-05-20 rgerhards
*/
-static void
-dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
+static inline void
+do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg)
{
static pthread_t ptLastThrdID = 0;
static int bWasNL = 0;
@@ -842,20 +841,6 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
char pszWriteBuf[1024];
size_t lenWriteBuf;
struct timespec t;
- uchar *pszObjName = NULL;
-
- /* we must get the object name before we lock the mutex, because the object
- * potentially calls back into us. If we locked the mutex, we would deadlock
- * ourselfs. On the other hand, the GetName call needs not to be protected, as
- * this thread has a valid reference. If such an object is deleted by another
- * thread, we are in much more trouble than just for dbgprint(). -- rgerhards, 2008-09-26
- */
- if(pObj != NULL) {
- pszObjName = obj.GetName(pObj);
- }
-
- pthread_mutex_lock(&mutdbgprint);
- pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
/* The bWasNL handler does not really work. It works if no thread
* switching occurs during non-NL messages. Else, things are messed
@@ -903,11 +888,35 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
if(altdbg != -1) write(altdbg, pszMsg, lenMsg);
bWasNL = (pszMsg[lenMsg - 1] == '\n') ? 1 : 0;
+}
+
+#pragma GCC diagnostic ignored "-Wempty-body"
+/* write the debug message. This is a helper to dbgprintf and dbgoprint which
+ * contains common code. added 2008-09-26 rgerhards
+ */
+static void
+dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
+{
+ uchar *pszObjName = NULL;
+
+ /* we must get the object name before we lock the mutex, because the object
+ * potentially calls back into us. If we locked the mutex, we would deadlock
+ * ourselfs. On the other hand, the GetName call needs not to be protected, as
+ * this thread has a valid reference. If such an object is deleted by another
+ * thread, we are in much more trouble than just for dbgprint(). -- rgerhards, 2008-09-26
+ */
+ if(pObj != NULL) {
+ pszObjName = obj.GetName(pObj);
+ }
+
+ pthread_mutex_lock(&mutdbgprint);
+ pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
+
+ do_dbgprint(pszObjName, pszMsg, lenMsg);
pthread_cleanup_pop(1);
}
#pragma GCC diagnostic warning "-Wempty-body"
-#endif
/* print some debug output when an object is given
* This is mostly a copy of dbgprintf, but I do not know how to combine it
diff --git a/runtime/queue.c b/runtime/queue.c
index 3118bb19..0019297b 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -469,7 +469,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
@@ -1257,7 +1257,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
/* we need to re-aquire the mutex for the next check in this case! */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
}
- if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
+ if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n");
@@ -1548,18 +1548,19 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
{
int nDequeued;
int nDiscarded;
+ int nDeleted;
int iQueueSize;
void *pUsr;
rsRetVal localRet;
DEFiRet;
+ nDeleted = pWti->batch.nElemDeq;
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = nDiscarded = 0;
- do {
+ while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDeq(pThis, &pUsr));
- iQueueSize = getLogicalQueueSize(pThis);
/* check if we should discard this element */
localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
@@ -1574,9 +1575,10 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
pWti->batch.pElem[nDequeued].pUsrp = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
++nDequeued;
- } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
+ }
- qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */
+ /* it is sufficient to persist only when the bulk of work is done */
+ qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
pWti->batch.nElem = nDequeued;
pWti->batch.nElemDeq = nDequeued + nDiscarded;
@@ -1618,6 +1620,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
+ // TODO: MULTI: check physical queue size!
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1727,6 +1730,27 @@ RateLimiter(qqueue_t *pThis)
}
+/* This dequeues the next batch and checks if the queue is empty. If it is
+ * empty, return RS_RET_IDLE. That will trigger termination of the function
+ * and tell the upper layer caller to initiate idle processing.
+ * rgerhards, 2009-05-20
+ */
+static inline rsRetVal
+DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ISOBJ_TYPE_assert(pWti, wti);
+
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+
+ if(pWti->batch.nElem == 0)
+ ABORT_FINALIZE(RS_RET_IDLE);
+
+finalize_it:
+ RETiRet;
+}
/* This is the queue consumer in the regular (non-DA) case. It is
* protected by the queue mutex, but MUST release it as soon as possible.
@@ -1740,7 +1764,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1754,6 +1778,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
}
finalize_it:
+dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1776,7 +1801,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp));
@@ -1855,6 +1880,8 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
RETiRet;
}
+
+/* common function for the idle functions that deletes the last batch if TODO MULTI */
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1990,7 +2017,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
@@ -2133,7 +2160,10 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- assert(nUpdates > 0);
+ assert(nUpdates >= 0);
+
+ if(nUpdates == 0)
+ FINALIZE;
pThis->iUpdsSincePersist += nUpdates;
if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
@@ -2141,6 +2171,7 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
pThis->iUpdsSincePersist = 0;
}
+finalize_it:
RETiRet;
}
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 3dd84ef6..01bbbb29 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -102,6 +102,7 @@ typedef struct tcpsrv_s tcpsrv_t;
typedef struct vmstk_s vmstk_t;
typedef struct batch_obj_s batch_obj_t;
typedef struct batch_s batch_t;
+typedef struct wtp_s wtp_t;
typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */
typedef uint64 qDeqID; /* queue Dequeue order ID. 32 bits is considered dangerously few */
@@ -290,10 +291,11 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
/* some generic error/status codes */
+ RS_RET_OK = 0, /**< operation successful */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
- RS_RET_OK = 0 /**< operation successful */
+ RS_RET_IDLE = 4 /**< operation successful, but callee is idle (e.g. because queue is empty) */
};
/* some helpful macros to work with srRetVals.
diff --git a/runtime/wti.c b/runtime/wti.c
index 8a7ee657..1be008df 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -333,6 +333,32 @@ wtiWorkerCancelCleanup(void *arg)
}
+/* wait for queue to become non-empty or timeout
+ * helper to wtiWorker
+ * IMPORTANT: mutex must be locked when this code is called!
+ * rgerhards, 2009-05-20
+ */
+static inline void
+doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
+{
+ struct timespec t;
+
+ DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+ pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
+
+ if(pWtp->toWrkShutdown == -1) {
+ /* never shut down any started worker */
+ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ } else {
+ timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
+ if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
+ *pbInactivityTOOccured = 1; /* indicate we had a timeout */
+ }
+ }
+}
+
+
/* generic worker thread framework
*
* Some special comments below, so that they do not clutter the main function code:
@@ -341,16 +367,20 @@ wtiWorkerCancelCleanup(void *arg)
* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
* a cancellation point in itself. As we run most of the time without cancel enabled, I fear
* we may never get cancelled if we do not create a cancellation point ourselfs.
+ * Note on rate-limiters:
+ * If we have a rate-limiter set for this worker pool, let's call it. Please
+ * keep in mind that the rate-limiter may hold us for an extended period
+ * of time. -- rgerhards, 2008-04-02
*/
#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
wtiWorker(wti_t *pThis)
{
- DEFiRet;
DEFVARS_mutexProtection;
- struct timespec t;
wtp_t *pWtp; /* our worker thread pool */
int bInactivityTOOccured = 0;
+ rsRetVal localRet;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
pWtp = pThis->pWtp; /* shortcut */
@@ -369,48 +399,38 @@ wtiWorker(wti_t *pThis)
wtpProcessThrdChanges(pWtp);
pthread_testcancel(); /* see big comment in function header */
- /* if we have a rate-limiter set for this worker pool, let's call it. Please
- * keep in mind that the rate-limiter may hold us for an extended period
- * of time. -- rgerhards, 2008-04-02
- */
- if(pWtp->pfRateLimiter != NULL) {
+ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
- if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
- || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+ /* first check if we are in shutdown process */
+ if(wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
break; /* end worker thread run */
}
- bInactivityTOOccured = 0; /* reset for next run */
- /* if we reach this point, we are still protected by the mutex */
-
- if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
- DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
- pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
-
- if(pWtp->toWrkShutdown == -1) {
- /* never shut down any started worker */
- d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
- } else {
- timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
- if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
- DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
- bInactivityTOOccured = 1; /* indicate we had a timeout */
- }
+ /* try to execute and process whatever we have */
+ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
+
+ if(localRet == RS_RET_IDLE) {
+ if(bInactivityTOOccured) {
+ /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
+ break; /* end worker thread run */
}
+ doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
continue; /* request next iteration */
}
+ END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
- /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
- pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
+ bInactivityTOOccured = 0; /* reset for next run */
}
+ /* if we exit the loop, the mutex is locked and must be unlocked */
+ END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pThis->mut);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index dab59562..40a9095b 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -248,7 +248,6 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState)
/* check if the worker shall shutdown (1 = yes, 0 = no)
- * TODO: check if we can use atomic operations to enhance performance
* Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
* (e.g. the queue clas)
* rgerhards, 2008-01-21
@@ -263,14 +262,14 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
- || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
+ || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, pThis)))
iRet = RS_RET_TERMINATE_NOW;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
/* try customer handler if one was set and we do not yet have a definite result */
if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
}
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
@@ -577,7 +576,7 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
-DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
+DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 88bd9197..d9d582af 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -50,7 +50,7 @@ typedef enum {
/* the worker thread pool (wtp) object */
-typedef struct wtp_s {
+struct wtp_s {
BEGINobjInstance;
int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
wtpState_t wtpState;
@@ -73,7 +73,7 @@ typedef struct wtp_s {
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
rsRetVal (*pfRateLimiter)(void *pUsr);
- rsRetVal (*pfIsIdle)(void *pUsr, int);
+ rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
rsRetVal (*pfOnIdle)(void *pUsr, int);
rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti);
@@ -81,7 +81,7 @@ typedef struct wtp_s {
rsRetVal (*pfOnWorkerShutdown)(void *pUsr);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
-} wtp_t;
+};
/* some symbolic constants for easier reference */
@@ -106,7 +106,7 @@ PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
-PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*));
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*));