summaryrefslogtreecommitdiffstats
path: root/runtime/queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.h')
-rw-r--r--runtime/queue.h21
1 files changed, 7 insertions, 14 deletions
diff --git a/runtime/queue.h b/runtime/queue.h
index 73c62b52..93573dae 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -55,14 +55,14 @@ typedef struct qLinkedList_S {
/* the queue object */
-typedef struct queue_s {
+struct queue_s {
BEGINobjInstance;
queueType_t qType;
int nLogDeq; /* number of elements currently logically dequeued */
+ int bShutdownImmediate; /* should all workers cease processing messages? */
bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
- bool bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
@@ -101,10 +101,11 @@ typedef struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
- * is pointer to an array of message message pointers)
+ * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero
+ * during normal operations and one if the consumer must urgently shut down.
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -112,7 +113,6 @@ typedef struct queue_s {
rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
rsRetVal (*qDel)(struct queue_s *pThis);
- rsRetVal (*qUnDeqAll)(struct queue_s *pThis);
/* end type-specific handler */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
@@ -138,7 +138,6 @@ typedef struct queue_s {
qDeqID deqIDAdd; /* next dequeue ID to use during add to queue store */
qDeqID deqIDDel; /* queue store delete position */
int bIsDA; /* is this queue disk assisted? */
- int bRunsDA; /* is this queue actually *running* disk assisted? */
struct queue_s *pqDA; /* queue for disk-assisted modes */
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
@@ -161,14 +160,8 @@ typedef struct queue_s {
strm_t *pReadDel; /* current file for deleting */
} disk;
} tVars;
-} qqueue_t;
-
-/* some symbolic constants for easier reference */
-#define QUEUE_MODE_ENQDEQ 0
-#define QUEUE_MODE_ENQONLY 1
+};
-#define QUEUE_IDX_DA_WORKER 0 /* index for the DA worker (fixed) */
-#define QUEUE_PTR_DA_WORKER(x) (&((pThis)->pWrkThrds[0]))
/* the define below is an "eternal" timeout for the timeout settings which require a value.
* It is one day, which is not really eternal, but comes close to it if we think about
@@ -185,7 +178,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);