summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imfile/imfile.c23
-rw-r--r--plugins/imklog/bsd.c6
-rw-r--r--plugins/imklog/imklog.c52
-rw-r--r--plugins/imklog/imklog.h4
-rw-r--r--plugins/imklog/linux.c113
-rw-r--r--plugins/imklog/solaris.c68
-rw-r--r--plugins/imptcp/imptcp.c105
-rw-r--r--plugins/imtcp/imtcp.c10
-rw-r--r--plugins/imuxsock/imuxsock.c246
-rw-r--r--plugins/mmsnmptrapd/mmsnmptrapd.c2
-rw-r--r--plugins/omelasticsearch/Makefile.am8
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c270
-rw-r--r--plugins/omhdfs/javaenv.sh14
-rw-r--r--plugins/omhdfs/omhdfs.c85
14 files changed, 861 insertions, 145 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index ba8318df..632bf390 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -63,6 +63,7 @@ DEFobjCurrIf(strm)
DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
+#define NUM_MULTISUB 1024 /* max number of submits -- TODO: make configurable */
typedef struct fileInfo_s {
uchar *pszFileName;
uchar *pszTag;
@@ -70,11 +71,13 @@ typedef struct fileInfo_s {
uchar *pszStateFile; /* file in which state between runs is to be stored */
int iFacility;
int iSeverity;
+ int maxLinesAtOnce;
int nRecords; /**< How many records did we process before persisting the stream? */
int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */
strm_t *pStrm; /* its stream (NULL if not assigned) */
int readMode; /* which mode to use in ReadMulteLine call? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ multi_submit_t multiSub;
} fileInfo_t;
@@ -90,6 +93,7 @@ static int iPersistStateInterval = 0; /* how often if state file to be persisted
static int iFacility = 128; /* local0 */
static int iSeverity = 5; /* notice, as of rfc 3164 */
static int readMode = 0; /* mode to use for ReadMultiLine call */
+static int maxLinesAtOnce = 10240; /* how many lines to process in a row? */
static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */
@@ -121,7 +125,9 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
MsgSetRuleset(pMsg, pInfo->pRuleset);
- CHKiRet(submitMsg(pMsg));
+ pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg;
+ if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem)
+ CHKiRet(multiSubmitMsg(&pInfo->multiSub));
finalize_it:
RETiRet;
}
@@ -205,6 +211,7 @@ static void pollFileCancelCleanup(void *pArg)
static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
{
cstr_t *pCStr = NULL;
+ int nProcessed = 0;
DEFiRet;
ASSERT(pbHadFileData != NULL);
@@ -219,7 +226,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
/* loop below will be exited when strmReadLine() returns EOF */
while(glbl.GetGlobalInputTermState() == 0) {
+ if(pThis->maxLinesAtOnce != 0 && nProcessed >= pThis->maxLinesAtOnce)
+ break;
CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode));
+ ++nProcessed;
*pbHadFileData = 1; /* this is just a flag, so set it and forget it */
CHKiRet(enqLine(pThis, pCStr)); /* process line */
rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */
@@ -230,6 +240,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
finalize_it:
+ if(pThis->multiSub.nElem > 0) {
+ /* submit everything that was not yet submitted */
+ CHKiRet(multiSubmitMsg(&pThis->multiSub));
+ }
; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */
/* Note: the problem above is that pthread:cleanup_pop() is a macro which
* evaluates to something like "} while(0);". So the code would become
@@ -474,6 +488,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iSeverity = 5; /* notice, as of rfc 3164 */
readMode = 0;
pBindRuleset = NULL;
+ maxLinesAtOnce = 10240;
RETiRet;
}
@@ -512,8 +527,12 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal)
pThis->pszStateFile = (uchar*) strdup((char*) pszStateFile);
}
+ CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(NUM_MULTISUB * sizeof(msg_t*)));
+ pThis->multiSub.maxElem = NUM_MULTISUB;
+ pThis->multiSub.nElem = 0;
pThis->iSeverity = iSeverity;
pThis->iFacility = iFacility;
+ pThis->maxLinesAtOnce = maxLinesAtOnce;
pThis->iPersistStateInterval = iPersistStateInterval;
pThis->nRecords = 0;
pThis->readMode = readMode;
@@ -591,6 +610,8 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &iPollInterval, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt,
NULL, &readMode, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilemaxlinesatonce", 0, eCmdHdlrSize,
+ NULL, &maxLinesAtOnce, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt,
NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord,
diff --git a/plugins/imklog/bsd.c b/plugins/imklog/bsd.c
index 0a4c7cd4..930bbd11 100644
--- a/plugins/imklog/bsd.c
+++ b/plugins/imklog/bsd.c
@@ -155,18 +155,18 @@ readklog(void)
for (p = (char*)pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) {
*q = '\0';
- Syslog(LOG_INFO, (uchar*) p);
+ Syslog(LOG_INFO, (uchar*) p, NULL);
}
len = strlen(p);
if (len >= iMaxLine - 1) {
- Syslog(LOG_INFO, (uchar*)p);
+ Syslog(LOG_INFO, (uchar*)p, NULL);
len = 0;
}
if (len > 0)
memmove(pRcv, p, len + 1);
}
if (len > 0)
- Syslog(LOG_INFO, pRcv);
+ Syslog(LOG_INFO, pRcv, NULL);
if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1)
free(pRcv);
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c
index 69c8cd1a..cb28e68e 100644
--- a/plugins/imklog/imklog.c
+++ b/plugins/imklog/imklog.c
@@ -18,7 +18,10 @@
* Please note that this file replaces the klogd daemon that was
* also present in pre-v3 versions of rsyslog.
*
- * Copyright (C) 2008, 2009 by Rainer Gerhards and Adiscon GmbH
+ * To test under Linux:
+ * echo test1 > /dev/kmsg
+ *
+ * Copyright (C) 2008-2011 by Rainer Gerhards and Adiscon GmbH
*
* This file is part of rsyslog.
*
@@ -93,15 +96,21 @@ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1
* rgerhards, 2008-04-12
*/
static rsRetVal
-enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity)
+enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *tp)
{
- DEFiRet;
+ struct syslogTime st;
msg_t *pMsg;
+ DEFiRet;
assert(msg != NULL);
assert(pszTag != NULL);
- CHKiRet(msgConstruct(&pMsg));
+ if(tp == NULL) {
+ CHKiRet(msgConstruct(&pMsg));
+ } else {
+ datetime.timeval2syslogTime(tp, &st);
+ CHKiRet(msgConstructWithTime(&pMsg, &st, tp->tv_sec));
+ }
MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
MsgSetInputName(pMsg, pInputName);
MsgSetRawMsgWOSize(pMsg, (char*)msg);
@@ -174,31 +183,48 @@ rsRetVal imklogLogIntMsg(int priority, char *fmt, ...)
va_end(ap);
iRet = enqMsg((uchar*)pLogMsg, (uchar*) ((iFacilIntMsg == LOG_KERN) ? "kernel:" : "imklog:"),
- iFacilIntMsg, LOG_PRI(priority));
+ iFacilIntMsg, LOG_PRI(priority), NULL);
RETiRet;
}
-/* log a kernel message
+/* log a kernel message. If tp is non-NULL, it contains the message creation
+ * time to use.
* rgerhards, 2008-04-14
*/
-rsRetVal Syslog(int priority, uchar *pMsg)
+rsRetVal Syslog(int priority, uchar *pMsg, struct timeval *tp)
{
- DEFiRet;
+ int pri = -1;
rsRetVal localRet;
+ DEFiRet;
- /* Output using syslog */
- localRet = parsePRI(&pMsg, &priority);
- if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK)
- FINALIZE;
+ /* then check if we have two PRIs. This can happen in case of systemd,
+ * in which case the second PRI is the rigth one.
+ * TODO: added kernel timestamp support to this PoC. -- rgerhards, 2011-03-18
+ */
+ if(pMsg[3] == '<') { /* could be a pri... */
+ uchar *pMsgTmp = pMsg + 3;
+ localRet = parsePRI(&pMsgTmp, &pri);
+ if(localRet == RS_RET_OK && pri >= 8 && pri <= 192) {
+ /* *this* is our PRI */
+ DBGPRINTF("imklog detected secondary PRI in klog msg\n");
+ pMsg = pMsgTmp;
+ priority = pri;
+ }
+ }
+ if(pri == -1) {
+ localRet = parsePRI(&pMsg, &priority);
+ if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK)
+ FINALIZE;
+ }
/* if we don't get the pri, we use whatever we were supplied */
/* ignore non-kernel messages if not permitted */
if(bPermitNonKernel == 0 && LOG_FAC(priority) != LOG_KERN)
FINALIZE; /* silently ignore */
- iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority));
+ iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority), tp);
finalize_it:
RETiRet;
diff --git a/plugins/imklog/imklog.h b/plugins/imklog/imklog.h
index c183026d..39bdeb5c 100644
--- a/plugins/imklog/imklog.h
+++ b/plugins/imklog/imklog.h
@@ -5,7 +5,7 @@
* Major change: 2008-04-09: switched to a driver interface for
* several platforms
*
- * Copyright 2007-2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -56,7 +56,7 @@ extern uchar *pszPath;
/* the functions below may be called by the drivers */
rsRetVal imklogLogIntMsg(int priority, char *fmt, ...) __attribute__((format(printf,2, 3)));
-rsRetVal Syslog(int priority, uchar *msg);
+rsRetVal Syslog(int priority, uchar *msg, struct timeval *tp);
/* prototypes */
extern int klog_getMaxLine(void); /* work-around for klog drivers to get configured max line size */
diff --git a/plugins/imklog/linux.c b/plugins/imklog/linux.c
index 727708a5..b44619f5 100644
--- a/plugins/imklog/linux.c
+++ b/plugins/imklog/linux.c
@@ -28,6 +28,8 @@
#include "rsyslog.h"
#include <stdlib.h>
#include <stdio.h>
+#include <ctype.h>
+#include <time.h>
#include <assert.h>
#include <signal.h>
#include <string.h>
@@ -181,6 +183,93 @@ static int copyin( uchar *line, int space,
return(i);
}
+
+/* submit a message to imklog Syslog() API. In this function, we check if
+ * a kernel timestamp is present and, if so, extract and strip it.
+ * Note: this is an extra processing step. We should revisit the whole
+ * idea in v6 and remove all that old stuff that we do not longer need
+ * (like symbol resolution). <-- TODO
+ * Special thanks to Lennart Poettering for suggesting on how to convert
+ * the kernel timestamp to a realtime timestamp. This method depends on
+ * the fact the the kernel timestamp is written using the monotonic clock.
+ * Shall that change (very unlikely), this code must be changed as well. Note
+ * that due to the way we generate the delta, we are unable to write the
+ * absolutely correc timestamp (system call overhead of the clock calls
+ * prevents us from doing so). However, the difference is very minor.
+ * rgerhards, 201106-24
+ */
+static void
+submitSyslog(int pri, uchar *buf)
+{
+ long secs;
+ long nsecs;
+ long secOffs;
+ long nsecOffs;
+ unsigned i;
+ unsigned bufsize;
+ struct timespec monotonic, realtime;
+ struct timeval tv;
+ struct timeval *tp = NULL;
+
+ if(buf[3] != '[')
+ goto done;
+ DBGPRINTF("imklog: kernel timestamp detected, extracting it\n");
+
+ /* we now try to parse the timestamp. iff it parses, we assume
+ * it is a timestamp. Otherwise we know for sure it is no ts ;)
+ */
+ i = 4; /* first digit after '[' */
+ secs = 0;
+ while(buf[i] && isdigit(buf[i])) {
+ secs = secs * 10 + buf[i] - '0';
+ ++i;
+ }
+ if(buf[i] != '.') {
+ DBGPRINTF("no dot --> no kernel timestamp\n");
+ goto done; /* no TS! */
+ }
+
+ ++i; /* skip dot */
+ nsecs = 0;
+ while(buf[i] && isdigit(buf[i])) {
+ nsecs = nsecs * 10 + buf[i] - '0';
+ ++i;
+ }
+ if(buf[i] != ']') {
+ DBGPRINTF("no trailing ']' --> no kernel timestamp\n");
+ goto done; /* no TS! */
+ }
+ ++i; /* skip ']' */
+
+ /* we have a timestamp */
+ DBGPRINTF("kernel timestamp is %ld %ld\n", secs, nsecs);
+ bufsize= strlen((char*)buf);
+ memcpy(buf+3, buf+i, bufsize - i + 1);
+
+ clock_gettime(CLOCK_MONOTONIC, &monotonic);
+ clock_gettime(CLOCK_REALTIME, &realtime);
+ secOffs = realtime.tv_sec - monotonic.tv_sec;
+ nsecOffs = realtime.tv_nsec - monotonic.tv_nsec;
+ if(nsecOffs < 0) {
+ secOffs--;
+ nsecOffs += 1000000000l;
+ }
+
+ nsecs +=nsecOffs;
+ if(nsecs > 999999999l) {
+ secs++;
+ nsecs -= 1000000000l;
+ }
+ secs += secOffs;
+ tv.tv_sec = secs;
+ tv.tv_usec = nsecs / 1000;
+ tp = &tv;
+
+done:
+ Syslog(pri, buf, tp);
+}
+
+
/*
* Messages are separated by "\n". Messages longer than
* LOG_LINE_LENGTH are broken up.
@@ -235,7 +324,7 @@ static void LogLine(char *ptr, int len)
//dbgprintf("Line buffer full:\n");
//dbgprintf("\tLine: %s\n", line);
- Syslog(LOG_INFO, line_buff);
+ submitSyslog(LOG_INFO, line_buff);
line = line_buff;
space = sizeof(line_buff)-1;
parse_state = PARSING_TEXT;
@@ -254,28 +343,24 @@ static void LogLine(char *ptr, int len)
space -= delta;
len -= delta;
- if( space == 0 || len == 0 )
- {
+ if( space == 0 || len == 0 ) {
break; /* full line_buff or end of input buffer */
}
- if( *ptr == '\0' ) /* zero byte */
- {
+ if( *ptr == '\0' ) /* zero byte */ {
ptr++; /* skip zero byte */
space -= 1;
len -= 1;
-
break;
}
- if( *ptr == '\n' ) /* newline */
- {
+ if( *ptr == '\n' ) /* newline */ {
ptr++; /* skip newline */
space -= 1;
len -= 1;
*line = 0; /* force null terminator */
- Syslog(LOG_INFO, line_buff);
+ submitSyslog(LOG_INFO, line_buff);
line = line_buff;
space = sizeof(line_buff)-1;
if (symbols_twice) {
@@ -285,9 +370,7 @@ static void LogLine(char *ptr, int len)
skip_symbol_lookup = 1;
ptr = save_ptr;
len = save_len;
- }
- else
- {
+ } else {
skip_symbol_lookup = 0;
save_ptr = ptr;
save_len = len;
@@ -295,8 +378,7 @@ static void LogLine(char *ptr, int len)
}
break;
}
- if( *ptr == '[' ) /* possible kernel symbol */
- {
+ if( *ptr == '[' ) /* possible kernel symbol */ {
*line++ = *ptr++;
space -= 1;
len -= 1;
@@ -310,8 +392,7 @@ static void LogLine(char *ptr, int len)
break;
case PARSING_SYMSTART:
- if( *ptr != '<' )
- {
+ if( *ptr != '<' ) {
parse_state = PARSING_TEXT; /* not a symbol */
break;
}
diff --git a/plugins/imklog/solaris.c b/plugins/imklog/solaris.c
index 8a6d5af1..0a169cdd 100644
--- a/plugins/imklog/solaris.c
+++ b/plugins/imklog/solaris.c
@@ -80,74 +80,6 @@ klogWillRun(void)
}
-#if 0
-/* Read /dev/klog while data are available, split into lines.
- * Contrary to standard BSD syslogd, we do a blocking read. We can
- * afford this as imklog is running on its own threads. So if we have
- * a single file, it really doesn't matter if we wait inside a 1-file
- * select or the read() directly.
- */
-static void
-readklog(void)
-{
- char *p, *q;
- int len, i;
- int iMaxLine;
- uchar bufRcv[4096+1];
- uchar *pRcv = NULL; /* receive buffer */
-
- iMaxLine = klog_getMaxLine();
-
- /* we optimize performance: if iMaxLine is below 4K (which it is in almost all
- * cases, we use a fixed buffer on the stack. Only if it is higher, heap memory
- * is used. We could use alloca() to achive a similar aspect, but there are so
- * many issues with alloca() that I do not want to take that route.
- * rgerhards, 2008-09-02
- */
- if((size_t) iMaxLine < sizeof(bufRcv) - 1) {
- pRcv = bufRcv;
- } else {
- if((pRcv = (uchar*) malloc(sizeof(uchar) * (iMaxLine + 1))) == NULL)
- iMaxLine = sizeof(bufRcv) - 1; /* better this than noting */
- }
-
- len = 0;
- for (;;) {
- dbgprintf("----------imklog(BSD) waiting for kernel log line\n");
- i = read(fklog, pRcv + len, iMaxLine - len);
- if (i > 0) {
- pRcv[i + len] = '\0';
- } else {
- if (i < 0 && errno != EINTR && errno != EAGAIN) {
- imklogLogIntMsg(LOG_ERR,
- "imklog error %d reading kernel log - shutting down imklog",
- errno);
- fklog = -1;
- }
- break;
- }
-
- for(p = pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) {
- *q = '\0';
- Syslog(LOG_INFO, (uchar*) p);
- }
- len = strlen(p);
- if (len >= iMaxLine - 1) {
- Syslog(LOG_INFO, (uchar*)p);
- len = 0;
- }
- if (len > 0)
- memmove(pRcv, p, len + 1);
- }
- if (len > 0)
- Syslog(LOG_INFO, pRcv);
-
- if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1)
- free(pRcv);
-}
-#endif
-
-
/* to be called in the module's AfterRun entry point
* rgerhards, 2008-04-09
*/
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 103da210..a735b0b2 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -49,6 +49,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
+#include <netinet/tcp.h>
#if HAVE_FCNTL_H
#include <fcntl.h>
#endif
@@ -88,6 +89,10 @@ DEFobjCurrIf(statsobj)
/* config settings */
typedef struct configSettings_s {
+ int bKeepAlive; /* support keep-alive packets */
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
int bEmitMsgOnClose; /* emit an informational message on close by remote peer */
int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
@@ -111,13 +116,17 @@ struct ptcpsrv_s {
ptcpsrv_t *pNext; /* linked list maintenance */
uchar *port; /* Port to listen to */
uchar *lstnIP; /* which IP we should listen on? */
- int bEmitMsgOnClose;
int iAddtlFrameDelim;
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
uchar *pszInputName;
prop_t *pInputName; /* InputName in (fast to process) property format */
ruleset_t *pRuleset;
ptcplstn_t *pLstn; /* root of our listeners */
ptcpsess_t *pSess; /* root of our sessions */
+ sbool bKeepAlive; /* support keep-alive packets */
+ sbool bEmitMsgOnClose;
};
/* the ptcp session object. Describes a single active session.
@@ -434,12 +443,80 @@ finalize_it:
}
+/* Enable KEEPALIVE handling on the socket. */
+static inline rsRetVal
+EnableKeepAlive(ptcplstn_t *pLstn, int sock)
+{
+ int ret;
+ int optval;
+ socklen_t optlen;
+ DEFiRet;
+
+ optval = 1;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen);
+ if(ret < 0) {
+ dbgprintf("EnableKeepAlive socket call returns error %d\n", ret);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveProbes > 0) {
+ optval = pLstn->pSrv->iKeepAliveProbes;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored");
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveTime > 0) {
+ optval = pLstn->pSrv->iKeepAliveTime;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored");
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveIntvl > 0) {
+ optval = pLstn->pSrv->iKeepAliveIntvl;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored");
+ }
+
+ dbgprintf("KEEPALIVE enabled for socket %d\n", sock);
+
+finalize_it:
+ RETiRet;
+}
+
/* accept an incoming connection request
* rgerhards, 2008-04-22
*/
static rsRetVal
-AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP)
+AcceptConnReq(ptcplstn_t *pLstn, int *newSock, prop_t **peerName, prop_t **peerIP)
{
int sockflags;
struct sockaddr_storage addr;
@@ -448,13 +525,17 @@ AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP)
DEFiRet;
- iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen);
+ iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen);
if(iNewSock < 0) {
if(errno == EAGAIN || errno == EWOULDBLOCK)
ABORT_FINALIZE(RS_RET_NO_MORE_DATA);
ABORT_FINALIZE(RS_RET_ACCEPT_ERR);
}
+ if(pLstn->pSrv->bKeepAlive)
+ EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */
+
+
CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr*) &addr));
/* set the new socket to non-blocking IO */
@@ -904,6 +985,10 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t)));
pSrv->pSess = NULL;
pSrv->pLstn = NULL;
+ pSrv->bKeepAlive = cs.bKeepAlive;
+ pSrv->iKeepAliveIntvl = cs.iKeepAliveTime;
+ pSrv->iKeepAliveProbes = cs.iKeepAliveProbes;
+ pSrv->iKeepAliveTime = cs.iKeepAliveTime;
pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose;
pSrv->port = pNewVal;
pSrv->iAddtlFrameDelim = cs.iAddtlFrameDelim;
@@ -967,7 +1052,7 @@ lstnActivity(ptcplstn_t *pLstn)
DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock);
while(1) {
- localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP);
+ localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP);
if(localRet == RS_RET_NO_MORE_DATA)
break;
CHKiRet(localRet);
@@ -1169,6 +1254,10 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
cs.bEmitMsgOnClose = 0;
+ cs.bKeepAlive = 0;
+ cs.iKeepAliveProbes = 0;
+ cs.iKeepAliveTime = 0;
+ cs.iKeepAliveIntvl = 0;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
free(cs.pszInputName);
cs.pszInputName = NULL;
@@ -1202,6 +1291,14 @@ CODEmodInit_QueryRegCFSLineHdlr
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary,
+ NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0,
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index 6ab39477..e9961af7 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -82,12 +82,14 @@ static permittedPeers_t *pPermPeersRoot = NULL;
/* config settings */
+static int bKeepAlive = 0; /* support keep-alive packets */
static int iTCPSessMax = 200; /* max number of sessions */
static int iTCPLstnMax = 20; /* max number of sessions */
static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */
static int bEmitMsgOnClose = 0; /* emit an informational message on close by remote peer */
static int iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; /* addtl frame delimiter, e.g. for netscreen, default none */
static int bDisableLFDelim = 0; /* disbale standard LF delimiter */
+static int bUseFlowControl = 1; /* use flow control, what means indicate ourselfs a "light delayable" */
static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */
static uchar *pszInputName = NULL; /* value for inputname property, NULL is OK and handled by core engine */
static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
@@ -191,6 +193,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
if(pOurTcpsrv == NULL) {
CHKiRet(tcpsrv.Construct(&pOurTcpsrv));
+ CHKiRet(tcpsrv.SetKeepAlive(pOurTcpsrv, bKeepAlive));
CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, iTCPSessMax));
CHKiRet(tcpsrv.SetLstnMax(pOurTcpsrv, iTCPLstnMax));
CHKiRet(tcpsrv.SetCBIsPermittedHost(pOurTcpsrv, isPermittedHost));
@@ -199,6 +202,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose));
CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose));
CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode));
+ CHKiRet(tcpsrv.SetUseFlowControl(pOurTcpsrv, bUseFlowControl));
CHKiRet(tcpsrv.SetAddtlFrameDelim(pOurTcpsrv, iAddtlFrameDelim));
CHKiRet(tcpsrv.SetbDisableLFDelim(pOurTcpsrv, bDisableLFDelim));
CHKiRet(tcpsrv.SetNotificationOnRemoteClose(pOurTcpsrv, bEmitMsgOnClose));
@@ -287,8 +291,10 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
iTCPSessMax = 200;
+ bKeepAlive = 0;
iTCPLstnMax = 20;
iStrmDrvrMode = 0;
+ bUseFlowControl = 0;
bEmitMsgOnClose = 0;
iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
bDisableLFDelim = 0;
@@ -324,6 +330,8 @@ CODEmodInit_QueryRegCFSLineHdlr
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverkeepalive"), 0, eCmdHdlrBinary,
+ NULL, &bKeepAlive, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt,
NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt,
@@ -344,6 +352,8 @@ CODEmodInit_QueryRegCFSLineHdlr
eCmdHdlrGetWord, NULL, &pszInputName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverbindruleset"), 0,
eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpflowcontrol"), 0,
+ eCmdHdlrBinary, NULL, &bUseFlowControl, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
ENDmodInit
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index feddb20c..403173e1 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -34,6 +34,7 @@
#include <string.h>
#include <errno.h>
#include <unistd.h>
+#include <fcntl.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <sys/socket.h>
@@ -135,7 +136,9 @@ typedef struct lstn_s {
sbool bParseHost; /* should parser parse host name? read-only after startup */
sbool bCreatePath; /* auto-creation of socket directory? */
sbool bUseCreds; /* pull original creator credentials from socket */
+ sbool bAnnotate; /* annotate events with trusted properties */
sbool bWritePid; /* write original PID into tag */
+ sbool bUseSysTimeStamp; /* use timestamp from system (instead of from message) */
} lstn_t;
static lstn_t listeners[MAXFUNIX];
@@ -156,9 +159,13 @@ static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is u
static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
static int bWritePid = 0; /* use credentials from recvmsg() and fixup PID in TAG */
static int bWritePidSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */
+static int bUseSysTimeStamp = 1; /* use timestamp from system (rather than from message) */
+static int bUseSysTimeStampSysSock = 1; /* same, for system log socket */
+static int bAnnotate = 0; /* annotate trusted properties */
+static int bAnnotateSysSock = 0; /* same, for system log socket */
#define DFLT_bCreatePath 0
static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */
-#define DFLT_ratelimitInterval 5
+#define DFLT_ratelimitInterval 0
static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */
static int ratelimitIntervalSysSock = DFLT_ratelimitInterval;
#define DFLT_ratelimitBurst 200
@@ -300,8 +307,10 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal)
listeners[nfd].flags = bIgnoreTimestamp ? IGNDATE : NOFLAG;
listeners[nfd].bCreatePath = bCreatePath;
listeners[nfd].sockName = pNewVal;
- listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval) ? 1 : 0;
+ listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval || bAnnotate) ? 1 : 0;
+ listeners[nfd].bAnnotate = bAnnotate;
listeners[nfd].bWritePid = bWritePid;
+ listeners[nfd].bUseSysTimeStamp = bUseSysTimeStamp;
nfd++;
} else {
errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n",
@@ -415,9 +424,14 @@ openLogSocket(lstn_t *pLstn)
errmsg.LogError(errno, NO_ERRCODE, "set SCM_CREDENTIALS failed on '%s'", pLstn->sockName);
pLstn->bUseCreds = 0;
}
+// TODO: move to its own #if
+ if(setsockopt(pLstn->fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) != 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "set SO_TIMESTAMP failed on '%s'", pLstn->sockName);
+ }
}
# else /* HAVE_SCM_CREDENTIALS */
pLstn->bUseCreds = 0;
+ pLstn->bAnnotate = 0;
# endif /* HAVE_SCM_CREDENTIALS */
finalize_it:
@@ -500,12 +514,109 @@ fixPID(uchar *bufTAG, int *lenTag, struct ucred *cred)
}
+/* Get an "trusted property" from the system. Returns an empty string if the
+ * property can not be obtained. Inspired by similiar functionality inside
+ * journald. Currently works with Linux /proc filesystem, only.
+ */
+static rsRetVal
+getTrustedProp(struct ucred *cred, char *propName, uchar *buf, size_t lenBuf, int *lenProp)
+{
+ int fd;
+ int i;
+ int lenRead;
+ char namebuf[1024];
+ DEFiRet;
+
+ if(snprintf(namebuf, sizeof(namebuf), "/proc/%lu/%s", (long unsigned) cred->pid,
+ propName) >= (int) sizeof(namebuf)) {
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if((fd = open(namebuf, O_RDONLY)) == -1) {
+ DBGPRINTF("error reading '%s'\n", namebuf);
+ *lenProp = 0;
+ FINALIZE;
+ }
+ if((lenRead = read(fd, buf, lenBuf - 1)) == -1) {
+ DBGPRINTF("error reading file data for '%s'\n", namebuf);
+ *lenProp = 0;
+ close(fd);
+ FINALIZE;
+ }
+
+ /* we strip after the first \n */
+ for(i = 0 ; i < lenRead ; ++i) {
+ if(buf[i] == '\n')
+ break;
+ else if(iscntrl(buf[i]))
+ buf[i] = ' ';
+ }
+ buf[i] = '\0';
+ *lenProp = i;
+
+ close(fd);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* read the exe trusted property path (so far, /proc fs only)
+ */
+static rsRetVal
+getTrustedExe(struct ucred *cred, uchar *buf, size_t lenBuf, int* lenProp)
+{
+ int lenRead;
+ char namebuf[1024];
+ DEFiRet;
+
+ if(snprintf(namebuf, sizeof(namebuf), "/proc/%lu/exe", (long unsigned) cred->pid)
+ >= (int) sizeof(namebuf)) {
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if((lenRead = readlink(namebuf, (char*)buf, lenBuf - 1)) == -1) {
+ DBGPRINTF("error reading link '%s'\n", namebuf);
+ *lenProp = 0;
+ FINALIZE;
+ }
+
+ buf[lenRead] = '\0';
+ *lenProp = lenRead;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* copy a trusted property in escaped mode. That is, the property can contain
+ * any character and so it must be properly quoted AND escaped.
+ * It is assumed the output buffer is large enough. Returns the number of
+ * characters added.
+ */
+static inline int
+copyescaped(uchar *dstbuf, uchar *inbuf, int inlen)
+{
+ int iDst, iSrc;
+
+ *dstbuf = '"';
+ for(iDst=1, iSrc=0 ; iSrc < inlen ; ++iDst, ++iSrc) {
+ if(inbuf[iSrc] == '"' || inbuf[iSrc] == '\\') {
+ dstbuf[iDst++] = '\\';
+ }
+ dstbuf[iDst] = inbuf[iSrc];
+ }
+ dstbuf[iDst++] = '"';
+ return iDst;
+}
+
+
/* submit received message to the queue engine
* We now parse the message according to expected format so that we
* can also mangle it if necessary.
*/
static inline rsRetVal
-SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
+SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct timeval *ts)
{
msg_t *pMsg;
int lenMsg;
@@ -519,6 +630,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
struct syslogTime st;
time_t tt;
rs_ratelimit_state_t *ratelimiter = NULL;
+ int lenProp;
+ uchar propBuf[1024];
+ uchar msgbuf[8192];
+ uchar *pmsgbuf;
+ int toffs; /* offset for trusted properties */
+ struct syslogTime dummyTS;
DEFiRet;
/* TODO: handle format errors?? */
@@ -544,12 +661,58 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */
}
- datetime.getCurrTime(&st, &tt);
+ if(ts == NULL) {
+ datetime.getCurrTime(&st, &tt);
+ } else {
+ datetime.timeval2syslogTime(ts, &st);
+ tt = ts->tv_sec;
+ }
+
if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
FINALIZE;
}
+ /* created trusted properties */
+ if(cred != NULL && pLstn->bAnnotate) {
+ if((unsigned) (lenRcv + 4096) < sizeof(msgbuf)) {
+ pmsgbuf = msgbuf;
+ } else {
+ CHKmalloc(pmsgbuf = malloc(lenRcv+4096));
+ }
+ memcpy(pmsgbuf, pRcv, lenRcv);
+ memcpy(pmsgbuf+lenRcv, " @[", 3);
+ toffs = lenRcv + 3; /* next free location */
+ lenProp = snprintf((char*)propBuf, sizeof(propBuf), "_PID=%lu _UID=%lu _GID=%lu",
+ (long unsigned) cred->pid, (long unsigned) cred->uid,
+ (long unsigned) cred->gid);
+ memcpy(pmsgbuf+toffs, propBuf, lenProp);
+ toffs = toffs + lenProp;
+ getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp);
+ if(lenProp) {
+ memcpy(pmsgbuf+toffs, " _COMM=", 7);
+ memcpy(pmsgbuf+toffs+7, propBuf, lenProp);
+ toffs = toffs + 7 + lenProp;
+ }
+ getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp);
+ if(lenProp) {
+ memcpy(pmsgbuf+toffs, " _EXE=", 6);
+ memcpy(pmsgbuf+toffs+6, propBuf, lenProp);
+ toffs = toffs + 6 + lenProp;
+ }
+ getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp);
+ if(lenProp) {
+ memcpy(pmsgbuf+toffs, " _CMDLINE=", 10);
+ toffs = toffs + 10 +
+ copyescaped(pmsgbuf+toffs+10, propBuf, lenProp);
+ }
+ /* finalize string */
+ pmsgbuf[toffs] = ']';
+ pmsgbuf[toffs+1] = '\0';
+ pRcv = pmsgbuf;
+ lenRcv = toffs + 1;
+ }
+
/* we now create our own message object and submit it to the queue */
CHKiRet(msgConstructWithTime(&pMsg, &st, tt));
MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv);
@@ -564,15 +727,27 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
parse++; lenMsg--; /* '>' */
- if((pLstn->flags & IGNDATE)) {
- /* in this case, we still need to find out if we have a valid
- * datestamp or not .. and advance the parse pointer accordingly.
- */
- struct syslogTime dummy;
- datetime.ParseTIMESTAMP3164(&dummy, &parse, &lenMsg);
- } else {
- if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
- DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
+ if(ts == NULL) {
+ if((pLstn->flags & IGNDATE)) {
+ /* in this case, we still need to find out if we have a valid
+ * datestamp or not .. and advance the parse pointer accordingly.
+ */
+ datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg);
+ } else {
+ if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
+ DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
+ }
+ }
+ } else { /* if we pulled the time from the system, we need to update the message text */
+ uchar *tmpParse = parse; /* just to check correctness of TS */
+ if(datetime.ParseTIMESTAMP3164(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK) {
+ /* We modify the message only if it contained a valid timestamp,
+ * otherwise we do not touch it at all. */
+ datetime.formatTimestamp3164(&st, (char*)parse, 0);
+ parse[15] = ' '; /* re-write \0 from fromatTimestamp3164 by SP */
+ /* update "counters" to reflect processed timestamp */
+ parse += 16;
+ lenMsg -= 16;
}
}
@@ -624,6 +799,7 @@ static rsRetVal readSocket(lstn_t *pLstn)
struct cmsghdr *cm;
# endif
struct ucred *cred;
+ struct timeval *ts;
uchar bufRcv[4096+1];
char aux[128];
uchar *pRcv = NULL; /* receive buffer */
@@ -662,21 +838,28 @@ static rsRetVal readSocket(lstn_t *pLstn)
dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd);
if(iRcvd > 0) {
cred = NULL;
-# if HAVE_SCM_CREDENTIALS
- if(pLstn->bUseCreds) {
- dbgprintf("XXX: pre CM loop, length of control message %d\n", (int) msgh.msg_controllen);
- for (cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) {
- dbgprintf("XXX: in CM loop, %d, %d\n", cm->cmsg_level, cm->cmsg_type);
- if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) {
+ ts = NULL;
+ if(pLstn->bUseCreds || pLstn->bUseSysTimeStamp) {
+ for(cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) {
+# if HAVE_SCM_CREDENTIALS
+ if( pLstn->bUseCreds
+ && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) {
cred = (struct ucred*) CMSG_DATA(cm);
- dbgprintf("XXX: got credentials pid %d\n", (int) cred->pid);
break;
}
+# endif /* HAVE_SCM_CREDENTIALS */
+# if HAVE_SO_TIMESTAMP
+ if( pLstn->bUseSysTimeStamp
+ && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) {
+ ts = (struct timeval *)CMSG_DATA(cm);
+ dbgprintf("XXX: got timestamp %ld.%ld\n",
+ (long) ts->tv_sec, (long) ts->tv_usec);
+ break;
+ }
+# endif /* HAVE_SO_TIMESTAMP */
}
- dbgprintf("XXX: post CM loop\n");
}
-# endif /* HAVE_SCM_CREDENTIALS */
- CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred));
+ CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred, ts));
} else if(iRcvd < 0 && errno != EINTR) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));
@@ -786,8 +969,10 @@ CODESTARTwillRun
listeners[0].ratelimitInterval = ratelimitIntervalSysSock;
listeners[0].ratelimitBurst = ratelimitBurstSysSock;
listeners[0].ratelimitSev = ratelimitSeveritySysSock;
- listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0;
+ listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock || bAnnotateSysSock) ? 1 : 0;
listeners[0].bWritePid = bWritePidSysSock;
+ listeners[0].bAnnotate = bAnnotateSysSock;
+ listeners[0].bUseSysTimeStamp = bUseSysTimeStampSysSock;
sd_fds = sd_listen_fds(0);
if (sd_fds < 0) {
@@ -830,7 +1015,6 @@ CODESTARTafterRun
/* Clean-up files. */
for(i = startIndexUxLocalSockets; i < nfd; i++)
if (listeners[i].sockName && listeners[i].fd != -1) {
-
/* If systemd passed us a socket it is systemd's job to clean it up.
* Do not unlink it -- we will get same socket (node) from systemd
* e.g. on restart again.
@@ -900,6 +1084,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
bUseFlowCtl = 0;
bWritePid = 0;
bWritePidSysSock = 0;
+ bUseSysTimeStamp = 1;
+ bUseSysTimeStampSysSock = 1;
bCreatePath = DFLT_bCreatePath;
ratelimitInterval = DFLT_ratelimitInterval;
ratelimitIntervalSysSock = DFLT_ratelimitInterval;
@@ -934,7 +1120,9 @@ CODEmodInit_QueryRegCFSLineHdlr
listeners[0].fd = -1;
listeners[0].bParseHost = 0;
listeners[0].bUseCreds = 0;
+ listeners[0].bAnnotate = 0;
listeners[0].bCreatePath = 0;
+ listeners[0].bUseSysTimeStamp = 1;
/* initialize socket names */
for(i = 1 ; i < MAXFUNIX ; ++i) {
@@ -962,10 +1150,14 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &pLogHostName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary,
NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketannotate", 0, eCmdHdlrBinary,
+ NULL, &bAnnotate, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary,
NULL, &bCreatePath, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary,
NULL, &bWritePid, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusesystimestamp", 0, eCmdHdlrBinary,
+ NULL, &bUseSysTimeStamp, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord,
addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt,
@@ -986,6 +1178,10 @@ CODEmodInit_QueryRegCFSLineHdlr
setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary,
setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusesystimestamp", 0, eCmdHdlrBinary,
+ NULL, &bUseSysTimeStampSysSock, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketannotate", 0, eCmdHdlrBinary,
+ NULL, &bAnnotateSysSock, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary,
NULL, &bWritePidSysSock, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt,
diff --git a/plugins/mmsnmptrapd/mmsnmptrapd.c b/plugins/mmsnmptrapd/mmsnmptrapd.c
index 767829d6..b78046ee 100644
--- a/plugins/mmsnmptrapd/mmsnmptrapd.c
+++ b/plugins/mmsnmptrapd/mmsnmptrapd.c
@@ -418,7 +418,7 @@ CODEmodInit_QueryRegCFSLineHdlr
cs.pszTagName = NULL;
cs.pszSeverityMapping = NULL;
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrInt,
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrGetWord,
NULL, &cs.pszTagName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdseveritymapping", 0, eCmdHdlrGetWord,
NULL, &cs.pszSeverityMapping, STD_LOADABLE_MODULE_ID));
diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am
new file mode 100644
index 00000000..a574c72f
--- /dev/null
+++ b/plugins/omelasticsearch/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = omelasticsearch.la
+
+omelasticsearch_la_SOURCES = omelasticsearch.c
+omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+omelasticsearch_la_LDFLAGS = -module -avoid-version
+omelasticsearch_la_LIBADD = $(CURL_LIBS)
+
+EXTRA_DIST =
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
new file mode 100644
index 00000000..3bec1838
--- /dev/null
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -0,0 +1,270 @@
+/* omelasticsearch.c
+ * This is the http://www.elasticsearch.org/ output module.
+ *
+ * NOTE: read comments in module-template.h for more specifics!
+ *
+ * Copyright 2011 Nathan Scott.
+ * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <curl/curl.h>
+#include <curl/types.h>
+#include <curl/easy.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <time.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "statsobj.h"
+#include "cfsysline.h"
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+
+/* internal structures */
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(statsobj)
+
+statsobj_t *indexStats;
+STATSCOUNTER_DEF(indexConFail, mutIndexConFail)
+STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
+STATSCOUNTER_DEF(indexFailed, mutIndexFailed)
+STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
+
+/* REST API for elasticsearch hits this URL:
+ * http://<hostName>:<restPort>/<searchIndex>/<searchType>
+ */
+typedef struct curl_slist HEADER;
+typedef struct _instanceData {
+ CURL *curlHandle; /* libcurl session handle */
+ HEADER *postHeader; /* json POST request info */
+} instanceData;
+
+/* config variables */
+static int restPort = 9200;
+static char *hostName = "localhost";
+static char *searchIndex = "system";
+static char *searchType = "events";
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURERepeatedMsgReduction)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ if (pData->postHeader) {
+ curl_slist_free_all(pData->postHeader);
+ pData->postHeader = NULL;
+ }
+ if (pData->curlHandle) {
+ curl_easy_cleanup(pData->curlHandle);
+ pData->curlHandle = NULL;
+ }
+ENDfreeInstance
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+rsRetVal
+curlPost(instanceData *instance, uchar *message)
+{
+ CURLcode code;
+ CURL *curl = instance->curlHandle;
+ int length = strlen((char *)message);
+
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length);
+ code = curl_easy_perform(curl);
+ switch (code) {
+ case CURLE_COULDNT_RESOLVE_HOST:
+ case CURLE_COULDNT_RESOLVE_PROXY:
+ case CURLE_COULDNT_CONNECT:
+ case CURLE_WRITE_ERROR:
+ STATSCOUNTER_INC(indexConFail, mutIndexConFail);
+ return RS_RET_SUSPENDED;
+ default:
+ STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
+ return RS_RET_OK;
+ }
+}
+
+BEGINdoAction
+CODESTARTdoAction
+ CHKiRet(curlPost(pData, ppString[0]));
+finalize_it:
+ENDdoAction
+
+/* elasticsearch POST result string ... useful for debugging */
+size_t
+curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
+{
+ unsigned int i;
+ char *p = (char *)ptr;
+ char *jsonData = (char *)userdata;
+ static char ok[] = "{\"ok\":true,";
+
+ ASSERT(size == 1);
+
+ if (size == 1 &&
+ nmemb > sizeof(ok)-1 &&
+ strncmp(p, ok, sizeof(ok)-1) == 0) {
+ STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
+ } else {
+ STATSCOUNTER_INC(indexFailed, mutIndexFailed);
+ if (Debug) {
+ DBGPRINTF("omelasticsearch request: %s\n", jsonData);
+ DBGPRINTF("omelasticsearch result: ");
+ for (i = 0; i < nmemb; i++)
+ DBGPRINTF("%c", p[i]);
+ DBGPRINTF("\n");
+ }
+ }
+ return size * nmemb;
+}
+
+static rsRetVal
+curlSetup(instanceData *instance)
+{
+ char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */
+ HEADER *header;
+ CURL *handle;
+
+ handle = curl_easy_init();
+ if (handle == NULL) {
+ return RS_RET_OBJ_CREATION_FAILED;
+ }
+
+ snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s",
+ hostName, restPort, searchIndex, searchType);
+ header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
+
+ curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
+ curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header);
+ curl_easy_setopt(handle, CURLOPT_URL, restURL);
+ curl_easy_setopt(handle, CURLOPT_POST, 1);
+
+ instance->curlHandle = handle;
+ instance->postHeader = header;
+
+ DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL);
+ return RS_RET_OK;
+}
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) {
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+ }
+ p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
+ CHKiRet(createInstance(&pData));
+
+ /* check if a non-standard template is to be applied */
+ if(*(p-1) == ';')
+ --p;
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt"));
+
+ /* all good, we can now initialise our private data */
+ CHKiRet(curlSetup(pData));
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+BEGINmodExit
+CODESTARTmodExit
+ curl_global_cleanup();
+ statsobj.Destruct(&indexStats);
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
+ENDmodExit
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+static rsRetVal
+resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
+{
+ DEFiRet;
+ restPort = 9200;
+ hostName = "localhost";
+ searchIndex = "system";
+ searchType = "events";
+ RETiRet;
+}
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
+
+ /* register config file handlers */
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
+
+ if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
+ errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
+ ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED);
+ }
+
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&indexStats));
+ CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch"));
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail",
+ ctrType_IntCtr, &indexConFail));
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits",
+ ctrType_IntCtr, &indexSubmit));
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed",
+ ctrType_IntCtr, &indexFailed));
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success",
+ ctrType_IntCtr, &indexSuccess));
+ CHKiRet(statsobj.ConstructFinalize(indexStats));
+ENDmodInit
+
+/* vi:set ai:
+ */
diff --git a/plugins/omhdfs/javaenv.sh b/plugins/omhdfs/javaenv.sh
new file mode 100644
index 00000000..d07a8473
--- /dev/null
+++ b/plugins/omhdfs/javaenv.sh
@@ -0,0 +1,14 @@
+# This is a sample file for environment settings on Fedora 13
+# that made me compile & run omhdfs. I really *hate* the way
+# java uses environment variables... Hopefully this file will
+# help building and testing omhdfs in the future (there is also
+# some more information in the rsyslog wiki).
+# rgerhards, 2011-03-11
+# this now works, but don't ask my why ;)
+#export JAVA_HOME=/usr/java/jdk1.6.0_21/bin/java
+export PATH=/usr/java/jdk1.6.0_21/bin:$PATH
+export JAVA_INCLUDES="-I/usr/java/jdk1.6.0_21/include -I/usr/java/jdk1.6.0_21/include/linux"
+export JAVA_LIBS="-L/usr/java/jdk1.6.0_21/jre/lib/i386 -ljava -ljvm -lverify"
+export HADOOP_HOME=/usr/lib/hadoop
+export CLASSPATH=/usr/lib/jvm/java-6-sun/lib:/usr/lib/hadoop/lib:/usr/lib/hadoop/hadoop-ant-0.20.2+320.jar:/usr/lib/hadoop/hadoop-core-0.20.2+320.jar:/usr/lib/hadoop/hadoop-examples-0.20.2+320.jar:/usr/lib/hadoop/hadoop-test-0.20.2+320.jar:/usr/lib/hadoop/hadoop-tools-0.20.2+320.jar/usr/lib/hadoop/lib/commons-cli-1.2.jar:/usr/lib/hadoop/lib/commons-codec-1.3.jar:/usr/lib/hadoop/lib/commons-el-1.0.jar:/usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar:/usr/lib/hadoop/lib/commons-logging-1.0.4.jar:/usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar:/usr/lib/hadoop/lib/commons-net-1.4.1.jar:/usr/lib/hadoop/lib/core-3.1.1.jar:/usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar:/usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar:/usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar:/usr/lib/hadoop/lib/hsqldb.jar:/usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar:/usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar:/usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar:/usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar:/usr/lib/hadoop/lib/jets3t-0.6.1.jar:/usr/lib/hadoop/lib/jetty-6.1.14.jar:/usr/lib/hadoop/lib/jetty-util-6.1.14.jar:/usr/lib/hadoop/lib/junit-4.5.jar:/usr/lib/hadoop/lib/kfs-0.2.2.jar:/usr/lib/hadoop/lib/libfb303.jar:/usr/lib/hadoop/lib/libthrift.jar:/usr/lib/hadoop/lib/log4j-1.2.15.jar:/usr/lib/hadoop/lib/mockito-all-1.8.2.jar:/usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar:/usr/lib/hadoop/lib/oro-2.0.8.jar:/usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar:/usr/lib/hadoop/lib/slf4j-api-1.4.3.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar:/usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf
+###export CLASSPATH="/usr/lib/hadoop/hadoop-0.20.2+320-ant.jar: /usr/lib/hadoop/hadoop-0.20.2+320-core.jar: /usr/lib/hadoop/hadoop-0.20.2+320-examples.jar: /usr/lib/hadoop/hadoop-0.20.2+320-test.jar: /usr/lib/hadoop/hadoop-0.20.2+320-tools.jar: /usr/lib/hadoop/hadoop-ant-0.20.2+320.jar: /usr/lib/hadoop/hadoop-core-0.20.2+320.jar: /usr/lib/hadoop/hadoop-examples-0.20.2+320.jar: /usr/lib/hadoop/hadoop-test-0.20.2+320.jar: /usr/lib/hadoop/hadoop-tools-0.20.2+320.jar:/usr/lib/hadoop/lib: /usr/lib/hadoop/lib/commons-cli-1.2.jar: /usr/lib/hadoop/lib/commons-codec-1.3.jar: /usr/lib/hadoop/lib/commons-el-1.0.jar: /usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar: /usr/lib/hadoop/lib/commons-logging-1.0.4.jar: /usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar: /usr/lib/hadoop/lib/commons-net-1.4.1.jar: /usr/lib/hadoop/lib/core-3.1.1.jar: /usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar: /usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar: /usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar: /usr/lib/hadoop/lib/hsqldb.jar: /usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar: /usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar: /usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar: /usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar: /usr/lib/hadoop/lib/jets3t-0.6.1.jar: /usr/lib/hadoop/lib/jetty-6.1.14.jar: /usr/lib/hadoop/lib/jetty-util-6.1.14.jar: /usr/lib/hadoop/lib/junit-4.5.jar: /usr/lib/hadoop/lib/kfs-0.2.2.jar: /usr/lib/hadoop/lib/libfb303.jar: /usr/lib/hadoop/lib/libthrift.jar: /usr/lib/hadoop/lib/log4j-1.2.15.jar: /usr/lib/hadoop/lib/mockito-all-1.8.2.jar: /usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar: /usr/lib/hadoop/lib/oro-2.0.8.jar: /usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar: /usr/lib/hadoop/lib/slf4j-api-1.4.3.jar: /usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar: /usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf:/usr/lib/hadoop/lib"
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index 8b72747f..76128a4e 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -80,6 +80,8 @@ typedef struct {
typedef struct _instanceData {
file_t *pFile;
+ uchar ioBuf[64*1024];
+ unsigned offsBuf;
} instanceData;
/* forward definitions (down here, need data types) */
@@ -260,7 +262,8 @@ fileOpen(file_t *pFile)
if(errno == ENOENT) {
DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n",
pFile->name);
- pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
+ pFile->fh = hdfsOpenFile(pFile->fs,
+ (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
}
}
if(pFile->fh == NULL) {
@@ -275,12 +278,15 @@ finalize_it:
}
+/* Note: lenWrite is reset to zero on successful write! */
static inline rsRetVal
-fileWrite(file_t *pFile, uchar *buf)
+fileWrite(file_t *pFile, uchar *buf, size_t *lenWrite)
{
- size_t lenWrite;
DEFiRet;
+ if(*lenWrite == 0)
+ FINALIZE;
+
if(pFile->nUsers > 1)
d_pthread_mutex_lock(&pFile->mut);
@@ -294,18 +300,18 @@ fileWrite(file_t *pFile, uchar *buf)
}
}
- lenWrite = strlen((char*) buf);
- tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
- if((unsigned) num_written_bytes != lenWrite) {
- errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, "
- "written %lu\n", pFile->name, (unsigned long) lenWrite,
+dbgprintf("XXXXX: omhdfs writing %u bytes\n", *lenWrite);
+ tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, *lenWrite);
+ if((unsigned) num_written_bytes != *lenWrite) {
+ errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE,
+ "omhdfs: failed to write %s, expected %lu bytes, "
+ "written %lu\n", pFile->name, (unsigned long) *lenWrite,
(unsigned long) num_written_bytes);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ *lenWrite = 0;
finalize_it:
- if(pFile->nUsers > 1)
- d_pthread_mutex_unlock(&pFile->mut);
RETiRet;
}
@@ -333,6 +339,40 @@ finalize_it:
/* ---END FILE OBJECT---------------------------------------------------- */
+/* This adds data to the output buffer and performs an actual write
+ * if the new data does not fit into the buffer. Note that we never write
+ * partial data records. Other actions may write into the same file, and if
+ * we would write partial records, data could become severely mixed up.
+ * Note that we must check of some new data arrived is large than our
+ * buffer. In that case, the new data will written with its own
+ * write operation.
+ */
+static inline rsRetVal
+addData(instanceData *pData, uchar *buf)
+{
+ unsigned len;
+ DEFiRet;
+
+ len = strlen((char*)buf);
+ if(pData->offsBuf + len < sizeof(pData->ioBuf)) {
+ /* new data fits into remaining buffer */
+ memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len);
+ pData->offsBuf += len;
+ } else {
+dbgprintf("XXXXX: not enough room, need to flush\n");
+ CHKiRet(fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf));
+ if(len >= sizeof(pData->ioBuf)) {
+ CHKiRet(fileWrite(pData->pFile, buf, &len));
+ } else {
+ memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len);
+ pData->offsBuf += len;
+ }
+ }
+
+ iRet = RS_RET_DEFER_COMMIT;
+finalize_it:
+ RETiRet;
+}
BEGINcreateInstance
CODESTARTcreateInstance
@@ -358,13 +398,31 @@ CODESTARTtryResume
}
ENDtryResume
+
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+dbgprintf("omhdfs: beginTransaction\n");
+ENDbeginTransaction
+
+
BEGINdoAction
CODESTARTdoAction
- DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name);
- iRet = fileWrite(pData->pFile, ppString[0]);
+ DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name);
+ iRet = addData(pData, ppString[0]);
+dbgprintf("omhdfs: done doAction\n");
ENDdoAction
+BEGINendTransaction
+CODESTARTendTransaction
+dbgprintf("omhdfs: endTransaction\n");
+ if(pData->offsBuf != 0) {
+ DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n");
+ iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf);
+ }
+ENDendTransaction
+
+
BEGINparseSelectorAct
file_t *pFile;
int r;
@@ -409,6 +467,7 @@ CODESTARTparseSelectorAct
}
fileObjAddUser(pFile);
pData->pFile = pFile;
+ pData->offsBuf = 0;
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -455,6 +514,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
CODEqueryEtryPt_doHUP
ENDqueryEtryPt
@@ -472,5 +532,6 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
+ DBGPRINTF("omhdfs: module compiled with rsyslog version %s.\n", VERSION);
CODEmodInit_QueryRegCFSLineHdlr
ENDmodInit