diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 139 |
1 files changed, 135 insertions, 4 deletions
@@ -72,7 +72,6 @@ rsRetVal qDestructFixedArray(queue_t *pThis) if(pThis->tVars.farray.pBuf != NULL) free(pThis->tVars.farray.pBuf); - free (pThis); return iRet; } @@ -109,8 +108,131 @@ rsRetVal qDelFixedArray(queue_t *pThis, void **out) return iRet; } + + +/* -------------------- linked list -------------------- */ +rsRetVal qConstructLinkedList(queue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + pThis->tVars.linklist.pRoot = 0; + pThis->tVars.linklist.pLast = 0; + + return iRet; +} + + +rsRetVal qDestructLinkedList(queue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + /* with the linked list type, there is nothing to do here. The + * reason is that the Destructor is only called after all entries + * have bene taken off the queue. In this case, there is nothing + * dynamic left with the linked list. + */ + + return iRet; +} + +rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) +{ + DEFiRet; + qLinkedList_t *pEntry; + + assert(pThis != NULL); + if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + + pEntry->pNext = NULL; + pEntry->pUsr = pUsr; + + if(pThis->tVars.linklist.pRoot == NULL) { + pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry; + } else { + pThis->tVars.linklist.pLast->pNext = pEntry; + pThis->tVars.linklist.pLast = pEntry; + } + +finalize_it: + return iRet; +} + +rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) +{ + DEFiRet; + qLinkedList_t *pEntry; + + assert(pThis != NULL); + assert(pThis->tVars.linklist.pRoot != NULL); + + pEntry = pThis->tVars.linklist.pRoot; + *ppUsr = pEntry->pUsr; + + if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) { + pThis->tVars.linklist.pRoot = NULL; + pThis->tVars.linklist.pLast = NULL; + } else { + pThis->tVars.linklist.pRoot = pEntry->pNext; + } + free(pEntry); + + return iRet; +} + /* --------------- end type-specific handlers -------------------- */ + +/* generic code to add a queue entry */ +static rsRetVal +queueAdd(queue_t *pThis, void *pUsr) +{ + DEFiRet; + + assert(pThis != NULL); + CHKiRet(pThis->qAdd(pThis, pUsr)); + + ++pThis->iQueueSize; + + if(pThis->iQueueSize >= pThis->iMaxQueueSize) + pThis->full = 1; + pThis->empty = 0; + + dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize); + +finalize_it: + return iRet; +} + + +/* generic code to remove a queue entry */ +static rsRetVal +queueDel(queue_t *pThis, void *pUsr) +{ + DEFiRet; + + assert(pThis != NULL); + CHKiRet(pThis->qDel(pThis, pUsr)); + --pThis->iQueueSize; + + if(pThis->iQueueSize == 0) + pThis->empty = 1; + pThis->full = 0; + + dbgprintf("Queue 0x%lx: entry deleted, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize); + +finalize_it: + return iRet; +} + + + + /* Each queue has one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. @@ -139,7 +261,7 @@ queueWorker(void *arg) } if(!pThis->empty) { /* dequeue element (still protected from mutex) */ - pThis->qDel(pThis, &pUsr); + queueDel(pThis, &pUsr); pthread_mutex_unlock(pThis->mut); pthread_cond_signal (pThis->notFull); /* do actual processing (the lengthy part, runs in parallel) */ @@ -167,13 +289,13 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, assert(ppThis != NULL); assert(pConsumer != NULL); -dbgprintf("queueConstruct in \n"); if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } /* we have an object, so let's fill the properties */ + pThis->iQueueSize = 0; pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; pThis->empty = 1; @@ -194,6 +316,12 @@ dbgprintf("queueConstruct in \n"); pThis->qAdd = qAddFixedArray; pThis->qDel = qDelFixedArray; break; + case QUEUETYPE_LINKEDLIST: + pThis->qConstruct = qConstructLinkedList; + pThis->qDestruct = qDestructLinkedList; + pThis->qAdd = qAddLinkedList; + pThis->qDel = qDelLinkedList; + break; } /* call type-specific constructor */ @@ -243,6 +371,9 @@ rsRetVal queueDestruct(queue_t *pThis) /* type-specific destructor */ iRet = pThis->qDestruct(pThis); + /* and finally delete the queue objet itself */ + free(pThis); + return iRet; } @@ -277,7 +408,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) ABORT_FINALIZE(RS_RET_QUEUE_FULL); } } - CHKiRet(pThis->qAdd(pThis, pUsr)); + CHKiRet(queueAdd(pThis, pUsr)); finalize_it: /* now activate the worker thread */ |