summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c133
1 files changed, 34 insertions, 99 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 2568717b..a4d16132 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -2460,105 +2460,6 @@ finalize_it:
}
-/* enqueue a new user data element
- * Enqueues the new element and awakes worker thread.
- */
-rsRetVal
-qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
-{
- DEFiRet;
- int iCancelStateSave;
- struct timespec t;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* Please note that this function is not cancel-safe and consequently
- * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
- * during its execution. If that is not done, race conditions occur if the
- * thread is canceled (most important use case is input module termination).
- * rgerhards, 2008-01-08
- */
- if(pThis->qType != QUEUETYPE_DIRECT) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
- }
-
- /* first check if we need to discard this message (which will cause CHKiRet() to exit)
- */
- CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr));
-
- /* then check if we need to add an assistance disk queue */
- if(pThis->bIsDA)
- CHKiRet(ChkStrtDA(pThis));
-
- /* handle flow control
- * There are two different flow control mechanisms: basic and advanced flow control.
- * Basic flow control has always been implemented and protects the queue structures
- * in that it makes sure no more data is enqueued than the queue is configured to
- * support. Enhanced flow control is being added today. There are some sources which
- * can easily be stopped, e.g. a file reader. This is the case because it is unlikely
- * that blocking those sources will have negative effects (after all, the file is
- * continued to be written). Other sources can somewhat be blocked (e.g. the kernel
- * log reader or the local log stream reader): in general, nothing is lost if messages
- * from these sources are not picked up immediately. HOWEVER, they can not block for
- * an extended period of time, as this either causes message loss or - even worse - some
- * other bad effects (e.g. unresponsive system in respect to the main system log socket).
- * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is
- * a prime example. If a UDP message is not received, it is simply lost. So we can't
- * do anything against UDP sockets that come in too fast. The core idea of advanced
- * flow control is that we take into account the different natures of the sources and
- * select flow control mechanisms that fit these needs. This also means, in the end
- * result, that non-blockable sources like UDP syslog receive priority in the system.
- * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14
- */
- if(flowCtlType == eFLOWCTL_FULL_DELAY) {
- while(getPhysicalQueueSize(pThis) >= pThis->iFullDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
- pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
- }
- } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
- if(getPhysicalQueueSize(pThis) >= pThis->iLightDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
- timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
- pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
- }
- }
-
- /* from our regular flow control settings, we are now ready to enqueue the object.
- * However, we now need to do a check if the queue permits to add more data. If that
- * is not the case, basic flow control enters the field, which means we wait for
- * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14
- */
- while( (pThis->iMaxQueueSize > 0 && getPhysicalQueueSize(pThis) >= pThis->iMaxQueueSize)
- || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
- && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
- timeoutComp(&t, pThis->toEnq);
- if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
- objDestruct(pUsr);
- ABORT_FINALIZE(RS_RET_QUEUE_FULL);
- }
- }
-
- /* and finally enqueue the message */
- CHKiRet(qqueueAdd(pThis, pUsr));
- qqueueChkPersist(pThis, 1);
-
-finalize_it:
- if(pThis->qType != QUEUETYPE_DIRECT) {
- /* make sure at least one worker is running. */
- qqueueAdviseMaxWorkers(pThis);
- /* and release the mutex */
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
- }
-
- RETiRet;
-}
-
-
/* enqueue a single data object. This currently is a helper to qqueueMultiEnqObj.
* Note that the queue mutex MUST already be locked when this function is called.
* rgerhards, 2009-06-16
@@ -2678,6 +2579,40 @@ finalize_it:
}
+/* enqueue a new user data element
+ * Enqueues the new element and awakes worker thread.
+ */
+rsRetVal
+qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
+ }
+
+ CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr));
+
+ qqueueChkPersist(pThis, 1);
+
+finalize_it:
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* make sure at least one worker is running. */
+ qqueueAdviseMaxWorkers(pThis);
+ /* and release the mutex */
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
+ }
+
+ RETiRet;
+}
+
+
/* set queue mode to enqueue only or not
* There is one subtle issue: this method may be called during queue
* construction or while it is running. In the former case, the queue