summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-03 11:14:03 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-03 11:14:03 +0000
commit9e67ae041d964748755e5c9c45ebe55ff612391e (patch)
tree451c4ba0a65acd70d38147e8148e0633c8b7485d
parent48cb0a980f657fe1d6484a1322db26c753835f03 (diff)
downloadrsyslog-9e67ae041d964748755e5c9c45ebe55ff612391e.tar.gz
rsyslog-9e67ae041d964748755e5c9c45ebe55ff612391e.tar.xz
rsyslog-9e67ae041d964748755e5c9c45ebe55ff612391e.zip
implemented queue type "drivers"
-rw-r--r--queue.c117
-rw-r--r--queue.h12
2 files changed, 99 insertions, 30 deletions
diff --git a/queue.c b/queue.c
index 00292a42..2600871a 100644
--- a/queue.c
+++ b/queue.c
@@ -37,6 +37,79 @@
/* methods */
+/* first, we define type-specific handlers. The provide a generic functionality,
+ * but for this specific type of queue. The mapping to these handlers happens during
+ * queue construction. Later on, handlers are called by pointers present in the
+ * queue instance object.
+ */
+
+/* -------------------- fixed array -------------------- */
+rsRetVal qConstructFixedArray(queue_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ if((pThis->tVars.farray.pBuf = malloc(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
+ pThis->tVars.farray.head = 0;
+ pThis->tVars.farray.tail = 0;
+
+finalize_it:
+ return iRet;
+}
+
+
+rsRetVal qDestructFixedArray(queue_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ if(pThis->tVars.farray.pBuf != NULL)
+ free(pThis->tVars.farray.pBuf);
+ free (pThis);
+
+ return iRet;
+}
+
+rsRetVal qAddFixedArray(queue_t *pThis, void* in)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ pThis->tVars.farray.pBuf[pThis->tVars.farray.tail] = in;
+ pThis->tVars.farray.tail++;
+ if (pThis->tVars.farray.tail == pThis->iMaxQueueSize)
+ pThis->tVars.farray.tail = 0;
+ if (pThis->tVars.farray.tail == pThis->tVars.farray.head)
+ pThis->full = 1;
+ pThis->empty = 0;
+
+ return iRet;
+}
+
+rsRetVal qDelFixedArray(queue_t *pThis, void **out)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.head];
+
+ pThis->tVars.farray.head++;
+ if (pThis->tVars.farray.head == pThis->iMaxQueueSize)
+ pThis->tVars.farray.head = 0;
+ if (pThis->tVars.farray.head == pThis->tVars.farray.tail)
+ pThis->empty = 1;
+ pThis->full = 0;
+
+ return iRet;
+}
+/* --------------- end type-specific handlers -------------------- */
+
+
/* Constructor for the queue object */
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize)
{
@@ -62,13 +135,18 @@ dbgprintf("queueConstruct in \n");
pthread_cond_init (pThis->notEmpty, NULL);
pThis->qType = qType;
- /* type-specific initialization */
- if((pThis->tVars.farray.pBuf = malloc(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ /* set type-specific handlers */
+ switch(qType) {
+ case QUEUETYPE_FIXED_ARRAY:
+ pThis->qConstruct = qConstructFixedArray;
+ pThis->qDestruct = qDestructFixedArray;
+ pThis->qAdd = qAddFixedArray;
+ pThis->qDel = qDelFixedArray;
+ break;
}
- pThis->tVars.farray.head = 0;
- pThis->tVars.farray.tail = 0;
+ /* call type-specific constructor */
+ CHKiRet(pThis->qConstruct(pThis));
finalize_it:
if(iRet == RS_RET_OK) {
@@ -87,7 +165,6 @@ rsRetVal queueDestruct(queue_t *pThis)
{
DEFiRet;
-dbgprintf("queueDestruct\n");
assert(pThis != NULL);
pthread_mutex_destroy (pThis->mut);
free (pThis->mut);
@@ -96,8 +173,7 @@ dbgprintf("queueDestruct\n");
pthread_cond_destroy (pThis->notEmpty);
free (pThis->notEmpty);
/* type-specific destructor */
- free(pThis->tVars.farray.pBuf);
- free (pThis);
+ iRet = pThis->qDestruct(pThis);
return iRet;
}
@@ -122,15 +198,9 @@ rsRetVal queueAdd(queue_t *pThis, void* in)
{
DEFiRet;
-dbgprintf("queueAdd\n");
- pThis->tVars.farray.pBuf[pThis->tVars.farray.tail] = in;
- pThis->tVars.farray.tail++;
- if (pThis->tVars.farray.tail == pThis->iMaxQueueSize)
- pThis->tVars.farray.tail = 0;
- if (pThis->tVars.farray.tail == pThis->tVars.farray.head)
- pThis->full = 1;
- pThis->empty = 0;
-
+ assert(pThis != NULL);
+ CHKiRet(pThis->qAdd(pThis, in));
+finalize_it:
return iRet;
}
@@ -138,16 +208,9 @@ rsRetVal queueDel(queue_t *pThis, void **out)
{
DEFiRet;
-dbgprintf("queueDel\n");
- *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.head];
-
- pThis->tVars.farray.head++;
- if (pThis->tVars.farray.head == pThis->iMaxQueueSize)
- pThis->tVars.farray.head = 0;
- if (pThis->tVars.farray.head == pThis->tVars.farray.tail)
- pThis->empty = 1;
- pThis->full = 0;
-
+ assert(pThis != NULL);
+ CHKiRet(pThis->qDel(pThis, out));
+finalize_it:
return iRet;
}
diff --git a/queue.h b/queue.h
index 1a1722a0..86499306 100644
--- a/queue.h
+++ b/queue.h
@@ -26,14 +26,20 @@
/* queue types */
typedef enum {
- QUEUETYPE_FIXED_ARRAY, /* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */
- QUEUETYPE_LINKEDLIST, /* linked list used as buffer, lower fixed memory overhead but slower */
+ QUEUETYPE_FIXED_ARRAY,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */
+ QUEUETYPE_LINKEDLIST,/* linked list used as buffer, lower fixed memory overhead but slower */
} queueType_t;
/* the queue object */
-typedef struct {
+typedef struct queue_s {
queueType_t qType;
int iMaxQueueSize; /* how large can the queue grow? */
+ /* type-specific handlers (set during construction) */
+ rsRetVal (*qConstruct)(struct queue_s *pThis);
+ rsRetVal (*qDestruct)(struct queue_s *pThis);
+ rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
+ rsRetVal (*qDel)(struct queue_s *pThis, void **ppUsr);
+ /* end type-specific handler */
/* synchronization variables */
pthread_mutex_t *mut;
pthread_cond_t *notFull, *notEmpty;