diff options
-rw-r--r-- | action.c | 1 | ||||
-rw-r--r-- | runtime/msg.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 40 | ||||
-rw-r--r-- | tools/syslogd.c | 1 |
4 files changed, 38 insertions, 6 deletions
@@ -905,7 +905,6 @@ processAction(action_t *pAction, batch_t *pBatch) /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ for(i = 0 ; i < pBatch->nElem ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ } iRet = finishBatch(pAction); diff --git a/runtime/msg.h b/runtime/msg.h index c8350626..14148441 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -51,7 +51,7 @@ struct msg { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ pthread_mutexattr_t mutAttr; -short bDoLock; /* use the mutex? */ + short bDoLock; /* use the mutex? */ pthread_mutex_t mut; flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because once data has entered the queue, this property is no longer needed. */ diff --git a/runtime/queue.c b/runtime/queue.c index c3a8e9d4..6bea338a 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1390,13 +1390,47 @@ finalize_it: } +/* Delete a batch of processed user objects from the queue, which includes + * destructing the objects themself. The pointer piRemainingQueu + * rgerhards, 2009-05-13 + */ +static inline rsRetVal +DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) +{ + int i; + void *pUsr; + DEFiRet; + + /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pBatch != NULL); + + /* if the queue runs in DA mode, the DA worker already deleted the message. But + * in regular mode, we need to do it ourselfs. We differentiate between the two cases, + * because it is actually the easiest way to handle the destruct-Problem in a simple + * and pUsrp-Type agnostic way (else we would need an objAddRef() generic function). + */ + if(!pThis->bRunsDA) { + for(i = 0 ; i < pBatch->nElem ; ++i) { + /* TODO: pull msgs off the queue (not yet necessary) */ + pUsr = pBatch->pElem[i].pUsrp; + objDestruct(pUsr); + } + } + pBatch->nElem = 0; + + RETiRet; +} + + /* dequeue as many user points as are available, until we hit the configured * upper limit of pointers. * This must only be called when the queue mutex is LOOKED, otherwise serious * malfunction will happen. */ static inline rsRetVal -DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize) +DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSize) { int nDequeued; int iQueueSize; @@ -1405,6 +1439,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize DEFiRet; /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ + DeleteProcessedBatch(pThis, &pWti->batch); nDequeued = 0; do { @@ -1427,9 +1462,8 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ - //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ pWti->batch.nElem = nDequeued; - *iRemainingQueueSize = iQueueSize; + *piRemainingQueueSize = iQueueSize; finalize_it: RETiRet; diff --git a/tools/syslogd.c b/tools/syslogd.c index cae07811..88a588e9 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -1230,7 +1230,6 @@ dbgprintf("msgConsumer..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); parseMsg(pMsg); } processMsg(pMsg); - msgDestruct(&pMsg); } dbgprintf("DONE msgConsumer..MULTIQUEUE:\n"); |