summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-01-10 12:54:21 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-01-10 12:54:21 +0100
commitf7c20920046ebcb94eadadf1ebad97b634a12a2d (patch)
tree0cf704f462fa7394842beb3a496627915d863426
parent58c486b36b27d30444b621876846052e239f8c50 (diff)
parent7742b2182fc017ca4c9fcfe3b26f4c8d68a9bd58 (diff)
downloadrsyslog-f7c20920046ebcb94eadadf1ebad97b634a12a2d.tar.gz
rsyslog-f7c20920046ebcb94eadadf1ebad97b634a12a2d.tar.xz
rsyslog-f7c20920046ebcb94eadadf1ebad97b634a12a2d.zip
Merge branch 'v5.6.2-newimudp' into v5-devel-newimudp
Conflicts: plugins/imudp/imudp.c threads.c
-rw-r--r--configure.ac30
-rw-r--r--plugins/imudp/Makefile.am2
-rw-r--r--plugins/imudp/imudp.c137
-rw-r--r--runtime/rsyslog.c18
-rw-r--r--runtime/rsyslog.h7
-rw-r--r--runtime/stream.c8
-rw-r--r--runtime/wtp.c6
-rw-r--r--threads.c8
8 files changed, 212 insertions, 4 deletions
diff --git a/configure.ac b/configure.ac
index b165afaa..e66e6a4c 100644
--- a/configure.ac
+++ b/configure.ac
@@ -274,6 +274,36 @@ if test "x$enable_pthreads" != "xno"; then
)
fi
+AC_CHECK_FUNCS(
+ [pthread_setschedparam],
+ [
+ rsyslog_have_pthread_setschedparam=yes
+ ],
+ [
+ rsyslog_have_pthread_setschedparam=no
+ ]
+)
+AC_CHECK_HEADERS(
+ [sched.h],
+ [
+ rsyslog_have_sched_h=yes
+ ],
+ [
+ rsyslog_have_sched_h=no
+ ]
+)
+if test "$rsyslog_have_pthread_setschedparam" = "yes" -a "$rsyslog_have_sched_h" = "yes"; then
+ save_LIBS=$LIBS
+ LIBS=
+ AC_SEARCH_LIBS(sched_get_priority_max, rt)
+ if test "x$ac_cv_search" != "xno"; then
+ AC_CHECK_FUNCS(sched_get_priority_max)
+ fi
+ IMUDP_LIBS=$LIBS
+ AC_SUBST(IMUDP_LIBS)
+ LIBS=$save_LIBS
+fi
+
# klog
AC_ARG_ENABLE(klog,
diff --git a/plugins/imudp/Makefile.am b/plugins/imudp/Makefile.am
index 517b1287..bc64b8c8 100644
--- a/plugins/imudp/Makefile.am
+++ b/plugins/imudp/Makefile.am
@@ -3,4 +3,4 @@ pkglib_LTLIBRARIES = imudp.la
imudp_la_SOURCES = imudp.c
imudp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
imudp_la_LDFLAGS = -module -avoid-version
-imudp_la_LIBADD =
+imudp_la_LIBADD = $(IMUDP_LIBS)
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index ce0856be..ad39ead0 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -35,6 +35,9 @@
#if HAVE_SYS_EPOLL_H
# include <sys/epoll.h>
#endif
+#ifdef HAVE_SCHED_H
+# include <sched.h>
+#endif
#include "rsyslog.h"
#include "dirty.h"
#include "net.h"
@@ -78,12 +81,103 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a
* termination if we can not get it. -- rgerhards, 2007-12-27
*/
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
-static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
+static uchar *pszSchedPolicy = NULL; /* scheduling policy string */
+static int iSchedPolicy; /* scheduling policy as SCHED_xxx */
+static int iSchedPrio; /* scheduling priority */
+static int seen_iSchedPrio = 0; /* have we seen scheduling priority in the config file? */
+static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
#define TIME_REQUERY_DFLT 2
static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */
/* config settings */
+static rsRetVal check_scheduling_priority(int report_error)
+{
+ DEFiRet;
+
+#ifdef HAVE_SCHED_GET_PRIORITY_MAX
+ if (iSchedPrio < sched_get_priority_min(iSchedPolicy) ||
+ iSchedPrio > sched_get_priority_max(iSchedPolicy)) {
+ if (report_error)
+ errmsg.LogError(errno, NO_ERRCODE,
+ "imudp: scheduling priority %d out of range (%d - %d)"
+ " for scheduling policy '%s' - ignoring settings",
+ iSchedPrio,
+ sched_get_priority_min(iSchedPolicy),
+ sched_get_priority_max(iSchedPolicy),
+ pszSchedPolicy);
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+#endif
+
+finalize_it:
+ RETiRet;
+}
+
+/* Set scheduling priority in the supplied variable (will be iSchedPrio)
+ * and record that we have seen the directive (in seen_iSchedPrio).
+ */
+static rsRetVal set_scheduling_priority(void *pVal, int value)
+{
+ DEFiRet;
+
+ if (seen_iSchedPrio) {
+ errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ *(int *)pVal = value;
+ seen_iSchedPrio = 1;
+ if (pszSchedPolicy != NULL)
+ CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+ RETiRet;
+}
+
+/* Set scheduling policy in iSchedPolicy */
+static rsRetVal set_scheduling_policy(void *pVal, uchar *pNewVal)
+{
+ int have_sched_policy = 0;
+ DEFiRet;
+
+ if (pszSchedPolicy != NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ *((uchar**)pVal) = pNewVal; /* pVal is pszSchedPolicy */
+ if (0) { /* trick to use conditional compilation */
+#ifdef SCHED_FIFO
+ } else if (!strcasecmp((char*)pszSchedPolicy, "fifo")) {
+ iSchedPolicy = SCHED_FIFO;
+ have_sched_policy = 1;
+#endif
+#ifdef SCHED_RR
+ } else if (!strcasecmp((char*)pszSchedPolicy, "rr")) {
+ iSchedPolicy = SCHED_RR;
+ have_sched_policy = 1;
+#endif
+#ifdef SCHED_OTHER
+ } else if (!strcasecmp((char*)pszSchedPolicy, "other")) {
+ iSchedPolicy = SCHED_OTHER;
+ have_sched_policy = 1;
+#endif
+ } else {
+ errmsg.LogError(errno, NO_ERRCODE,
+ "imudp: invalid scheduling policy '%s' "
+ "- ignoring setting", pszSchedPolicy);
+ }
+ if (have_sched_policy == 0) {
+ free(pszSchedPolicy);
+ pszSchedPolicy = NULL;
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ if (seen_iSchedPrio)
+ CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+ RETiRet;
+}
+
/* This function is called when a new listener shall be added. It takes
* the configured parameters, tries to bind the socket and, if that
@@ -294,6 +388,41 @@ finalize_it:
RETiRet;
}
+static void set_thread_schedparam(void)
+{
+ struct sched_param sparam;
+
+ if (pszSchedPolicy != NULL && seen_iSchedPrio == 0) {
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: scheduling policy set, but without priority - ignoring settings");
+ } else if (pszSchedPolicy == NULL && seen_iSchedPrio != 0) {
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: scheduling priority set, but without policy - ignoring settings");
+ } else if (pszSchedPolicy != NULL && seen_iSchedPrio != 0 &&
+ check_scheduling_priority(0) == 0) {
+#ifndef HAVE_PTHREAD_SETSCHEDPARAM
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: cannot set thread scheduling policy, "
+ "pthread_setschedparam() not available");
+#else
+ int err;
+
+ memset(&sparam, 0, sizeof sparam);
+ sparam.sched_priority = iSchedPrio;
+ dbgprintf("imudp trying to set sched policy to '%s', prio %d\n",
+ pszSchedPolicy, iSchedPrio);
+ err = pthread_setschedparam(pthread_self(), iSchedPolicy, &sparam);
+ if (err != 0) {
+ errmsg.LogError(err, NO_ERRCODE, "imudp: pthread_setschedparam() failed");
+ }
+#endif
+ }
+
+ if (pszSchedPolicy != NULL) {
+ free(pszSchedPolicy);
+ pszSchedPolicy = NULL;
+ }
+}
/* This function implements the main reception loop. Depending on the environment,
* we either use the traditional (but slower) select() or the Linux-specific epoll()
@@ -317,6 +446,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
*/
+ set_thread_schedparam();
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
@@ -384,6 +514,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
*/
+ set_thread_schedparam();
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
DBGPRINTF("imudp uses select()\n");
@@ -539,6 +670,10 @@ CODEmodInit_QueryRegCFSLineHdlr
addListner, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord,
NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpolicy", 0, eCmdHdlrGetWord,
+ &set_scheduling_policy, &pszSchedPolicy, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpriority", 0, eCmdHdlrInt,
+ &set_scheduling_priority, &iSchedPrio, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt,
NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index 8baa2b59..bdb1c9ff 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -85,6 +85,12 @@
#include "statsobj.h"
#include "atomic.h"
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+struct sched_param default_sched_param;
+pthread_attr_t default_thread_attr;
+int default_thr_sched_policy;
+#endif
+
/* forward definitions */
static rsRetVal dfltErrLogger(int, uchar *errMsg);
@@ -139,6 +145,18 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF)
if(iRefCount == 0) {
/* init runtime only if not yet done */
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ CHKiRet(pthread_getschedparam(pthread_self(),
+ &default_thr_sched_policy,
+ &default_sched_param));
+ CHKiRet(pthread_attr_init(&default_thread_attr));
+ CHKiRet(pthread_attr_setschedpolicy(&default_thread_attr,
+ default_thr_sched_policy));
+ CHKiRet(pthread_attr_setschedparam(&default_thread_attr,
+ &default_sched_param));
+ CHKiRet(pthread_attr_setinheritsched(&default_thread_attr,
+ PTHREAD_EXPLICIT_SCHED));
+#endif
if(ppErrObj != NULL) *ppErrObj = "obj";
CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */
CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index b47ef82e..0402f159 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -25,6 +25,7 @@
*/
#ifndef INCLUDED_RSYSLOG_H
#define INCLUDED_RSYSLOG_H
+#include <pthread.h>
#include "typedefs.h"
/* ############################################################# *
@@ -415,6 +416,12 @@ typedef enum rsObjectID rsObjID;
#define RSFREEOBJ(x) {(x)->OID = OIDrsFreed; free(x);}
#endif
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+extern struct sched_param default_sched_param;
+extern pthread_attr_t default_thread_attr;
+extern int default_thr_sched_policy;
+#endif
+
/* for the time being, we do our own portability handling here. It
* looks like autotools either does not yet support checks for it, or
diff --git a/runtime/stream.c b/runtime/stream.c
index 260b59ef..658aba11 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -669,7 +669,13 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
pThis->bStopWriter = 0;
- if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
+ if(pthread_create(&pThis->writerThreadID,
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ &default_thread_attr,
+#else
+ NULL,
+#endif
+ asyncWriterThread, pThis) != 0)
DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
} else {
/* we work synchronously, so we need to alloc a fixed pIOBuf */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index ece80911..e615fb19 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -90,6 +90,12 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pthread_mutex_init(&pThis->mutWtp, NULL);
pthread_cond_init(&pThis->condThrdTrm, NULL);
pthread_attr_init(&pThis->attrThrd);
+ /* Set thread scheduling policy to default */
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy);
+ pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param);
+ pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED);
+#endif
pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
diff --git a/threads.c b/threads.c
index 04ccb8d3..d4e14527 100644
--- a/threads.c
+++ b/threads.c
@@ -219,7 +219,13 @@ rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), rsRetVal(*afterRun)(thrdI
pThis->pUsrThrdMain = thrdMain;
pThis->pAfterRun = afterRun;
pThis->bNeedsCancel = bNeedsCancel;
- pthread_create(&pThis->thrdID, NULL, thrdStarter, pThis);
+ pthread_create(&pThis->thrdID,
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ &default_thread_attr,
+#else
+ NULL,
+#endif
+ thrdStarter, pThis);
CHKiRet(llAppend(&llThrds, NULL, pThis));
finalize_it: