summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-20 15:12:49 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-20 15:12:49 +0200
commit9f45b80ea9ea86d516c895d97fd8670df37e319e (patch)
tree83a6ca3416c783c16801dfdc3651d52a3af6f4d1
parentad7ccabe5ec616a4bf9fda1472d8041eaf1bf815 (diff)
downloadrsyslog-9f45b80ea9ea86d516c895d97fd8670df37e319e.tar.gz
rsyslog-9f45b80ea9ea86d516c895d97fd8670df37e319e.tar.xz
rsyslog-9f45b80ea9ea86d516c895d97fd8670df37e319e.zip
free last processed message in all cases
so far, the last processed message was only freed when the next one was processed. This has been changed now. More precisely, a better algorithm has been selected for the queue worker process, which also involves less overhead than the previous one. The fix for "free last processed message" as then more or less a side-effect (easy to do) of the new algorithm.
-rw-r--r--runtime/debug.c51
-rw-r--r--runtime/queue.c51
-rw-r--r--runtime/rsyslog.h4
-rw-r--r--runtime/wti.c76
-rw-r--r--runtime/wtp.c7
-rw-r--r--runtime/wtp.h8
6 files changed, 129 insertions, 68 deletions
diff --git a/runtime/debug.c b/runtime/debug.c
index 4f45a1e3..248c5ea3 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -828,13 +828,12 @@ sigsegvHdlr(int signum)
abort();
}
-#if 1
-#pragma GCC diagnostic ignored "-Wempty-body"
-/* write the debug message. This is a helper to dbgprintf and dbgoprint which
- * contains common code. added 2008-09-26 rgerhards
+/* actually write the debug message. This is a separate fuction because the cleanup_push/_pop
+ * interface otherwise is unsafe to use (generates compiler warnings at least).
+ * 2009-05-20 rgerhards
*/
-static void
-dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
+static inline void
+do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg)
{
static pthread_t ptLastThrdID = 0;
static int bWasNL = 0;
@@ -842,20 +841,6 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
char pszWriteBuf[1024];
size_t lenWriteBuf;
struct timespec t;
- uchar *pszObjName = NULL;
-
- /* we must get the object name before we lock the mutex, because the object
- * potentially calls back into us. If we locked the mutex, we would deadlock
- * ourselfs. On the other hand, the GetName call needs not to be protected, as
- * this thread has a valid reference. If such an object is deleted by another
- * thread, we are in much more trouble than just for dbgprint(). -- rgerhards, 2008-09-26
- */
- if(pObj != NULL) {
- pszObjName = obj.GetName(pObj);
- }
-
- pthread_mutex_lock(&mutdbgprint);
- pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
/* The bWasNL handler does not really work. It works if no thread
* switching occurs during non-NL messages. Else, things are messed
@@ -903,11 +888,35 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
if(altdbg != -1) write(altdbg, pszMsg, lenMsg);
bWasNL = (pszMsg[lenMsg - 1] == '\n') ? 1 : 0;
+}
+
+#pragma GCC diagnostic ignored "-Wempty-body"
+/* write the debug message. This is a helper to dbgprintf and dbgoprint which
+ * contains common code. added 2008-09-26 rgerhards
+ */
+static void
+dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
+{
+ uchar *pszObjName = NULL;
+
+ /* we must get the object name before we lock the mutex, because the object
+ * potentially calls back into us. If we locked the mutex, we would deadlock
+ * ourselfs. On the other hand, the GetName call needs not to be protected, as
+ * this thread has a valid reference. If such an object is deleted by another
+ * thread, we are in much more trouble than just for dbgprint(). -- rgerhards, 2008-09-26
+ */
+ if(pObj != NULL) {
+ pszObjName = obj.GetName(pObj);
+ }
+
+ pthread_mutex_lock(&mutdbgprint);
+ pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
+
+ do_dbgprint(pszObjName, pszMsg, lenMsg);
pthread_cleanup_pop(1);
}
#pragma GCC diagnostic warning "-Wempty-body"
-#endif
/* print some debug output when an object is given
* This is mostly a copy of dbgprintf, but I do not know how to combine it
diff --git a/runtime/queue.c b/runtime/queue.c
index 3118bb19..0019297b 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -469,7 +469,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
@@ -1257,7 +1257,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
/* we need to re-aquire the mutex for the next check in this case! */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
}
- if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
+ if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n");
@@ -1548,18 +1548,19 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
{
int nDequeued;
int nDiscarded;
+ int nDeleted;
int iQueueSize;
void *pUsr;
rsRetVal localRet;
DEFiRet;
+ nDeleted = pWti->batch.nElemDeq;
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = nDiscarded = 0;
- do {
+ while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDeq(pThis, &pUsr));
- iQueueSize = getLogicalQueueSize(pThis);
/* check if we should discard this element */
localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
@@ -1574,9 +1575,10 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
pWti->batch.pElem[nDequeued].pUsrp = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
++nDequeued;
- } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
+ }
- qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */
+ /* it is sufficient to persist only when the bulk of work is done */
+ qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
pWti->batch.nElem = nDequeued;
pWti->batch.nElemDeq = nDequeued + nDiscarded;
@@ -1618,6 +1620,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
+ // TODO: MULTI: check physical queue size!
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1727,6 +1730,27 @@ RateLimiter(qqueue_t *pThis)
}
+/* This dequeues the next batch and checks if the queue is empty. If it is
+ * empty, return RS_RET_IDLE. That will trigger termination of the function
+ * and tell the upper layer caller to initiate idle processing.
+ * rgerhards, 2009-05-20
+ */
+static inline rsRetVal
+DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ISOBJ_TYPE_assert(pWti, wti);
+
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+
+ if(pWti->batch.nElem == 0)
+ ABORT_FINALIZE(RS_RET_IDLE);
+
+finalize_it:
+ RETiRet;
+}
/* This is the queue consumer in the regular (non-DA) case. It is
* protected by the queue mutex, but MUST release it as soon as possible.
@@ -1740,7 +1764,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1754,6 +1778,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
}
finalize_it:
+dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1776,7 +1801,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp));
@@ -1855,6 +1880,8 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
RETiRet;
}
+
+/* common function for the idle functions that deletes the last batch if TODO MULTI */
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1990,7 +2017,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
@@ -2133,7 +2160,10 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- assert(nUpdates > 0);
+ assert(nUpdates >= 0);
+
+ if(nUpdates == 0)
+ FINALIZE;
pThis->iUpdsSincePersist += nUpdates;
if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
@@ -2141,6 +2171,7 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
pThis->iUpdsSincePersist = 0;
}
+finalize_it:
RETiRet;
}
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 3dd84ef6..01bbbb29 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -102,6 +102,7 @@ typedef struct tcpsrv_s tcpsrv_t;
typedef struct vmstk_s vmstk_t;
typedef struct batch_obj_s batch_obj_t;
typedef struct batch_s batch_t;
+typedef struct wtp_s wtp_t;
typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */
typedef uint64 qDeqID; /* queue Dequeue order ID. 32 bits is considered dangerously few */
@@ -290,10 +291,11 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
/* some generic error/status codes */
+ RS_RET_OK = 0, /**< operation successful */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
- RS_RET_OK = 0 /**< operation successful */
+ RS_RET_IDLE = 4 /**< operation successful, but callee is idle (e.g. because queue is empty) */
};
/* some helpful macros to work with srRetVals.
diff --git a/runtime/wti.c b/runtime/wti.c
index 8a7ee657..1be008df 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -333,6 +333,32 @@ wtiWorkerCancelCleanup(void *arg)
}
+/* wait for queue to become non-empty or timeout
+ * helper to wtiWorker
+ * IMPORTANT: mutex must be locked when this code is called!
+ * rgerhards, 2009-05-20
+ */
+static inline void
+doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
+{
+ struct timespec t;
+
+ DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+ pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
+
+ if(pWtp->toWrkShutdown == -1) {
+ /* never shut down any started worker */
+ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ } else {
+ timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
+ if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
+ *pbInactivityTOOccured = 1; /* indicate we had a timeout */
+ }
+ }
+}
+
+
/* generic worker thread framework
*
* Some special comments below, so that they do not clutter the main function code:
@@ -341,16 +367,20 @@ wtiWorkerCancelCleanup(void *arg)
* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
* a cancellation point in itself. As we run most of the time without cancel enabled, I fear
* we may never get cancelled if we do not create a cancellation point ourselfs.
+ * Note on rate-limiters:
+ * If we have a rate-limiter set for this worker pool, let's call it. Please
+ * keep in mind that the rate-limiter may hold us for an extended period
+ * of time. -- rgerhards, 2008-04-02
*/
#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
wtiWorker(wti_t *pThis)
{
- DEFiRet;
DEFVARS_mutexProtection;
- struct timespec t;
wtp_t *pWtp; /* our worker thread pool */
int bInactivityTOOccured = 0;
+ rsRetVal localRet;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
pWtp = pThis->pWtp; /* shortcut */
@@ -369,48 +399,38 @@ wtiWorker(wti_t *pThis)
wtpProcessThrdChanges(pWtp);
pthread_testcancel(); /* see big comment in function header */
- /* if we have a rate-limiter set for this worker pool, let's call it. Please
- * keep in mind that the rate-limiter may hold us for an extended period
- * of time. -- rgerhards, 2008-04-02
- */
- if(pWtp->pfRateLimiter != NULL) {
+ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
- if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
- || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
- END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+ /* first check if we are in shutdown process */
+ if(wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
break; /* end worker thread run */
}
- bInactivityTOOccured = 0; /* reset for next run */
- /* if we reach this point, we are still protected by the mutex */
-
- if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
- DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
- pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
-
- if(pWtp->toWrkShutdown == -1) {
- /* never shut down any started worker */
- d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
- } else {
- timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
- if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
- DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
- bInactivityTOOccured = 1; /* indicate we had a timeout */
- }
+ /* try to execute and process whatever we have */
+ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
+
+ if(localRet == RS_RET_IDLE) {
+ if(bInactivityTOOccured) {
+ /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
+ break; /* end worker thread run */
}
+ doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
continue; /* request next iteration */
}
+ END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
- /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
- pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
+ bInactivityTOOccured = 0; /* reset for next run */
}
+ /* if we exit the loop, the mutex is locked and must be unlocked */
+ END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
+
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pThis->mut);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index dab59562..40a9095b 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -248,7 +248,6 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState)
/* check if the worker shall shutdown (1 = yes, 0 = no)
- * TODO: check if we can use atomic operations to enhance performance
* Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
* (e.g. the queue clas)
* rgerhards, 2008-01-21
@@ -263,14 +262,14 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
- || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
+ || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, pThis)))
iRet = RS_RET_TERMINATE_NOW;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
/* try customer handler if one was set and we do not yet have a definite result */
if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
}
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
@@ -577,7 +576,7 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
-DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
+DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 88bd9197..d9d582af 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -50,7 +50,7 @@ typedef enum {
/* the worker thread pool (wtp) object */
-typedef struct wtp_s {
+struct wtp_s {
BEGINobjInstance;
int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
wtpState_t wtpState;
@@ -73,7 +73,7 @@ typedef struct wtp_s {
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
rsRetVal (*pfRateLimiter)(void *pUsr);
- rsRetVal (*pfIsIdle)(void *pUsr, int);
+ rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
rsRetVal (*pfOnIdle)(void *pUsr, int);
rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti);
@@ -81,7 +81,7 @@ typedef struct wtp_s {
rsRetVal (*pfOnWorkerShutdown)(void *pUsr);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
-} wtp_t;
+};
/* some symbolic constants for easier reference */
@@ -106,7 +106,7 @@ PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
-PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*));
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*));