diff options
Diffstat (limited to 'runtime/queue.h')
-rw-r--r-- | runtime/queue.h | 14 |
1 files changed, 8 insertions, 6 deletions
diff --git a/runtime/queue.h b/runtime/queue.h index a267862d..8a60254b 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -54,6 +54,7 @@ typedef struct qWrkThrd_s { pthread_mutex_t mut; } qWrkThrd_t; /* type for queue worker threads */ + /* the queue object */ typedef struct queue_s { BEGINobjInstance; @@ -84,6 +85,7 @@ typedef struct queue_s { int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ int toEnq; /* enqueue timeout */ + int iDeqBatchSize; /* max number of elements that shall be dequeued at once */ /* rate limiting settings (will be expanded) */ int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */ /* end rate limiting */ @@ -97,12 +99,11 @@ typedef struct queue_s { * applied to detect user configuration errors (and tell me how should we detect what * the user really wanted...). -- rgerhards, 2008-04-02 */ - /* ane dequeue time window */ - rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ + /* end dequeue time window */ + rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the - * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer - * to message) - * rgerhards, 2008-01-28 + * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 + * is pointer to an array of message message pointers) */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -183,7 +184,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); @@ -201,6 +202,7 @@ PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(qqueue, pUsr, void*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); +PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int); #define qqueueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ |