From 6cc46b15d953e1cd766f3f9f29011e740e51ca6c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 30 Jan 2008 15:37:23 +0000 Subject: - implemented simple output rate limiting - addded $ActionQueueDequeueSlowdown config directive - addded $MainMsgQueueDequeueSlowdown config directive - bugfix: MsgDup() did not work with new base object data structure --- queue.c | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 0fef44e6..d829b9bd 100644 --- a/queue.c +++ b/queue.c @@ -235,6 +235,7 @@ queueStartDA(queue_t *pThis) pThis->pqDA->pqParent = pThis; CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); @@ -1379,6 +1380,15 @@ queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave) CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); + /* we now need to check if we should deliberately delay processing a bit + * and, if so, do that. -- rgerhards, 2008-01-30 + */ + if(pThis->iDeqSlowdown) { + dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", + pThis->iDeqSlowdown); + srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); + } + finalize_it: dbgoprint((obj_t*) pThis, "regular consumer returns %d\n", iRet); RETiRet; @@ -1985,6 +1995,7 @@ DEFpropSetMeth(queue, bIsDA, int); DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); DEFpropSetMeth(queue, bSaveOnShutdown, int); DEFpropSetMeth(queue, pUsr, void*); +DEFpropSetMeth(queue, iDeqSlowdown, int); /* This function can be used as a generic way to set properties. Only the subset -- cgit