summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c13
1 files changed, 11 insertions, 2 deletions
diff --git a/queue.c b/queue.c
index 8eb98d72..19e92e9c 100644
--- a/queue.c
+++ b/queue.c
@@ -235,7 +235,6 @@ rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
assert(pThis != NULL);
dbgprintf("writing to file %d\n", pThis->tVars.disk.fd);
-dbgprintf("objInfo: %lx\n", (unsigned long)pUsr);
CHKiRet((objSerialize(pUsr))(pBuf, &lenBuf, pUsr)); // TODO: hier weiter machen!
i = write(pThis->tVars.disk.fd, "entry\n", 6);
dbgprintf("write wrote %d bytes, errno: %d, err %s\n", i, errno, strerror(errno));
@@ -338,8 +337,17 @@ queueWorker(void *arg)
* rgerhards, 2008-01-03
*/
if(iRet == RS_RET_OK) {
+ rsRetVal iRetLocal;
dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis);
- pThis->pConsumer(pUsr);
+ iRetLocal = pThis->pConsumer(pUsr);
+ if(iRetLocal != RS_RET_OK)
+ dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
+ (unsigned long) pThis, iRetLocal);
+dbgprintf("QUEUE: consumer done\n");
+ iRetLocal = objDestruct(pUsr);
+ if(iRetLocal != RS_RET_OK)
+ dbgprintf("Queue 0x%lx: Destructor returned iRet %d\n",
+ (unsigned long) pThis, iRetLocal);
} else {
dbgprintf("Queue 0x%lx: error %d dequeueing element - ignoring, but strange things "
"may happen\n", (unsigned long) pThis, iRet);
@@ -487,6 +495,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
if(pthread_cond_timedwait (pThis->notFull,
pThis->mut, &t) != 0) {
dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis);
+ objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
}