From 9b48c4a481c64503605f25e1d0648d24f43437f1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 2 Apr 2008 16:53:29 +0000 Subject: begun working on time-window based dequeueing (and rate limiting in general) --- queue.c | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) (limited to 'queue.c') diff --git a/queue.c b/queue.c index ed720c55..bfdac204 100644 --- a/queue.c +++ b/queue.c @@ -55,6 +55,7 @@ DEFobjStaticHelpers /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex); +static rsRetVal queueRateLimiter(queue_t *pThis); static int queueChkStopWrkrDA(queue_t *pThis); static int queueIsIdleDA(queue_t *pThis); static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); @@ -341,6 +342,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA)); @@ -1450,6 +1452,60 @@ finalize_it: } +/* The rate limiter - we only need one - do we? + * +* Here we may wait if a dequeue time window is defined or if we are + * rate-limited. TODO: If we do so, we should also look into the + * way new worker threads are spawned. Obviously, it doesn't make much + * sense to spawn additional worker threads when none of them can do any + * processing. However, it is deemed acceptable to allow this for an initial + * implementation of the timeframe/rate limiting feature. + * Please also note that these feature could also be implemented at the action + * level. However, that would limit them to be used together with actions. We have + * taken the broader approach, moving it right into the queue. This is even + * necessary if we want to prevent spawning of multiple unnecessary worker + * threads as described above. -- rgerhards, 2008-04-02 + * + * + * time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format). + * We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy + * we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two + * windows: 0-4; 22-23:59 + * so when to run? Let's assume we have 3am + * + * if(tTo < tFrom) { + * if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22]) + * do work + * else + * sleep for tFrom - tCurr "hours" [22 - 5 --> 17] + * } else { + * if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10]) + * do work + * else + * sleep for tTo - tCurr "hours" [4 - 3 --> 1] + * } + * + * Bottom line: we need to check which type of window we have and need to adjust our + * logic accordingly. Of course, sleep calculations need to be done up to the minute, + * but you get the idea from the code above. + */ +static rsRetVal +queueRateLimiter(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + + dbgoprint((obj_t*) pThis, "entering rate limiter\n"); + srSleep(2, 0); + +finalize_it: + dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet); + RETiRet; +} + + + /* This is the queue consumer in the regular (non-DA) case. It is * protected by the queue mutex, but MUST release it as soon as possible. * rgerhards, 2008-01-21 @@ -1690,6 +1746,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg)); -- cgit