diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imdiag/imdiag.c | 3 | ||||
-rw-r--r-- | plugins/imfile/imfile.c | 23 | ||||
-rw-r--r-- | plugins/imgssapi/imgssapi.c | 2 | ||||
-rw-r--r-- | plugins/imklog/Makefile.am | 3 | ||||
-rw-r--r-- | plugins/imklog/bsd.c | 244 | ||||
-rw-r--r-- | plugins/imklog/imklog.c | 58 | ||||
-rw-r--r-- | plugins/imklog/imklog.h | 2 | ||||
-rw-r--r-- | plugins/imklog/ksym.c | 832 | ||||
-rw-r--r-- | plugins/imklog/ksym_mod.c | 485 | ||||
-rw-r--r-- | plugins/imklog/ksyms.h | 37 | ||||
-rw-r--r-- | plugins/imklog/linux.c | 542 | ||||
-rw-r--r-- | plugins/imklog/module.h | 35 | ||||
-rw-r--r-- | plugins/imklog/solaris.c | 68 | ||||
-rw-r--r-- | plugins/impstats/impstats.c | 8 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 179 | ||||
-rw-r--r-- | plugins/imtcp/imtcp.c | 16 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 174 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 253 | ||||
-rw-r--r-- | plugins/mmsnmptrapd/mmsnmptrapd.c | 2 | ||||
-rw-r--r-- | plugins/omelasticsearch/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 270 | ||||
-rw-r--r-- | plugins/omhdfs/javaenv.sh | 14 | ||||
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 85 |
23 files changed, 1095 insertions, 2248 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index b3468921..bcbbab09 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -376,7 +376,8 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa /* initialized, now add socket */ CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, pszInputName == NULL ? UCHAR_CONSTANT("imdiag") : pszInputName)); - tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal); + /* we support octect-cuunted frame (constant 1 below) */ + tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal, 1); finalize_it: if(iRet != RS_RET_OK) { diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index ba8318df..632bf390 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -63,6 +63,7 @@ DEFobjCurrIf(strm) DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) +#define NUM_MULTISUB 1024 /* max number of submits -- TODO: make configurable */ typedef struct fileInfo_s { uchar *pszFileName; uchar *pszTag; @@ -70,11 +71,13 @@ typedef struct fileInfo_s { uchar *pszStateFile; /* file in which state between runs is to be stored */ int iFacility; int iSeverity; + int maxLinesAtOnce; int nRecords; /**< How many records did we process before persisting the stream? */ int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */ strm_t *pStrm; /* its stream (NULL if not assigned) */ int readMode; /* which mode to use in ReadMulteLine call? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + multi_submit_t multiSub; } fileInfo_t; @@ -90,6 +93,7 @@ static int iPersistStateInterval = 0; /* how often if state file to be persisted static int iFacility = 128; /* local0 */ static int iSeverity = 5; /* notice, as of rfc 3164 */ static int readMode = 0; /* mode to use for ReadMultiLine call */ +static int maxLinesAtOnce = 10240; /* how many lines to process in a row? */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */ @@ -121,7 +125,9 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) pMsg->iFacility = LOG_FAC(pInfo->iFacility); pMsg->iSeverity = LOG_PRI(pInfo->iSeverity); MsgSetRuleset(pMsg, pInfo->pRuleset); - CHKiRet(submitMsg(pMsg)); + pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg; + if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem) + CHKiRet(multiSubmitMsg(&pInfo->multiSub)); finalize_it: RETiRet; } @@ -205,6 +211,7 @@ static void pollFileCancelCleanup(void *pArg) static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) { cstr_t *pCStr = NULL; + int nProcessed = 0; DEFiRet; ASSERT(pbHadFileData != NULL); @@ -219,7 +226,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) /* loop below will be exited when strmReadLine() returns EOF */ while(glbl.GetGlobalInputTermState() == 0) { + if(pThis->maxLinesAtOnce != 0 && nProcessed >= pThis->maxLinesAtOnce) + break; CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode)); + ++nProcessed; *pbHadFileData = 1; /* this is just a flag, so set it and forget it */ CHKiRet(enqLine(pThis, pCStr)); /* process line */ rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */ @@ -230,6 +240,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) } finalize_it: + if(pThis->multiSub.nElem > 0) { + /* submit everything that was not yet submitted */ + CHKiRet(multiSubmitMsg(&pThis->multiSub)); + } ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */ /* Note: the problem above is that pthread:cleanup_pop() is a macro which * evaluates to something like "} while(0);". So the code would become @@ -474,6 +488,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iSeverity = 5; /* notice, as of rfc 3164 */ readMode = 0; pBindRuleset = NULL; + maxLinesAtOnce = 10240; RETiRet; } @@ -512,8 +527,12 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal) pThis->pszStateFile = (uchar*) strdup((char*) pszStateFile); } + CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(NUM_MULTISUB * sizeof(msg_t*))); + pThis->multiSub.maxElem = NUM_MULTISUB; + pThis->multiSub.nElem = 0; pThis->iSeverity = iSeverity; pThis->iFacility = iFacility; + pThis->maxLinesAtOnce = maxLinesAtOnce; pThis->iPersistStateInterval = iPersistStateInterval; pThis->nRecords = 0; pThis->readMode = readMode; @@ -591,6 +610,8 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &iPollInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt, NULL, &readMode, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilemaxlinesatonce", 0, eCmdHdlrSize, + NULL, &maxLinesAtOnce, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt, NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord, diff --git a/plugins/imgssapi/imgssapi.c b/plugins/imgssapi/imgssapi.c index 446795d6..cc969aa5 100644 --- a/plugins/imgssapi/imgssapi.c +++ b/plugins/imgssapi/imgssapi.c @@ -335,7 +335,7 @@ addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal) CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose)); CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose)); CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, UCHAR_CONSTANT("imgssapi"))); - tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal); + tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal, 1); CHKiRet(tcpsrv.ConstructFinalize(pOurTcpsrv)); } diff --git a/plugins/imklog/Makefile.am b/plugins/imklog/Makefile.am index 5d4d0465..7d0d37c9 100644 --- a/plugins/imklog/Makefile.am +++ b/plugins/imklog/Makefile.am @@ -1,5 +1,4 @@ pkglib_LTLIBRARIES = imklog.la - imklog_la_SOURCES = imklog.c imklog.h # select klog "driver" @@ -8,7 +7,7 @@ imklog_la_SOURCES += bsd.c endif if ENABLE_IMKLOG_LINUX -imklog_la_SOURCES += linux.c module.h ksym.c ksyms.h ksym_mod.c +imklog_la_SOURCES += bsd.c endif imklog_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) diff --git a/plugins/imklog/bsd.c b/plugins/imklog/bsd.c index 0a4c7cd4..eaf8e5ca 100644 --- a/plugins/imklog/bsd.c +++ b/plugins/imklog/bsd.c @@ -1,69 +1,30 @@ -/* klog for BSD, based on the FreeBSD syslogd implementation. +/* combined imklog driver for BSD and Linux * * This contains OS-specific functionality to read the BSD - * kernel log. For a general overview, see head comment in - * imklog.c. + * or Linux kernel log. For a general overview, see head comment in + * imklog.c. This started out as the BSD-specific drivers, but it + * turned out that on modern Linux the implementation details + * are very small, and so we use a single driver for both OS's with + * a little help of conditional compilation. * - * Copyright (C) 2008 by Rainer Gerhards for the modifications of - * the original FreeBSD sources. - * - * I would like to express my gratitude to those folks which - * layed an important foundation for rsyslog to build on. + * Copyright 2008-2012 Adiscon GmbH * * This file is part of rsyslog. * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. - * - * This file is based on earlier work included in the FreeBSD sources. We - * integrated it into the rsyslog project. The copyright below applies, and - * I also reproduce the original license under which we aquired the code: - * - * Copyright (c) 1983, 1988, 1993, 1994 - * The Regents of the University of California. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 4. Neither the name of the University nor the names of its contributors - * may be used to endorse or promote products derived from this software - * without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS - * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY - * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - * - * If you would like to use the code under the BSD license, you should - * aquire your own copy of BSD's syslogd, from which we have taken it. The - * code in this file is modified and may only be used under the terms of - * the GPLv3+ as specified above. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - #ifdef HAVE_CONFIG_H # include "config.h" #endif @@ -72,18 +33,124 @@ #include <fcntl.h> #include <errno.h> #include <string.h> +#include <ctype.h> +#ifdef OS_LINUX +# include <sys/klog.h> +#endif #include "rsyslog.h" -#include "imklog.h" +#include "srUtils.h" #include "debug.h" +#include "imklog.h" /* globals */ -static int fklog = -1; /* /dev/klog */ +static int fklog = -1; /* kernel log fd */ #ifndef _PATH_KLOG -# define _PATH_KLOG "/dev/klog" +# ifdef OS_LINUX +# define _PATH_KLOG "/proc/kmsg" +# else +# define _PATH_KLOG "/dev/klog" +# endif #endif + +#ifdef OS_LINUX +/* submit a message to imklog Syslog() API. In this function, we check if + * a kernel timestamp is present and, if so, extract and strip it. + * Note: this is an extra processing step. We should revisit the whole + * idea in v6 and remove all that old stuff that we do not longer need + * (like symbol resolution). <-- TODO + * Note that this is heavily Linux specific and thus is not compiled or + * used for BSD. + * Special thanks to Lennart Poettering for suggesting on how to convert + * the kernel timestamp to a realtime timestamp. This method depends on + * the fact the the kernel timestamp is written using the monotonic clock. + * Shall that change (very unlikely), this code must be changed as well. Note + * that due to the way we generate the delta, we are unable to write the + * absolutely correct timestamp (system call overhead of the clock calls + * prevents us from doing so). However, the difference is very minor. + * rgerhards, 2011-06-24 + */ +static void +submitSyslog(int pri, uchar *buf) +{ + long secs; + long nsecs; + long secOffs; + long nsecOffs; + unsigned i; + unsigned bufsize; + struct timespec monotonic, realtime; + struct timeval tv; + struct timeval *tp = NULL; + + if(buf[3] != '[') + goto done; + DBGPRINTF("imklog: kernel timestamp detected, extracting it\n"); + + /* we now try to parse the timestamp. iff it parses, we assume + * it is a timestamp. Otherwise we know for sure it is no ts ;) + */ + i = 4; /* first digit after '[' */ + secs = 0; + while(buf[i] && isdigit(buf[i])) { + secs = secs * 10 + buf[i] - '0'; + ++i; + } + if(buf[i] != '.') { + DBGPRINTF("no dot --> no kernel timestamp\n"); + goto done; /* no TS! */ + } + + ++i; /* skip dot */ + nsecs = 0; + while(buf[i] && isdigit(buf[i])) { + nsecs = nsecs * 10 + buf[i] - '0'; + ++i; + } + if(buf[i] != ']') { + DBGPRINTF("no trailing ']' --> no kernel timestamp\n"); + goto done; /* no TS! */ + } + ++i; /* skip ']' */ + + /* we have a timestamp */ + DBGPRINTF("kernel timestamp is %ld %ld\n", secs, nsecs); + bufsize= strlen((char*)buf); + memcpy(buf+3, buf+i, bufsize - i + 1); + + clock_gettime(CLOCK_MONOTONIC, &monotonic); + clock_gettime(CLOCK_REALTIME, &realtime); + secOffs = realtime.tv_sec - monotonic.tv_sec; + nsecOffs = realtime.tv_nsec - monotonic.tv_nsec; + if(nsecOffs < 0) { + secOffs--; + nsecOffs += 1000000000l; + } + + nsecs +=nsecOffs; + if(nsecs > 999999999l) { + secs++; + nsecs -= 1000000000l; + } + secs += secOffs; + tv.tv_sec = secs; + tv.tv_usec = nsecs / 1000; + tp = &tv; + +done: + Syslog(pri, buf, tp); +} +#else /* now comes the BSD "code" (just a shim) */ +static void +submitSyslog(int pri, uchar *buf) +{ + Syslog(pri, buf, NULL); +} +#endif /* #ifdef LINUX */ + + static uchar *GetPath(void) { return pszPath ? pszPath : (uchar*) _PATH_KLOG; @@ -95,23 +162,36 @@ static uchar *GetPath(void) rsRetVal klogWillRun(void) { + char errmsg[2048]; + int r; DEFiRet; fklog = open((char*)GetPath(), O_RDONLY, 0); if (fklog < 0) { - dbgprintf("can't open %s (%d)\n", GetPath(), errno); - iRet = RS_RET_ERR; // TODO: better error code + imklogLogIntMsg(RS_RET_ERR_OPEN_KLOG, "imklog: cannot open kernel log(%s): %s.", + GetPath(), rs_strerror_r(errno, errmsg, sizeof(errmsg))); + ABORT_FINALIZE(RS_RET_ERR_OPEN_KLOG); + } + +# ifdef OS_LINUX + /* Set level of kernel console messaging.. */ + if(console_log_level != -1) { + r = klogctl(8, NULL, console_log_level); + if(r != 0) { + imklogLogIntMsg(LOG_WARNING, "imklog: cannot set console log level: %s", + rs_strerror_r(errno, errmsg, sizeof(errmsg))); + /* make sure we do not try to re-set! */ + console_log_level = -1; + } } +# endif /* #ifdef OS_LINUX */ +finalize_it: RETiRet; } -/* Read /dev/klog while data are available, split into lines. - * Contrary to standard BSD syslogd, we do a blocking read. We can - * afford this as imklog is running on its own threads. So if we have - * a single file, it really doesn't matter if we wait inside a 1-file - * select or the read() directly. +/* Read kernel log while data are available, split into lines. */ static void readklog(void) @@ -119,13 +199,14 @@ readklog(void) char *p, *q; int len, i; int iMaxLine; - uchar bufRcv[4096+1]; + uchar bufRcv[128*1024+1]; + char errmsg[2048]; uchar *pRcv = NULL; /* receive buffer */ iMaxLine = klog_getMaxLine(); - /* we optimize performance: if iMaxLine is below 4K (which it is in almost all - * cases, we use a fixed buffer on the stack. Only if it is higher, heap memory + /* we optimize performance: if iMaxLine is below our fixed size buffer (which + * usually is sufficiently large), we use this buffer. if it is higher, heap memory * is used. We could use alloca() to achive a similar aspect, but there are so * many issues with alloca() that I do not want to take that route. * rgerhards, 2008-09-02 @@ -139,15 +220,15 @@ readklog(void) len = 0; for (;;) { - dbgprintf("----------imklog(BSD) waiting for kernel log line\n"); + dbgprintf("imklog(BSD/Linux) waiting for kernel log line\n"); i = read(fklog, pRcv + len, iMaxLine - len); if (i > 0) { pRcv[i + len] = '\0'; } else { if (i < 0 && errno != EINTR && errno != EAGAIN) { imklogLogIntMsg(LOG_ERR, - "imklog error %d reading kernel log - shutting down imklog", - errno); + "imklog: error reading kernel log - shutting down: %s", + rs_strerror_r(errno, errmsg, sizeof(errmsg))); fklog = -1; } break; @@ -155,18 +236,18 @@ readklog(void) for (p = (char*)pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) { *q = '\0'; - Syslog(LOG_INFO, (uchar*) p); + submitSyslog(LOG_INFO, (uchar*) p); } len = strlen(p); if (len >= iMaxLine - 1) { - Syslog(LOG_INFO, (uchar*)p); + submitSyslog(LOG_INFO, (uchar*)p); len = 0; } - if (len > 0) + if(len > 0) memmove(pRcv, p, len + 1); } if (len > 0) - Syslog(LOG_INFO, pRcv); + submitSyslog(LOG_INFO, pRcv); if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1) free(pRcv); @@ -181,6 +262,11 @@ rsRetVal klogAfterRun(void) DEFiRet; if(fklog != -1) close(fklog); +# ifdef OS_LINUX + /* Turn on logging of messages to console, but only if a log level was speficied */ + if(console_log_level != -1) + klogctl(7, NULL, 0); +# endif RETiRet; } diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c index 16adbc21..aa500084 100644 --- a/plugins/imklog/imklog.c +++ b/plugins/imklog/imklog.c @@ -18,6 +18,9 @@ * Please note that this file replaces the klogd daemon that was * also present in pre-v3 versions of rsyslog. * + * To test under Linux: + * echo test1 > /dev/kmsg + * * Copyright (C) 2008-2012 Adiscon GmbH * * This file is part of rsyslog. @@ -44,6 +47,7 @@ #include <stdarg.h> #include <ctype.h> #include <stdlib.h> +#include <sys/socket.h> #include "dirty.h" #include "cfsysline.h" @@ -52,6 +56,7 @@ #include "module-template.h" #include "datetime.h" #include "imklog.h" +#include "net.h" #include "glbl.h" #include "prop.h" #include "unicode-helper.h" @@ -64,6 +69,7 @@ DEF_IMOD_STATIC_DATA DEFobjCurrIf(datetime) DEFobjCurrIf(glbl) DEFobjCurrIf(prop) +DEFobjCurrIf(net) /* configuration settings */ int dbgPrintSymbols = 0; /* this one is extern so the helpers can access it! */ @@ -92,15 +98,21 @@ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 * rgerhards, 2008-04-12 */ static rsRetVal -enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity) +enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *tp) { - DEFiRet; + struct syslogTime st; msg_t *pMsg; + DEFiRet; assert(msg != NULL); assert(pszTag != NULL); - CHKiRet(msgConstruct(&pMsg)); + if(tp == NULL) { + CHKiRet(msgConstruct(&pMsg)); + } else { + datetime.timeval2syslogTime(tp, &st); + CHKiRet(msgConstructWithTime(&pMsg, &st, tp->tv_sec)); + } MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); MsgSetInputName(pMsg, pInputName); MsgSetRawMsgWOSize(pMsg, (char*)msg); @@ -173,31 +185,47 @@ rsRetVal imklogLogIntMsg(int priority, char *fmt, ...) va_end(ap); iRet = enqMsg((uchar*)pLogMsg, (uchar*) ((iFacilIntMsg == LOG_KERN) ? "kernel:" : "imklog:"), - iFacilIntMsg, LOG_PRI(priority)); + iFacilIntMsg, LOG_PRI(priority), NULL); RETiRet; } -/* log a kernel message +/* log a kernel message. If tp is non-NULL, it contains the message creation + * time to use. * rgerhards, 2008-04-14 */ -rsRetVal Syslog(int priority, uchar *pMsg) +rsRetVal Syslog(int priority, uchar *pMsg, struct timeval *tp) { - DEFiRet; + int pri = -1; rsRetVal localRet; + DEFiRet; - /* Output using syslog */ - localRet = parsePRI(&pMsg, &priority); - if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK) - FINALIZE; + /* then check if we have two PRIs. This can happen in case of systemd, + * in which case the second PRI is the right one. + */ + if(pMsg[3] == '<' || (pMsg[3] == ' ' && pMsg[4] == '<')) { /* could be a pri... */ + uchar *pMsgTmp = pMsg + ((pMsg[3] == '<') ? 3 : 4); + localRet = parsePRI(&pMsgTmp, &pri); + if(localRet == RS_RET_OK && pri >= 8 && pri <= 192) { + /* *this* is our PRI */ + DBGPRINTF("imklog detected secondary PRI(%d) in klog msg\n", pri); + pMsg = pMsgTmp; + priority = pri; + } + } + if(pri == -1) { + localRet = parsePRI(&pMsg, &priority); + if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK) + FINALIZE; + } /* if we don't get the pri, we use whatever we were supplied */ /* ignore non-kernel messages if not permitted */ if(bPermitNonKernel == 0 && LOG_FAC(priority) != LOG_KERN) FINALIZE; /* silently ignore */ - iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority)); + iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority), tp); finalize_it: RETiRet; @@ -236,7 +264,7 @@ BEGINwillRun CODESTARTwillRun /* we need to create the inputName property (only once during our lifetime) */ CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imklog"), sizeof("imklog") - 1)); - CHKiRet(prop.CreateStringProp(&pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); + pLocalHostIP = glbl.GetLocalHostIP(); iRet = klogWillRun(); finalize_it: @@ -249,8 +277,6 @@ CODESTARTafterRun if(pInputName != NULL) prop.Destruct(&pInputName); - if(pLocalHostIP != NULL) - prop.Destruct(&pLocalHostIP); ENDafterRun @@ -258,6 +284,7 @@ BEGINmodExit CODESTARTmodExit /* release objects we used */ objRelease(glbl, CORE_COMPONENT); + objRelease(net, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); if(pszPath != NULL) @@ -293,6 +320,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(net, CORE_COMPONENT)); iFacilIntMsg = klogFacilIntMsg(); diff --git a/plugins/imklog/imklog.h b/plugins/imklog/imklog.h index 22f10053..7a060df4 100644 --- a/plugins/imklog/imklog.h +++ b/plugins/imklog/imklog.h @@ -55,7 +55,7 @@ extern uchar *pszPath; /* the functions below may be called by the drivers */ rsRetVal imklogLogIntMsg(int priority, char *fmt, ...) __attribute__((format(printf,2, 3))); -rsRetVal Syslog(int priority, uchar *msg); +rsRetVal Syslog(int priority, uchar *msg, struct timeval *tp); /* prototypes */ extern int klog_getMaxLine(void); /* work-around for klog drivers to get configured max line size */ diff --git a/plugins/imklog/ksym.c b/plugins/imklog/ksym.c deleted file mode 100644 index ebaec011..00000000 --- a/plugins/imklog/ksym.c +++ /dev/null @@ -1,832 +0,0 @@ -/* ksym.c - functions for kernel address->symbol translation - * Copyright (c) 1995, 1996 Dr. G.W. Wettstein <greg@wind.rmcc.com> - * Copyright (c) 1996 Enjellic Systems Development - * Copyright (c) 1998-2007 Martin Schulze <joey@infodrom.org> - * Copyright (C) 2007-2008 Rainer Gerhards <rgerhards@adiscon.com> - * - * This file is part of rsyslog. - * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. -*/ - -/* - * This file contains functions which handle the translation of kernel - * numeric addresses into symbols for the klogd utility. - * - * Sat Oct 28 09:00:14 CDT 1995: Dr. Wettstein - * Initial Version. - * - * Fri Nov 24 12:50:52 CST 1995: Dr. Wettstein - * Added VERBOSE_DEBUGGING define to make debugging output more - * manageable. - * - * Added support for verification of the loaded kernel symbols. If - * no version information can be be found in the mapfile a warning - * message is issued but translation will still take place. This - * will be the default case if kernel versions < 1.3.43 are used. - * - * If the symbols in the mapfile are of the same version as the kernel - * that is running an informative message is issued. If the symbols - * in the mapfile do not match the current kernel version a warning - * message is issued and translation is disabled. - * - * Wed Dec 6 16:14:11 CST 1995: Dr. Wettstein - * Added /boot/System.map to the list of symbol maps to search for. - * Also made this map the first item in the search list. I am open - * to CONSTRUCTIVE suggestions for any additions or corrections to - * the list of symbol maps to search for. Be forewarned that the - * list in use is the consensus agreement between myself, Linus and - * some package distributers. It is a given that no list will suit - * everyone's taste. If you have rabid concerns about the list - * please feel free to edit the system_maps array and compile your - * own binaries. - * - * Added support for searching of the list of symbol maps. This - * allows support for access to multiple symbol maps. The theory - * behind this is that a production kernel may have a system map in - * /boot/System.map. If a test kernel is booted this system map - * would be skipped in favor of one found in /usr/src/linux. - * - * Thu Jan 18 11:18:31 CST 1996: Dr. Wettstein - * Added patch from beta-testers to allow for reading of both - * ELF and a.out map files. - * - * Wed Aug 21 09:15:49 CDT 1996: Dr. Wettstein - * Reloading of kernel module symbols is now turned on by the - * SetParanoiaLevel function. The default behavior is to NOT reload - * the kernel module symbols when a protection fault is detected. - * - * Added support for freeing of the current kernel module symbols. - * This was necessary to support reloading of the kernel module symbols. - * - * When a matching static symbol table is loaded the kernel version - * number is printed. - * - * Mon Jun 9 17:12:42 CST 1997: Martin Schulze - * Added #1 and #2 to some error messages in order to being able - * to divide them (ulmo@Q.Net) - * - * Fri Jun 13 10:50:23 CST 1997: Martin Schulze - * Changed definition of LookupSymbol to non-static because it is - * used in klogd.c, too. - * - * Fri Jan 9 23:00:08 CET 1998: Martin Schulze <joey@infodrom.north.de> - * Fixed bug that caused klogd to die if there is no System.map available. - * - * Sun 29 Mar 18:14:07 BST 1998: Mark Simon Phillips <M.S.Phillips@nortel.co.uk> - * Switched to fgets() as gets() is not buffer overrun secure. - * - * Mon Apr 13 18:18:45 CEST 1998: Martin Schulze <joey@infodrom.north.de> - * Modified loop for detecting the correct system map. Now it won't - * stop if a file has been found but doesn't contain the correct map. - * Special thanks go go Mark Simon Phillips for the hint. - * - * Mon Oct 12 00:42:30 CEST 1998: Martin Schulze <joey@infodrom.north.de> - * Modified CheckVersion() - * . Use shift to decode the kernel version - * . Compare integers of kernel version - * . extract major.minor.patch from utsname.release via sscanf() - * The reason lays in possible use of kernel flavours which - * modify utsname.release but no the Version_ symbol. - * - * Sun Feb 21 22:27:49 EST 1999: Keith Owens <kaos@ocs.com.au> - * Fixed bug that caused klogd to die if there is no sym_array available. - * - * Tue Sep 12 23:48:12 CEST 2000: Martin Schulze <joey@infodrom.ffis.de> - * Close symbol file in InitKsyms() when an error occurred. - */ - - -/* Includes. */ -#include "config.h" -#include <stdio.h> -#include <stdlib.h> -#include <sys/utsname.h> -#include <ctype.h> -#include <stdarg.h> -#include <string.h> -#include <syslog.h> -#include "imklog.h" -#include "ksyms.h" -#include "module.h" -#include "debug.h" - - -int num_syms = 0; -static int i_am_paranoid = 0; -static char vstring[12]; -static struct sym_table *sym_array = (struct sym_table *) 0; - -static char *system_maps[] = -{ - "/boot/System.map", - "/System.map", - NULL -}; - - -/* Function prototypes. */ -static char *FindSymbolFile(void); -static int AddSymbol(unsigned long, char*); -static void FreeSymbols(void); -static int CheckVersion(char *); -static int CheckMapVersion(char *); - - -/************************************************************************* - * Function: InitKsyms - * - * Purpose: This function is responsible for initializing and loading - * the data tables used by the kernel address translations. - * - * Arguements: (char *) mapfile - * - * mapfile:-> A pointer to a complete path - * specification of the file containing - * the kernel map to use. - * - * Return: int - * - * A boolean style context is returned. The return value will - * be true if initialization was successful. False if not. - **************************************************************************/ -extern int InitKsyms(char *mapfile) -{ - auto char type, - sym[512]; - - auto int version = 0; - - auto unsigned long int address; - - auto FILE *sym_file; - - BEGINfunc - - /* Check and make sure that we are starting with a clean slate. */ - if ( num_syms > 0 ) - FreeSymbols(); - - - /* Search for and open the file containing the kernel symbols. */ - if ( mapfile != NULL ) { - if ( (sym_file = fopen(mapfile, "r")) == NULL ) - { - imklogLogIntMsg(LOG_WARNING, "Cannot open map file: %s.", mapfile); - return(0); - } - } else { - if ( (mapfile = FindSymbolFile()) == NULL ) { - imklogLogIntMsg(LOG_WARNING, "Cannot find map file."); - dbgprintf("Cannot find map file.\n"); - return(0); - } - - if ( (sym_file = fopen(mapfile, "r")) == NULL ) { - imklogLogIntMsg(LOG_WARNING, "Cannot open map file."); - dbgprintf("Cannot open map file.\n"); - return(0); - } - } - - - /* Read the kernel symbol table file and add entries for each - * line. I suspect that the use of fscanf is not really in vogue - * but it was quick and dirty and IMHO suitable for fixed format - * data such as this. If anybody doesn't agree with this please - * e-mail me a diff containing a parser with suitable political - * correctness -- GW. - */ - while ( !feof(sym_file) ) { - if ( fscanf(sym_file, "%lx %c %s\n", &address, &type, sym) != 3 ) { - imklogLogIntMsg(LOG_ERR, "Error in symbol table input (#1)."); - fclose(sym_file); - return(0); - } - if(dbgPrintSymbols) - dbgprintf("Address: %lx, Type: %c, Symbol: %s\n", address, type, sym); - - if ( AddSymbol(address, sym) == 0 ) { - imklogLogIntMsg(LOG_ERR, "Error adding symbol - %s.", sym); - fclose(sym_file); - return(0); - } - - if ( version == 0 ) - version = CheckVersion(sym); - } - - - imklogLogIntMsg(LOG_INFO, "Loaded %d symbols from %s.", num_syms, mapfile); - switch(version) { - case -1: - imklogLogIntMsg(LOG_WARNING, "Symbols do not match kernel version."); - num_syms = 0; - break; - - case 0: - imklogLogIntMsg(LOG_WARNING, "Cannot verify that symbols match kernel version."); - break; - - case 1: - imklogLogIntMsg(LOG_INFO, "Symbols match kernel version %s.", vstring); - break; - } - - fclose(sym_file); - ENDfunc - return(1); -} - - -extern void DeinitKsyms(void) -{ - FreeSymbols(); -} - - -/************************************************************************** - * Function: FindSymbolFile - * - * Purpose: This function is responsible for encapsulating the search - * for a valid symbol file. Encapsulating the search for - * the map file in this function allows an intelligent search - * process to be implemented. - * - * The list of symbol files will be searched until either a - * symbol file is found whose version matches the currently - * executing kernel or the end of the list is encountered. If - * the end of the list is encountered the first available - * symbol file is returned to the caller. - * - * This strategy allows klogd to locate valid symbol files - * for both a production and an experimental kernel. For - * example a map for a production kernel could be installed - * in /boot. If an experimental kernel is loaded the map - * in /boot will be skipped and the map in /usr/src/linux would - * be used if its version number matches the executing kernel. - * - * Arguements: None specified. - * - * Return: char * - * - * If a valid system map cannot be located a null pointer - * is returned to the caller. - * - * If the search is succesful a pointer is returned to the - * caller which points to the name of the file containing - * the symbol table to be used. - **************************************************************************/ -static char *FindSymbolFile(void) -{ - auto char *file = NULL, - **mf = system_maps; - auto struct utsname utsname; - static char mysymfile[100]; - auto FILE *sym_file = NULL; - BEGINfunc - - if(uname(&utsname) < 0) { - imklogLogIntMsg(LOG_ERR, "Cannot get kernel version information."); - return(0); - } - - dbgprintf("Searching for symbol map.\n"); - - for(mf = system_maps; *mf != NULL && file == NULL; ++mf) { - snprintf(mysymfile, sizeof(mysymfile), "%s-%s", *mf, utsname.release); - dbgprintf("Trying %s.\n", mysymfile); - if((sym_file = fopen(mysymfile, "r")) != NULL) { - if(CheckMapVersion(mysymfile) == 1) - file = mysymfile; - fclose(sym_file); - } - if(sym_file == NULL || file == NULL) { - sprintf (mysymfile, "%s", *mf); - dbgprintf("Trying %s.\n", mysymfile); - if((sym_file = fopen(mysymfile, "r")) != NULL ) { - if (CheckMapVersion(mysymfile) == 1) - file = mysymfile; - fclose(sym_file); - } - } - } - - /* At this stage of the game we are at the end of the symbol tables. */ - dbgprintf("End of search list encountered.\n"); - ENDfunc - return(file); -} - - -/************************************************************************** - * Function: CheckVersion - * - * Purpose: This function is responsible for determining whether or - * the system map being loaded matches the version of the - * currently running kernel. - * - * The kernel version is checked by examing a variable which - * is of the form: _Version_66347 (a.out) or Version_66437 (ELF). - * - * The suffix of this variable is the current kernel version - * of the kernel encoded in base 256. For example the - * above variable would be decoded as: - * - * (66347 = 1*65536 + 3*256 + 43 = 1.3.43) - * - * (Insert appropriate deities here) help us if Linus ever - * needs more than 255 patch levels to get a kernel out the - * door... :-) - * - * Arguements: (char *) version - * - * version:-> A pointer to the string which - * is to be decoded as a kernel - * version variable. - * - * Return: int - * - * -1:-> The currently running kernel version does - * not match this version string. - * - * 0:-> The string is not a kernel version variable. - * - * 1:-> The executing kernel is of the same version - * as the version string. - **************************************************************************/ -static int CheckVersion(char *version) -{ - auto int vnum, - major, - minor, - patch; - int kvnum; - auto struct utsname utsname; - - static char *prefix = { "Version_" }; - - - /* Early return if there is no hope. */ - if ( strncmp(version, prefix, strlen(prefix)) == 0 /* ELF */ || - (*version == '_' && - strncmp(++version, prefix, strlen(prefix)) == 0 ) /* a.out */ ) - ; - else - return(0); - - - /* Since the symbol looks like a kernel version we can start - * things out by decoding the version string into its component - * parts. - */ - vnum = atoi(version + strlen(prefix)); - patch = vnum & 0x000000FF; - minor = (vnum >> 8) & 0x000000FF; - major = (vnum >> 16) & 0x000000FF; - dbgprintf("Version string = %s, Major = %d, Minor = %d, Patch = %d.\n", version + - strlen(prefix), major, minor, patch); - sprintf(vstring, "%d.%d.%d", major, minor, patch); - - /* We should now have the version string in the vstring variable in - * the same format that it is stored in by the kernel. We now - * ask the kernel for its version information and compare the two - * values to determine if our system map matches the kernel - * version level. - */ - if ( uname(&utsname) < 0 ) { - imklogLogIntMsg(LOG_ERR, "Cannot get kernel version information."); - return(0); - } - dbgprintf("Comparing kernel %s with symbol table %s.\n", utsname.release, vstring); - - if ( sscanf (utsname.release, "%d.%d.%d", &major, &minor, &patch) < 3 ) { - imklogLogIntMsg(LOG_ERR, "Kernel send bogus release string `%s'.", utsname.release); - return(0); - } - - /* Compute the version code from data sent by the kernel */ - kvnum = (major << 16) | (minor << 8) | patch; - - /* Failure. */ - if ( vnum != kvnum ) - return(-1); - - /* Success. */ - return(1); -} - - -/************************************************************************** - * Function: CheckMapVersion - * - * Purpose: This function is responsible for determining whether or - * the system map being loaded matches the version of the - * currently running kernel. It uses CheckVersion as - * backend. - * - * Arguements: (char *) fname - * - * fname:-> A pointer to the string which - * references the system map file to - * be used. - * - * Return: int - * - * -1:-> The currently running kernel version does - * not match the version in the given file. - * - * 0:-> No system map file or no version information. - * - * 1:-> The executing kernel is of the same version - * as the version of the map file. - **************************************************************************/ -static int CheckMapVersion(char *fname) -{ - int version; - FILE *sym_file; - auto unsigned long int address; - auto char type, - sym[512]; - - if ( (sym_file = fopen(fname, "r")) != NULL ) { - /* - * At this point a map file was successfully opened. We - * now need to search this file and look for version - * information. - */ - imklogLogIntMsg(LOG_INFO, "Inspecting %s", fname); - - version = 0; - while ( !feof(sym_file) && (version == 0) ) { - if ( fscanf(sym_file, "%lx %c %s\n", &address, &type, sym) != 3 ) { - imklogLogIntMsg(LOG_ERR, "Error in symbol table input (#2)."); - fclose(sym_file); - return(0); - } - if(dbgPrintSymbols) - dbgprintf("Address: %lx, Type: %c, Symbol: %s\n", address, type, sym); - version = CheckVersion(sym); - } - fclose(sym_file); - - switch ( version ) { - case -1: - imklogLogIntMsg(LOG_ERR, "Symbol table has incorrect version number.\n"); - break; - case 0: - dbgprintf("No version information found.\n"); - break; - case 1: - dbgprintf("Found table with matching version number.\n"); - break; - } - - return(version); - } - - return(0); -} - - -/************************************************************************** - * Function: AddSymbol - * - * Purpose: This function is responsible for adding a symbol name - * and its address to the symbol table. - * - * Arguements: (unsigned long) address, (char *) symbol - * - * Return: int - * - * A boolean value is assumed. True if the addition is - * successful. False if not. - **************************************************************************/ -static int AddSymbol(unsigned long address, char *symbol) -{ - /* Allocate the the symbol table entry. */ - sym_array = (struct sym_table *) realloc(sym_array, (num_syms+1) * - sizeof(struct sym_table)); - if ( sym_array == (struct sym_table *) 0 ) - return(0); - - /* Then the space for the symbol. */ - sym_array[num_syms].name = (char *) MALLOC(strlen(symbol)*sizeof(char) + 1); - if ( sym_array[num_syms].name == NULL ) - return(0); - - sym_array[num_syms].value = address; - strcpy(sym_array[num_syms].name, symbol); - ++num_syms; - return(1); -} - - -/************************************************************************** - * Function: LookupSymbol - * - * Purpose: Find the symbol which is related to the given kernel - * address. - * - * Arguements: (long int) value, (struct symbol *) sym - * - * value:-> The address to be located. - * - * sym:-> A pointer to a structure which will be - * loaded with the symbol's parameters. - * - * Return: (char *) - * - * If a match cannot be found a diagnostic string is printed. - * If a match is found the pointer to the symbolic name most - * closely matching the address is returned. - **************************************************************************/ -char * LookupSymbol(unsigned long value, struct symbol *sym) -{ - auto int lp; - - auto char *last; - auto char *name; - - struct symbol ksym, msym; - - if (!sym_array) - return(NULL); - - last = sym_array[0].name; - ksym.offset = 0; - ksym.size = 0; - if ( value < sym_array[0].value ) - return(NULL); - - for(lp = 0; lp <= num_syms; ++lp) { - if ( sym_array[lp].value > value ) { - ksym.offset = value - sym_array[lp-1].value; - ksym.size = sym_array[lp].value - \ - sym_array[lp-1].value; - break; - } - last = sym_array[lp].name; - } - - name = LookupModuleSymbol(value, &msym); - - if ( ksym.offset == 0 && msym.offset == 0 ) { - return(NULL); - } - - if ( ksym.offset == 0 || msym.offset < 0 || - (ksym.offset > 0 && ksym.offset < msym.offset) ) { - sym->offset = ksym.offset; - sym->size = ksym.size; - return(last); - } else { - sym->offset = msym.offset; - sym->size = msym.size; - return(name); - } - - - return(NULL); -} - -/************************************************************************** - * Function: FreeSymbols - * - * Purpose: This function is responsible for freeing all memory which - * has been allocated to hold the static symbol table. It - * also initializes the symbol count and in general prepares - * for a re-read of a static symbol table. - * - * Arguements: void - * - * Return: void - **************************************************************************/ -static void FreeSymbols(void) -{ - auto int lp; - - /* Free each piece of memory allocated for symbol names. */ - for(lp= 0; lp < num_syms; ++lp) - free(sym_array[lp].name); - - /* Whack the entire array and initialize everything. */ - free(sym_array); - sym_array = (struct sym_table *) 0; - num_syms = 0; - - return; -} - - -/************************************************************************** - * Function: LogExpanded - * - * Purpose: This function is responsible for logging a kernel message - * line after all potential numeric kernel addresses have - * been resolved symolically. - * - * Arguements: (char *) line, (char *) el - * - * line:-> A pointer to the buffer containing the kernel - * message to be expanded and logged. - * - * el:-> A pointer to the buffer into which the expanded - * kernel line will be written. - * - * Return: void - **************************************************************************/ -extern char *ExpandKadds(char *line, char *el) -{ - auto char *kp, - *sl = line, - *elp = el, - *symbol; - char num[15]; - auto unsigned long int value; - auto struct symbol sym; - - sym.offset = 0; - sym.size = 0; - - /* - * This is as handy a place to put this as anyplace. - * - * Since the insertion of kernel modules can occur in a somewhat - * dynamic fashion we need some mechanism to insure that the - * kernel symbol tables get read just prior to when they are - * needed. - * - * To accomplish this we look for the Oops string and use its - * presence as a signal to load the module symbols. - * - * This is not the best solution of course, especially if the - * kernel is rapidly going out to lunch. What really needs to - * be done is to somehow generate a callback from the - * kernel whenever a module is loaded or unloaded. I am - * open for patches. - */ - if ( i_am_paranoid && - (strstr(line, "Oops:") != NULL) && !InitMsyms() ) - imklogLogIntMsg(LOG_WARNING, "Cannot load kernel module symbols.\n"); - - - /* - * Early return if there do not appear to be any kernel - * messages in this line. - */ - if ( (num_syms == 0) || - (kp = strstr(line, "[<")) == NULL ) { -#ifdef __sparc__ - if (num_syms) { - /* On SPARC, register dumps do not have the [< >] characters in it. - */ - static struct sparc_tests { - char *str; - int len; - } tests[] = { { "PC: ", 4 }, - { " o7: ", 5 }, - { " ret_pc: ", 9 }, - { " i7: ", 5 }, - { "Caller[", 7 } - }; - int i, j, ndigits; - char *kp2; - for (i = 0; i < 5; i++) { - kp = strstr(line, tests[i].str); - if (!kp) continue; - kp2 = kp + tests[i].len; - if (!isxdigit(*kp2)) continue; - for (ndigits = 1; isxdigit(kp2[ndigits]); ndigits++); - if (ndigits != 8 && ndigits != 16) continue; - /* On sparc64, all kernel addresses are in first 4GB */ - if (ndigits == 16) { - if (strncmp (kp2, "00000000", 8)) continue; - kp2 += 8; - } - if (!i) { - char *kp3; - if (ndigits == 16 && kp > line && kp[-1L] != 'T') continue; - kp3 = kp2 + 8; - if (ndigits == 16) { - if (strncmp (kp3, " TNPC: 00000000", 15) || !isxdigit(kp3[15])) - continue; - kp3 += 15; - } else { - if (strncmp (kp3, " NPC: ", 6) || !isxdigit(kp3[6])) - continue; - kp3 += 6; - } - for (j = 0; isxdigit(kp3[j]); j++); - if (j != 8) continue; - strncpy(elp, line, kp2 + 8 - line); - elp += kp2 + 8 - line; - value = strtol(kp2, (char **) 0, 16); - if ( (symbol = LookupSymbol(value, &sym)) ) { - if (sym.size) - elp += sprintf(elp, " (%s+%d/%d)", symbol, sym.offset, sym.size); - else - elp += sprintf(elp, " (%s)", symbol); - } - strncpy(elp, kp2 + 8, kp3 - kp2); - elp += kp3 - kp2; - value = strtol(kp3, (char **) 0, 16); - if ( (symbol = LookupSymbol(value, &sym)) ) { - if (sym.size) - elp += sprintf(elp, " (%s+%d/%d)", symbol, sym.offset, sym.size); - else - elp += sprintf(elp, " (%s)", symbol); - } - strcpy(elp, kp3 + 8); - } else { - strncpy(elp, line, kp2 + 8 - line); - elp += kp2 + 8 - line; - value = strtol(kp2, (char **) 0, 16); - if ( (symbol = LookupSymbol(value, &sym)) ) { - if (sym.size) - elp += sprintf(elp, " (%s+%d/%d)", symbol, sym.offset, sym.size); - else - elp += sprintf(elp, " (%s)", symbol); - } - strcpy(elp, kp2 + 8); - } - return el; - } - } -#endif - strcpy(el, line); - return(el); - } - - /* Loop through and expand all kernel messages. */ - do { - while ( sl < kp+1 ) - *elp++ = *sl++; - - /* Now poised at a kernel delimiter. */ - if ( (kp = strstr(sl, ">]")) == NULL ) { - strcpy(el, sl); - return(el); - } - strncpy(num,sl+1,kp-sl-1); - num[kp-sl-1] = '\0'; - value = strtoul(num, (char **) 0, 16); - if ( (symbol = LookupSymbol(value, &sym)) == NULL ) - symbol = sl; - - strcat(elp, symbol); - elp += strlen(symbol); - dbgprintf("Symbol: %s = %lx = %s, %x/%d\n", sl+1, value, - (sym.size==0) ? symbol+1 : symbol, sym.offset, sym.size); - - value = 2; - if ( sym.size != 0 ) { - --value; - ++kp; - elp += sprintf(elp, "+0x%x/0x%02x", sym.offset, sym.size); - } - strncat(elp, kp, value); - elp += value; - sl = kp + value; - if ( (kp = strstr(sl, "[<")) == NULL ) - strcat(elp, sl); - } - while ( kp != NULL); - - dbgprintf("Expanded line: %s\n", el); - return(el); -} - - -/************************************************************************** - * Function: SetParanoiaLevel - * - * Purpose: This function is an interface function for setting the - * mode of loadable module symbol lookups. Probably overkill - * but it does slay another global variable. - * - * Arguements: (int) level - * - * level:-> The amount of paranoia which is to be - * present when resolving kernel exceptions. - * Return: void - **************************************************************************/ -extern void SetParanoiaLevel(int level) -{ - i_am_paranoid = level; - return; -} - diff --git a/plugins/imklog/ksym_mod.c b/plugins/imklog/ksym_mod.c deleted file mode 100644 index 82978892..00000000 --- a/plugins/imklog/ksym_mod.c +++ /dev/null @@ -1,485 +0,0 @@ -/* ksym_mod.c - functions for building symbol lookup tables for klogd - * Copyright (c) 1995, 1996 Dr. G.W. Wettstein <greg@wind.rmcc.com> - * Copyright (c) 1996 Enjellic Systems Development - * Copyright (c) 1998-2007 Martin Schulze <joey@infodrom.org> - * Copyright (C) 2007-2009 Rainer Gerhards <rgerhards@adiscon.com> - * - * This file is part of rsyslog. - * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. -*/ - -/* - * This file implements functions which are useful for building - * a symbol lookup table based on the in kernel symbol table - * maintained by the Linux kernel. - * - * Proper logging of kernel panics generated by loadable modules - * tends to be difficult. Since the modules are loaded dynamically - * their addresses are not known at kernel load time. A general - * protection fault (Oops) cannot be properly deciphered with - * classic methods using the static symbol map produced at link time. - * - * One solution to this problem is to have klogd attempt to translate - * addresses from module when the fault occurs. By referencing the - * the kernel symbol table proper resolution of these symbols is made - * possible. - * - * At least that is the plan. - * - * Wed Aug 21 09:20:09 CDT 1996: Dr. Wettstein - * The situation where no module support has been compiled into a - * kernel is now detected. An informative message is output indicating - * that the kernel has no loadable module support whenever kernel - * module symbols are loaded. - * - * An informative message is printed indicating the number of kernel - * modules and the number of symbols loaded from these modules. - * - * Sun Jun 15 16:23:29 MET DST 1997: Michael Alan Dorman - * Some more glibc patches made by <mdorman@debian.org>. - * - * Sat Jan 10 15:00:18 CET 1998: Martin Schulze <joey@infodrom.north.de> - * Fixed problem with klogd not being able to be built on a kernel - * newer than 2.1.18. It was caused by modified structures - * inside the kernel that were included. I have worked in a - * patch from Alessandro Suardi <asuardi@uninetcom.it>. - * - * Sun Jan 25 20:57:34 CET 1998: Martin Schulze <joey@infodrom.north.de> - * Another patch for Linux/alpha by Christopher C Chimelis - * <chris@classnet.med.miami.edu>. - * - * Thu Mar 19 23:39:29 CET 1998: Manuel Rodrigues <pmanuel@cindy.fe.up.pt> - * Changed lseek() to llseek() in order to support > 2GB address - * space which provided by kernels > 2.1.70. - * - * Mon Apr 13 18:18:45 CEST 1998: Martin Schulze <joey@infodrom.north.de> - * Removed <sys/module.h> as it's no longer part of recent glibc - * versions. Added prototyp for llseek() which has been - * forgotton in <unistd.h> from glibc. Added more log - * information if problems occurred while reading a system map - * file, by submission from Mark Simon Phillips <M.S.Phillips@nortel.co.uk>. - * - * Sun Jan 3 18:38:03 CET 1999: Martin Schulze <joey@infodrom.north.de> - * Corrected return value of AddModule if /dev/kmem can't be - * loaded. This will prevent klogd from segfaulting if /dev/kmem - * is not available. Patch from Topi Miettinen <tom@medialab.sonera.net>. - * - * Tue Sep 12 23:11:13 CEST 2000: Martin Schulze <joey@infodrom.ffis.de> - * Changed llseek() to lseek64() in order to skip a libc warning. - */ - -/* Includes. */ -#include "config.h" -#include <stdio.h> -#include <stdlib.h> -#include <ctype.h> -#include <unistd.h> -#include <signal.h> -#include <string.h> -#include <errno.h> -#include <sys/fcntl.h> -#include <sys/stat.h> -#if !defined(__GLIBC__) -#include <linux/time.h> -#include <linux/module.h> -#else /* __GLIBC__ */ -#include "module.h" -#endif /* __GLIBC__ */ -#include <stdarg.h> -#include <paths.h> -#include <linux/version.h> - -#include "rsyslog.h" -#include "imklog.h" -#include "ksyms.h" -#include "debug.h" - -#define KSYMS "/proc/kallsyms" - -static int num_modules = 0; -struct Module *sym_array_modules = (struct Module *) NULL; - -static int have_modules = 0; - - -/* Function prototypes. */ -static void FreeModules(void); -static int AddSymbol(const char *); -struct Module *AddModule(const char *); -static int symsort(const void *, const void *); - -/* Imported from ksym.c */ -extern int num_syms; - - -/************************************************************************** - * Function: InitMsyms - * - * Purpose: This function is responsible for building a symbol - * table which can be used to resolve addresses for - * loadable modules. - * - * Arguements: Void - * - * Return: A boolean return value is assumed. - * - * A false value indicates that something went wrong. - * - * True if loading is successful. - **************************************************************************/ -extern int InitMsyms(void) -{ - - auto int rtn, - tmp; - FILE *ksyms; - char buf[128]; - char *p; - - /* Initialize the kernel module symbol table. */ - FreeModules(); - - ksyms = fopen(KSYMS, "r"); - - if ( ksyms == NULL ) { - if ( errno == ENOENT ) - imklogLogIntMsg(LOG_INFO, "No module symbols loaded - " - "kernel modules not enabled.\n"); - else - imklogLogIntMsg(LOG_ERR, "Error loading kernel symbols " \ - "- %s\n", strerror(errno)); - return(0); - } - - dbgprintf("Loading kernel module symbols - Source: %s\n", KSYMS); - - while ( fgets(buf, sizeof(buf), ksyms) != NULL ) { - if (num_syms > 0 && index(buf, '[') == NULL) - continue; - - p = index(buf, ' '); - - if ( p == NULL ) - continue; - - if ( buf[strlen(buf)-1] == '\n' ) - buf[strlen(buf)-1] = '\0'; - /* overlong lines will be ignored above */ - - AddSymbol(buf); - } - - if(ksyms != NULL) - fclose(ksyms); - - have_modules = 1; - - /* Sort the symbol tables in each module. */ - for (rtn = tmp = 0; tmp < num_modules; ++tmp) { - rtn += sym_array_modules[tmp].num_syms; - if ( sym_array_modules[tmp].num_syms < 2 ) - continue; - qsort(sym_array_modules[tmp].sym_array, \ - sym_array_modules[tmp].num_syms, \ - sizeof(struct sym_table), symsort); - } - - if ( rtn == 0 ) - imklogLogIntMsg(LOG_INFO, "No module symbols loaded."); - else - imklogLogIntMsg(LOG_INFO, "Loaded %d %s from %d module%s", rtn, \ - (rtn == 1) ? "symbol" : "symbols", \ - num_modules, (num_modules == 1) ? "." : "s."); - - return(1); -} - - -static int symsort(const void *p1, const void *p2) -{ - auto const struct sym_table *sym1 = p1, - *sym2 = p2; - - if ( sym1->value < sym2->value ) - return(-1); - if ( sym1->value == sym2->value ) - return(0); - return(1); -} - - -extern void DeinitMsyms(void) -{ - FreeModules(); -} - - -/************************************************************************** - * Function: FreeModules - * - * Purpose: This function is used to free all memory which has been - * allocated for the modules and their symbols. - * - * Arguements: None specified. - * - * Return: void - **************************************************************************/ -static void FreeModules() -{ - auto int nmods, - nsyms; - auto struct Module *mp; - - /* Check to see if the module symbol tables need to be cleared. */ - have_modules = 0; - if ( num_modules == 0 ) - return; - - if ( sym_array_modules == NULL ) - return; - - for (nmods = 0; nmods < num_modules; ++nmods) { - mp = &sym_array_modules[nmods]; - if ( mp->num_syms == 0 ) - continue; - - for (nsyms= 0; nsyms < mp->num_syms; ++nsyms) - free(mp->sym_array[nsyms].name); - free(mp->sym_array); - if ( mp->name != NULL ) - free(mp->name); - } - - free(sym_array_modules); - sym_array_modules = (struct Module *) NULL; - num_modules = 0; - return; -} - - -/************************************************************************** - * Function: AddModule - * - * Purpose: This function is responsible for adding a module to - * the list of currently loaded modules. - * - * Arguments: (const char *) module - * - * module:-> The name of the module. - * - * Return: struct Module * - **************************************************************************/ - -struct Module *AddModule(module) - const char *module; -{ - struct Module *mp; - - if ( num_modules == 0 ) { - sym_array_modules = (struct Module *)MALLOC(sizeof(struct Module)); - - if ( sym_array_modules == NULL ) - { - imklogLogIntMsg(LOG_WARNING, "Cannot allocate Module array.\n"); - return NULL; - } - mp = sym_array_modules; - } else { - /* Allocate space for the module. */ - mp = (struct Module *) \ - realloc(sym_array_modules, \ - (num_modules+1) * sizeof(struct Module)); - - if ( mp == NULL ) - { - imklogLogIntMsg(LOG_WARNING, "Cannot allocate Module array.\n"); - return NULL; - } - - sym_array_modules = mp; - mp = &sym_array_modules[num_modules]; - } - - num_modules++; - mp->sym_array = NULL; - mp->num_syms = 0; - - if ( module != NULL ) - mp->name = strdup(module); - else - mp->name = NULL; - - return mp; -} - - -/************************************************************************** - * Function: AddSymbol - * - * Purpose: This function is responsible for adding a symbol name - * and its address to the symbol table. - * - * Arguements: const char * - * - * Return: int - * - * A boolean value is assumed. True if the addition is - * successful. False if not. - **************************************************************************/ -static int AddSymbol(line) - const char *line; -{ - char *module; - unsigned long address; - char *p; - static char *lastmodule = NULL; - struct Module *mp; - - module = index(line, '['); - - if ( module != NULL ) { - p = index(module, ']'); - if ( p != NULL ) - *p = '\0'; - p = module++; - while ( isspace(*(--p)) ) - /*SKIP*/; - *(++p) = '\0'; - } - - p = index(line, ' '); - - if ( p == NULL ) - return(0); - - *p = '\0'; - - address = strtoul(line, (char **) 0, 16); - - p += 3; - - if ( num_modules == 0 || - ( lastmodule == NULL && module != NULL ) || - ( module == NULL && lastmodule != NULL) || - ( module != NULL && strcmp(module, lastmodule))) { - mp = AddModule(module); - - if ( mp == NULL ) - return(0); - } else - mp = &sym_array_modules[num_modules-1]; - - lastmodule = mp->name; - - /* Allocate space for the symbol table entry. */ - mp->sym_array = (struct sym_table *) realloc(mp->sym_array, \ - (mp->num_syms+1) * sizeof(struct sym_table)); - - if ( mp->sym_array == (struct sym_table *) NULL ) - return(0); - - mp->sym_array[mp->num_syms].name = strdup(p); - if ( mp->sym_array[mp->num_syms].name == (char *) NULL ) - return(0); - - /* Stuff interesting information into the module. */ - mp->sym_array[mp->num_syms].value = address; - ++mp->num_syms; - - return(1); -} - - - -/************************************************************************** - * Function: LookupModuleSymbol - * - * Purpose: Find the symbol which is related to the given address from - * a kernel module. - * - * Arguements: (long int) value, (struct symbol *) sym - * - * value:-> The address to be located. - * - * sym:-> A pointer to a structure which will be - * loaded with the symbol's parameters. - * - * Return: (char *) - * - * If a match cannot be found a diagnostic string is printed. - * If a match is found the pointer to the symbolic name most - * closely matching the address is returned. - * - * TODO: We are using int values for the offset, but longs for the value - * values. This may create some trouble in the future (on 64 Bit OS?). - * Anyhow, I have not changed this, because we do not seem to have any - * issue and my understanding of this code is limited (and I don't see - * need to invest more time to dig much deeper). - * rgerhards, 2009-04-17 - **************************************************************************/ -extern char * LookupModuleSymbol(value, sym) - unsigned long value; - struct symbol *sym; -{ - int nmod, nsym; - struct sym_table *last; - struct Module *mp; - static char ret[100]; - - sym->size = 0; - sym->offset = 0; - if ( num_modules == 0 ) - return((char *) 0); - - for (nmod = 0; nmod < num_modules; ++nmod) { - mp = &sym_array_modules[nmod]; - - /* Run through the list of symbols in this module and - * see if the address can be resolved. - */ - for(nsym = 1, last = &mp->sym_array[0]; - nsym < mp->num_syms; - ++nsym) { - if ( mp->sym_array[nsym].value > value ) - { - if ( sym->size == 0 || - (int) (value - last->value) < sym->offset || - ( (sym->offset == (int) (value - last->value)) && - (int) (mp->sym_array[nsym].value-last->value) < sym->size ) ) - { - sym->offset = value - last->value; - sym->size = mp->sym_array[nsym].value - last->value; - ret[sizeof(ret)-1] = '\0'; - if ( mp->name == NULL ) - snprintf(ret, sizeof(ret)-1, - "%s", last->name); - else - snprintf(ret, sizeof(ret)-1, - "%s:%s", mp->name, last->name); - } - break; - } - last = &mp->sym_array[nsym]; - } - } - - if ( sym->size > 0 ) - return(ret); - - /* It has been a hopeless exercise. */ - return(NULL); -} diff --git a/plugins/imklog/ksyms.h b/plugins/imklog/ksyms.h deleted file mode 100644 index a168947b..00000000 --- a/plugins/imklog/ksyms.h +++ /dev/null @@ -1,37 +0,0 @@ -/* ksym.h - Definitions for symbol table utilities. - * Copyright (c) 1995, 1996 Dr. G.W. Wettstein <greg@wind.rmcc.com> - * Copyright (c) 1996 Enjellic Systems Development - * Copyright (c) 2004-7 Martin Schulze <joey@infodrom.org> - * Copyright (c) 2007-2009 Rainer Gerhards <rgerhards@adiscon.com> - * - * This file is part of rsyslog. - * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. - */ - -/* Variables, structures and type definitions static to this module. */ - -struct symbol -{ - uchar *name; - int size; - int offset; -}; - - -/* Function prototypes. */ -extern char * LookupSymbol(unsigned long, struct symbol *); -extern char * LookupModuleSymbol(unsigned long int, struct symbol *); diff --git a/plugins/imklog/linux.c b/plugins/imklog/linux.c deleted file mode 100644 index 727708a5..00000000 --- a/plugins/imklog/linux.c +++ /dev/null @@ -1,542 +0,0 @@ -/* klog for linux, based on the FreeBSD syslogd implementation. - * - * This contains OS-specific functionality to read the BSD - * kernel log. For a general overview, see head comment in - * imklog.c. - * - * This file heavily borrows from the klogd daemon provided by - * the sysklogd project. Many thanks for this piece of software. - * - * This file is part of rsyslog. - * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. -*/ -#include "config.h" -#include "rsyslog.h" -#include <stdlib.h> -#include <stdio.h> -#include <assert.h> -#include <signal.h> -#include <string.h> -#include <pthread.h> -#include "cfsysline.h" -#include "template.h" -#include "msg.h" -#include "module-template.h" -#include "imklog.h" -#include "unicode-helper.h" - - -/* Includes. */ -#include <unistd.h> -#include <errno.h> -#include <sys/fcntl.h> -#include <sys/stat.h> - -#if HAVE_TIME_H -# include <time.h> -#endif - -#include <stdarg.h> -#include <paths.h> -#include "ksyms.h" - -#define __LIBRARY__ -#include <unistd.h> - - -#if !defined(__GLIBC__) -# define __NR_ksyslog __NR_syslog -_syscall3(int,ksyslog,int, type, char *, buf, int, len); -#else -#include <sys/klog.h> -#define ksyslog klogctl -#endif - - - -#ifndef _PATH_KLOG -#define _PATH_KLOG "/proc/kmsg" -#endif - -#define LOG_BUFFER_SIZE 4096 -#define LOG_LINE_LENGTH 1000 - -static int kmsg; -static char log_buffer[LOG_BUFFER_SIZE]; - -static enum LOGSRC {none, proc, kernel} logsrc; - - -/* Function prototypes. */ -extern int ksyslog(int type, char *buf, int len); - - -static uchar *GetPath(void) -{ - return pszPath ? pszPath : UCHAR_CONSTANT(_PATH_KLOG); -} - -static void CloseLogSrc(void) -{ - /* Turn on logging of messages to console, but only if a log level was speficied */ - if(console_log_level != -1) - ksyslog(7, NULL, 0); - - /* Shutdown the log sources. */ - switch(logsrc) { - case kernel: - ksyslog(0, NULL, 0); - imklogLogIntMsg(LOG_INFO, "Kernel logging (ksyslog) stopped."); - break; - case proc: - close(kmsg); - imklogLogIntMsg(LOG_INFO, "Kernel logging (proc) stopped."); - break; - case none: - break; - } - - return; -} - - -static enum LOGSRC GetKernelLogSrc(void) -{ - auto struct stat sb; - - /* Set level of kernel console messaging.. */ - if ( (console_log_level != -1) && - (ksyslog(8, NULL, console_log_level) < 0) && - (errno == EINVAL) ) - { - /* - * An invalid arguement error probably indicates that - * a pre-0.14 kernel is being run. At this point we - * issue an error message and simply shut-off console - * logging completely. - */ - imklogLogIntMsg(LOG_WARNING, "Cannot set console log level - disabling " - "console output."); - } - - /* - * First do a stat to determine whether or not the proc based - * file system is available to get kernel messages from. - */ - if ( use_syscall || - ((stat((char*)GetPath(), &sb) < 0) && (errno == ENOENT)) ) - { - /* Initialize kernel logging. */ - ksyslog(1, NULL, 0); - imklogLogIntMsg(LOG_INFO, "imklog %s, log source = ksyslog " - "started.", VERSION); - return(kernel); - } - - if ( (kmsg = open((char*)GetPath(), O_RDONLY|O_CLOEXEC)) < 0 ) - { - imklogLogIntMsg(LOG_ERR, "imklog: Cannot open proc file system, %d.\n", errno); - ksyslog(7, NULL, 0); - return(none); - } - - imklogLogIntMsg(LOG_INFO, "imklog %s, log source = %s started.", VERSION, GetPath()); - return(proc); -} - - -/* Copy characters from ptr to line until a char in the delim - * string is encountered or until min( space, len ) chars have - * been copied. - * - * Returns the actual number of chars copied. - */ -static int copyin( uchar *line, int space, - const char *ptr, int len, - const char *delim ) -{ - auto int i; - auto int count; - - count = len < space ? len : space; - - for(i=0; i<count && !strchr(delim, *ptr); i++ ) { - *line++ = *ptr++; - } - - return(i); -} - -/* - * Messages are separated by "\n". Messages longer than - * LOG_LINE_LENGTH are broken up. - * - * Kernel symbols show up in the input buffer as : "[<aaaaaa>]", - * where "aaaaaa" is the address. These are replaced with - * "[symbolname+offset/size]" in the output line - symbolname, - * offset, and size come from the kernel symbol table. - * - * If a kernel symbol happens to fall at the end of a message close - * in length to LOG_LINE_LENGTH, the symbol will not be expanded. - * (This should never happen, since the kernel should never generate - * messages that long. - * - * To preserve the original addresses, lines containing kernel symbols - * are output twice. Once with the symbols converted and again with the - * original text. Just in case somebody wants to run their own Oops - * analysis on the syslog, e.g. ksymoops. - */ -static void LogLine(char *ptr, int len) -{ - enum parse_state_enum { - PARSING_TEXT, - PARSING_SYMSTART, /* at < */ - PARSING_SYMBOL, - PARSING_SYMEND /* at ] */ - }; - - static uchar line_buff[LOG_LINE_LENGTH]; - - static uchar *line =line_buff; - static enum parse_state_enum parse_state = PARSING_TEXT; - static int space = sizeof(line_buff)-1; - - static uchar *sym_start; /* points at the '<' of a symbol */ - - auto int delta = 0; /* number of chars copied */ - auto int symbols_expanded = 0; /* 1 if symbols were expanded */ - auto int skip_symbol_lookup = 0; /* skip symbol lookup on this pass */ - auto char *save_ptr = ptr; /* save start of input line */ - auto int save_len = len; /* save length at start of input line */ - - while( len > 0 ) - { - if( space == 0 ) /* line buffer is full */ - { - /* - ** Line too long. Start a new line. - */ - *line = 0; /* force null terminator */ - - //dbgprintf("Line buffer full:\n"); - //dbgprintf("\tLine: %s\n", line); - - Syslog(LOG_INFO, line_buff); - line = line_buff; - space = sizeof(line_buff)-1; - parse_state = PARSING_TEXT; - symbols_expanded = 0; - skip_symbol_lookup = 0; - save_ptr = ptr; - save_len = len; - } - - switch( parse_state ) - { - case PARSING_TEXT: - delta = copyin(line, space, ptr, len, "\n[" ); - line += delta; - ptr += delta; - space -= delta; - len -= delta; - - if( space == 0 || len == 0 ) - { - break; /* full line_buff or end of input buffer */ - } - - if( *ptr == '\0' ) /* zero byte */ - { - ptr++; /* skip zero byte */ - space -= 1; - len -= 1; - - break; - } - - if( *ptr == '\n' ) /* newline */ - { - ptr++; /* skip newline */ - space -= 1; - len -= 1; - - *line = 0; /* force null terminator */ - Syslog(LOG_INFO, line_buff); - line = line_buff; - space = sizeof(line_buff)-1; - if (symbols_twice) { - if (symbols_expanded) { - /* reprint this line without symbol lookup */ - symbols_expanded = 0; - skip_symbol_lookup = 1; - ptr = save_ptr; - len = save_len; - } - else - { - skip_symbol_lookup = 0; - save_ptr = ptr; - save_len = len; - } - } - break; - } - if( *ptr == '[' ) /* possible kernel symbol */ - { - *line++ = *ptr++; - space -= 1; - len -= 1; - if (!skip_symbol_lookup) - parse_state = PARSING_SYMSTART; /* at < */ - break; - } - /* Now that line_buff is no longer fed to *printf as format - * string, '%'s are no longer "dangerous". - */ - break; - - case PARSING_SYMSTART: - if( *ptr != '<' ) - { - parse_state = PARSING_TEXT; /* not a symbol */ - break; - } - - /* - ** Save this character for now. If this turns out to - ** be a valid symbol, this char will be replaced later. - ** If not, we'll just leave it there. - */ - - sym_start = line; /* this will point at the '<' */ - - *line++ = *ptr++; - space -= 1; - len -= 1; - parse_state = PARSING_SYMBOL; /* symbol... */ - break; - - case PARSING_SYMBOL: - delta = copyin( line, space, ptr, len, ">\n[" ); - line += delta; - ptr += delta; - space -= delta; - len -= delta; - if( space == 0 || len == 0 ) - { - break; /* full line_buff or end of input buffer */ - } - if( *ptr != '>' ) - { - parse_state = PARSING_TEXT; - break; - } - - *line++ = *ptr++; /* copy the '>' */ - space -= 1; - len -= 1; - - parse_state = PARSING_SYMEND; - - break; - - case PARSING_SYMEND: - if( *ptr != ']' ) - { - parse_state = PARSING_TEXT; /* not a symbol */ - break; - } - - /* - ** It's really a symbol! Replace address with the - ** symbol text. - */ - { - auto int sym_space; - - unsigned long value; - auto struct symbol sym; - auto char *symbol; - - *(line-1) = 0; /* null terminate the address string */ - value = strtoul((char*)(sym_start+1), (char **) 0, 16); - *(line-1) = '>'; /* put back delim */ - - if ( !symbol_lookup || (symbol = LookupSymbol(value, &sym)) == (char *)0 ) - { - parse_state = PARSING_TEXT; - break; - } - - /* - ** verify there is room in the line buffer - */ - sym_space = space + ( line - sym_start ); - if( (unsigned) sym_space < strlen(symbol) + 30 ) /*(30 should be overkill)*/ - { - parse_state = PARSING_TEXT; /* not enough space */ - break; - } - - // TODO: sprintf!!!! - delta = sprintf( (char*) sym_start, "%s+%d/%d]", - symbol, sym.offset, sym.size ); - - space = sym_space + delta; - line = sym_start + delta; - symbols_expanded = 1; - } - ptr++; - len--; - parse_state = PARSING_TEXT; - break; - - default: /* Can't get here! */ - parse_state = PARSING_TEXT; - - } - } - - return; -} - - -static void LogKernelLine(void) -{ - auto int rdcnt; - - /* - * Zero-fill the log buffer. This should cure a multitude of - * problems with klogd logging the tail end of the message buffer - * which will contain old messages. Then read the kernel log - * messages into this fresh buffer. - */ - memset(log_buffer, '\0', sizeof(log_buffer)); - if ( (rdcnt = ksyslog(2, log_buffer, sizeof(log_buffer)-1)) < 0 ) - { - if(errno == EINTR) - return; - imklogLogIntMsg(LOG_ERR, "imklog Error return from sys_sycall: %d\n", errno); - } - else - LogLine(log_buffer, rdcnt); - return; -} - - -static void LogProcLine(void) -{ - auto int rdcnt; - - /* - * Zero-fill the log buffer. This should cure a multitude of - * problems with klogd logging the tail end of the message buffer - * which will contain old messages. Then read the kernel messages - * from the message pseudo-file into this fresh buffer. - */ - memset(log_buffer, '\0', sizeof(log_buffer)); - if ( (rdcnt = read(kmsg, log_buffer, sizeof(log_buffer)-1)) < 0 ) { - if ( errno == EINTR ) - return; - imklogLogIntMsg(LOG_ERR, "Cannot read proc file system: %d - %s.", errno, strerror(errno)); - } else { - LogLine(log_buffer, rdcnt); - } - - return; -} - - -/* to be called in the module's WillRun entry point - * rgerhards, 2008-04-09 - */ -rsRetVal klogLogKMsg(void) -{ - DEFiRet; - switch(logsrc) { - case kernel: - LogKernelLine(); - break; - case proc: - LogProcLine(); - break; - case none: - /* TODO: We need to handle this case here somewhat more intelligent - * This is now at least partly done - code should never reach this point - * as willRun() already checked for the "none" status -- rgerhards, 2007-12-17 - */ - pause(); - break; - } - RETiRet; -} - - -/* to be called in the module's WillRun entry point - * rgerhards, 2008-04-09 - */ -rsRetVal klogWillRun(void) -{ - DEFiRet; - /* Initialize this module. If that fails, we tell the engine we don't like to run */ - /* Determine where kernel logging information is to come from. */ - logsrc = GetKernelLogSrc(); - if(logsrc == none) { - iRet = RS_RET_NO_KERNEL_LOGSRC; - } else { - if (symbol_lookup) { - symbol_lookup = (InitKsyms(symfile) == 1); - symbol_lookup |= InitMsyms(); - if (symbol_lookup == 0) { - imklogLogIntMsg(LOG_WARNING, "cannot find any symbols, turning off symbol lookups"); - } - } - } - - RETiRet; -} - - -/* to be called in the module's AfterRun entry point - * rgerhards, 2008-04-09 - */ -rsRetVal klogAfterRun(void) -{ - DEFiRet; - /* cleanup here */ - if(logsrc != none) - CloseLogSrc(); - - DeinitKsyms(); - DeinitMsyms(); - - RETiRet; -} - - -/* provide the (system-specific) default facility for internal messages - * rgerhards, 2008-04-14 - */ -int -klogFacilIntMsg(void) -{ - return LOG_KERN; -} - - -/* vi:set ai: - */ diff --git a/plugins/imklog/module.h b/plugins/imklog/module.h deleted file mode 100644 index 38a26fea..00000000 --- a/plugins/imklog/module.h +++ /dev/null @@ -1,35 +0,0 @@ -/* module.h - Miscellaneous module definitions - * Copyright (c) 1996 Richard Henderson <rth@tamu.edu> - * Copyright (c) 2004-7 Martin Schulze <joey@infodrom.org> - * Copyright (c) 2007-2008 Rainer Gerhards <rgerhards@adiscon.com> - * - * This file is part of rsyslog. - * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. - */ -struct sym_table -{ - unsigned long value; - char *name; -}; - -struct Module -{ - struct sym_table *sym_array; - int num_syms; - - char *name; -}; diff --git a/plugins/imklog/solaris.c b/plugins/imklog/solaris.c index 8a6d5af1..0a169cdd 100644 --- a/plugins/imklog/solaris.c +++ b/plugins/imklog/solaris.c @@ -80,74 +80,6 @@ klogWillRun(void) } -#if 0 -/* Read /dev/klog while data are available, split into lines. - * Contrary to standard BSD syslogd, we do a blocking read. We can - * afford this as imklog is running on its own threads. So if we have - * a single file, it really doesn't matter if we wait inside a 1-file - * select or the read() directly. - */ -static void -readklog(void) -{ - char *p, *q; - int len, i; - int iMaxLine; - uchar bufRcv[4096+1]; - uchar *pRcv = NULL; /* receive buffer */ - - iMaxLine = klog_getMaxLine(); - - /* we optimize performance: if iMaxLine is below 4K (which it is in almost all - * cases, we use a fixed buffer on the stack. Only if it is higher, heap memory - * is used. We could use alloca() to achive a similar aspect, but there are so - * many issues with alloca() that I do not want to take that route. - * rgerhards, 2008-09-02 - */ - if((size_t) iMaxLine < sizeof(bufRcv) - 1) { - pRcv = bufRcv; - } else { - if((pRcv = (uchar*) malloc(sizeof(uchar) * (iMaxLine + 1))) == NULL) - iMaxLine = sizeof(bufRcv) - 1; /* better this than noting */ - } - - len = 0; - for (;;) { - dbgprintf("----------imklog(BSD) waiting for kernel log line\n"); - i = read(fklog, pRcv + len, iMaxLine - len); - if (i > 0) { - pRcv[i + len] = '\0'; - } else { - if (i < 0 && errno != EINTR && errno != EAGAIN) { - imklogLogIntMsg(LOG_ERR, - "imklog error %d reading kernel log - shutting down imklog", - errno); - fklog = -1; - } - break; - } - - for(p = pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) { - *q = '\0'; - Syslog(LOG_INFO, (uchar*) p); - } - len = strlen(p); - if (len >= iMaxLine - 1) { - Syslog(LOG_INFO, (uchar*)p); - len = 0; - } - if (len > 0) - memmove(pRcv, p, len + 1); - } - if (len > 0) - Syslog(LOG_INFO, pRcv); - - if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1) - free(pRcv); -} -#endif - - /* to be called in the module's AfterRun entry point * rgerhards, 2008-04-09 */ diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 3012136c..ba497e01 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -62,7 +62,6 @@ typedef struct configSettings_s { static configSettings_t cs; static prop_t *pInputName = NULL; -static prop_t *pLocalHostIP = NULL; BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -92,7 +91,7 @@ doSubmitMsg(uchar *line) MsgSetRawMsgWOSize(pMsg, (char*)line); MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp()); - MsgSetRcvFromIP(pMsg, pLocalHostIP); + MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP()); MsgSetMSGoffs(pMsg, 0); MsgSetTAG(pMsg, UCHAR_CONSTANT("rsyslogd-pstats:"), sizeof("rsyslogd-pstats:") - 1); pMsg->iFacility = cs.iFacility; @@ -169,7 +168,6 @@ ENDafterRun BEGINmodExit CODESTARTmodExit prop.Destruct(&pInputName); - prop.Destruct(&pLocalHostIP); /* release objects we used */ objRelease(glbl, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); @@ -212,10 +210,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(prop.Construct(&pInputName)); CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("impstats"), sizeof("impstats") - 1)); CHKiRet(prop.ConstructFinalize(pInputName)); - - CHKiRet(prop.Construct(&pLocalHostIP)); - CHKiRet(prop.SetString(pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); - CHKiRet(prop.ConstructFinalize(pLocalHostIP)); ENDmodInit /* vi:set ai: */ diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 91bdf8b3..008f7920 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -49,6 +49,7 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> +#include <netinet/tcp.h> #if HAVE_FCNTL_H #include <fcntl.h> #endif @@ -65,6 +66,7 @@ #include "datetime.h" #include "ruleset.h" #include "msg.h" +#include "statsobj.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ @@ -82,12 +84,17 @@ DEFobjCurrIf(prop) DEFobjCurrIf(datetime) DEFobjCurrIf(errmsg) DEFobjCurrIf(ruleset) - +DEFobjCurrIf(statsobj) /* config settings */ typedef struct configSettings_s { + int bKeepAlive; /* support keep-alive packets */ + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; int bEmitMsgOnClose; /* emit an informational message on close by remote peer */ + int bSuppOctetFram; /* support octet-counted framing? */ int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */ uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ uchar *lstnIP; /* which IP we should listen on? */ @@ -110,26 +117,33 @@ struct ptcpsrv_s { ptcpsrv_t *pNext; /* linked list maintenance */ uchar *port; /* Port to listen to */ uchar *lstnIP; /* which IP we should listen on? */ - int bEmitMsgOnClose; int iAddtlFrameDelim; + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; uchar *pszInputName; prop_t *pInputName; /* InputName in (fast to process) property format */ ruleset_t *pRuleset; ptcplstn_t *pLstn; /* root of our listeners */ ptcpsess_t *pSess; /* root of our sessions */ + sbool bKeepAlive; /* support keep-alive packets */ + sbool bEmitMsgOnClose; + sbool bSuppOctetFram; }; /* the ptcp session object. Describes a single active session. * includes support for doubly-linked list. */ struct ptcpsess_s { - ptcpsrv_t *pSrv; /* our server */ +// ptcpsrv_t *pSrv; /* our server TODO: check remove! */ + ptcplstn_t *pLstn; /* our listener */ ptcpsess_t *prev, *next; int sock; epolld_t *epd; //--- from tcps_sess.h int iMsg; /* index of next char to store in msg */ int bAtStrtOfFram; /* are we at the very beginning of a new frame? */ + sbool bSuppOctetFram; /**< copy from listener, to speed up access */ enum { eAtStrtFram, eInOctetCnt, @@ -150,7 +164,10 @@ struct ptcplstn_s { ptcpsrv_t *pSrv; /* our server */ ptcplstn_t *prev, *next; int sock; + sbool bSuppOctetFram; epolld_t *epd; + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) }; @@ -188,7 +205,7 @@ static char rcvBuf[128*1024]; /* forward definitions */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); -static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock); +static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6); /* some simple constructors/destructors */ @@ -233,6 +250,7 @@ startupSrv(ptcpsrv_t *pSrv) int sockflags; struct addrinfo hints, *res = NULL, *r; uchar *lstnIP; + int isIPv6 = 0; lstnIP = pSrv->lstnIP == NULL ? UCHAR_CONSTANT("") : pSrv->lstnIP; @@ -265,8 +283,9 @@ startupSrv(ptcpsrv_t *pSrv) continue; } -#ifdef IPV6_V6ONLY if(r->ai_family == AF_INET6) { + isIPv6 = 1; +#ifdef IPV6_V6ONLY int iOn = 1; if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&iOn, sizeof (iOn)) < 0) { @@ -274,8 +293,10 @@ startupSrv(ptcpsrv_t *pSrv) sock = -1; continue; } - } #endif + } else { + isIPv6 = 0; + } if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) { DBGPRINTF("error %d setting tcp socket option\n", errno); close(sock); @@ -337,7 +358,7 @@ startupSrv(ptcpsrv_t *pSrv) /* if we reach this point, we were able to obtain a valid socket, so we can * create our listener object. -- rgerhards, 2010-08-10 */ - CHKiRet(addLstn(pSrv, sock)); + CHKiRet(addLstn(pSrv, sock, isIPv6)); ++numSocks; } @@ -428,12 +449,80 @@ finalize_it: } +/* Enable KEEPALIVE handling on the socket. */ +static inline rsRetVal +EnableKeepAlive(ptcplstn_t *pLstn, int sock) +{ + int ret; + int optval; + socklen_t optlen; + DEFiRet; + + optval = 1; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); + if(ret < 0) { + dbgprintf("EnableKeepAlive socket call returns error %d\n", ret); + ABORT_FINALIZE(RS_RET_ERR); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveProbes > 0) { + optval = pLstn->pSrv->iKeepAliveProbes; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveTime > 0) { + optval = pLstn->pSrv->iKeepAliveTime; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveIntvl > 0) { + optval = pLstn->pSrv->iKeepAliveIntvl; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored"); + } + + dbgprintf("KEEPALIVE enabled for socket %d\n", sock); + +finalize_it: + RETiRet; +} + /* accept an incoming connection request * rgerhards, 2008-04-22 */ static rsRetVal -AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP) +AcceptConnReq(ptcplstn_t *pLstn, int *newSock, prop_t **peerName, prop_t **peerIP) { int sockflags; struct sockaddr_storage addr; @@ -442,13 +531,17 @@ AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP) DEFiRet; - iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen); + iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen); if(iNewSock < 0) { if(errno == EAGAIN || errno == EWOULDBLOCK) ABORT_FINALIZE(RS_RET_NO_MORE_DATA); ABORT_FINALIZE(RS_RET_ACCEPT_ERR); } + if(pLstn->pSrv->bKeepAlive) + EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */ + + CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr*) &addr)); /* set the new socket to non-blocking IO */ @@ -492,22 +585,25 @@ static rsRetVal doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { msg_t *pMsg; + ptcpsrv_t *pSrv; DEFiRet; if(pThis->iMsg == 0) { DBGPRINTF("discarding zero-sized message\n"); FINALIZE; } + pSrv = pThis->pLstn->pSrv; /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); - MsgSetInputName(pMsg, pThis->pSrv->pInputName); + MsgSetInputName(pMsg, pSrv->pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pThis->peerName); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->peerIP)); - MsgSetRuleset(pMsg, pThis->pSrv->pRuleset); + MsgSetRuleset(pMsg, pSrv->pRuleset); + STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); if(pMultiSub == NULL) { CHKiRet(submitMsg(pMsg)); @@ -540,7 +636,7 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG DEFiRet; if(pThis->inputState == eAtStrtFram) { - if(isdigit((int) c)) { + if(pThis->bSuppOctetFram && isdigit((int) c)) { pThis->inputState = eInOctetCnt; pThis->iOctetsRemain = 0; pThis->eFraming = TCP_FRAMING_OCTET_COUNTING; @@ -589,7 +685,8 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG } if(( (c == '\n') - || ((pThis->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->iAddtlFrameDelim)) + || ((pThis->pLstn->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) + && (c == pThis->pLstn->pSrv->iAddtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; @@ -678,6 +775,7 @@ static inline void initConfigSettings(void) { cs.bEmitMsgOnClose = 0; + cs.bSuppOctetFram = 1; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; cs.pszInputName = NULL; cs.pRuleset = NULL; @@ -693,7 +791,7 @@ addEPollSock(epolld_type_t typ, void *ptr, int sock, epolld_t **pEpd) DEFiRet; epolld_t *epd = NULL; - CHKmalloc(epd = malloc(sizeof(epolld_t))); + CHKmalloc(epd = calloc(sizeof(epolld_t), 1)); epd->typ = typ; epd->ptr = ptr; *pEpd = epd; @@ -747,14 +845,27 @@ finalize_it: /* add a listener to the server */ static rsRetVal -addLstn(ptcpsrv_t *pSrv, int sock) +addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6) { DEFiRet; ptcplstn_t *pLstn; + uchar statname[64]; CHKmalloc(pLstn = malloc(sizeof(ptcplstn_t))); pLstn->pSrv = pSrv; + pLstn->bSuppOctetFram = pSrv->bSuppOctetFram; pLstn->sock = sock; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(pLstn->stats))); + snprintf((char*)statname, sizeof(statname), "imptcp(%s/%s/%s)", + (pSrv->lstnIP == NULL) ? "*" : (char*)pSrv->lstnIP, pSrv->port, + isIPv6 ? "IPv6" : "IPv4"); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(pLstn->stats, statname)); + STATSCOUNTER_INIT(pLstn->ctrSubmit, pLstn->mutCtrSubmit); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(pLstn->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(pLstn->stats)); /* add to start of server's listener list */ pLstn->prev = NULL; @@ -773,15 +884,17 @@ finalize_it: /* add a session to the server */ static rsRetVal -addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP) +addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP) { DEFiRet; ptcpsess_t *pSess = NULL; + ptcpsrv_t *pSrv = pLstn->pSrv; CHKmalloc(pSess = malloc(sizeof(ptcpsess_t))); CHKmalloc(pSess->pMsg = malloc(iMaxLine * sizeof(uchar))); - pSess->pSrv = pSrv; + pSess->pLstn = pLstn; pSess->sock = sock; + pSess->bSuppOctetFram = pLstn->bSuppOctetFram; pSess->inputState = eAtStrtFram; pSess->iMsg = 0; pSess->bAtStrtOfFram = 1; @@ -824,7 +937,7 @@ closeSess(ptcpsess_t *pSess) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { /* need to update root! */ - pSess->pSrv->pSess = pSess->next; + pSess->pLstn->pSrv->pSess = pSess->next; } else { pSess->prev->next = pSess->next; } @@ -882,6 +995,11 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t))); pSrv->pSess = NULL; pSrv->pLstn = NULL; + pSrv->bKeepAlive = cs.bKeepAlive; + pSrv->iKeepAliveIntvl = cs.iKeepAliveTime; + pSrv->iKeepAliveProbes = cs.iKeepAliveProbes; + pSrv->iKeepAliveTime = cs.iKeepAliveTime; + pSrv->bSuppOctetFram = cs.bSuppOctetFram; pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose; pSrv->port = pNewVal; pSrv->iAddtlFrameDelim = cs.iAddtlFrameDelim; @@ -945,11 +1063,11 @@ lstnActivity(ptcplstn_t *pLstn) DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock); while(1) { - localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP); + localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP); if(localRet == RS_RET_NO_MORE_DATA) break; CHKiRet(localRet); - CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP)); + CHKiRet(addSess(pLstn, newSock, peerName, peerIP)); } finalize_it: @@ -979,7 +1097,7 @@ sessActivity(ptcpsess_t *pSess) CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv)); } else if (lenRcv == 0) { /* session was closed, do clean-up */ - if(pSess->pSrv->bEmitMsgOnClose) { + if(pSess->pLstn->pSrv->bEmitMsgOnClose) { uchar *peerName; int lenPeer; prop.GetString(pSess->peerName, &peerName, &lenPeer); @@ -1085,6 +1203,8 @@ shutdownSrv(ptcpsrv_t *pSrv) pLstn = pSrv->pLstn; while(pLstn != NULL) { close(pLstn->sock); + statsobj.Destruct(&(pLstn->stats)); + /* now unlink listner */ lstnDel = pLstn; pLstn = pLstn->next; DBGPRINTF("imptcp shutdown listen socket %d\n", lstnDel->sock); @@ -1132,6 +1252,7 @@ CODESTARTmodExit /* release objects we used */ objRelease(glbl, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); objRelease(datetime, CORE_COMPONENT); @@ -1144,6 +1265,11 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; + cs.bKeepAlive = 0; + cs.iKeepAliveProbes = 0; + cs.iKeepAliveTime = 0; + cs.iKeepAliveIntvl = 0; + cs.bSuppOctetFram = 1; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; free(cs.pszInputName); cs.pszInputName = NULL; @@ -1167,6 +1293,7 @@ CODEmodInit_QueryRegCFSLineHdlr initConfigSettings(); /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); @@ -1176,6 +1303,16 @@ CODEmodInit_QueryRegCFSLineHdlr /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary, + NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary, + NULL, &cs.bSuppOctetFram, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0, eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index 6ab39477..59dbd449 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -82,12 +82,15 @@ static permittedPeers_t *pPermPeersRoot = NULL; /* config settings */ +static int bKeepAlive = 0; /* support keep-alive packets */ static int iTCPSessMax = 200; /* max number of sessions */ +static int bSuppOctetFram = 1; /* octet counted TCP framing supported? */ static int iTCPLstnMax = 20; /* max number of sessions */ static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */ static int bEmitMsgOnClose = 0; /* emit an informational message on close by remote peer */ static int iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; /* addtl frame delimiter, e.g. for netscreen, default none */ static int bDisableLFDelim = 0; /* disbale standard LF delimiter */ +static int bUseFlowControl = 1; /* use flow control, what means indicate ourselfs a "light delayable" */ static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */ static uchar *pszInputName = NULL; /* value for inputname property, NULL is OK and handled by core engine */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ @@ -191,6 +194,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa if(pOurTcpsrv == NULL) { CHKiRet(tcpsrv.Construct(&pOurTcpsrv)); + CHKiRet(tcpsrv.SetKeepAlive(pOurTcpsrv, bKeepAlive)); CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, iTCPSessMax)); CHKiRet(tcpsrv.SetLstnMax(pOurTcpsrv, iTCPLstnMax)); CHKiRet(tcpsrv.SetCBIsPermittedHost(pOurTcpsrv, isPermittedHost)); @@ -199,6 +203,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose)); CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose)); CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode)); + CHKiRet(tcpsrv.SetUseFlowControl(pOurTcpsrv, bUseFlowControl)); CHKiRet(tcpsrv.SetAddtlFrameDelim(pOurTcpsrv, iAddtlFrameDelim)); CHKiRet(tcpsrv.SetbDisableLFDelim(pOurTcpsrv, bDisableLFDelim)); CHKiRet(tcpsrv.SetNotificationOnRemoteClose(pOurTcpsrv, bEmitMsgOnClose)); @@ -215,7 +220,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa CHKiRet(tcpsrv.SetRuleset(pOurTcpsrv, pBindRuleset)); CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, pszInputName == NULL ? UCHAR_CONSTANT("imtcp") : pszInputName)); - tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal); + tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal, bSuppOctetFram); finalize_it: if(iRet != RS_RET_OK) { @@ -287,8 +292,11 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { iTCPSessMax = 200; + bKeepAlive = 0; + bSuppOctetFram = 1; iTCPLstnMax = 20; iStrmDrvrMode = 0; + bUseFlowControl = 0; bEmitMsgOnClose = 0; iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; bDisableLFDelim = 0; @@ -324,6 +332,10 @@ CODEmodInit_QueryRegCFSLineHdlr /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverkeepalive"), 0, eCmdHdlrBinary, + NULL, &bKeepAlive, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary, + NULL, &bSuppOctetFram, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt, NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt, @@ -344,6 +356,8 @@ CODEmodInit_QueryRegCFSLineHdlr eCmdHdlrGetWord, NULL, &pszInputName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverbindruleset"), 0, eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpflowcontrol"), 0, + eCmdHdlrBinary, NULL, &bUseFlowControl, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index a5002591..46631e97 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -26,6 +26,7 @@ * A copy of the GPL can be found in the file "COPYING" in this distribution. */ #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> @@ -51,6 +52,7 @@ #include "datetime.h" #include "prop.h" #include "ruleset.h" +#include "statsobj.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -66,6 +68,16 @@ DEFobjCurrIf(net) DEFobjCurrIf(datetime) DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) +DEFobjCurrIf(statsobj) + + +static struct lstn_s { + struct lstn_s *next; + int sock; /* socket */ + ruleset_t *pRuleset; /* bound ruleset */ + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) +} *lcnfRoot = NULL, *lcnfLast = NULL; static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ @@ -73,9 +85,7 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte * This shall prevent remote DoS when the "discard on disallowed sender" * message is configured to be logged on occurance of such a case. */ -static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements - * read-only after init(), but beware of restart! */ -static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */ + static uchar *pszBindAddr = NULL; /* IP to bind socket to */ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc * it so that we can check available memory in willRun() and request @@ -190,9 +200,11 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) DEFiRet; uchar *bindAddr; int *newSocks; - int *tmpSocks; - int iSrc, iDst; - ruleset_t **tmpRulesets; + int iSrc; + struct lstn_s *newlcnfinfo; + uchar *bindName; + uchar *port; + uchar statname[64]; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -203,55 +215,46 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) bindAddr = NULL; else bindAddr = pszBindAddr; + bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr; - DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", - (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal); + DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, pNewVal); - newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1); + port = (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal; + newSocks = net.create_udp_socket(bindAddr, port, 1); if(newSocks != NULL) { /* we now need to add the new sockets to the existing set */ - if(udpLstnSocks == NULL) { - /* esay, we can just replace it */ - udpLstnSocks = newSocks; - CHKmalloc(udpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (newSocks[0] + 1))); - for(iDst = 1 ; iDst <= newSocks[0] ; ++iDst) - udpRulesets[iDst] = pBindRuleset; - } else { - /* we need to add them */ - tmpSocks = (int*) MALLOC(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0])); - tmpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0])); - if(tmpSocks == NULL || tmpRulesets == NULL) { - DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); - /* in this case, we discard the new sockets but continue with what we - * already have - */ - free(newSocks); - free(tmpSocks); - free(tmpRulesets); - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } else { - /* ready to copy */ - iDst = 1; - for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc, ++iDst) { - tmpSocks[iDst] = udpLstnSocks[iSrc]; - tmpRulesets[iDst] = udpRulesets[iSrc]; - } - for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) { - tmpSocks[iDst] = newSocks[iSrc]; - tmpRulesets[iDst] = pBindRuleset; - } - tmpSocks[0] = udpLstnSocks[0] + newSocks[0]; - free(newSocks); - free(udpLstnSocks); - udpLstnSocks = tmpSocks; - free(udpRulesets); - udpRulesets = tmpRulesets; + /* ready to copy */ + for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) { + CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s))); + newlcnfinfo->next = NULL; + newlcnfinfo->sock = newSocks[iSrc]; + newlcnfinfo->pRuleset = pBindRuleset; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); + snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit); + CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats)); + /* link to list. Order must be preserved to take care for + * conflicting matches. + */ + if(lcnfRoot == NULL) + lcnfRoot = newlcnfinfo; + if(lcnfLast == NULL) + lcnfLast = newlcnfinfo; + else { + lcnfLast->next = newlcnfinfo; + lcnfLast = newlcnfinfo; } } } finalize_it: free(pNewVal); /* in any case, this is no longer needed */ + free(newSocks); RETiRet; } @@ -294,8 +297,7 @@ finalize_it: * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal -processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - ruleset_t *pRuleset) +processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { DEFiRet; int iNbrTimeUsed; @@ -315,7 +317,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, if(pThrd->bShallStop == TRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); socklen = sizeof(struct sockaddr_storage); - lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -360,7 +362,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, *pbIsPermitted = 1; /* no check -> everything permitted */ } - DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", fd, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); if(*pbIsPermitted != 0) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { @@ -370,13 +372,14 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); - MsgSetRuleset(pMsg, pRuleset); + MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); CHKiRet(submitMsg(pMsg)); + STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } @@ -443,6 +446,8 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) struct epoll_event *udpEPollEvt = NULL; struct epoll_event currEvt[NUM_EPOLL_EVENTS]; char errStr[1024]; + struct lstn_s *lstn; + int nLstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -451,7 +456,11 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); - CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event))); + /* count num listeners -- do it here in order to avoid inconsistency */ + nLstn = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) + ++nLstn; + CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event))); #if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) DBGPRINTF("imudp uses epoll_create1()\n"); @@ -471,16 +480,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) /* fill the epoll set - we need to do this only once, as the set * can not change dyamically. */ - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { + i = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if(lstn->sock != -1) { udpEPollEvt[i].events = EPOLLIN | EPOLLET; - udpEPollEvt[i].data.u64 = i+1; - if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) { + udpEPollEvt[i].data.u64 = (long long unsigned) lstn; + if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) { rs_strerror_r(errno, errStr, sizeof(errStr)); errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", - udpLstnSocks[i+1], errStr); + lstn->sock, errStr); } } + i++; } while(1) { @@ -492,8 +503,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { - processSocket(pThrd, udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted, - udpRulesets[currEvt[i].data.u64]); + processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted); } } @@ -510,10 +520,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) DEFiRet; int maxfds; int nfds; - int i; fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; + struct lstn_s *lstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -524,22 +534,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) DBGPRINTF("imudp uses select()\n"); while(1) { - /* Add the Unix Domain Sockets to the list of read - * descriptors. - * rgerhards 2005-08-01: we must now check if there are - * any local sockets to listen to at all. If the -o option - * is given without -a, we do not need to listen at all.. + /* Add the Unix Domain Sockets to the list of read descriptors. */ maxfds = 0; FD_ZERO(&readfds); /* Add the UDP listen sockets to the list of read descriptors. */ - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if (lstn->sock != -1) { if(Debug) - net.debugListenInfo(udpLstnSocks[i+1], "UDP"); - FD_SET(udpLstnSocks[i+1], &readfds); - if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; + net.debugListenInfo(lstn->sock, "UDP"); + FD_SET(lstn->sock, &readfds); + if(lstn->sock>maxfds) maxfds=lstn->sock; } } if(Debug) { @@ -555,10 +561,9 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ - for(i = 0; nfds && i < *udpLstnSocks; i++) { - if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { - processSocket(pThrd, udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, - udpRulesets[i+1]); + for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) { + if(FD_ISSET(lstn->sock, &readfds)) { + processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted); --nfds; /* indicate we have processed one descriptor */ } } @@ -570,7 +575,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) #endif /* #if HAVE_EPOLL_CREATE1 */ /* This function is called to gather input. - * Note that udpLstnSocks must be non-NULL because otherwise we would not have + * Note that sock must be non-NULL because otherwise we would not have * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 */ BEGINrunInput @@ -591,8 +596,10 @@ CODESTARTwillRun net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */ /* if we could not set up any listners, there is no point in running... */ - if(udpLstnSocks == NULL) + if(lcnfRoot == NULL) { + DBGPRINTF("imudp: no listeners configured, will not run\n"); ABORT_FINALIZE(RS_RET_NO_RUN); + } iMaxLine = glbl.GetMaxLine(); @@ -602,15 +609,18 @@ ENDwillRun BEGINafterRun + struct lstn_s *lstn, *lstnDel; CODESTARTafterRun /* do cleanup here */ net.clearAllowedSenders((uchar*)"UDP"); - if(udpLstnSocks != NULL) { - net.closeUDPListenSockets(udpLstnSocks); - udpLstnSocks = NULL; - free(udpRulesets); - udpRulesets = NULL; + for(lstn = lcnfRoot ; lstn != NULL ; ) { + statsobj.Destruct(&(lstn->stats)); + close(lstn->sock); + lstnDel = lstn; + lstn = lstn->next; + free(lstnDel); } + lcnfRoot = lcnfLast = NULL; if(pRcvBuf != NULL) { free(pRcvBuf); pRcvBuf = NULL; @@ -625,6 +635,7 @@ CODESTARTmodExit /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); @@ -662,6 +673,7 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 9ad24213..ea54e79b 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -34,6 +34,7 @@ #include <string.h> #include <errno.h> #include <unistd.h> +#include <fcntl.h> #include <sys/stat.h> #include <sys/un.h> #include <sys/socket.h> @@ -89,6 +90,7 @@ DEF_IMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(prop) +DEFobjCurrIf(net) DEFobjCurrIf(parser) DEFobjCurrIf(datetime) DEFobjCurrIf(statsobj) @@ -141,7 +143,9 @@ typedef struct lstn_s { sbool bParseHost; /* should parser parse host name? read-only after startup */ sbool bCreatePath; /* auto-creation of socket directory? */ sbool bUseCreds; /* pull original creator credentials from socket */ + sbool bAnnotate; /* annotate events with trusted properties */ sbool bWritePid; /* write original PID into tag */ + sbool bUseSysTimeStamp; /* use timestamp from system (instead of from message) */ } lstn_t; static lstn_t listeners[MAXFUNIX]; @@ -162,9 +166,13 @@ static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is u static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */ static int bWritePid = 0; /* use credentials from recvmsg() and fixup PID in TAG */ static int bWritePidSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */ +static int bUseSysTimeStamp = 1; /* use timestamp from system (rather than from message) */ +static int bUseSysTimeStampSysSock = 1; /* same, for system log socket */ +static int bAnnotate = 0; /* annotate trusted properties */ +static int bAnnotateSysSock = 0; /* same, for system log socket */ #define DFLT_bCreatePath 0 static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */ -#define DFLT_ratelimitInterval 5 +#define DFLT_ratelimitInterval 0 static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */ static int ratelimitIntervalSysSock = DFLT_ratelimitInterval; #define DFLT_ratelimitBurst 200 @@ -306,8 +314,10 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal) listeners[nfd].flags = bIgnoreTimestamp ? IGNDATE : NOFLAG; listeners[nfd].bCreatePath = bCreatePath; listeners[nfd].sockName = pNewVal; - listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval) ? 1 : 0; + listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval || bAnnotate) ? 1 : 0; + listeners[nfd].bAnnotate = bAnnotate; listeners[nfd].bWritePid = bWritePid; + listeners[nfd].bUseSysTimeStamp = bUseSysTimeStamp; nfd++; } else { errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n", @@ -421,9 +431,14 @@ openLogSocket(lstn_t *pLstn) errmsg.LogError(errno, NO_ERRCODE, "set SCM_CREDENTIALS failed on '%s'", pLstn->sockName); pLstn->bUseCreds = 0; } +// TODO: move to its own #if + if(setsockopt(pLstn->fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) != 0) { + errmsg.LogError(errno, NO_ERRCODE, "set SO_TIMESTAMP failed on '%s'", pLstn->sockName); + } } # else /* HAVE_SCM_CREDENTIALS */ pLstn->bUseCreds = 0; + pLstn->bAnnotate = 0; # endif /* HAVE_SCM_CREDENTIALS */ finalize_it: @@ -506,12 +521,109 @@ fixPID(uchar *bufTAG, int *lenTag, struct ucred *cred) } +/* Get an "trusted property" from the system. Returns an empty string if the + * property can not be obtained. Inspired by similiar functionality inside + * journald. Currently works with Linux /proc filesystem, only. + */ +static rsRetVal +getTrustedProp(struct ucred *cred, char *propName, uchar *buf, size_t lenBuf, int *lenProp) +{ + int fd; + int i; + int lenRead; + char namebuf[1024]; + DEFiRet; + + if(snprintf(namebuf, sizeof(namebuf), "/proc/%lu/%s", (long unsigned) cred->pid, + propName) >= (int) sizeof(namebuf)) { + ABORT_FINALIZE(RS_RET_ERR); + } + + if((fd = open(namebuf, O_RDONLY)) == -1) { + DBGPRINTF("error reading '%s'\n", namebuf); + *lenProp = 0; + FINALIZE; + } + if((lenRead = read(fd, buf, lenBuf - 1)) == -1) { + DBGPRINTF("error reading file data for '%s'\n", namebuf); + *lenProp = 0; + close(fd); + FINALIZE; + } + + /* we strip after the first \n */ + for(i = 0 ; i < lenRead ; ++i) { + if(buf[i] == '\n') + break; + else if(iscntrl(buf[i])) + buf[i] = ' '; + } + buf[i] = '\0'; + *lenProp = i; + + close(fd); + +finalize_it: + RETiRet; +} + + +/* read the exe trusted property path (so far, /proc fs only) + */ +static rsRetVal +getTrustedExe(struct ucred *cred, uchar *buf, size_t lenBuf, int* lenProp) +{ + int lenRead; + char namebuf[1024]; + DEFiRet; + + if(snprintf(namebuf, sizeof(namebuf), "/proc/%lu/exe", (long unsigned) cred->pid) + >= (int) sizeof(namebuf)) { + ABORT_FINALIZE(RS_RET_ERR); + } + + if((lenRead = readlink(namebuf, (char*)buf, lenBuf - 1)) == -1) { + DBGPRINTF("error reading link '%s'\n", namebuf); + *lenProp = 0; + FINALIZE; + } + + buf[lenRead] = '\0'; + *lenProp = lenRead; + +finalize_it: + RETiRet; +} + + +/* copy a trusted property in escaped mode. That is, the property can contain + * any character and so it must be properly quoted AND escaped. + * It is assumed the output buffer is large enough. Returns the number of + * characters added. + */ +static inline int +copyescaped(uchar *dstbuf, uchar *inbuf, int inlen) +{ + int iDst, iSrc; + + *dstbuf = '"'; + for(iDst=1, iSrc=0 ; iSrc < inlen ; ++iDst, ++iSrc) { + if(inbuf[iSrc] == '"' || inbuf[iSrc] == '\\') { + dstbuf[iDst++] = '\\'; + } + dstbuf[iDst] = inbuf[iSrc]; + } + dstbuf[iDst++] = '"'; + return iDst; +} + + /* submit received message to the queue engine * We now parse the message according to expected format so that we * can also mangle it if necessary. */ static inline rsRetVal -SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) +SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct timeval *ts) { msg_t *pMsg; int lenMsg; @@ -525,6 +637,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) struct syslogTime st; time_t tt; rs_ratelimit_state_t *ratelimiter = NULL; + int lenProp; + uchar propBuf[1024]; + uchar msgbuf[8192]; + uchar *pmsgbuf; + int toffs; /* offset for trusted properties */ + struct syslogTime dummyTS; DEFiRet; /* TODO: handle format errors?? */ @@ -550,12 +668,58 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */ } - datetime.getCurrTime(&st, &tt); + if(ts == NULL) { + datetime.getCurrTime(&st, &tt); + } else { + datetime.timeval2syslogTime(ts, &st); + tt = ts->tv_sec; + } + if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) { STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit); FINALIZE; } + /* created trusted properties */ + if(cred != NULL && pLstn->bAnnotate) { + if((unsigned) (lenRcv + 4096) < sizeof(msgbuf)) { + pmsgbuf = msgbuf; + } else { + CHKmalloc(pmsgbuf = malloc(lenRcv+4096)); + } + memcpy(pmsgbuf, pRcv, lenRcv); + memcpy(pmsgbuf+lenRcv, " @[", 3); + toffs = lenRcv + 3; /* next free location */ + lenProp = snprintf((char*)propBuf, sizeof(propBuf), "_PID=%lu _UID=%lu _GID=%lu", + (long unsigned) cred->pid, (long unsigned) cred->uid, + (long unsigned) cred->gid); + memcpy(pmsgbuf+toffs, propBuf, lenProp); + toffs = toffs + lenProp; + getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp); + if(lenProp) { + memcpy(pmsgbuf+toffs, " _COMM=", 7); + memcpy(pmsgbuf+toffs+7, propBuf, lenProp); + toffs = toffs + 7 + lenProp; + } + getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp); + if(lenProp) { + memcpy(pmsgbuf+toffs, " _EXE=", 6); + memcpy(pmsgbuf+toffs+6, propBuf, lenProp); + toffs = toffs + 6 + lenProp; + } + getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp); + if(lenProp) { + memcpy(pmsgbuf+toffs, " _CMDLINE=", 10); + toffs = toffs + 10 + + copyescaped(pmsgbuf+toffs+10, propBuf, lenProp); + } + /* finalize string */ + pmsgbuf[toffs] = ']'; + pmsgbuf[toffs+1] = '\0'; + pRcv = pmsgbuf; + lenRcv = toffs + 1; + } + /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, &st, tt)); MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv); @@ -570,15 +734,27 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) parse++; lenMsg--; /* '>' */ - if((pLstn->flags & IGNDATE)) { - /* in this case, we still need to find out if we have a valid - * datestamp or not .. and advance the parse pointer accordingly. - */ - struct syslogTime dummy; - datetime.ParseTIMESTAMP3164(&dummy, &parse, &lenMsg); - } else { - if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) { - DBGPRINTF("we have a problem, invalid timestamp in msg!\n"); + if(ts == NULL) { + if((pLstn->flags & IGNDATE)) { + /* in this case, we still need to find out if we have a valid + * datestamp or not .. and advance the parse pointer accordingly. + */ + datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg); + } else { + if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) { + DBGPRINTF("we have a problem, invalid timestamp in msg!\n"); + } + } + } else { /* if we pulled the time from the system, we need to update the message text */ + uchar *tmpParse = parse; /* just to check correctness of TS */ + if(datetime.ParseTIMESTAMP3164(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK) { + /* We modify the message only if it contained a valid timestamp, + * otherwise we do not touch it at all. */ + datetime.formatTimestamp3164(&st, (char*)parse, 0); + parse[15] = ' '; /* re-write \0 from fromatTimestamp3164 by SP */ + /* update "counters" to reflect processed timestamp */ + parse += 16; + lenMsg -= 16; } } @@ -630,6 +806,7 @@ static rsRetVal readSocket(lstn_t *pLstn) struct cmsghdr *cm; # endif struct ucred *cred; + struct timeval *ts; uchar bufRcv[4096+1]; char aux[128]; uchar *pRcv = NULL; /* receive buffer */ @@ -668,21 +845,28 @@ static rsRetVal readSocket(lstn_t *pLstn) dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd); if(iRcvd > 0) { cred = NULL; -# if HAVE_SCM_CREDENTIALS - if(pLstn->bUseCreds) { - dbgprintf("XXX: pre CM loop, length of control message %d\n", (int) msgh.msg_controllen); - for (cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) { - dbgprintf("XXX: in CM loop, %d, %d\n", cm->cmsg_level, cm->cmsg_type); - if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) { + ts = NULL; + if(pLstn->bUseCreds || pLstn->bUseSysTimeStamp) { + for(cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) { +# if HAVE_SCM_CREDENTIALS + if( pLstn->bUseCreds + && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) { cred = (struct ucred*) CMSG_DATA(cm); - dbgprintf("XXX: got credentials pid %d\n", (int) cred->pid); break; } +# endif /* HAVE_SCM_CREDENTIALS */ +# if HAVE_SO_TIMESTAMP + if( pLstn->bUseSysTimeStamp + && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) { + ts = (struct timeval *)CMSG_DATA(cm); + dbgprintf("XXX: got timestamp %ld.%ld\n", + (long) ts->tv_sec, (long) ts->tv_usec); + break; + } +# endif /* HAVE_SO_TIMESTAMP */ } - dbgprintf("XXX: post CM loop\n"); } -# endif /* HAVE_SCM_CREDENTIALS */ - CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred)); + CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred, ts)); } else if(iRcvd < 0 && errno != EINTR) { char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -784,7 +968,7 @@ CODESTARTwillRun else if(sd_booted()) { struct stat st; if(stat(SYSTEMD_JOURNAL, &st) != -1 && S_ISDIR(st.st_mode)) { - listeners[0].sockName = (uchar*) SYSTEMD_PATH_LOG; + listeners[0].sockName = (uchar*)SYSTEMD_PATH_LOG; } } if(ratelimitIntervalSysSock > 0) { @@ -798,8 +982,10 @@ CODESTARTwillRun listeners[0].ratelimitInterval = ratelimitIntervalSysSock; listeners[0].ratelimitBurst = ratelimitBurstSysSock; listeners[0].ratelimitSev = ratelimitSeveritySysSock; - listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0; + listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock || bAnnotateSysSock) ? 1 : 0; listeners[0].bWritePid = bWritePidSysSock; + listeners[0].bAnnotate = bAnnotateSysSock; + listeners[0].bUseSysTimeStamp = bUseSysTimeStampSysSock; sd_fds = sd_listen_fds(0); if (sd_fds < 0) { @@ -826,6 +1012,8 @@ CODESTARTwillRun CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imuxsock"), sizeof("imuxsock") - 1)); CHKiRet(prop.ConstructFinalize(pInputName)); + pLocalHostIP = glbl.GetLocalHostIP(); + finalize_it: ENDwillRun @@ -842,7 +1030,6 @@ CODESTARTafterRun /* Clean-up files. */ for(i = startIndexUxLocalSockets; i < nfd; i++) if (listeners[i].sockName && listeners[i].fd != -1) { - /* If systemd passed us a socket it is systemd's job to clean it up. * Do not unlink it -- we will get same socket (node) from systemd * e.g. on restart again. @@ -911,6 +1098,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a bUseFlowCtl = 0; bWritePid = 0; bWritePidSysSock = 0; + bUseSysTimeStamp = 1; + bUseSysTimeStampSysSock = 1; bCreatePath = DFLT_bCreatePath; ratelimitInterval = DFLT_ratelimitInterval; ratelimitIntervalSysSock = DFLT_ratelimitInterval; @@ -930,6 +1119,7 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(net, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); @@ -945,7 +1135,9 @@ CODEmodInit_QueryRegCFSLineHdlr listeners[0].fd = -1; listeners[0].bParseHost = 0; listeners[0].bUseCreds = 0; + listeners[0].bAnnotate = 0; listeners[0].bCreatePath = 0; + listeners[0].bUseSysTimeStamp = 1; /* initialize socket names */ for(i = 1 ; i < MAXFUNIX ; ++i) { @@ -953,6 +1145,7 @@ CODEmodInit_QueryRegCFSLineHdlr listeners[i].fd = -1; } + /* now init listen socket zero, the local log socket */ CHKiRet(prop.Construct(&pLocalHostIP)); CHKiRet(prop.SetString(pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); CHKiRet(prop.ConstructFinalize(pLocalHostIP)); @@ -968,10 +1161,14 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &pLogHostName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary, NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketannotate", 0, eCmdHdlrBinary, + NULL, &bAnnotate, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary, NULL, &bCreatePath, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary, NULL, &bWritePid, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusesystimestamp", 0, eCmdHdlrBinary, + NULL, &bUseSysTimeStamp, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord, addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt, @@ -992,6 +1189,10 @@ CODEmodInit_QueryRegCFSLineHdlr setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary, setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusesystimestamp", 0, eCmdHdlrBinary, + NULL, &bUseSysTimeStampSysSock, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketannotate", 0, eCmdHdlrBinary, + NULL, &bAnnotateSysSock, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary, NULL, &bWritePidSysSock, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt, diff --git a/plugins/mmsnmptrapd/mmsnmptrapd.c b/plugins/mmsnmptrapd/mmsnmptrapd.c index 767829d6..b78046ee 100644 --- a/plugins/mmsnmptrapd/mmsnmptrapd.c +++ b/plugins/mmsnmptrapd/mmsnmptrapd.c @@ -418,7 +418,7 @@ CODEmodInit_QueryRegCFSLineHdlr cs.pszTagName = NULL; cs.pszSeverityMapping = NULL; - CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrInt, + CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrGetWord, NULL, &cs.pszTagName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdseveritymapping", 0, eCmdHdlrGetWord, NULL, &cs.pszSeverityMapping, STD_LOADABLE_MODULE_ID)); diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am new file mode 100644 index 00000000..a574c72f --- /dev/null +++ b/plugins/omelasticsearch/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omelasticsearch.la + +omelasticsearch_la_SOURCES = omelasticsearch.c +omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +omelasticsearch_la_LDFLAGS = -module -avoid-version +omelasticsearch_la_LIBADD = $(CURL_LIBS) + +EXTRA_DIST = diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c new file mode 100644 index 00000000..3bec1838 --- /dev/null +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -0,0 +1,270 @@ +/* omelasticsearch.c + * This is the http://www.elasticsearch.org/ output module. + * + * NOTE: read comments in module-template.h for more specifics! + * + * Copyright 2011 Nathan Scott. + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <curl/curl.h> +#include <curl/types.h> +#include <curl/easy.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <time.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "statsobj.h" +#include "cfsysline.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP + +/* internal structures */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) + +statsobj_t *indexStats; +STATSCOUNTER_DEF(indexConFail, mutIndexConFail) +STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) +STATSCOUNTER_DEF(indexFailed, mutIndexFailed) +STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) + +/* REST API for elasticsearch hits this URL: + * http://<hostName>:<restPort>/<searchIndex>/<searchType> + */ +typedef struct curl_slist HEADER; +typedef struct _instanceData { + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ +} instanceData; + +/* config variables */ +static int restPort = 9200; +static char *hostName = "localhost"; +static char *searchIndex = "system"; +static char *searchType = "events"; + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +BEGINfreeInstance +CODESTARTfreeInstance + if (pData->postHeader) { + curl_slist_free_all(pData->postHeader); + pData->postHeader = NULL; + } + if (pData->curlHandle) { + curl_easy_cleanup(pData->curlHandle); + pData->curlHandle = NULL; + } +ENDfreeInstance + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + +rsRetVal +curlPost(instanceData *instance, uchar *message) +{ + CURLcode code; + CURL *curl = instance->curlHandle; + int length = strlen((char *)message); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + code = curl_easy_perform(curl); + switch (code) { + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_COULDNT_CONNECT: + case CURLE_WRITE_ERROR: + STATSCOUNTER_INC(indexConFail, mutIndexConFail); + return RS_RET_SUSPENDED; + default: + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); + return RS_RET_OK; + } +} + +BEGINdoAction +CODESTARTdoAction + CHKiRet(curlPost(pData, ppString[0])); +finalize_it: +ENDdoAction + +/* elasticsearch POST result string ... useful for debugging */ +size_t +curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) +{ + unsigned int i; + char *p = (char *)ptr; + char *jsonData = (char *)userdata; + static char ok[] = "{\"ok\":true,"; + + ASSERT(size == 1); + + if (size == 1 && + nmemb > sizeof(ok)-1 && + strncmp(p, ok, sizeof(ok)-1) == 0) { + STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); + } else { + STATSCOUNTER_INC(indexFailed, mutIndexFailed); + if (Debug) { + DBGPRINTF("omelasticsearch request: %s\n", jsonData); + DBGPRINTF("omelasticsearch result: "); + for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); + DBGPRINTF("\n"); + } + } + return size * nmemb; +} + +static rsRetVal +curlSetup(instanceData *instance) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + HEADER *header; + CURL *handle; + + handle = curl_easy_init(); + if (handle == NULL) { + return RS_RET_OBJ_CREATION_FAILED; + } + + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + hostName, restPort, searchIndex, searchType); + header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_URL, restURL); + curl_easy_setopt(handle, CURLOPT_POST, 1); + + instance->curlHandle = handle; + instance->postHeader = header; + + DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + return RS_RET_OK; +} + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + + /* check if a non-standard template is to be applied */ + if(*(p-1) == ';') + --p; + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + + /* all good, we can now initialise our private data */ + CHKiRet(curlSetup(pData)); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINmodExit +CODESTARTmodExit + curl_global_cleanup(); + statsobj.Destruct(&indexStats); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + DEFiRet; + restPort = 9200; + hostName = "localhost"; + searchIndex = "system"; + searchType = "events"; + RETiRet; +} + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); + ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&indexStats)); + CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", + ctrType_IntCtr, &indexConFail)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", + ctrType_IntCtr, &indexSubmit)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", + ctrType_IntCtr, &indexFailed)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", + ctrType_IntCtr, &indexSuccess)); + CHKiRet(statsobj.ConstructFinalize(indexStats)); +ENDmodInit + +/* vi:set ai: + */ diff --git a/plugins/omhdfs/javaenv.sh b/plugins/omhdfs/javaenv.sh new file mode 100644 index 00000000..d07a8473 --- /dev/null +++ b/plugins/omhdfs/javaenv.sh @@ -0,0 +1,14 @@ +# This is a sample file for environment settings on Fedora 13 +# that made me compile & run omhdfs. I really *hate* the way +# java uses environment variables... Hopefully this file will +# help building and testing omhdfs in the future (there is also +# some more information in the rsyslog wiki). +# rgerhards, 2011-03-11 +# this now works, but don't ask my why ;) +#export JAVA_HOME=/usr/java/jdk1.6.0_21/bin/java +export PATH=/usr/java/jdk1.6.0_21/bin:$PATH +export JAVA_INCLUDES="-I/usr/java/jdk1.6.0_21/include -I/usr/java/jdk1.6.0_21/include/linux" +export JAVA_LIBS="-L/usr/java/jdk1.6.0_21/jre/lib/i386 -ljava -ljvm -lverify" +export HADOOP_HOME=/usr/lib/hadoop +export CLASSPATH=/usr/lib/jvm/java-6-sun/lib:/usr/lib/hadoop/lib:/usr/lib/hadoop/hadoop-ant-0.20.2+320.jar:/usr/lib/hadoop/hadoop-core-0.20.2+320.jar:/usr/lib/hadoop/hadoop-examples-0.20.2+320.jar:/usr/lib/hadoop/hadoop-test-0.20.2+320.jar:/usr/lib/hadoop/hadoop-tools-0.20.2+320.jar/usr/lib/hadoop/lib/commons-cli-1.2.jar:/usr/lib/hadoop/lib/commons-codec-1.3.jar:/usr/lib/hadoop/lib/commons-el-1.0.jar:/usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar:/usr/lib/hadoop/lib/commons-logging-1.0.4.jar:/usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar:/usr/lib/hadoop/lib/commons-net-1.4.1.jar:/usr/lib/hadoop/lib/core-3.1.1.jar:/usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar:/usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar:/usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar:/usr/lib/hadoop/lib/hsqldb.jar:/usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar:/usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar:/usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar:/usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar:/usr/lib/hadoop/lib/jets3t-0.6.1.jar:/usr/lib/hadoop/lib/jetty-6.1.14.jar:/usr/lib/hadoop/lib/jetty-util-6.1.14.jar:/usr/lib/hadoop/lib/junit-4.5.jar:/usr/lib/hadoop/lib/kfs-0.2.2.jar:/usr/lib/hadoop/lib/libfb303.jar:/usr/lib/hadoop/lib/libthrift.jar:/usr/lib/hadoop/lib/log4j-1.2.15.jar:/usr/lib/hadoop/lib/mockito-all-1.8.2.jar:/usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar:/usr/lib/hadoop/lib/oro-2.0.8.jar:/usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar:/usr/lib/hadoop/lib/slf4j-api-1.4.3.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar:/usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf +###export CLASSPATH="/usr/lib/hadoop/hadoop-0.20.2+320-ant.jar: /usr/lib/hadoop/hadoop-0.20.2+320-core.jar: /usr/lib/hadoop/hadoop-0.20.2+320-examples.jar: /usr/lib/hadoop/hadoop-0.20.2+320-test.jar: /usr/lib/hadoop/hadoop-0.20.2+320-tools.jar: /usr/lib/hadoop/hadoop-ant-0.20.2+320.jar: /usr/lib/hadoop/hadoop-core-0.20.2+320.jar: /usr/lib/hadoop/hadoop-examples-0.20.2+320.jar: /usr/lib/hadoop/hadoop-test-0.20.2+320.jar: /usr/lib/hadoop/hadoop-tools-0.20.2+320.jar:/usr/lib/hadoop/lib: /usr/lib/hadoop/lib/commons-cli-1.2.jar: /usr/lib/hadoop/lib/commons-codec-1.3.jar: /usr/lib/hadoop/lib/commons-el-1.0.jar: /usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar: /usr/lib/hadoop/lib/commons-logging-1.0.4.jar: /usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar: /usr/lib/hadoop/lib/commons-net-1.4.1.jar: /usr/lib/hadoop/lib/core-3.1.1.jar: /usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar: /usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar: /usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar: /usr/lib/hadoop/lib/hsqldb.jar: /usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar: /usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar: /usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar: /usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar: /usr/lib/hadoop/lib/jets3t-0.6.1.jar: /usr/lib/hadoop/lib/jetty-6.1.14.jar: /usr/lib/hadoop/lib/jetty-util-6.1.14.jar: /usr/lib/hadoop/lib/junit-4.5.jar: /usr/lib/hadoop/lib/kfs-0.2.2.jar: /usr/lib/hadoop/lib/libfb303.jar: /usr/lib/hadoop/lib/libthrift.jar: /usr/lib/hadoop/lib/log4j-1.2.15.jar: /usr/lib/hadoop/lib/mockito-all-1.8.2.jar: /usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar: /usr/lib/hadoop/lib/oro-2.0.8.jar: /usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar: /usr/lib/hadoop/lib/slf4j-api-1.4.3.jar: /usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar: /usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf:/usr/lib/hadoop/lib" diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index 8b72747f..76128a4e 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -80,6 +80,8 @@ typedef struct { typedef struct _instanceData { file_t *pFile; + uchar ioBuf[64*1024]; + unsigned offsBuf; } instanceData; /* forward definitions (down here, need data types) */ @@ -260,7 +262,8 @@ fileOpen(file_t *pFile) if(errno == ENOENT) { DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n", pFile->name); - pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); + pFile->fh = hdfsOpenFile(pFile->fs, + (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); } } if(pFile->fh == NULL) { @@ -275,12 +278,15 @@ finalize_it: } +/* Note: lenWrite is reset to zero on successful write! */ static inline rsRetVal -fileWrite(file_t *pFile, uchar *buf) +fileWrite(file_t *pFile, uchar *buf, size_t *lenWrite) { - size_t lenWrite; DEFiRet; + if(*lenWrite == 0) + FINALIZE; + if(pFile->nUsers > 1) d_pthread_mutex_lock(&pFile->mut); @@ -294,18 +300,18 @@ fileWrite(file_t *pFile, uchar *buf) } } - lenWrite = strlen((char*) buf); - tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite); - if((unsigned) num_written_bytes != lenWrite) { - errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, " - "written %lu\n", pFile->name, (unsigned long) lenWrite, +dbgprintf("XXXXX: omhdfs writing %u bytes\n", *lenWrite); + tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, *lenWrite); + if((unsigned) num_written_bytes != *lenWrite) { + errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, + "omhdfs: failed to write %s, expected %lu bytes, " + "written %lu\n", pFile->name, (unsigned long) *lenWrite, (unsigned long) num_written_bytes); ABORT_FINALIZE(RS_RET_SUSPENDED); } + *lenWrite = 0; finalize_it: - if(pFile->nUsers > 1) - d_pthread_mutex_unlock(&pFile->mut); RETiRet; } @@ -333,6 +339,40 @@ finalize_it: /* ---END FILE OBJECT---------------------------------------------------- */ +/* This adds data to the output buffer and performs an actual write + * if the new data does not fit into the buffer. Note that we never write + * partial data records. Other actions may write into the same file, and if + * we would write partial records, data could become severely mixed up. + * Note that we must check of some new data arrived is large than our + * buffer. In that case, the new data will written with its own + * write operation. + */ +static inline rsRetVal +addData(instanceData *pData, uchar *buf) +{ + unsigned len; + DEFiRet; + + len = strlen((char*)buf); + if(pData->offsBuf + len < sizeof(pData->ioBuf)) { + /* new data fits into remaining buffer */ + memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len); + pData->offsBuf += len; + } else { +dbgprintf("XXXXX: not enough room, need to flush\n"); + CHKiRet(fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf)); + if(len >= sizeof(pData->ioBuf)) { + CHKiRet(fileWrite(pData->pFile, buf, &len)); + } else { + memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len); + pData->offsBuf += len; + } + } + + iRet = RS_RET_DEFER_COMMIT; +finalize_it: + RETiRet; +} BEGINcreateInstance CODESTARTcreateInstance @@ -358,13 +398,31 @@ CODESTARTtryResume } ENDtryResume + +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omhdfs: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction CODESTARTdoAction - DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name); - iRet = fileWrite(pData->pFile, ppString[0]); + DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name); + iRet = addData(pData, ppString[0]); +dbgprintf("omhdfs: done doAction\n"); ENDdoAction +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("omhdfs: endTransaction\n"); + if(pData->offsBuf != 0) { + DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n"); + iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf); + } +ENDendTransaction + + BEGINparseSelectorAct file_t *pFile; int r; @@ -409,6 +467,7 @@ CODESTARTparseSelectorAct } fileObjAddUser(pFile); pData->pFile = pFile; + pData->offsBuf = 0; CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -455,6 +514,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ CODEqueryEtryPt_doHUP ENDqueryEtryPt @@ -472,5 +532,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + DBGPRINTF("omhdfs: module compiled with rsyslog version %s.\n", VERSION); CODEmodInit_QueryRegCFSLineHdlr ENDmodInit |