diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/conf.c | 2 | ||||
-rw-r--r-- | runtime/module-template.h | 72 | ||||
-rw-r--r-- | runtime/modules.c | 50 | ||||
-rw-r--r-- | runtime/modules.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 253 | ||||
-rw-r--r-- | runtime/queue.h | 14 | ||||
-rw-r--r-- | runtime/rsyslog.h | 12 | ||||
-rw-r--r-- | runtime/wti.c | 18 | ||||
-rw-r--r-- | runtime/wti.h | 16 | ||||
-rw-r--r-- | runtime/wtp.c | 9 | ||||
-rw-r--r-- | runtime/wtp.h | 4 |
11 files changed, 303 insertions, 149 deletions
diff --git a/runtime/conf.c b/runtime/conf.c index 27ab8bb4..c776ef46 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -1080,7 +1080,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction) dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n"); pAction->f_ReduceRepeated = 0; } - pAction->bEnabled = 1; /* action is enabled */ + pAction->eState = ACT_STATE_RDY; /* action is enabled */ iNbrActions++; /* one more active action! */ } break; diff --git a/runtime/module-template.h b/runtime/module-template.h index 6f7d877c..3e963199 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -39,7 +39,8 @@ #define DEF_OMOD_STATIC_DATA \ DEF_MOD_STATIC_DATA \ - DEFobjCurrIf(obj) + DEFobjCurrIf(obj) \ + static __attribute__((unused)) int bCoreSupportsBatching; #define DEF_IMOD_STATIC_DATA \ DEF_MOD_STATIC_DATA \ DEFobjCurrIf(obj) @@ -160,6 +161,37 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF RETiRet;\ } + +/* beginTransaction() + * introduced in v4.3.3 -- rgerhards, 2009-04-27 + */ +#define BEGINbeginTransaction \ +static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ +{\ + DEFiRet; + +#define CODESTARTbeginTransaction /* currently empty, but may be extended */ + +#define ENDbeginTransaction \ + RETiRet;\ +} + + +/* endTransaction() + * introduced in v4.3.3 -- rgerhards, 2009-04-27 + */ +#define BEGINendTransaction \ +static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ +{\ + DEFiRet; + +#define CODESTARTendTransaction /* currently empty, but may be extended */ + +#define ENDendTransaction \ + RETiRet;\ +} + + /* doAction() */ #define BEGINdoAction \ @@ -324,6 +356,18 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\ *pEtryPoint = tryResume;\ } + +/* the following definition is queryEtryPt block that must be added + * if an output module supports the transactional interface. + * rgerhards, 2009-04-27 + */ +#define CODEqueryEtryPt_TXIF_OMOD_QUERIES \ + else if(!strcmp((char*) name, "beginTransaction")) {\ + *pEtryPoint = beginTransaction;\ + } else if(!strcmp((char*) name, "endTransaction")) {\ + *pEtryPoint = endTransaction;\ + } + /* the following definition is the standard block for queryEtryPt for INPUT * modules. This can be used if no specific handling (e.g. to cover version * differences) is needed. @@ -393,6 +437,32 @@ finalize_it:\ } +/* now come some check functions, which enable a standard way of obtaining feature + * information from the core. feat is the to-be-tested feature and featVar is a + * variable that receives the result (0-not support, 1-supported). + * This must be a macro, so that it is put into the output's code. Otherwise, we + * would need to rely on a library entry point, which is what we intend to avoid ;) + * rgerhards, 2009-04-27 + */ +#define INITChkCoreFeature(featVar, feat) \ +{ \ + rsRetVal MACRO_Ret; \ + rsRetVal (*pQueryCoreFeatureSupport)(int*, unsigned); \ + int bSupportsIt; \ + featVar = 0; \ + MACRO_Ret = pHostQueryEtryPt((uchar*)"queryCoreFeatureSupport", &pQueryCoreFeatureSupport); \ + if(MACRO_Ret == RS_RET_OK) { \ + /* found entry point, so let's see if core supports it */ \ + CHKiRet((*pQueryCoreFeatureSupport)(&bSupportsIt, feat)); \ + if(bSupportsIt) \ + featVar = 1; \ + } else if(MACRO_Ret != RS_RET_ENTRY_POINT_NOT_FOUND) { \ + ABORT_FINALIZE(MACRO_Ret); /* Something else went wrong, what is not acceptable */ \ + } \ +} + + + /* definitions for host API queries */ #define CODEmodInit_QueryRegCFSLineHdlr \ CHKiRet(pHostQueryEtryPt((uchar*)"regCfSysLineHdlr", &omsdRegCFSLineHdlr)); diff --git a/runtime/modules.c b/runtime/modules.c index 9fdb48e7..bfd87a71 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -68,6 +68,21 @@ static modInfo_t *pLoadedModulesLast = NULL; /* tail-pointer */ uchar *pModDir = NULL; /* read-only after startup */ +/* we provide a set of dummy functions for output modules that do not support the + * transactional interface. As they do not do this, they commit each message they + * receive, and as such the dummies can always return RS_RET_OK without causing + * harm. This simplifies things as in action processing we do not need to check + * if the transactional entry points exist. + */ +static rsRetVal dummyBeginTransaction() +{ + return RS_RET_OK; +} +static rsRetVal dummyEndTransaction() +{ + return RS_RET_OK; +} + #ifdef DEBUG /* we add some home-grown support to track our users (and detect who does not free us). In * the long term, this should probably be migrated into debug.c (TODO). -- rgerhards, 2008-03-11 @@ -207,19 +222,38 @@ static void moduleDestruct(modInfo_t *pThis) } +/* This enables a module to query the core for specific features. + * rgerhards, 2009-04-22 + */ +static rsRetVal queryCoreFeatureSupport(int *pBool, unsigned uFeat) +{ + DEFiRet; + + if((pBool == NULL)) + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + + *pBool = (uFeat & CORE_FEATURE_BATCHING) ? 1 : 0; + +finalize_it: + RETiRet; +} + + /* The following function is the queryEntryPoint for host-based entry points. * Modules may call it to get access to core interface functions. Please note * that utility functions can be accessed via shared libraries - at least this * is my current shool of thinking. * Please note that the implementation as a query interface allows to take * care of plug-in interface version differences. -- rgerhards, 2007-07-31 + * ... but often it better not to use a new interface. So we now add core + * functions here that a plugin may request. -- rgerhards, 2009-04-22 */ static rsRetVal queryHostEtryPt(uchar *name, rsRetVal (**pEtryPoint)()) { DEFiRet; if((name == NULL) || (pEtryPoint == NULL)) - return RS_RET_PARAM_ERROR; + ABORT_FINALIZE(RS_RET_PARAM_ERROR); if(!strcmp((char*) name, "regCfSysLineHdlr")) { *pEtryPoint = regCfSysLineHdlr; @@ -227,6 +261,8 @@ static rsRetVal queryHostEtryPt(uchar *name, rsRetVal (**pEtryPoint)()) *pEtryPoint = objGetObjInterface; } else if(!strcmp((char*) name, "OMSRgetSupportedTplOpts")) { *pEtryPoint = OMSRgetSupportedTplOpts; + } else if(!strcmp((char*) name, "queryCoreFeatureSupport")) { + *pEtryPoint = queryCoreFeatureSupport; } else { *pEtryPoint = NULL; /* to be on the safe side */ ABORT_FINALIZE(RS_RET_ENTRY_POINT_NOT_FOUND); @@ -402,6 +438,18 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_ localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP); if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) ABORT_FINALIZE(localRet); + + localRet = (*pNew->modQueryEtryPt)((uchar*)"beginTransaction", &pNew->mod.om.beginTransaction); + if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) + pNew->mod.om.beginTransaction = dummyBeginTransaction; + else if(localRet != RS_RET_OK) + ABORT_FINALIZE(localRet); + + localRet = (*pNew->modQueryEtryPt)((uchar*)"endTransaction", &pNew->mod.om.endTransaction); + if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) + pNew->mod.om.beginTransaction = dummyEndTransaction; + else if(localRet != RS_RET_OK) + ABORT_FINALIZE(localRet); break; case eMOD_LIB: break; diff --git a/runtime/modules.h b/runtime/modules.h index 372529ee..e33bbbe1 100644 --- a/runtime/modules.h +++ b/runtime/modules.h @@ -110,7 +110,9 @@ typedef struct modInfo_s { struct {/* data for output modules */ /* below: perform the configured action */ + rsRetVal (*beginTransaction)(void*); rsRetVal (*doAction)(uchar**, unsigned, void*); + rsRetVal (*endTransaction)(void*); rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**); } om; struct { /* data for library modules */ diff --git a/runtime/queue.c b/runtime/queue.c index 4e017e84..c2df928b 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -8,7 +8,11 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static + * function names - this makes it really hard to read and does not provide much + * benefit, at least I (now) think so... + * + * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -63,12 +67,13 @@ DEFobjCurrIf(glbl) /* forward-definitions */ rsRetVal qqueueChkPersist(qqueue_t *pThis); static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); -static rsRetVal qqueueRateLimiter(qqueue_t *pThis); +static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); +static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); -static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); -static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2); -static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); +static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); +static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -369,9 +374,10 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); @@ -483,9 +489,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis) ASSERT(pThis != NULL); queueDrain(pThis); /* discard any remaining queue entries */ - - if(pThis->tVars.farray.pBuf != NULL) - free(pThis->tVars.farray.pBuf); + free(pThis->tVars.farray.pBuf); RETiRet; } @@ -607,28 +611,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) { DEFiRet; - iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); -#if 0 - qLinkedList_t *pEntry; - - ASSERT(pThis != NULL); - if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } - - pEntry->pNext = NULL; - pEntry->pUsr = pUsr; - - if(pThis->tVars.linklist.pRoot == NULL) { - pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry; - } else { - pThis->tVars.linklist.pLast->pNext = pEntry; - pThis->tVars.linklist.pLast = pEntry; - } - -finalize_it: -#endif RETiRet; } @@ -636,24 +619,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); -#if 0 - qLinkedList_t *pEntry; - - ASSERT(pThis != NULL); - ASSERT(pThis->tVars.linklist.pRoot != NULL); - - pEntry = pThis->tVars.linklist.pRoot; - *ppUsr = pEntry->pUsr; - - if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) { - pThis->tVars.linklist.pRoot = NULL; - pThis->tVars.linklist.pLast = NULL; - } else { - pThis->tVars.linklist.pRoot = pEntry->pNext; - } - free(pEntry); - -#endif RETiRet; } @@ -760,7 +725,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) while(iUngottenObjs > 0) { /* fill the queue from disk */ CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL)); - qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); + UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); --iUngottenObjs; /* one less */ } @@ -931,6 +896,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { + aUsrp_t aUsrp; + obj_t *pMsgp; DEFiRet; ASSERT(pThis != NULL); @@ -940,8 +907,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * mode the consumer probably has a lot to convey (which get's lost in the other modes * because they are asynchronous. But direct mode is deliberately synchronous. * rgerhards, 2008-02-12 + * We use our knowledge about the aUsrp_t structure below, but without that, we + * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pUsr); + pMsgp = (obj_t*) pUsr; + aUsrp.nElem = 1; /* there always is only one in direct mode */ + aUsrp.pUsrp = &pMsgp; + iRet = pThis->pConsumer(pThis->pUsr, &aUsrp); RETiRet; } @@ -961,7 +933,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute_ * rgerhards, 2008-01-20 */ static rsRetVal -qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) +UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -991,7 +963,7 @@ qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) * rgerhards, 2008-01-29 */ static rsRetVal -qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) +GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; @@ -1047,7 +1019,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ if(pThis->iUngottenObjs > 0) { - iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); + iRet = GetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); ATOMIC_DEC(pThis->iQueueSize); @@ -1275,7 +1247,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)) { DEFiRet; qqueue_t *pThis; @@ -1305,6 +1277,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ + pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */ pThis->pszFilePrefix = NULL; pThis->qType = qType; @@ -1354,7 +1327,7 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -qqueueConsumerCancelCleanup(void *arg1, void *arg2) +ConsumerCancelCleanup(void *arg1, void *arg2) { DEFiRet; @@ -1366,7 +1339,7 @@ qqueueConsumerCancelCleanup(void *arg1, void *arg2) if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); - CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX)); + CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX)); } finalize_it: @@ -1415,38 +1388,68 @@ finalize_it: } +/* dequeue as many user points as are available, until we hit the configured + * upper limit of pointers. + * This must only be called when the queue mutex is LOOKED, otherwise serious + * malfunction will happen. + */ +static inline rsRetVal +DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize) +{ + int nDequeued; + int iQueueSize; + void *pUsr; + rsRetVal localRet; + DEFiRet; + + nDequeued = 0; + do { +dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); + CHKiRet(qqueueDel(pThis, &pUsr)); + qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */ + iQueueSize = qqueueGetOverallQueueSize(pThis); + + /* check if we should discard this element */ + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); + if(localRet == RS_RET_QUEUE_FULL) + continue; + else if(localRet != RS_RET_OK) + ABORT_FINALIZE(localRet); + + /* all well, use this element */ + pWti->paUsrp->pUsrp[nDequeued++] = pUsr; + } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize); + + //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ + pWti->paUsrp->nElem = nDequeued; + *iRemainingQueueSize = iQueueSize; + +finalize_it: + RETiRet; +} + + /* dequeue the queued object for the queue consumers. * rgerhards, 2008-10-21 + * I made a radical change - we now dequeue multiple elements, and store these objects in + * an array of user pointers. We expect that this increases performance. + * rgerhards, 2009-04-22 */ static rsRetVal -qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; - void *pUsr; - int iQueueSize; - int bRunsDA; /* cache for early mutex release */ + int iQueueSize = 0; /* keep the compiler happy... */ - /* dequeue element (still protected from mutex) */ - iRet = qqueueDel(pThis, &pUsr); - qqueueChkPersist(pThis); - iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */ - bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ - - /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY - * if we could successfully obtain a user pointer. Otherwise, we would bring the - * cancel cleanup handler into big troubles (and we did ;)). Note that we can - * NOT set the variable further below, as this may lead to an object leak. We - * may get cancelled before we reach that part of the code, so the only - * solution is to do it here. -- rgerhards, 2008-02-27 - */ - if(iRet == RS_RET_OK) { - pWti->pUsrp = pUsr; - } + /* dequeue element batch (still protected from mutex) */ + iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize); /* awake some flow-controlled sources if we can do this right now */ /* TODO: this could be done better from a performance point of view -- do it only if * we have someone waiting for the condition (or only when we hit the watermark right * on the nail [exact value]) -- rgerhards, 2008-03-14 + * now that we dequeue batches of pointers, this is much less an issue... + * rgerhards, 2009-04-22 */ if(iQueueSize < pThis->iFullDlyMrk) { pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); @@ -1456,37 +1459,16 @@ qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); } - /* rgerhards, 2008-09-30: I reversed the order of cond_signal und mutex_unlock - * as of the pthreads recommendation on predictable scheduling behaviour. I don't see - * any problems caused by this, but I add this comment in case some will be seen - * in the next time. - */ pthread_cond_signal(&pThis->notFull); d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ - /* do actual processing (the lengthy part, runs in parallel) - * If we had a problem while dequeing, we do not call the consumer, - * but we otherwise ignore it. This is in the hopes that it will be - * self-healing. However, this is really not a good thing. - * rgerhards, 2008-01-03 - */ - if(iRet != RS_RET_OK) - FINALIZE; - - /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue. - * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to - * provide real-time creation of spool files. - * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong. - */ - CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); - -finalize_it: if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things " "may happen\n", iRet); } + RETiRet; } @@ -1529,7 +1511,7 @@ finalize_it: * but you get the idea from the code above. */ static rsRetVal -qqueueRateLimiter(qqueue_t *pThis) +RateLimiter(qqueue_t *pThis) { DEFiRet; int iDelay; @@ -1592,19 +1574,20 @@ qqueueRateLimiter(qqueue_t *pThis) * rgerhards, 2008-01-21 */ static rsRetVal -qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); + CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 */ +//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that... if(pThis->iDeqSlowdown) { dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", pThis->iDeqSlowdown); @@ -1616,7 +1599,7 @@ finalize_it: } -/* This is a special consumer to feed the disk-queue in disk-assited mode. +/* This is a special consumer to feed the disk-queue in disk-assisted mode. * When active, our own queue more or less acts as a memory buffer to the disk. * So this consumer just needs to drain the memory queue and submit entries * to the disk queue. The disk queue will then call the actual consumer from @@ -1626,15 +1609,18 @@ finalize_it: * rgerhards, 2008-01-14 */ static rsRetVal -qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); + CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); + /* iterate over returned results and enqueue them in DA queue */ + for(i = 0 ; i < pWti->paUsrp->nElem ; i++) + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i])); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1692,12 +1678,24 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) * the DA queue */ static int -qqueueChkStopWrkrReg(qqueue_t *pThis) +ChkStooWrkrReg(qqueue_t *pThis) { return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0); } +/* return the configured "deq max at once" interval + * rgerhards, 2009-04-22 + */ +static rsRetVal +GetDeqBatchSize(qqueue_t *pThis, int *pVal) +{ + DEFiRet; + assert(pVal != NULL); + *pVal = pThis->iDeqBatchSize; + RETiRet; +} + /* must only be called when the queue mutex is locked, else results * are not stable! DA queue version */ @@ -1712,7 +1710,7 @@ qqueueIsIdleDA(qqueue_t *pThis) * are not stable! Regular queue version */ static int -qqueueIsIdleReg(qqueue_t *pThis) +IsIdleReg(qqueue_t *pThis) { #if 0 /* enable for performance testing */ int ret; @@ -1740,7 +1738,7 @@ qqueueIsIdleReg(qqueue_t *pThis) * I am telling this, because I, too, always get confused by those... */ static rsRetVal -qqueueRegOnWrkrShutdown(qqueue_t *pThis) +RegOnWrkrShutdown(qqueue_t *pThis) { DEFiRet; @@ -1761,7 +1759,7 @@ qqueueRegOnWrkrShutdown(qqueue_t *pThis) * hook to indicate in the parent queue (if we are a child) that we are not done yet. */ static rsRetVal -qqueueRegOnWrkrStartup(qqueue_t *pThis) +RegOnWrkrStartup(qqueue_t *pThis) { DEFiRet; @@ -1815,10 +1813,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, " - "full delay %d, light delay %d starting\n", + "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, - pThis->iFullDlyMrk, pThis->iLightDlyMrk); + pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1829,13 +1827,14 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); - CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -1945,7 +1944,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) * to the regular files. -- rgerhards, 2008-01-29 */ while(pThis->iUngottenObjs > 0) { - CHKiRet(qqueueGetUngottenObj(pThis, &pUsr)); + CHKiRet(GetUngottenObj(pThis, &pUsr)); CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); objDestruct(pUsr); } @@ -2061,11 +2060,8 @@ CODESTARTobjDestruct(qqueue) /* type-specific destructor */ iRet = pThis->qDestruct(pThis); - if(pThis->pszFilePrefix != NULL) - free(pThis->pszFilePrefix); - - if(pThis->pszSpoolDir != NULL) - free(pThis->pszSpoolDir); + free(pThis->pszFilePrefix); + free(pThis->pszSpoolDir); ENDobjDestruct(qqueue) @@ -2079,8 +2075,8 @@ qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; - if(pThis->pszFilePrefix != NULL) - free(pThis->pszFilePrefix); + free(pThis->pszFilePrefix); + pThis->pszFilePrefix = NULL; if(pszPrefix == NULL) /* just unset the prefix! */ ABORT_FINALIZE(RS_RET_OK); @@ -2296,6 +2292,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) DEFpropSetMeth(qqueue, pUsr, void*) DEFpropSetMeth(qqueue, iDeqSlowdown, int) +DEFpropSetMeth(qqueue, iDeqBatchSize, int) DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) diff --git a/runtime/queue.h b/runtime/queue.h index a267862d..8a60254b 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -54,6 +54,7 @@ typedef struct qWrkThrd_s { pthread_mutex_t mut; } qWrkThrd_t; /* type for queue worker threads */ + /* the queue object */ typedef struct queue_s { BEGINobjInstance; @@ -84,6 +85,7 @@ typedef struct queue_s { int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ int toEnq; /* enqueue timeout */ + int iDeqBatchSize; /* max number of elements that shall be dequeued at once */ /* rate limiting settings (will be expanded) */ int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */ /* end rate limiting */ @@ -97,12 +99,11 @@ typedef struct queue_s { * applied to detect user configuration errors (and tell me how should we detect what * the user really wanted...). -- rgerhards, 2008-04-02 */ - /* ane dequeue time window */ - rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ + /* end dequeue time window */ + rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the - * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer - * to message) - * rgerhards, 2008-01-28 + * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 + * is pointer to an array of message message pointers) */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -183,7 +184,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); @@ -201,6 +202,7 @@ PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(qqueue, pUsr, void*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); +PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int); #define qqueueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index fd5a5371..ee941b2b 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -58,8 +58,18 @@ #endif +/* the rsyslog core provides information about present feature to plugins + * asking it. Below are feature-test macros which must be used to query + * features. Note that this must be powers of two, so that multiple queries + * can be combined. -- rgerhards, 2009-04-27 + */ +#define CORE_FEATURE_BATCHING 1 +/*#define CORE_FEATURE_whatever 2 ... and so on ... */ + + /* define some base data types */ typedef unsigned char uchar;/* get rid of the unhandy "unsigned char" */ +typedef struct aUsrp_s aUsrp_t; typedef struct thrdInfo thrdInfo_t; typedef struct obj_s obj_t; typedef struct filed selector_t;/* TODO: this so far resides in syslogd.c, think about modularization */ @@ -267,6 +277,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_ERR_FORK = -2118, /**< error during fork() */ RS_RET_ERR_WRITE_PIPE = -2119, /**< error writing to pipe */ RS_RET_RSCORE_TOO_OLD = -2120, /**< rsyslog core is too old for ... (eg this plugin) */ + RS_RET_DEFER_COMMIT = -2121, /**< output plugin status: not yet committed (an OK state!) */ + RS_RET_PREVIOUS_COMMITTED = -2122, /**< output plugin status: previous record was committed (an OK state!) */ RS_RET_FILENAME_INVALID = -2140, /**< filename invalid, not found, no access, ... */ /* RainerScript error messages (range 1000.. 1999) */ diff --git a/runtime/wti.c b/runtime/wti.c index 544bffa7..346ef7aa 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -201,8 +201,9 @@ CODESTARTobjDestruct(wti) pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); - if(pThis->pszDbgHdr != NULL) - free(pThis->pszDbgHdr); + free(pThis->paUsrp->pUsrp); + free(pThis->paUsrp); + free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -222,15 +223,21 @@ rsRetVal wtiConstructFinalize(wti_t *pThis) { DEFiRet; + int iDeqBatchSize; ISOBJ_TYPE_assert(pThis, wti); dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor */ - pThis->pUsrp = NULL; pThis->tCurrCmd = eWRKTHRD_STOPPED; + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ + CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); + CHKmalloc(pThis->paUsrp = calloc(1, sizeof(aUsrp_t))); + CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqBatchSize, sizeof(void*))); + +finalize_it: RETiRet; } @@ -314,7 +321,8 @@ wtiWorkerCancelCleanup(void *arg) DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); /* call user supplied handler (that one e.g. requeues the element) */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp); +// MULTIQUEUE: need to change here! + pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->paUsrp->pUsrp[0]); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); @@ -366,7 +374,7 @@ wtiWorker(wti_t *pThis) ISOBJ_TYPE_assert(pWtp, wtp); dbgSetThrdName(pThis->pszDbgHdr); - pThis->pUsrp = NULL; + pThis->paUsrp->nElem = 0; /* flag no elements present */ // MULTIQUEUE: do we really need this any longer (cnacel handeler)? pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); diff --git a/runtime/wti.h b/runtime/wti.h index 6b60b833..85c98fe6 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -1,6 +1,6 @@ /* Definition of the worker thread instance (wti) class. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008, 2009 by Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -28,13 +28,25 @@ #include "wtp.h" #include "obj.h" +/* the user pointer array object + * This object is used to dequeue multiple user pointers which are than handed over + * to processing. The size of elements is fixed after queue creation, but may be + * modified by config variables (better said: queue properties). + * rgerhards, 2009-04-22 + */ +struct aUsrp_s { + int nElem; /* actual number of element in this entry */ + obj_t **pUsrp; /* actual elements (array!) */ +}; + + /* the worker thread instance class */ typedef struct wti_s { BEGINobjInstance; int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */ pthread_t thrdID; /* thread ID */ qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ - obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */ + aUsrp_t *paUsrp; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */ pthread_mutex_t mut; diff --git a/runtime/wtp.c b/runtime/wtp.c index 04eb974f..9891a55c 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -78,7 +78,7 @@ wtpGetDbgHdr(wtp_t *pThis) /* Not implemented dummy function for constructor */ -static rsRetVal NotImplementedDummy() { return RS_RET_OK; } +static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; } /* Standard-Constructor for the wtp object */ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */ @@ -88,6 +88,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; + pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; @@ -117,7 +118,7 @@ wtpConstructFinalize(wtp_t *pThis) */ if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { CHKiRet(wtiConstruct(&pThis->pWrkr[i])); pWti = pThis->pWrkr[i]; @@ -151,8 +152,7 @@ CODESTARTobjDestruct(wtp) pthread_mutex_destroy(&pThis->mut); pthread_mutex_destroy(&pThis->mutThrdShutdwn); - if(pThis->pszDbgHdr != NULL) - free(pThis->pszDbgHdr); + free(pThis->pszDbgHdr); ENDobjDestruct(wtp) @@ -584,6 +584,7 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) +DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) diff --git a/runtime/wtp.h b/runtime/wtp.h index b9cb07c5..88bd9197 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -67,10 +67,11 @@ typedef struct wtp_s { int bThrdStateChanged; /* at least one thread state has changed if 1 */ /* end sync variables */ /* user objects */ - void *pUsr; /* pointer to user object */ + void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */ pthread_mutex_t *pmutUsr; pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); + rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */ rsRetVal (*pfRateLimiter)(void *pUsr); rsRetVal (*pfIsIdle)(void *pUsr, int); rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int); @@ -104,6 +105,7 @@ int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex); PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); +PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)); PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)); PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); |