diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-03 14:57:23 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-03 14:57:23 +0000 |
commit | f7a15abfba79a9da1f125d48e75b3bb68ebe0e9e (patch) | |
tree | db901e43604c22cc5367c02d1a88acdd82e02193 | |
parent | fc761b9fdc5486a0072ee63d8b438f562127d057 (diff) | |
download | rsyslog-f7a15abfba79a9da1f125d48e75b3bb68ebe0e9e.tar.gz rsyslog-f7a15abfba79a9da1f125d48e75b3bb68ebe0e9e.tar.xz rsyslog-f7a15abfba79a9da1f125d48e75b3bb68ebe0e9e.zip |
added capability to use a linked list for queuing to the queue class
-rw-r--r-- | queue.c | 139 | ||||
-rw-r--r-- | queue.h | 11 | ||||
-rw-r--r-- | syslogd.c | 3 |
3 files changed, 148 insertions, 5 deletions
@@ -72,7 +72,6 @@ rsRetVal qDestructFixedArray(queue_t *pThis) if(pThis->tVars.farray.pBuf != NULL) free(pThis->tVars.farray.pBuf); - free (pThis); return iRet; } @@ -109,8 +108,131 @@ rsRetVal qDelFixedArray(queue_t *pThis, void **out) return iRet; } + + +/* -------------------- linked list -------------------- */ +rsRetVal qConstructLinkedList(queue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + pThis->tVars.linklist.pRoot = 0; + pThis->tVars.linklist.pLast = 0; + + return iRet; +} + + +rsRetVal qDestructLinkedList(queue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + /* with the linked list type, there is nothing to do here. The + * reason is that the Destructor is only called after all entries + * have bene taken off the queue. In this case, there is nothing + * dynamic left with the linked list. + */ + + return iRet; +} + +rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) +{ + DEFiRet; + qLinkedList_t *pEntry; + + assert(pThis != NULL); + if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + + pEntry->pNext = NULL; + pEntry->pUsr = pUsr; + + if(pThis->tVars.linklist.pRoot == NULL) { + pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry; + } else { + pThis->tVars.linklist.pLast->pNext = pEntry; + pThis->tVars.linklist.pLast = pEntry; + } + +finalize_it: + return iRet; +} + +rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) +{ + DEFiRet; + qLinkedList_t *pEntry; + + assert(pThis != NULL); + assert(pThis->tVars.linklist.pRoot != NULL); + + pEntry = pThis->tVars.linklist.pRoot; + *ppUsr = pEntry->pUsr; + + if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) { + pThis->tVars.linklist.pRoot = NULL; + pThis->tVars.linklist.pLast = NULL; + } else { + pThis->tVars.linklist.pRoot = pEntry->pNext; + } + free(pEntry); + + return iRet; +} + /* --------------- end type-specific handlers -------------------- */ + +/* generic code to add a queue entry */ +static rsRetVal +queueAdd(queue_t *pThis, void *pUsr) +{ + DEFiRet; + + assert(pThis != NULL); + CHKiRet(pThis->qAdd(pThis, pUsr)); + + ++pThis->iQueueSize; + + if(pThis->iQueueSize >= pThis->iMaxQueueSize) + pThis->full = 1; + pThis->empty = 0; + + dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize); + +finalize_it: + return iRet; +} + + +/* generic code to remove a queue entry */ +static rsRetVal +queueDel(queue_t *pThis, void *pUsr) +{ + DEFiRet; + + assert(pThis != NULL); + CHKiRet(pThis->qDel(pThis, pUsr)); + --pThis->iQueueSize; + + if(pThis->iQueueSize == 0) + pThis->empty = 1; + pThis->full = 0; + + dbgprintf("Queue 0x%lx: entry deleted, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize); + +finalize_it: + return iRet; +} + + + + /* Each queue has one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. @@ -139,7 +261,7 @@ queueWorker(void *arg) } if(!pThis->empty) { /* dequeue element (still protected from mutex) */ - pThis->qDel(pThis, &pUsr); + queueDel(pThis, &pUsr); pthread_mutex_unlock(pThis->mut); pthread_cond_signal (pThis->notFull); /* do actual processing (the lengthy part, runs in parallel) */ @@ -167,13 +289,13 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, assert(ppThis != NULL); assert(pConsumer != NULL); -dbgprintf("queueConstruct in \n"); if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } /* we have an object, so let's fill the properties */ + pThis->iQueueSize = 0; pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; pThis->empty = 1; @@ -194,6 +316,12 @@ dbgprintf("queueConstruct in \n"); pThis->qAdd = qAddFixedArray; pThis->qDel = qDelFixedArray; break; + case QUEUETYPE_LINKEDLIST: + pThis->qConstruct = qConstructLinkedList; + pThis->qDestruct = qDestructLinkedList; + pThis->qAdd = qAddLinkedList; + pThis->qDel = qDelLinkedList; + break; } /* call type-specific constructor */ @@ -243,6 +371,9 @@ rsRetVal queueDestruct(queue_t *pThis) /* type-specific destructor */ iRet = pThis->qDestruct(pThis); + /* and finally delete the queue objet itself */ + free(pThis); + return iRet; } @@ -277,7 +408,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) ABORT_FINALIZE(RS_RET_QUEUE_FULL); } } - CHKiRet(pThis->qAdd(pThis, pUsr)); + CHKiRet(queueAdd(pThis, pUsr)); finalize_it: /* now activate the worker thread */ @@ -31,9 +31,16 @@ typedef enum { QUEUETYPE_LINKEDLIST,/* linked list used as buffer, lower fixed memory overhead but slower */ } queueType_t; +/* list member definition for linked list types of queues: */ +typedef struct qLinkedList_S { + struct qLinkedList_S *pNext; + void *pUsr; +} qLinkedList_t; + /* the queue object */ typedef struct queue_s { queueType_t qType; + int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ pthread_t thrdWorker; /* ID of the worker thread associated with this queue */ int bDoRun; /* 1 - run queue, 0 - shutdown of queue requested */ @@ -54,6 +61,10 @@ typedef struct queue_s { long head, tail; void** pBuf; /* the queued user data structure */ } farray; + struct { + qLinkedList_t *pRoot; + qLinkedList_t *pLast; + } linklist; } tVars; } queue_t; @@ -3339,7 +3339,8 @@ init(void) } /* create message queue */ - CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize, msgConsumer)) { + //CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize, msgConsumer)) { + CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_LINKEDLIST, iMainMsgQueueSize, msgConsumer)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); exit(1); |