From 5c0aeae8ab1f344a022d586dc26c5d78203f7e0b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 12:50:07 +0200 Subject: added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize configuration directives --- runtime/queue.c | 22 ++++++++++------------ runtime/queue.h | 3 ++- runtime/wti.c | 6 +++--- runtime/wtp.c | 4 ++-- runtime/wtp.h | 4 ++-- 5 files changed, 19 insertions(+), 20 deletions(-) (limited to 'runtime') diff --git a/runtime/queue.c b/runtime/queue.c index c5f9df81..f3d3fe71 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -69,7 +69,7 @@ rsRetVal qqueueChkPersist(qqueue_t *pThis); static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); -static rsRetVal GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal); +static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); @@ -375,7 +375,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); // MULTIQUEUE: TODO: this should be DA-specific! - CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup)); @@ -1280,7 +1280,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ - pThis->iDeqMaxAtOnce = 8; /* conservative default, should still provide good performance */ + pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */ pThis->pszFilePrefix = NULL; pThis->qType = qType; @@ -1421,7 +1421,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); /* all well, use this element */ pWti->paUsrp->pUsrp[nDequeued++] = pUsr; - } while(iQueueSize > 0 && nDequeued < pThis->iDeqMaxAtOnce); + } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize); //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ pWti->paUsrp->nElem = nDequeued; @@ -1691,14 +1691,11 @@ ChkStooWrkrReg(qqueue_t *pThis) * rgerhards, 2009-04-22 */ static rsRetVal -GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal) +GetDeqBatchSize(qqueue_t *pThis, int *pVal) { DEFiRet; assert(pVal != NULL); -RUNLOG_VAR("%d", pThis->iDeqMaxAtOnce); // MULTIQUEUE: delete this when done - - *pVal = pThis->iDeqMaxAtOnce; - + *pVal = pThis->iDeqBatchSize; RETiRet; } @@ -1819,10 +1816,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, " - "full delay %d, light delay %d starting\n", + "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, - pThis->iFullDlyMrk, pThis->iLightDlyMrk); + pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1835,7 +1832,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg)); - CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup)); @@ -2301,6 +2298,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) DEFpropSetMeth(qqueue, pUsr, void*) DEFpropSetMeth(qqueue, iDeqSlowdown, int) +DEFpropSetMeth(qqueue, iDeqBatchSize, int) DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) diff --git a/runtime/queue.h b/runtime/queue.h index 4fb57d07..8a60254b 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -85,7 +85,7 @@ typedef struct queue_s { int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ int toEnq; /* enqueue timeout */ - int iDeqMaxAtOnce; /* max number of elements that shall be dequeued at once */ + int iDeqBatchSize; /* max number of elements that shall be dequeued at once */ /* rate limiting settings (will be expanded) */ int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */ /* end rate limiting */ @@ -202,6 +202,7 @@ PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(qqueue, pUsr, void*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); +PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int); #define qqueueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ diff --git a/runtime/wti.c b/runtime/wti.c index f50b3894..df1ea0ed 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -226,7 +226,7 @@ rsRetVal wtiConstructFinalize(wti_t *pThis) { DEFiRet; - int iDeqMaxAtOnce; + int iDeqBatchSize; ISOBJ_TYPE_assert(pThis, wti); @@ -236,9 +236,9 @@ wtiConstructFinalize(wti_t *pThis) pThis->tCurrCmd = eWRKTHRD_STOPPED; /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ - CHKiRet(pThis->pWtp->pfGetDeqMaxAtOnce(pThis->pWtp->pUsr, &iDeqMaxAtOnce)); + CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); CHKmalloc(pThis->paUsrp = calloc(1, sizeof(aUsrp_t))); - CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqMaxAtOnce, sizeof(void*))); + CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqBatchSize, sizeof(void*))); finalize_it: RETiRet; diff --git a/runtime/wtp.c b/runtime/wtp.c index e1966099..8bb55cf7 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -88,7 +88,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; - pThis->pfGetDeqMaxAtOnce = NotImplementedDummy; + pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; @@ -585,7 +585,7 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfGetDeqMaxAtOnce, rsRetVal(*pVal)(void*, int*)) +DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) diff --git a/runtime/wtp.h b/runtime/wtp.h index 5894000a..88bd9197 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -71,7 +71,7 @@ typedef struct wtp_s { pthread_mutex_t *pmutUsr; pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); - rsRetVal (*pfGetDeqMaxAtOnce)(void *pUsr, int*); /* obtains max dequeue count from queue config */ + rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */ rsRetVal (*pfRateLimiter)(void *pUsr); rsRetVal (*pfIsIdle)(void *pUsr, int); rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int); @@ -105,7 +105,7 @@ int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex); PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); -PROTOTYPEpropSetMethFP(wtp, pfGetDeqMaxAtOnce, rsRetVal(*pVal)(void*, int*)); +PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)); PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)); PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); -- cgit