summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/queue.c126
-rw-r--r--tcps_sess.c2
-rw-r--r--tools/syslogd.c2
3 files changed, 128 insertions, 2 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index aa8e6c21..dbf59210 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -49,6 +49,7 @@
#include "obj.h"
#include "wtp.h"
#include "wti.h"
+#include "msg.h"
#include "atomic.h"
#ifdef OS_SOLARIS
@@ -2230,6 +2231,131 @@ finalize_it:
}
+/* enqueue a single data object. This currently is a helper to qqueueMultiEnqObj.
+ * Note that the queue mutex MUST already be locked when this function is called.
+ * rgerhards, 2009-06-16
+ */
+static inline rsRetVal
+doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+{
+ DEFiRet;
+ struct timespec t;
+
+ /* first check if we need to discard this message (which will cause CHKiRet() to exit)
+ */
+ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
+
+ /* then check if we need to add an assistance disk queue */
+ if(pThis->bIsDA)
+ CHKiRet(qqueueChkStrtDA(pThis));
+
+ /* handle flow control
+ * There are two different flow control mechanisms: basic and advanced flow control.
+ * Basic flow control has always been implemented and protects the queue structures
+ * in that it makes sure no more data is enqueued than the queue is configured to
+ * support. Enhanced flow control is being added today. There are some sources which
+ * can easily be stopped, e.g. a file reader. This is the case because it is unlikely
+ * that blocking those sources will have negative effects (after all, the file is
+ * continued to be written). Other sources can somewhat be blocked (e.g. the kernel
+ * log reader or the local log stream reader): in general, nothing is lost if messages
+ * from these sources are not picked up immediately. HOWEVER, they can not block for
+ * an extended period of time, as this either causes message loss or - even worse - some
+ * other bad effects (e.g. unresponsive system in respect to the main system log socket).
+ * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is
+ * a prime example. If a UDP message is not received, it is simply lost. So we can't
+ * do anything against UDP sockets that come in too fast. The core idea of advanced
+ * flow control is that we take into account the different natures of the sources and
+ * select flow control mechanisms that fit these needs. This also means, in the end
+ * result, that non-blockable sources like UDP syslog receive priority in the system.
+ * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14
+ */
+ if(flowCtlType == eFLOWCTL_FULL_DELAY) {
+ while(pThis->iQueueSize >= pThis->iFullDlyMrk) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
+ pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
+ }
+ } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
+ if(pThis->iQueueSize >= pThis->iLightDlyMrk) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
+ timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
+ pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
+ }
+ }
+
+ /* from our regular flow control settings, we are now ready to enqueue the object.
+ * However, we now need to do a check if the queue permits to add more data. If that
+ * is not the case, basic flow control enters the field, which means we wait for
+ * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14
+ */
+ while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
+ || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
+ && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
+ timeoutComp(&t, pThis->toEnq);
+ if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
+ dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ objDestruct(pUsr);
+ ABORT_FINALIZE(RS_RET_QUEUE_FULL);
+ }
+ }
+
+ /* and finally enqueue the message */
+ CHKiRet(qqueueAdd(pThis, pUsr));
+ qqueueChkPersist(pThis); // TODO: optimize, do in outer function!
+
+finalize_it:
+ RETiRet;
+}
+
+/* enqueue multiple user data elements at once. The aim is to provide a faster interface
+ * for object submission. Uses the multi_submit_t helper object.
+ * Please note that this function is not cancel-safe and consequently
+ * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
+ * during its execution. If that is not done, race conditions occur if the
+ * thread is canceled (most important use case is input module termination).
+ * rgerhards, 2009-06-16
+ */
+rsRetVal
+qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
+{
+ int iCancelStateSave;
+ int i;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pMultiSub != NULL);
+
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
+ }
+
+ for(i = 0 ; i < pMultiSub->nElem ; ++i) {
+dbgprintf("queueMultiEnq: %d\n", i);
+ CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
+ }
+
+finalize_it:
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* make sure at least one worker is running. */
+ qqueueAdviseMaxWorkers(pThis);
+ /* and release the mutex */
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ dbgoprint((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+ /* the following pthread_yield is experimental, but brought us performance
+ * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216
+ * rgerhards, 2008-10-09
+ * but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22
+ */
+ if(pThis->bOptimizeUniProc)
+ pthread_yield();
+ }
+
+ RETiRet;
+}
+
+
/* set queue mode to enqueue only or not
* There is one subtle issue: this method may be called during queue
* construction or while it is running. In the former case, the queue
diff --git a/tcps_sess.c b/tcps_sess.c
index 379faeab..9353571c 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -441,7 +441,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt
* this *is* the *correct* reception step for all the data we received, because
* we have just received a bunch of data! -- rgerhards, 2009-06-16
*/
-#define NUM_MULTISUB 128
+#define NUM_MULTISUB 1024
static rsRetVal
DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen)
{
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 4d2839a1..e3cd54f7 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -1379,9 +1379,9 @@ multiSubmitMsg(multi_submit_t *pMultiSub)
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
dbgprintf("multiSubmitMsg, index %d\n", i);
MsgPrepareEnqueue(pMultiSub->ppMsgs[i]);
- qqueueEnqObj(pMsgQueue, pMultiSub->ppMsgs[i]->flowCtlType, (void*) pMultiSub->ppMsgs[i]);
}
+ iRet = qqueueMultiEnqObj(pMsgQueue, pMultiSub);
pMultiSub->nElem = 0;
RETiRet;