summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c1
-rw-r--r--runtime/msg.h2
-rw-r--r--runtime/queue.c40
-rw-r--r--tools/syslogd.c1
4 files changed, 38 insertions, 6 deletions
diff --git a/action.c b/action.c
index 0253e1b6..424cb00b 100644
--- a/action.c
+++ b/action.c
@@ -905,7 +905,6 @@ processAction(action_t *pAction, batch_t *pBatch)
/* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */
for(i = 0 ; i < pBatch->nElem ; i++) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
- msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
}
iRet = finishBatch(pAction);
diff --git a/runtime/msg.h b/runtime/msg.h
index c8350626..14148441 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -51,7 +51,7 @@
struct msg {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
pthread_mutexattr_t mutAttr;
-short bDoLock; /* use the mutex? */
+ short bDoLock; /* use the mutex? */
pthread_mutex_t mut;
flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because
once data has entered the queue, this property is no longer needed. */
diff --git a/runtime/queue.c b/runtime/queue.c
index c3a8e9d4..6bea338a 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1390,13 +1390,47 @@ finalize_it:
}
+/* Delete a batch of processed user objects from the queue, which includes
+ * destructing the objects themself. The pointer piRemainingQueu
+ * rgerhards, 2009-05-13
+ */
+static inline rsRetVal
+DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
+{
+ int i;
+ void *pUsr;
+ DEFiRet;
+
+ /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pBatch != NULL);
+
+ /* if the queue runs in DA mode, the DA worker already deleted the message. But
+ * in regular mode, we need to do it ourselfs. We differentiate between the two cases,
+ * because it is actually the easiest way to handle the destruct-Problem in a simple
+ * and pUsrp-Type agnostic way (else we would need an objAddRef() generic function).
+ */
+ if(!pThis->bRunsDA) {
+ for(i = 0 ; i < pBatch->nElem ; ++i) {
+ /* TODO: pull msgs off the queue (not yet necessary) */
+ pUsr = pBatch->pElem[i].pUsrp;
+ objDestruct(pUsr);
+ }
+ }
+ pBatch->nElem = 0;
+
+ RETiRet;
+}
+
+
/* dequeue as many user points as are available, until we hit the configured
* upper limit of pointers.
* This must only be called when the queue mutex is LOOKED, otherwise serious
* malfunction will happen.
*/
static inline rsRetVal
-DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize)
+DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSize)
{
int nDequeued;
int iQueueSize;
@@ -1405,6 +1439,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize
DEFiRet;
/* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */
+ DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = 0;
do {
@@ -1427,9 +1462,8 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */
- //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
pWti->batch.nElem = nDequeued;
- *iRemainingQueueSize = iQueueSize;
+ *piRemainingQueueSize = iQueueSize;
finalize_it:
RETiRet;
diff --git a/tools/syslogd.c b/tools/syslogd.c
index cae07811..88a588e9 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -1230,7 +1230,6 @@ dbgprintf("msgConsumer..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
parseMsg(pMsg);
}
processMsg(pMsg);
- msgDestruct(&pMsg);
}
dbgprintf("DONE msgConsumer..MULTIQUEUE:\n");