summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-19 18:47:26 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-19 18:47:26 +0200
commita9c4b26d462dd3c9dbd0575a3a1acc6d8df1c3b3 (patch)
tree9efcb4cdef4b55d4385aeaa0c8719c074bb6e487
parenta4dad2009992d436ba23c2d0a4a43b483aac40fc (diff)
downloadrsyslog-a9c4b26d462dd3c9dbd0575a3a1acc6d8df1c3b3.tar.gz
rsyslog-a9c4b26d462dd3c9dbd0575a3a1acc6d8df1c3b3.tar.xz
rsyslog-a9c4b26d462dd3c9dbd0575a3a1acc6d8df1c3b3.zip
some cleanup
-rw-r--r--runtime/queue.c86
-rw-r--r--runtime/wti.c1
-rw-r--r--tools/syslogd.c3
3 files changed, 1 insertions, 89 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 9855dac8..0f87b235 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -72,7 +72,6 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -206,7 +205,6 @@ static inline void queueDrain(qqueue_t *pThis)
ASSERT(pThis != NULL);
-// TODO: ULTRA it may be a good idea to check validitity once again
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(pThis->iQueueSize-- > 0) {
pThis->qDeq(pThis, &pUsr);
@@ -473,7 +471,6 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
@@ -613,7 +610,6 @@ static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
ASSERT(pThis != NULL);
*out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead];
-//MULTIdbgprintf("ULTRA qDeqFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail);
pThis->tVars.farray.deqhead++;
if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize)
pThis->tVars.farray.deqhead = 0;
@@ -627,7 +623,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis)
ASSERT(pThis != NULL);
-//MULTIdbgprintf("ULTRA qDelFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail);
pThis->tVars.farray.head++;
if (pThis->tVars.farray.head == pThis->iMaxQueueSize)
pThis->tVars.farray.head = 0;
@@ -638,58 +633,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis)
/* -------------------- linked list -------------------- */
-/* first some generic functions which are also used for the unget linked list */
-
-static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
-{
- qLinkedList_t *pEntry;
- DEFiRet;
-
- ASSERT(ppRoot != NULL);
- ASSERT(ppLast != NULL);
-
- CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))));
-
- pEntry->pNext = NULL;
- pEntry->pUsr = pUsr;
-
- if(*ppRoot == NULL) {
- *ppRoot = *ppLast = pEntry;
- } else {
- (*ppLast)->pNext = pEntry;
- *ppLast = pEntry;
- }
-
-finalize_it:
- RETiRet;
-}
-
-static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
-{
- DEFiRet;
- qLinkedList_t *pEntry;
-
- ASSERT(ppRoot != NULL);
- ASSERT(ppLast != NULL);
- ASSERT(ppUsr != NULL);
- ASSERT(*ppRoot != NULL);
-
- pEntry = *ppRoot;
- *ppUsr = pEntry->pUsr;
-
- if(*ppRoot == *ppLast) {
- *ppRoot = NULL;
- *ppLast = NULL;
- } else {
- *ppRoot = pEntry->pNext;
- }
- free(pEntry);
-
- RETiRet;
-}
-
-/* end generic functions which are also used for the unget linked list */
-
static rsRetVal qConstructLinkedList(qqueue_t *pThis)
{
@@ -1455,33 +1398,6 @@ finalize_it:
}
-/* cancellation cleanup handler for queueWorker ()
- * Updates admin structure and frees ressources.
- * Params:
- * arg1 - user pointer (in this case a qqueue_t)
- * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
- * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists!
- * rgerhards, 2008-01-16
- */
-static rsRetVal
-ConsumerCancelCleanup(void *arg1, void *arg2)
-{
- //TODO: looks like we no longer need it!
- /*
- DEFiRet;
-
- qqueue_t *pThis = (qqueue_t*) arg1;
- obj_t *pUsr = (obj_t*) arg2;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- RETiRet;
- */
- return RS_RET_OK;
-}
-
-
-
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
@@ -1636,7 +1552,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
rsRetVal localRet;
DEFiRet;
- /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = 0;
@@ -2075,7 +1990,6 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
diff --git a/runtime/wti.c b/runtime/wti.c
index 7029dcfd..942f7cf1 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -319,7 +319,6 @@ wtiWorkerCancelCleanup(void *arg)
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
-// MULTIQUEUE: need to change here!
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 88a588e9..2bd43685 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -1225,13 +1225,12 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch)
for(i = 0 ; i < pBatch->nElem ; i++) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
-dbgprintf("msgConsumer..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem);
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
parseMsg(pMsg);
}
processMsg(pMsg);
}
-dbgprintf("DONE msgConsumer..MULTIQUEUE:\n");
RETiRet;
}