diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-10 10:18:59 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-10 10:18:59 +0200 |
commit | d630bc742f2f0b6a29c745bba743ecb8a03033c6 (patch) | |
tree | 4c8a70641fb757808c2fd0cb5a0c257cb2c0eeb6 /runtime | |
parent | 559cb84a79a9848ce1415569158928478991108c (diff) | |
parent | 8fbcea483710faae468ecf0ba706adc7e60ed41d (diff) | |
download | rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.tar.gz rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.tar.xz rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.zip |
Merge branch 'concurrent-output' into tmp
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/atomic.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 55 | ||||
-rw-r--r-- | runtime/queue.h | 4 | ||||
-rw-r--r-- | runtime/rule.c | 3 |
4 files changed, 43 insertions, 21 deletions
diff --git a/runtime/atomic.h b/runtime/atomic.h index e5fafe04..da0852fa 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -50,7 +50,7 @@ # define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0) # define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1) # define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val)) -# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); +# define ATOMIC_CAS(data, oldVal, newVal, phlpmut) __sync_bool_compare_and_swap(data, (oldVal), (newVal)) # define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal)); /* functions below are not needed if we have atomics */ diff --git a/runtime/queue.c b/runtime/queue.c index b6c30278..d437d590 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -79,6 +79,8 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); +static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -1203,6 +1205,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1210,6 +1213,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1217,6 +1221,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1225,6 +1230,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; break; } @@ -1709,7 +1715,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ -dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ @@ -2149,7 +2154,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); -//dbgCallStackPrintAll(); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2209,6 +2213,7 @@ finalize_it: RETiRet; } +/* ------------------------------ multi-enqueue functions ------------------------------ */ /* enqueue multiple user data elements at once. The aim is to provide a faster interface * for object submission. Uses the multi_submit_t helper object. * Please note that this function is not cancel-safe and consequently @@ -2216,9 +2221,12 @@ finalize_it: * during its execution. If that is not done, race conditions occur if the * thread is canceled (most important use case is input module termination). * rgerhards, 2009-06-16 + * Note: there now exists multiple different functions implementing specially + * optimized algorithms for different config cases. -- rgerhards, 2010-06-09 */ -rsRetVal -qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) +/* now the function for all modes but direct */ +static rsRetVal +qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int iCancelStateSave; int i; @@ -2227,29 +2235,42 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) ISOBJ_TYPE_assert(pThis, qqueue); assert(pMultiSub != NULL); - if(pThis->qType != QUEUETYPE_DIRECT) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); - } - + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); for(i = 0 ; i < pMultiSub->nElem ; ++i) { CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); } - qqueueChkPersist(pThis, pMultiSub->nElem); finalize_it: - if(pThis->qType != QUEUETYPE_DIRECT) { - /* make sure at least one worker is running. */ - qqueueAdviseMaxWorkers(pThis); - /* and release the mutex */ - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + /* make sure at least one worker is running. */ + qqueueAdviseMaxWorkers(pThis); + /* and release the mutex */ + d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + + RETiRet; +} + +/* now, the same function, but for direct mode */ +static rsRetVal +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +{ + int i; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pMultiSub != NULL); + + for(i = 0 ; i < pMultiSub->nElem ; ++i) { + CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); } +finalize_it: RETiRet; } +/* ------------------------------ END multi-enqueue functions ------------------------------ */ /* enqueue a new user data element diff --git a/runtime/queue.h b/runtime/queue.h index 8ede6922..33b21c9a 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -114,6 +114,9 @@ struct queue_s { rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); rsRetVal (*qDel)(struct queue_s *pThis); /* end type-specific handler */ + /* public entry points (set during construction, permit to set best algorithm for params selected) */ + rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub); + /* end public entry points */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ @@ -174,7 +177,6 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub); rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); diff --git a/runtime/rule.c b/runtime/rule.c index 65ad071e..3b98d7d1 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -110,7 +110,7 @@ DEFFUNC_llExecFunc(processMsgDoActions) ABORT_FINALIZE(RS_RET_OK); } - iRetMod = actionCallAction(pAction, pDoActData->pMsg); + iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg); if(iRetMod == RS_RET_DISCARDMSG) { ABORT_FINALIZE(RS_RET_DISCARDMSG); } else if(iRetMod == RS_RET_SUSPENDED) { @@ -166,7 +166,6 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) } } -RUNLOG_VAR("%p", pRule->pCSProgNameComp); if(pRule->pCSProgNameComp != NULL) { int bInv = 0, bEqv = 0, offset = 0; if(*(rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp)) == '-') { |