summaryrefslogtreecommitdiffstats
path: root/runtime/queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.h')
-rw-r--r--runtime/queue.h48
1 files changed, 31 insertions, 17 deletions
diff --git a/runtime/queue.h b/runtime/queue.h
index 1d82d8d9..e873c456 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -27,8 +27,18 @@
#include <pthread.h>
#include "obj.h"
#include "wtp.h"
+#include "batch.h"
#include "stream.h"
+/* support for the toDelete list */
+typedef struct toDeleteLst_s toDeleteLst_t;
+struct toDeleteLst_s {
+ qDeqID deqID;
+ int nElemDeq; /* numbe of elements that were dequeued and as such must now be discarded */
+ struct toDeleteLst_s *pNext;
+};
+
+
/* queue types */
typedef enum {
QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */
@@ -54,10 +64,12 @@ typedef struct qWrkThrd_s {
pthread_mutex_t mut;
} qWrkThrd_t; /* type for queue worker threads */
+
/* the queue object */
typedef struct queue_s {
BEGINobjInstance;
queueType_t qType;
+ int nLogDeq; /* number of elements currently logically dequeued */
bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
@@ -83,7 +95,9 @@ typedef struct queue_s {
int toQShutdown; /* timeout for regular queue shutdown in ms */
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
+ toDeleteLst_t *toDeleteLst;/* this queue's to-delete list */
int toEnq; /* enqueue timeout */
+ int iDeqBatchSize; /* max number of elements that shall be dequeued at once */
/* rate limiting settings (will be expanded) */
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
/* end rate limiting */
@@ -97,18 +111,19 @@ typedef struct queue_s {
* applied to detect user configuration errors (and tell me how should we detect what
* the user really wanted...). -- rgerhards, 2008-04-02
*/
- /* ane dequeue time window */
- rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
+ /* end dequeue time window */
+ rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
- * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
- * to message)
- * rgerhards, 2008-01-28
+ * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
+ * is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
rsRetVal (*qDestruct)(struct queue_s *pThis);
rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
- rsRetVal (*qDel)(struct queue_s *pThis, void **ppUsr);
+ rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
+ rsRetVal (*qDel)(struct queue_s *pThis);
+ rsRetVal (*qUnDeqAll)(struct queue_s *pThis);
/* end type-specific handler */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
@@ -132,32 +147,30 @@ typedef struct queue_s {
int iNumberFiles; /* how many files make up the queue? */
int64 iMaxFileSize; /* max size for a single queue file */
int64 sizeOnDiskMax; /* maximum size on disk allowed */
+ qDeqID deqIDAdd; /* next dequeue ID to use during add to queue store */
+ qDeqID deqIDDel; /* queue store delete position */
int bIsDA; /* is this queue disk assisted? */
int bRunsDA; /* is this queue actually *running* disk assisted? */
struct queue_s *pqDA; /* queue for disk-assisted modes */
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
- /* some data elements for the queueUngetObj() functionality. This list should always be short
- * and is always kept in memory
- */
- qLinkedList_t *pUngetRoot;
- qLinkedList_t *pUngetLast;
- int iUngottenObjs; /* number of objects currently in the "ungotten" list */
/* now follow queueing mode specific data elements */
union { /* different data elements based on queue type (qType) */
struct {
- long head, tail;
+ long deqhead, head, tail;
void** pBuf; /* the queued user data structure */
} farray;
struct {
- qLinkedList_t *pRoot;
+ qLinkedList_t *pDeqRoot;
+ qLinkedList_t *pDelRoot;
qLinkedList_t *pLast;
} linklist;
struct {
int64 sizeOnDisk; /* current amount of disk space used */
int64 bytesRead; /* number of bytes read from current (undeleted!) file */
- strm_t *pWrite; /* current file to be written */
- strm_t *pRead; /* current file to be read */
+ strm_t *pWrite; /* current file to be written */
+ strm_t *pReadDeq; /* current file for dequeueing */
+ strm_t *pReadDel; /* current file for deleting */
} disk;
} tVars;
} qqueue_t;
@@ -184,7 +197,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
@@ -203,6 +216,7 @@ PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
+PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int);
#define qqueueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */