diff options
-rw-r--r-- | configure.ac | 30 | ||||
-rw-r--r-- | plugins/imudp/Makefile.am | 2 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 135 | ||||
-rw-r--r-- | runtime/rsyslog.c | 18 | ||||
-rw-r--r-- | runtime/rsyslog.h | 7 | ||||
-rw-r--r-- | runtime/stream.c | 8 | ||||
-rw-r--r-- | runtime/wtp.c | 6 | ||||
-rw-r--r-- | threads.c | 8 |
8 files changed, 211 insertions, 3 deletions
diff --git a/configure.ac b/configure.ac index 22365513..02d7c342 100644 --- a/configure.ac +++ b/configure.ac @@ -269,6 +269,36 @@ if test "x$enable_pthreads" != "xno"; then ) fi +AC_CHECK_FUNCS( + [pthread_setschedparam], + [ + rsyslog_have_pthread_setschedparam=yes + ], + [ + rsyslog_have_pthread_setschedparam=no + ] +) +AC_CHECK_HEADERS( + [sched.h], + [ + rsyslog_have_sched_h=yes + ], + [ + rsyslog_have_sched_h=no + ] +) +if test "$rsyslog_have_pthread_setschedparam" = "yes" -a "$rsyslog_have_sched_h" = "yes"; then + save_LIBS=$LIBS + LIBS= + AC_SEARCH_LIBS(sched_get_priority_max, rt) + if test "x$ac_cv_search" != "xno"; then + AC_CHECK_FUNCS(sched_get_priority_max) + fi + IMUDP_LIBS=$LIBS + AC_SUBST(IMUDP_LIBS) + LIBS=$save_LIBS +fi + # klog AC_ARG_ENABLE(klog, diff --git a/plugins/imudp/Makefile.am b/plugins/imudp/Makefile.am index 517b1287..bc64b8c8 100644 --- a/plugins/imudp/Makefile.am +++ b/plugins/imudp/Makefile.am @@ -3,4 +3,4 @@ pkglib_LTLIBRARIES = imudp.la imudp_la_SOURCES = imudp.c imudp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) imudp_la_LDFLAGS = -module -avoid-version -imudp_la_LIBADD = +imudp_la_LIBADD = $(IMUDP_LIBS) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 99b69731..ad39ead0 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -35,6 +35,9 @@ #if HAVE_SYS_EPOLL_H # include <sys/epoll.h> #endif +#ifdef HAVE_SCHED_H +# include <sched.h> +#endif #include "rsyslog.h" #include "dirty.h" #include "net.h" @@ -78,12 +81,103 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a * termination if we can not get it. -- rgerhards, 2007-12-27 */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ +static uchar *pszSchedPolicy = NULL; /* scheduling policy string */ +static int iSchedPolicy; /* scheduling policy as SCHED_xxx */ +static int iSchedPrio; /* scheduling priority */ +static int seen_iSchedPrio = 0; /* have we seen scheduling priority in the config file? */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ #define TIME_REQUERY_DFLT 2 static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */ /* config settings */ +static rsRetVal check_scheduling_priority(int report_error) +{ + DEFiRet; + +#ifdef HAVE_SCHED_GET_PRIORITY_MAX + if (iSchedPrio < sched_get_priority_min(iSchedPolicy) || + iSchedPrio > sched_get_priority_max(iSchedPolicy)) { + if (report_error) + errmsg.LogError(errno, NO_ERRCODE, + "imudp: scheduling priority %d out of range (%d - %d)" + " for scheduling policy '%s' - ignoring settings", + iSchedPrio, + sched_get_priority_min(iSchedPolicy), + sched_get_priority_max(iSchedPolicy), + pszSchedPolicy); + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } +#endif + +finalize_it: + RETiRet; +} + +/* Set scheduling priority in the supplied variable (will be iSchedPrio) + * and record that we have seen the directive (in seen_iSchedPrio). + */ +static rsRetVal set_scheduling_priority(void *pVal, int value) +{ + DEFiRet; + + if (seen_iSchedPrio) { + errmsg.LogError(0, NO_ERRCODE, "directive already seen"); + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } + *(int *)pVal = value; + seen_iSchedPrio = 1; + if (pszSchedPolicy != NULL) + CHKiRet(check_scheduling_priority(1)); + +finalize_it: + RETiRet; +} + +/* Set scheduling policy in iSchedPolicy */ +static rsRetVal set_scheduling_policy(void *pVal, uchar *pNewVal) +{ + int have_sched_policy = 0; + DEFiRet; + + if (pszSchedPolicy != NULL) { + errmsg.LogError(0, NO_ERRCODE, "directive already seen"); + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } + *((uchar**)pVal) = pNewVal; /* pVal is pszSchedPolicy */ + if (0) { /* trick to use conditional compilation */ +#ifdef SCHED_FIFO + } else if (!strcasecmp((char*)pszSchedPolicy, "fifo")) { + iSchedPolicy = SCHED_FIFO; + have_sched_policy = 1; +#endif +#ifdef SCHED_RR + } else if (!strcasecmp((char*)pszSchedPolicy, "rr")) { + iSchedPolicy = SCHED_RR; + have_sched_policy = 1; +#endif +#ifdef SCHED_OTHER + } else if (!strcasecmp((char*)pszSchedPolicy, "other")) { + iSchedPolicy = SCHED_OTHER; + have_sched_policy = 1; +#endif + } else { + errmsg.LogError(errno, NO_ERRCODE, + "imudp: invalid scheduling policy '%s' " + "- ignoring setting", pszSchedPolicy); + } + if (have_sched_policy == 0) { + free(pszSchedPolicy); + pszSchedPolicy = NULL; + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } + if (seen_iSchedPrio) + CHKiRet(check_scheduling_priority(1)); + +finalize_it: + RETiRet; +} + /* This function is called when a new listener shall be added. It takes * the configured parameters, tries to bind the socket and, if that @@ -294,6 +388,41 @@ finalize_it: RETiRet; } +static void set_thread_schedparam(void) +{ + struct sched_param sparam; + + if (pszSchedPolicy != NULL && seen_iSchedPrio == 0) { + errmsg.LogError(0, NO_ERRCODE, + "imudp: scheduling policy set, but without priority - ignoring settings"); + } else if (pszSchedPolicy == NULL && seen_iSchedPrio != 0) { + errmsg.LogError(0, NO_ERRCODE, + "imudp: scheduling priority set, but without policy - ignoring settings"); + } else if (pszSchedPolicy != NULL && seen_iSchedPrio != 0 && + check_scheduling_priority(0) == 0) { +#ifndef HAVE_PTHREAD_SETSCHEDPARAM + errmsg.LogError(0, NO_ERRCODE, + "imudp: cannot set thread scheduling policy, " + "pthread_setschedparam() not available"); +#else + int err; + + memset(&sparam, 0, sizeof sparam); + sparam.sched_priority = iSchedPrio; + dbgprintf("imudp trying to set sched policy to '%s', prio %d\n", + pszSchedPolicy, iSchedPrio); + err = pthread_setschedparam(pthread_self(), iSchedPolicy, &sparam); + if (err != 0) { + errmsg.LogError(err, NO_ERRCODE, "imudp: pthread_setschedparam() failed"); + } +#endif + } + + if (pszSchedPolicy != NULL) { + free(pszSchedPolicy); + pszSchedPolicy = NULL; + } +} /* This function implements the main reception loop. Depending on the environment, * we either use the traditional (but slower) select() or the Linux-specific epoll() @@ -317,6 +446,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ + set_thread_schedparam(); bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); @@ -384,6 +514,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ + set_thread_schedparam(); bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); DBGPRINTF("imudp uses select()\n"); @@ -539,6 +670,10 @@ CODEmodInit_QueryRegCFSLineHdlr addListner, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpolicy", 0, eCmdHdlrGetWord, + &set_scheduling_policy, &pszSchedPolicy, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpriority", 0, eCmdHdlrInt, + &set_scheduling_priority, &iSchedPrio, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index a9794840..2722fd6c 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -84,6 +84,12 @@ #include "strgen.h" #include "atomic.h" +#ifdef HAVE_PTHREAD_SETSCHEDPARAM +struct sched_param default_sched_param; +pthread_attr_t default_thread_attr; +int default_thr_sched_policy; +#endif + /* forward definitions */ static rsRetVal dfltErrLogger(int, uchar *errMsg); @@ -138,6 +144,18 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF) if(iRefCount == 0) { /* init runtime only if not yet done */ +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + CHKiRet(pthread_getschedparam(pthread_self(), + &default_thr_sched_policy, + &default_sched_param)); + CHKiRet(pthread_attr_init(&default_thread_attr)); + CHKiRet(pthread_attr_setschedpolicy(&default_thread_attr, + default_thr_sched_policy)); + CHKiRet(pthread_attr_setschedparam(&default_thread_attr, + &default_sched_param)); + CHKiRet(pthread_attr_setinheritsched(&default_thread_attr, + PTHREAD_EXPLICIT_SCHED)); +#endif if(ppErrObj != NULL) *ppErrObj = "obj"; CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */ CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 3e6ac4af..0742a2ed 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -25,6 +25,7 @@ */ #ifndef INCLUDED_RSYSLOG_H #define INCLUDED_RSYSLOG_H +#include <pthread.h> #include "typedefs.h" /* ############################################################# * @@ -411,6 +412,12 @@ typedef enum rsObjectID rsObjID; #define RSFREEOBJ(x) {(x)->OID = OIDrsFreed; free(x);} #endif +#ifdef HAVE_PTHREAD_SETSCHEDPARAM +extern struct sched_param default_sched_param; +extern pthread_attr_t default_thread_attr; +extern int default_thr_sched_policy; +#endif + /* for the time being, we do our own portability handling here. It * looks like autotools either does not yet support checks for it, or diff --git a/runtime/stream.c b/runtime/stream.c index 260b59ef..658aba11 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -669,7 +669,13 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } pThis->pIOBuf = pThis->asyncBuf[0].pBuf; pThis->bStopWriter = 0; - if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) + if(pthread_create(&pThis->writerThreadID, +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + &default_thread_attr, +#else + NULL, +#endif + asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ diff --git a/runtime/wtp.c b/runtime/wtp.c index ece80911..e615fb19 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -90,6 +90,12 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_mutex_init(&pThis->mutWtp, NULL); pthread_cond_init(&pThis->condThrdTrm, NULL); pthread_attr_init(&pThis->attrThrd); + /* Set thread scheduling policy to default */ +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy); + pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param); + pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED); +#endif pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; @@ -220,7 +220,13 @@ rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), rsRetVal(*afterRun)(thrdI pThis->pUsrThrdMain = thrdMain; pThis->pAfterRun = afterRun; pThis->bNeedsCancel = bNeedsCancel; - i = pthread_create(&pThis->thrdID, NULL, thrdStarter, pThis); + i = pthread_create(&pThis->thrdID, +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + &default_thread_attr, +#else + NULL, +#endif + thrdStarter, pThis); CHKiRet(llAppend(&llThrds, NULL, pThis)); finalize_it: |