diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 133 |
1 files changed, 34 insertions, 99 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 2568717b..a4d16132 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2460,105 +2460,6 @@ finalize_it: } -/* enqueue a new user data element - * Enqueues the new element and awakes worker thread. - */ -rsRetVal -qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) -{ - DEFiRet; - int iCancelStateSave; - struct timespec t; - - ISOBJ_TYPE_assert(pThis, qqueue); - - /* Please note that this function is not cancel-safe and consequently - * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE - * during its execution. If that is not done, race conditions occur if the - * thread is canceled (most important use case is input module termination). - * rgerhards, 2008-01-08 - */ - if(pThis->qType != QUEUETYPE_DIRECT) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); - } - - /* first check if we need to discard this message (which will cause CHKiRet() to exit) - */ - CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr)); - - /* then check if we need to add an assistance disk queue */ - if(pThis->bIsDA) - CHKiRet(ChkStrtDA(pThis)); - - /* handle flow control - * There are two different flow control mechanisms: basic and advanced flow control. - * Basic flow control has always been implemented and protects the queue structures - * in that it makes sure no more data is enqueued than the queue is configured to - * support. Enhanced flow control is being added today. There are some sources which - * can easily be stopped, e.g. a file reader. This is the case because it is unlikely - * that blocking those sources will have negative effects (after all, the file is - * continued to be written). Other sources can somewhat be blocked (e.g. the kernel - * log reader or the local log stream reader): in general, nothing is lost if messages - * from these sources are not picked up immediately. HOWEVER, they can not block for - * an extended period of time, as this either causes message loss or - even worse - some - * other bad effects (e.g. unresponsive system in respect to the main system log socket). - * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is - * a prime example. If a UDP message is not received, it is simply lost. So we can't - * do anything against UDP sockets that come in too fast. The core idea of advanced - * flow control is that we take into account the different natures of the sources and - * select flow control mechanisms that fit these needs. This also means, in the end - * result, that non-blockable sources like UDP syslog receive priority in the system. - * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14 - */ - if(flowCtlType == eFLOWCTL_FULL_DELAY) { - while(getPhysicalQueueSize(pThis) >= pThis->iFullDlyMrk) { - dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n"); - pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */ - } - } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) { - if(getPhysicalQueueSize(pThis) >= pThis->iLightDlyMrk) { - dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n"); - timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ - pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */ - } - } - - /* from our regular flow control settings, we are now ready to enqueue the object. - * However, we now need to do a check if the queue permits to add more data. If that - * is not the case, basic flow control enters the field, which means we wait for - * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14 - */ - while( (pThis->iMaxQueueSize > 0 && getPhysicalQueueSize(pThis) >= pThis->iMaxQueueSize) - || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 - && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { - dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); - timeoutComp(&t, pThis->toEnq); - if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { - dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); - objDestruct(pUsr); - ABORT_FINALIZE(RS_RET_QUEUE_FULL); - } - } - - /* and finally enqueue the message */ - CHKiRet(qqueueAdd(pThis, pUsr)); - qqueueChkPersist(pThis, 1); - -finalize_it: - if(pThis->qType != QUEUETYPE_DIRECT) { - /* make sure at least one worker is running. */ - qqueueAdviseMaxWorkers(pThis); - /* and release the mutex */ - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n"); - } - - RETiRet; -} - - /* enqueue a single data object. This currently is a helper to qqueueMultiEnqObj. * Note that the queue mutex MUST already be locked when this function is called. * rgerhards, 2009-06-16 @@ -2678,6 +2579,40 @@ finalize_it: } +/* enqueue a new user data element + * Enqueues the new element and awakes worker thread. + */ +rsRetVal +qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +{ + DEFiRet; + int iCancelStateSave; + + ISOBJ_TYPE_assert(pThis, qqueue); + + if(pThis->qType != QUEUETYPE_DIRECT) { + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); + } + + CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr)); + + qqueueChkPersist(pThis, 1); + +finalize_it: + if(pThis->qType != QUEUETYPE_DIRECT) { + /* make sure at least one worker is running. */ + qqueueAdviseMaxWorkers(pThis); + /* and release the mutex */ + d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n"); + } + + RETiRet; +} + + /* set queue mode to enqueue only or not * There is one subtle issue: this method may be called during queue * construction or while it is running. In the former case, the queue |