From 395660f462c62029f76b99f73bd9a424a8cf73a2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 9 Jun 2010 14:34:35 +0200 Subject: somewhat improved direct mode queue performance ... but only for batch enqueues. This will not help much with the current code, but will play well with upcoming changes. --- runtime/queue.c | 55 ++++++++++++++++++++++++++++++++++++++----------------- runtime/queue.h | 4 +++- tools/syslogd.c | 2 +- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index b6c30278..d437d590 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -79,6 +79,8 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); +static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -1203,6 +1205,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1210,6 +1213,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1217,6 +1221,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1225,6 +1230,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; break; } @@ -1709,7 +1715,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ -dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ @@ -2149,7 +2154,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); -//dbgCallStackPrintAll(); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2209,6 +2213,7 @@ finalize_it: RETiRet; } +/* ------------------------------ multi-enqueue functions ------------------------------ */ /* enqueue multiple user data elements at once. The aim is to provide a faster interface * for object submission. Uses the multi_submit_t helper object. * Please note that this function is not cancel-safe and consequently @@ -2216,9 +2221,12 @@ finalize_it: * during its execution. If that is not done, race conditions occur if the * thread is canceled (most important use case is input module termination). * rgerhards, 2009-06-16 + * Note: there now exists multiple different functions implementing specially + * optimized algorithms for different config cases. -- rgerhards, 2010-06-09 */ -rsRetVal -qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) +/* now the function for all modes but direct */ +static rsRetVal +qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int iCancelStateSave; int i; @@ -2227,29 +2235,42 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) ISOBJ_TYPE_assert(pThis, qqueue); assert(pMultiSub != NULL); - if(pThis->qType != QUEUETYPE_DIRECT) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); - } - + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); for(i = 0 ; i < pMultiSub->nElem ; ++i) { CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); } - qqueueChkPersist(pThis, pMultiSub->nElem); finalize_it: - if(pThis->qType != QUEUETYPE_DIRECT) { - /* make sure at least one worker is running. */ - qqueueAdviseMaxWorkers(pThis); - /* and release the mutex */ - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + /* make sure at least one worker is running. */ + qqueueAdviseMaxWorkers(pThis); + /* and release the mutex */ + d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + + RETiRet; +} + +/* now, the same function, but for direct mode */ +static rsRetVal +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +{ + int i; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pMultiSub != NULL); + + for(i = 0 ; i < pMultiSub->nElem ; ++i) { + CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); } +finalize_it: RETiRet; } +/* ------------------------------ END multi-enqueue functions ------------------------------ */ /* enqueue a new user data element diff --git a/runtime/queue.h b/runtime/queue.h index 8ede6922..33b21c9a 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -114,6 +114,9 @@ struct queue_s { rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); rsRetVal (*qDel)(struct queue_s *pThis); /* end type-specific handler */ + /* public entry points (set during construction, permit to set best algorithm for params selected) */ + rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub); + /* end public entry points */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ @@ -174,7 +177,6 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub); rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); diff --git a/tools/syslogd.c b/tools/syslogd.c index dfbd184b..94e8346d 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -735,7 +735,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub) pRuleset = MsgGetRuleset(pMultiSub->ppMsgs[0]); pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset); - iRet = qqueueMultiEnqObj(pQueue, pMultiSub); + iRet = pQueue->MultiEnq(pQueue, pMultiSub); pMultiSub->nElem = 0; finalize_it: -- cgit