path: root/queue.h
diff options
authorRainer Gerhards <>2008-01-16 16:40:11 +0000
committerRainer Gerhards <>2008-01-16 16:40:11 +0000
commit75a8f92d5001f555606b2ddb5de30acf689e2422 (patch)
tree7db2ad786fa5589cb279c0e465b029434d0a0946 /queue.h
parent19c9b187ab29f9304adb82d9c6005c69c92b3c17 (diff)
implemented dynamic startup and shutdown of worker threads based on current
Diffstat (limited to 'queue.h')
1 files changed, 13 insertions, 8 deletions
diff --git a/queue.h b/queue.h
index 7b0970c8..03a3517b 100644
--- a/queue.h
+++ b/queue.h
@@ -60,14 +60,16 @@ typedef struct qLinkedList_S {
void *pUsr;
} qLinkedList_t;
+/* commands and states for worker threads. */
typedef enum {
- eWRKTHRDCMD_NEVER_RAN = 0, /* granted, that's more a state than a cmd - thread is dead... */
- eWRKTHRDCMD_TERMINATED = 1, /* granted, that's more a state than a cmd - thread is dead... */
- /* ALL active states MUST be numerically higher than eWRKTHRDCMD_TERMINATED and NONE must be lower! */
-} qWrkCmd_t; /* commands for queue worker threads */
+ eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */
+ eWRKTHRD_TERMINATED = 1,/* worker thread has shut down, but some finalzing is still needed */
+ /* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */
+ eWRKTHRD_RUN_INIT = 2, /* worker thread is initializing, but not yet fully running */
+ eWRKTHRD_RUNNING = 3, /* worker thread is up and running and shall continue to do so */
+ eWRKTHRD_SHUTDOWN = 4, /* worker thread is running but shall terminate when queue is empty */
+ eWRKTHRD_SHUTDOWN_IMMEDIATE = 5/* worker thread is running but shall terminate even if queue is full */
+} qWrkCmd_t;
typedef struct qWrkThrd_s {
pthread_t thrdID; /* thread ID */
@@ -82,17 +84,18 @@ typedef struct queue_s {
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
int iCurNumWrkThrd;/* current number of active worker threads */
+ int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */
qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
- //int bWasBelowHighWtr;/* when running in DA mode: queue was below high wtr mrk at least once */
int iLowWtrMrk; /* low water mark for disk-assisted memory queues */
int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */
int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */
int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
int toQShutdown; /* timeout for regular queue shutdown in ms */
int toActShutdown; /* timeout for long-running action shutdown in ms */
+ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
int toEnq; /* enqueue timeout */
rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */
/* type-specific handlers (set during construction) */
@@ -161,11 +164,13 @@ PROTOTYPEObjClassInit(queue);
PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(queue, toQShutdown, long);
PROTOTYPEpropSetMeth(queue, toActShutdown, long);
+PROTOTYPEpropSetMeth(queue, toWrkShutdown, long);
PROTOTYPEpropSetMeth(queue, toEnq, long);
PROTOTYPEpropSetMeth(queue, iHighWtrMrk, int);
PROTOTYPEpropSetMeth(queue, iLowWtrMrk, int);
PROTOTYPEpropSetMeth(queue, iDiscardMrk, int);
PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int);
+PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int);
#define queueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */