summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-09 14:34:35 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-09 14:34:35 +0200
commit395660f462c62029f76b99f73bd9a424a8cf73a2 (patch)
treedf9a6b81f4ac86867092fcf953c09760b741901c
parent11bd517465360278b270ee7c18607b4d1d97e44e (diff)
downloadrsyslog-395660f462c62029f76b99f73bd9a424a8cf73a2.tar.gz
rsyslog-395660f462c62029f76b99f73bd9a424a8cf73a2.tar.xz
rsyslog-395660f462c62029f76b99f73bd9a424a8cf73a2.zip
somewhat improved direct mode queue performance
... but only for batch enqueues. This will not help much with the current code, but will play well with upcoming changes.
-rw-r--r--runtime/queue.c55
-rw-r--r--runtime/queue.h4
-rw-r--r--tools/syslogd.c2
3 files changed, 42 insertions, 19 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index b6c30278..d437d590 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -79,6 +79,8 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
+static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -1203,6 +1205,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddFixedArray;
pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
@@ -1210,6 +1213,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddLinkedList;
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1217,6 +1221,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
break;
@@ -1225,6 +1230,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qDestruct = qDestructDirect;
pThis->qAdd = qAddDirect;
pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
break;
}
@@ -1709,7 +1715,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
-dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
@@ -2149,7 +2154,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
-//dbgCallStackPrintAll();
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@@ -2209,6 +2213,7 @@ finalize_it:
RETiRet;
}
+/* ------------------------------ multi-enqueue functions ------------------------------ */
/* enqueue multiple user data elements at once. The aim is to provide a faster interface
* for object submission. Uses the multi_submit_t helper object.
* Please note that this function is not cancel-safe and consequently
@@ -2216,9 +2221,12 @@ finalize_it:
* during its execution. If that is not done, race conditions occur if the
* thread is canceled (most important use case is input module termination).
* rgerhards, 2009-06-16
+ * Note: there now exists multiple different functions implementing specially
+ * optimized algorithms for different config cases. -- rgerhards, 2010-06-09
*/
-rsRetVal
-qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
+/* now the function for all modes but direct */
+static rsRetVal
+qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int iCancelStateSave;
int i;
@@ -2227,29 +2235,42 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pMultiSub != NULL);
- if(pThis->qType != QUEUETYPE_DIRECT) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
- }
-
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
}
-
qqueueChkPersist(pThis, pMultiSub->nElem);
finalize_it:
- if(pThis->qType != QUEUETYPE_DIRECT) {
- /* make sure at least one worker is running. */
- qqueueAdviseMaxWorkers(pThis);
- /* and release the mutex */
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+ /* make sure at least one worker is running. */
+ qqueueAdviseMaxWorkers(pThis);
+ /* and release the mutex */
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+
+ RETiRet;
+}
+
+/* now, the same function, but for direct mode */
+static rsRetVal
+qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
+{
+ int i;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pMultiSub != NULL);
+
+ for(i = 0 ; i < pMultiSub->nElem ; ++i) {
+ CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
}
+finalize_it:
RETiRet;
}
+/* ------------------------------ END multi-enqueue functions ------------------------------ */
/* enqueue a new user data element
diff --git a/runtime/queue.h b/runtime/queue.h
index 8ede6922..33b21c9a 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -114,6 +114,9 @@ struct queue_s {
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
rsRetVal (*qDel)(struct queue_s *pThis);
/* end type-specific handler */
+ /* public entry points (set during construction, permit to set best algorithm for params selected) */
+ rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub);
+ /* end public entry points */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
@@ -174,7 +177,6 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
-rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub);
rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
diff --git a/tools/syslogd.c b/tools/syslogd.c
index dfbd184b..94e8346d 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -735,7 +735,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub)
pRuleset = MsgGetRuleset(pMultiSub->ppMsgs[0]);
pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset);
- iRet = qqueueMultiEnqObj(pQueue, pMultiSub);
+ iRet = pQueue->MultiEnq(pQueue, pMultiSub);
pMultiSub->nElem = 0;
finalize_it: