summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-03 14:57:23 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-03 14:57:23 +0000
commitf7a15abfba79a9da1f125d48e75b3bb68ebe0e9e (patch)
treedb901e43604c22cc5367c02d1a88acdd82e02193
parentfc761b9fdc5486a0072ee63d8b438f562127d057 (diff)
downloadrsyslog-f7a15abfba79a9da1f125d48e75b3bb68ebe0e9e.tar.gz
rsyslog-f7a15abfba79a9da1f125d48e75b3bb68ebe0e9e.tar.xz
rsyslog-f7a15abfba79a9da1f125d48e75b3bb68ebe0e9e.zip
added capability to use a linked list for queuing to the queue class
-rw-r--r--queue.c139
-rw-r--r--queue.h11
-rw-r--r--syslogd.c3
3 files changed, 148 insertions, 5 deletions
diff --git a/queue.c b/queue.c
index c5aff7c5..ac776d2c 100644
--- a/queue.c
+++ b/queue.c
@@ -72,7 +72,6 @@ rsRetVal qDestructFixedArray(queue_t *pThis)
if(pThis->tVars.farray.pBuf != NULL)
free(pThis->tVars.farray.pBuf);
- free (pThis);
return iRet;
}
@@ -109,8 +108,131 @@ rsRetVal qDelFixedArray(queue_t *pThis, void **out)
return iRet;
}
+
+
+/* -------------------- linked list -------------------- */
+rsRetVal qConstructLinkedList(queue_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ pThis->tVars.linklist.pRoot = 0;
+ pThis->tVars.linklist.pLast = 0;
+
+ return iRet;
+}
+
+
+rsRetVal qDestructLinkedList(queue_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ /* with the linked list type, there is nothing to do here. The
+ * reason is that the Destructor is only called after all entries
+ * have bene taken off the queue. In this case, there is nothing
+ * dynamic left with the linked list.
+ */
+
+ return iRet;
+}
+
+rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
+{
+ DEFiRet;
+ qLinkedList_t *pEntry;
+
+ assert(pThis != NULL);
+ if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
+ pEntry->pNext = NULL;
+ pEntry->pUsr = pUsr;
+
+ if(pThis->tVars.linklist.pRoot == NULL) {
+ pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry;
+ } else {
+ pThis->tVars.linklist.pLast->pNext = pEntry;
+ pThis->tVars.linklist.pLast = pEntry;
+ }
+
+finalize_it:
+ return iRet;
+}
+
+rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
+{
+ DEFiRet;
+ qLinkedList_t *pEntry;
+
+ assert(pThis != NULL);
+ assert(pThis->tVars.linklist.pRoot != NULL);
+
+ pEntry = pThis->tVars.linklist.pRoot;
+ *ppUsr = pEntry->pUsr;
+
+ if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) {
+ pThis->tVars.linklist.pRoot = NULL;
+ pThis->tVars.linklist.pLast = NULL;
+ } else {
+ pThis->tVars.linklist.pRoot = pEntry->pNext;
+ }
+ free(pEntry);
+
+ return iRet;
+}
+
/* --------------- end type-specific handlers -------------------- */
+
+/* generic code to add a queue entry */
+static rsRetVal
+queueAdd(queue_t *pThis, void *pUsr)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ CHKiRet(pThis->qAdd(pThis, pUsr));
+
+ ++pThis->iQueueSize;
+
+ if(pThis->iQueueSize >= pThis->iMaxQueueSize)
+ pThis->full = 1;
+ pThis->empty = 0;
+
+ dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize);
+
+finalize_it:
+ return iRet;
+}
+
+
+/* generic code to remove a queue entry */
+static rsRetVal
+queueDel(queue_t *pThis, void *pUsr)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ CHKiRet(pThis->qDel(pThis, pUsr));
+ --pThis->iQueueSize;
+
+ if(pThis->iQueueSize == 0)
+ pThis->empty = 1;
+ pThis->full = 0;
+
+ dbgprintf("Queue 0x%lx: entry deleted, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize);
+
+finalize_it:
+ return iRet;
+}
+
+
+
+
/* Each queue has one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
@@ -139,7 +261,7 @@ queueWorker(void *arg)
}
if(!pThis->empty) {
/* dequeue element (still protected from mutex) */
- pThis->qDel(pThis, &pUsr);
+ queueDel(pThis, &pUsr);
pthread_mutex_unlock(pThis->mut);
pthread_cond_signal (pThis->notFull);
/* do actual processing (the lengthy part, runs in parallel) */
@@ -167,13 +289,13 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize,
assert(ppThis != NULL);
assert(pConsumer != NULL);
-dbgprintf("queueConstruct in \n");
if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
/* we have an object, so let's fill the properties */
+ pThis->iQueueSize = 0;
pThis->iMaxQueueSize = iMaxQueueSize;
pThis->pConsumer = pConsumer;
pThis->empty = 1;
@@ -194,6 +316,12 @@ dbgprintf("queueConstruct in \n");
pThis->qAdd = qAddFixedArray;
pThis->qDel = qDelFixedArray;
break;
+ case QUEUETYPE_LINKEDLIST:
+ pThis->qConstruct = qConstructLinkedList;
+ pThis->qDestruct = qDestructLinkedList;
+ pThis->qAdd = qAddLinkedList;
+ pThis->qDel = qDelLinkedList;
+ break;
}
/* call type-specific constructor */
@@ -243,6 +371,9 @@ rsRetVal queueDestruct(queue_t *pThis)
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
+ /* and finally delete the queue objet itself */
+ free(pThis);
+
return iRet;
}
@@ -277,7 +408,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
}
- CHKiRet(pThis->qAdd(pThis, pUsr));
+ CHKiRet(queueAdd(pThis, pUsr));
finalize_it:
/* now activate the worker thread */
diff --git a/queue.h b/queue.h
index 0021dd08..730a5c50 100644
--- a/queue.h
+++ b/queue.h
@@ -31,9 +31,16 @@ typedef enum {
QUEUETYPE_LINKEDLIST,/* linked list used as buffer, lower fixed memory overhead but slower */
} queueType_t;
+/* list member definition for linked list types of queues: */
+typedef struct qLinkedList_S {
+ struct qLinkedList_S *pNext;
+ void *pUsr;
+} qLinkedList_t;
+
/* the queue object */
typedef struct queue_s {
queueType_t qType;
+ int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
pthread_t thrdWorker; /* ID of the worker thread associated with this queue */
int bDoRun; /* 1 - run queue, 0 - shutdown of queue requested */
@@ -54,6 +61,10 @@ typedef struct queue_s {
long head, tail;
void** pBuf; /* the queued user data structure */
} farray;
+ struct {
+ qLinkedList_t *pRoot;
+ qLinkedList_t *pLast;
+ } linklist;
} tVars;
} queue_t;
diff --git a/syslogd.c b/syslogd.c
index 898a0bfb..257f1a91 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -3339,7 +3339,8 @@ init(void)
}
/* create message queue */
- CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize, msgConsumer)) {
+ //CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize, msgConsumer)) {
+ CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_LINKEDLIST, iMainMsgQueueSize, msgConsumer)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
exit(1);