diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imfile/imfile.c | 23 | ||||
-rw-r--r-- | plugins/imklog/bsd.c | 6 | ||||
-rw-r--r-- | plugins/imklog/imklog.c | 52 | ||||
-rw-r--r-- | plugins/imklog/imklog.h | 4 | ||||
-rw-r--r-- | plugins/imklog/linux.c | 113 | ||||
-rw-r--r-- | plugins/imklog/solaris.c | 68 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 105 | ||||
-rw-r--r-- | plugins/imtcp/imtcp.c | 10 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 246 | ||||
-rw-r--r-- | plugins/mmsnmptrapd/mmsnmptrapd.c | 2 | ||||
-rw-r--r-- | plugins/omelasticsearch/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 270 | ||||
-rw-r--r-- | plugins/omhdfs/javaenv.sh | 14 | ||||
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 85 |
14 files changed, 861 insertions, 145 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index ba8318df..632bf390 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -63,6 +63,7 @@ DEFobjCurrIf(strm) DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) +#define NUM_MULTISUB 1024 /* max number of submits -- TODO: make configurable */ typedef struct fileInfo_s { uchar *pszFileName; uchar *pszTag; @@ -70,11 +71,13 @@ typedef struct fileInfo_s { uchar *pszStateFile; /* file in which state between runs is to be stored */ int iFacility; int iSeverity; + int maxLinesAtOnce; int nRecords; /**< How many records did we process before persisting the stream? */ int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */ strm_t *pStrm; /* its stream (NULL if not assigned) */ int readMode; /* which mode to use in ReadMulteLine call? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + multi_submit_t multiSub; } fileInfo_t; @@ -90,6 +93,7 @@ static int iPersistStateInterval = 0; /* how often if state file to be persisted static int iFacility = 128; /* local0 */ static int iSeverity = 5; /* notice, as of rfc 3164 */ static int readMode = 0; /* mode to use for ReadMultiLine call */ +static int maxLinesAtOnce = 10240; /* how many lines to process in a row? */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */ @@ -121,7 +125,9 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) pMsg->iFacility = LOG_FAC(pInfo->iFacility); pMsg->iSeverity = LOG_PRI(pInfo->iSeverity); MsgSetRuleset(pMsg, pInfo->pRuleset); - CHKiRet(submitMsg(pMsg)); + pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg; + if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem) + CHKiRet(multiSubmitMsg(&pInfo->multiSub)); finalize_it: RETiRet; } @@ -205,6 +211,7 @@ static void pollFileCancelCleanup(void *pArg) static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) { cstr_t *pCStr = NULL; + int nProcessed = 0; DEFiRet; ASSERT(pbHadFileData != NULL); @@ -219,7 +226,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) /* loop below will be exited when strmReadLine() returns EOF */ while(glbl.GetGlobalInputTermState() == 0) { + if(pThis->maxLinesAtOnce != 0 && nProcessed >= pThis->maxLinesAtOnce) + break; CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode)); + ++nProcessed; *pbHadFileData = 1; /* this is just a flag, so set it and forget it */ CHKiRet(enqLine(pThis, pCStr)); /* process line */ rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */ @@ -230,6 +240,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) } finalize_it: + if(pThis->multiSub.nElem > 0) { + /* submit everything that was not yet submitted */ + CHKiRet(multiSubmitMsg(&pThis->multiSub)); + } ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */ /* Note: the problem above is that pthread:cleanup_pop() is a macro which * evaluates to something like "} while(0);". So the code would become @@ -474,6 +488,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iSeverity = 5; /* notice, as of rfc 3164 */ readMode = 0; pBindRuleset = NULL; + maxLinesAtOnce = 10240; RETiRet; } @@ -512,8 +527,12 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal) pThis->pszStateFile = (uchar*) strdup((char*) pszStateFile); } + CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(NUM_MULTISUB * sizeof(msg_t*))); + pThis->multiSub.maxElem = NUM_MULTISUB; + pThis->multiSub.nElem = 0; pThis->iSeverity = iSeverity; pThis->iFacility = iFacility; + pThis->maxLinesAtOnce = maxLinesAtOnce; pThis->iPersistStateInterval = iPersistStateInterval; pThis->nRecords = 0; pThis->readMode = readMode; @@ -591,6 +610,8 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &iPollInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt, NULL, &readMode, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilemaxlinesatonce", 0, eCmdHdlrSize, + NULL, &maxLinesAtOnce, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt, NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord, diff --git a/plugins/imklog/bsd.c b/plugins/imklog/bsd.c index 0a4c7cd4..930bbd11 100644 --- a/plugins/imklog/bsd.c +++ b/plugins/imklog/bsd.c @@ -155,18 +155,18 @@ readklog(void) for (p = (char*)pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) { *q = '\0'; - Syslog(LOG_INFO, (uchar*) p); + Syslog(LOG_INFO, (uchar*) p, NULL); } len = strlen(p); if (len >= iMaxLine - 1) { - Syslog(LOG_INFO, (uchar*)p); + Syslog(LOG_INFO, (uchar*)p, NULL); len = 0; } if (len > 0) memmove(pRcv, p, len + 1); } if (len > 0) - Syslog(LOG_INFO, pRcv); + Syslog(LOG_INFO, pRcv, NULL); if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1) free(pRcv); diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c index 69c8cd1a..cb28e68e 100644 --- a/plugins/imklog/imklog.c +++ b/plugins/imklog/imklog.c @@ -18,7 +18,10 @@ * Please note that this file replaces the klogd daemon that was * also present in pre-v3 versions of rsyslog. * - * Copyright (C) 2008, 2009 by Rainer Gerhards and Adiscon GmbH + * To test under Linux: + * echo test1 > /dev/kmsg + * + * Copyright (C) 2008-2011 by Rainer Gerhards and Adiscon GmbH * * This file is part of rsyslog. * @@ -93,15 +96,21 @@ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 * rgerhards, 2008-04-12 */ static rsRetVal -enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity) +enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *tp) { - DEFiRet; + struct syslogTime st; msg_t *pMsg; + DEFiRet; assert(msg != NULL); assert(pszTag != NULL); - CHKiRet(msgConstruct(&pMsg)); + if(tp == NULL) { + CHKiRet(msgConstruct(&pMsg)); + } else { + datetime.timeval2syslogTime(tp, &st); + CHKiRet(msgConstructWithTime(&pMsg, &st, tp->tv_sec)); + } MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); MsgSetInputName(pMsg, pInputName); MsgSetRawMsgWOSize(pMsg, (char*)msg); @@ -174,31 +183,48 @@ rsRetVal imklogLogIntMsg(int priority, char *fmt, ...) va_end(ap); iRet = enqMsg((uchar*)pLogMsg, (uchar*) ((iFacilIntMsg == LOG_KERN) ? "kernel:" : "imklog:"), - iFacilIntMsg, LOG_PRI(priority)); + iFacilIntMsg, LOG_PRI(priority), NULL); RETiRet; } -/* log a kernel message +/* log a kernel message. If tp is non-NULL, it contains the message creation + * time to use. * rgerhards, 2008-04-14 */ -rsRetVal Syslog(int priority, uchar *pMsg) +rsRetVal Syslog(int priority, uchar *pMsg, struct timeval *tp) { - DEFiRet; + int pri = -1; rsRetVal localRet; + DEFiRet; - /* Output using syslog */ - localRet = parsePRI(&pMsg, &priority); - if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK) - FINALIZE; + /* then check if we have two PRIs. This can happen in case of systemd, + * in which case the second PRI is the rigth one. + * TODO: added kernel timestamp support to this PoC. -- rgerhards, 2011-03-18 + */ + if(pMsg[3] == '<') { /* could be a pri... */ + uchar *pMsgTmp = pMsg + 3; + localRet = parsePRI(&pMsgTmp, &pri); + if(localRet == RS_RET_OK && pri >= 8 && pri <= 192) { + /* *this* is our PRI */ + DBGPRINTF("imklog detected secondary PRI in klog msg\n"); + pMsg = pMsgTmp; + priority = pri; + } + } + if(pri == -1) { + localRet = parsePRI(&pMsg, &priority); + if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK) + FINALIZE; + } /* if we don't get the pri, we use whatever we were supplied */ /* ignore non-kernel messages if not permitted */ if(bPermitNonKernel == 0 && LOG_FAC(priority) != LOG_KERN) FINALIZE; /* silently ignore */ - iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority)); + iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority), tp); finalize_it: RETiRet; diff --git a/plugins/imklog/imklog.h b/plugins/imklog/imklog.h index c183026d..39bdeb5c 100644 --- a/plugins/imklog/imklog.h +++ b/plugins/imklog/imklog.h @@ -5,7 +5,7 @@ * Major change: 2008-04-09: switched to a driver interface for * several platforms * - * Copyright 2007-2008 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -56,7 +56,7 @@ extern uchar *pszPath; /* the functions below may be called by the drivers */ rsRetVal imklogLogIntMsg(int priority, char *fmt, ...) __attribute__((format(printf,2, 3))); -rsRetVal Syslog(int priority, uchar *msg); +rsRetVal Syslog(int priority, uchar *msg, struct timeval *tp); /* prototypes */ extern int klog_getMaxLine(void); /* work-around for klog drivers to get configured max line size */ diff --git a/plugins/imklog/linux.c b/plugins/imklog/linux.c index 727708a5..b44619f5 100644 --- a/plugins/imklog/linux.c +++ b/plugins/imklog/linux.c @@ -28,6 +28,8 @@ #include "rsyslog.h" #include <stdlib.h> #include <stdio.h> +#include <ctype.h> +#include <time.h> #include <assert.h> #include <signal.h> #include <string.h> @@ -181,6 +183,93 @@ static int copyin( uchar *line, int space, return(i); } + +/* submit a message to imklog Syslog() API. In this function, we check if + * a kernel timestamp is present and, if so, extract and strip it. + * Note: this is an extra processing step. We should revisit the whole + * idea in v6 and remove all that old stuff that we do not longer need + * (like symbol resolution). <-- TODO + * Special thanks to Lennart Poettering for suggesting on how to convert + * the kernel timestamp to a realtime timestamp. This method depends on + * the fact the the kernel timestamp is written using the monotonic clock. + * Shall that change (very unlikely), this code must be changed as well. Note + * that due to the way we generate the delta, we are unable to write the + * absolutely correc timestamp (system call overhead of the clock calls + * prevents us from doing so). However, the difference is very minor. + * rgerhards, 201106-24 + */ +static void +submitSyslog(int pri, uchar *buf) +{ + long secs; + long nsecs; + long secOffs; + long nsecOffs; + unsigned i; + unsigned bufsize; + struct timespec monotonic, realtime; + struct timeval tv; + struct timeval *tp = NULL; + + if(buf[3] != '[') + goto done; + DBGPRINTF("imklog: kernel timestamp detected, extracting it\n"); + + /* we now try to parse the timestamp. iff it parses, we assume + * it is a timestamp. Otherwise we know for sure it is no ts ;) + */ + i = 4; /* first digit after '[' */ + secs = 0; + while(buf[i] && isdigit(buf[i])) { + secs = secs * 10 + buf[i] - '0'; + ++i; + } + if(buf[i] != '.') { + DBGPRINTF("no dot --> no kernel timestamp\n"); + goto done; /* no TS! */ + } + + ++i; /* skip dot */ + nsecs = 0; + while(buf[i] && isdigit(buf[i])) { + nsecs = nsecs * 10 + buf[i] - '0'; + ++i; + } + if(buf[i] != ']') { + DBGPRINTF("no trailing ']' --> no kernel timestamp\n"); + goto done; /* no TS! */ + } + ++i; /* skip ']' */ + + /* we have a timestamp */ + DBGPRINTF("kernel timestamp is %ld %ld\n", secs, nsecs); + bufsize= strlen((char*)buf); + memcpy(buf+3, buf+i, bufsize - i + 1); + + clock_gettime(CLOCK_MONOTONIC, &monotonic); + clock_gettime(CLOCK_REALTIME, &realtime); + secOffs = realtime.tv_sec - monotonic.tv_sec; + nsecOffs = realtime.tv_nsec - monotonic.tv_nsec; + if(nsecOffs < 0) { + secOffs--; + nsecOffs += 1000000000l; + } + + nsecs +=nsecOffs; + if(nsecs > 999999999l) { + secs++; + nsecs -= 1000000000l; + } + secs += secOffs; + tv.tv_sec = secs; + tv.tv_usec = nsecs / 1000; + tp = &tv; + +done: + Syslog(pri, buf, tp); +} + + /* * Messages are separated by "\n". Messages longer than * LOG_LINE_LENGTH are broken up. @@ -235,7 +324,7 @@ static void LogLine(char *ptr, int len) //dbgprintf("Line buffer full:\n"); //dbgprintf("\tLine: %s\n", line); - Syslog(LOG_INFO, line_buff); + submitSyslog(LOG_INFO, line_buff); line = line_buff; space = sizeof(line_buff)-1; parse_state = PARSING_TEXT; @@ -254,28 +343,24 @@ static void LogLine(char *ptr, int len) space -= delta; len -= delta; - if( space == 0 || len == 0 ) - { + if( space == 0 || len == 0 ) { break; /* full line_buff or end of input buffer */ } - if( *ptr == '\0' ) /* zero byte */ - { + if( *ptr == '\0' ) /* zero byte */ { ptr++; /* skip zero byte */ space -= 1; len -= 1; - break; } - if( *ptr == '\n' ) /* newline */ - { + if( *ptr == '\n' ) /* newline */ { ptr++; /* skip newline */ space -= 1; len -= 1; *line = 0; /* force null terminator */ - Syslog(LOG_INFO, line_buff); + submitSyslog(LOG_INFO, line_buff); line = line_buff; space = sizeof(line_buff)-1; if (symbols_twice) { @@ -285,9 +370,7 @@ static void LogLine(char *ptr, int len) skip_symbol_lookup = 1; ptr = save_ptr; len = save_len; - } - else - { + } else { skip_symbol_lookup = 0; save_ptr = ptr; save_len = len; @@ -295,8 +378,7 @@ static void LogLine(char *ptr, int len) } break; } - if( *ptr == '[' ) /* possible kernel symbol */ - { + if( *ptr == '[' ) /* possible kernel symbol */ { *line++ = *ptr++; space -= 1; len -= 1; @@ -310,8 +392,7 @@ static void LogLine(char *ptr, int len) break; case PARSING_SYMSTART: - if( *ptr != '<' ) - { + if( *ptr != '<' ) { parse_state = PARSING_TEXT; /* not a symbol */ break; } diff --git a/plugins/imklog/solaris.c b/plugins/imklog/solaris.c index 8a6d5af1..0a169cdd 100644 --- a/plugins/imklog/solaris.c +++ b/plugins/imklog/solaris.c @@ -80,74 +80,6 @@ klogWillRun(void) } -#if 0 -/* Read /dev/klog while data are available, split into lines. - * Contrary to standard BSD syslogd, we do a blocking read. We can - * afford this as imklog is running on its own threads. So if we have - * a single file, it really doesn't matter if we wait inside a 1-file - * select or the read() directly. - */ -static void -readklog(void) -{ - char *p, *q; - int len, i; - int iMaxLine; - uchar bufRcv[4096+1]; - uchar *pRcv = NULL; /* receive buffer */ - - iMaxLine = klog_getMaxLine(); - - /* we optimize performance: if iMaxLine is below 4K (which it is in almost all - * cases, we use a fixed buffer on the stack. Only if it is higher, heap memory - * is used. We could use alloca() to achive a similar aspect, but there are so - * many issues with alloca() that I do not want to take that route. - * rgerhards, 2008-09-02 - */ - if((size_t) iMaxLine < sizeof(bufRcv) - 1) { - pRcv = bufRcv; - } else { - if((pRcv = (uchar*) malloc(sizeof(uchar) * (iMaxLine + 1))) == NULL) - iMaxLine = sizeof(bufRcv) - 1; /* better this than noting */ - } - - len = 0; - for (;;) { - dbgprintf("----------imklog(BSD) waiting for kernel log line\n"); - i = read(fklog, pRcv + len, iMaxLine - len); - if (i > 0) { - pRcv[i + len] = '\0'; - } else { - if (i < 0 && errno != EINTR && errno != EAGAIN) { - imklogLogIntMsg(LOG_ERR, - "imklog error %d reading kernel log - shutting down imklog", - errno); - fklog = -1; - } - break; - } - - for(p = pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) { - *q = '\0'; - Syslog(LOG_INFO, (uchar*) p); - } - len = strlen(p); - if (len >= iMaxLine - 1) { - Syslog(LOG_INFO, (uchar*)p); - len = 0; - } - if (len > 0) - memmove(pRcv, p, len + 1); - } - if (len > 0) - Syslog(LOG_INFO, pRcv); - - if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1) - free(pRcv); -} -#endif - - /* to be called in the module's AfterRun entry point * rgerhards, 2008-04-09 */ diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 103da210..a735b0b2 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -49,6 +49,7 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> +#include <netinet/tcp.h> #if HAVE_FCNTL_H #include <fcntl.h> #endif @@ -88,6 +89,10 @@ DEFobjCurrIf(statsobj) /* config settings */ typedef struct configSettings_s { + int bKeepAlive; /* support keep-alive packets */ + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; int bEmitMsgOnClose; /* emit an informational message on close by remote peer */ int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */ uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ @@ -111,13 +116,17 @@ struct ptcpsrv_s { ptcpsrv_t *pNext; /* linked list maintenance */ uchar *port; /* Port to listen to */ uchar *lstnIP; /* which IP we should listen on? */ - int bEmitMsgOnClose; int iAddtlFrameDelim; + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; uchar *pszInputName; prop_t *pInputName; /* InputName in (fast to process) property format */ ruleset_t *pRuleset; ptcplstn_t *pLstn; /* root of our listeners */ ptcpsess_t *pSess; /* root of our sessions */ + sbool bKeepAlive; /* support keep-alive packets */ + sbool bEmitMsgOnClose; }; /* the ptcp session object. Describes a single active session. @@ -434,12 +443,80 @@ finalize_it: } +/* Enable KEEPALIVE handling on the socket. */ +static inline rsRetVal +EnableKeepAlive(ptcplstn_t *pLstn, int sock) +{ + int ret; + int optval; + socklen_t optlen; + DEFiRet; + + optval = 1; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); + if(ret < 0) { + dbgprintf("EnableKeepAlive socket call returns error %d\n", ret); + ABORT_FINALIZE(RS_RET_ERR); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveProbes > 0) { + optval = pLstn->pSrv->iKeepAliveProbes; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveTime > 0) { + optval = pLstn->pSrv->iKeepAliveTime; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveIntvl > 0) { + optval = pLstn->pSrv->iKeepAliveIntvl; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored"); + } + + dbgprintf("KEEPALIVE enabled for socket %d\n", sock); + +finalize_it: + RETiRet; +} + /* accept an incoming connection request * rgerhards, 2008-04-22 */ static rsRetVal -AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP) +AcceptConnReq(ptcplstn_t *pLstn, int *newSock, prop_t **peerName, prop_t **peerIP) { int sockflags; struct sockaddr_storage addr; @@ -448,13 +525,17 @@ AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP) DEFiRet; - iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen); + iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen); if(iNewSock < 0) { if(errno == EAGAIN || errno == EWOULDBLOCK) ABORT_FINALIZE(RS_RET_NO_MORE_DATA); ABORT_FINALIZE(RS_RET_ACCEPT_ERR); } + if(pLstn->pSrv->bKeepAlive) + EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */ + + CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr*) &addr)); /* set the new socket to non-blocking IO */ @@ -904,6 +985,10 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t))); pSrv->pSess = NULL; pSrv->pLstn = NULL; + pSrv->bKeepAlive = cs.bKeepAlive; + pSrv->iKeepAliveIntvl = cs.iKeepAliveTime; + pSrv->iKeepAliveProbes = cs.iKeepAliveProbes; + pSrv->iKeepAliveTime = cs.iKeepAliveTime; pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose; pSrv->port = pNewVal; pSrv->iAddtlFrameDelim = cs.iAddtlFrameDelim; @@ -967,7 +1052,7 @@ lstnActivity(ptcplstn_t *pLstn) DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock); while(1) { - localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP); + localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP); if(localRet == RS_RET_NO_MORE_DATA) break; CHKiRet(localRet); @@ -1169,6 +1254,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; + cs.bKeepAlive = 0; + cs.iKeepAliveProbes = 0; + cs.iKeepAliveTime = 0; + cs.iKeepAliveIntvl = 0; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; free(cs.pszInputName); cs.pszInputName = NULL; @@ -1202,6 +1291,14 @@ CODEmodInit_QueryRegCFSLineHdlr /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary, + NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0, eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index 6ab39477..e9961af7 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -82,12 +82,14 @@ static permittedPeers_t *pPermPeersRoot = NULL; /* config settings */ +static int bKeepAlive = 0; /* support keep-alive packets */ static int iTCPSessMax = 200; /* max number of sessions */ static int iTCPLstnMax = 20; /* max number of sessions */ static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */ static int bEmitMsgOnClose = 0; /* emit an informational message on close by remote peer */ static int iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; /* addtl frame delimiter, e.g. for netscreen, default none */ static int bDisableLFDelim = 0; /* disbale standard LF delimiter */ +static int bUseFlowControl = 1; /* use flow control, what means indicate ourselfs a "light delayable" */ static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */ static uchar *pszInputName = NULL; /* value for inputname property, NULL is OK and handled by core engine */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ @@ -191,6 +193,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa if(pOurTcpsrv == NULL) { CHKiRet(tcpsrv.Construct(&pOurTcpsrv)); + CHKiRet(tcpsrv.SetKeepAlive(pOurTcpsrv, bKeepAlive)); CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, iTCPSessMax)); CHKiRet(tcpsrv.SetLstnMax(pOurTcpsrv, iTCPLstnMax)); CHKiRet(tcpsrv.SetCBIsPermittedHost(pOurTcpsrv, isPermittedHost)); @@ -199,6 +202,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose)); CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose)); CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode)); + CHKiRet(tcpsrv.SetUseFlowControl(pOurTcpsrv, bUseFlowControl)); CHKiRet(tcpsrv.SetAddtlFrameDelim(pOurTcpsrv, iAddtlFrameDelim)); CHKiRet(tcpsrv.SetbDisableLFDelim(pOurTcpsrv, bDisableLFDelim)); CHKiRet(tcpsrv.SetNotificationOnRemoteClose(pOurTcpsrv, bEmitMsgOnClose)); @@ -287,8 +291,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { iTCPSessMax = 200; + bKeepAlive = 0; iTCPLstnMax = 20; iStrmDrvrMode = 0; + bUseFlowControl = 0; bEmitMsgOnClose = 0; iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; bDisableLFDelim = 0; @@ -324,6 +330,8 @@ CODEmodInit_QueryRegCFSLineHdlr /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverkeepalive"), 0, eCmdHdlrBinary, + NULL, &bKeepAlive, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt, NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt, @@ -344,6 +352,8 @@ CODEmodInit_QueryRegCFSLineHdlr eCmdHdlrGetWord, NULL, &pszInputName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverbindruleset"), 0, eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpflowcontrol"), 0, + eCmdHdlrBinary, NULL, &bUseFlowControl, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index feddb20c..403173e1 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -34,6 +34,7 @@ #include <string.h> #include <errno.h> #include <unistd.h> +#include <fcntl.h> #include <sys/stat.h> #include <sys/un.h> #include <sys/socket.h> @@ -135,7 +136,9 @@ typedef struct lstn_s { sbool bParseHost; /* should parser parse host name? read-only after startup */ sbool bCreatePath; /* auto-creation of socket directory? */ sbool bUseCreds; /* pull original creator credentials from socket */ + sbool bAnnotate; /* annotate events with trusted properties */ sbool bWritePid; /* write original PID into tag */ + sbool bUseSysTimeStamp; /* use timestamp from system (instead of from message) */ } lstn_t; static lstn_t listeners[MAXFUNIX]; @@ -156,9 +159,13 @@ static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is u static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */ static int bWritePid = 0; /* use credentials from recvmsg() and fixup PID in TAG */ static int bWritePidSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */ +static int bUseSysTimeStamp = 1; /* use timestamp from system (rather than from message) */ +static int bUseSysTimeStampSysSock = 1; /* same, for system log socket */ +static int bAnnotate = 0; /* annotate trusted properties */ +static int bAnnotateSysSock = 0; /* same, for system log socket */ #define DFLT_bCreatePath 0 static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */ -#define DFLT_ratelimitInterval 5 +#define DFLT_ratelimitInterval 0 static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */ static int ratelimitIntervalSysSock = DFLT_ratelimitInterval; #define DFLT_ratelimitBurst 200 @@ -300,8 +307,10 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal) listeners[nfd].flags = bIgnoreTimestamp ? IGNDATE : NOFLAG; listeners[nfd].bCreatePath = bCreatePath; listeners[nfd].sockName = pNewVal; - listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval) ? 1 : 0; + listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval || bAnnotate) ? 1 : 0; + listeners[nfd].bAnnotate = bAnnotate; listeners[nfd].bWritePid = bWritePid; + listeners[nfd].bUseSysTimeStamp = bUseSysTimeStamp; nfd++; } else { errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n", @@ -415,9 +424,14 @@ openLogSocket(lstn_t *pLstn) errmsg.LogError(errno, NO_ERRCODE, "set SCM_CREDENTIALS failed on '%s'", pLstn->sockName); pLstn->bUseCreds = 0; } +// TODO: move to its own #if + if(setsockopt(pLstn->fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) != 0) { + errmsg.LogError(errno, NO_ERRCODE, "set SO_TIMESTAMP failed on '%s'", pLstn->sockName); + } } # else /* HAVE_SCM_CREDENTIALS */ pLstn->bUseCreds = 0; + pLstn->bAnnotate = 0; # endif /* HAVE_SCM_CREDENTIALS */ finalize_it: @@ -500,12 +514,109 @@ fixPID(uchar *bufTAG, int *lenTag, struct ucred *cred) } +/* Get an "trusted property" from the system. Returns an empty string if the + * property can not be obtained. Inspired by similiar functionality inside + * journald. Currently works with Linux /proc filesystem, only. + */ +static rsRetVal +getTrustedProp(struct ucred *cred, char *propName, uchar *buf, size_t lenBuf, int *lenProp) +{ + int fd; + int i; + int lenRead; + char namebuf[1024]; + DEFiRet; + + if(snprintf(namebuf, sizeof(namebuf), "/proc/%lu/%s", (long unsigned) cred->pid, + propName) >= (int) sizeof(namebuf)) { + ABORT_FINALIZE(RS_RET_ERR); + } + + if((fd = open(namebuf, O_RDONLY)) == -1) { + DBGPRINTF("error reading '%s'\n", namebuf); + *lenProp = 0; + FINALIZE; + } + if((lenRead = read(fd, buf, lenBuf - 1)) == -1) { + DBGPRINTF("error reading file data for '%s'\n", namebuf); + *lenProp = 0; + close(fd); + FINALIZE; + } + + /* we strip after the first \n */ + for(i = 0 ; i < lenRead ; ++i) { + if(buf[i] == '\n') + break; + else if(iscntrl(buf[i])) + buf[i] = ' '; + } + buf[i] = '\0'; + *lenProp = i; + + close(fd); + +finalize_it: + RETiRet; +} + + +/* read the exe trusted property path (so far, /proc fs only) + */ +static rsRetVal +getTrustedExe(struct ucred *cred, uchar *buf, size_t lenBuf, int* lenProp) +{ + int lenRead; + char namebuf[1024]; + DEFiRet; + + if(snprintf(namebuf, sizeof(namebuf), "/proc/%lu/exe", (long unsigned) cred->pid) + >= (int) sizeof(namebuf)) { + ABORT_FINALIZE(RS_RET_ERR); + } + + if((lenRead = readlink(namebuf, (char*)buf, lenBuf - 1)) == -1) { + DBGPRINTF("error reading link '%s'\n", namebuf); + *lenProp = 0; + FINALIZE; + } + + buf[lenRead] = '\0'; + *lenProp = lenRead; + +finalize_it: + RETiRet; +} + + +/* copy a trusted property in escaped mode. That is, the property can contain + * any character and so it must be properly quoted AND escaped. + * It is assumed the output buffer is large enough. Returns the number of + * characters added. + */ +static inline int +copyescaped(uchar *dstbuf, uchar *inbuf, int inlen) +{ + int iDst, iSrc; + + *dstbuf = '"'; + for(iDst=1, iSrc=0 ; iSrc < inlen ; ++iDst, ++iSrc) { + if(inbuf[iSrc] == '"' || inbuf[iSrc] == '\\') { + dstbuf[iDst++] = '\\'; + } + dstbuf[iDst] = inbuf[iSrc]; + } + dstbuf[iDst++] = '"'; + return iDst; +} + + /* submit received message to the queue engine * We now parse the message according to expected format so that we * can also mangle it if necessary. */ static inline rsRetVal -SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) +SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct timeval *ts) { msg_t *pMsg; int lenMsg; @@ -519,6 +630,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) struct syslogTime st; time_t tt; rs_ratelimit_state_t *ratelimiter = NULL; + int lenProp; + uchar propBuf[1024]; + uchar msgbuf[8192]; + uchar *pmsgbuf; + int toffs; /* offset for trusted properties */ + struct syslogTime dummyTS; DEFiRet; /* TODO: handle format errors?? */ @@ -544,12 +661,58 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */ } - datetime.getCurrTime(&st, &tt); + if(ts == NULL) { + datetime.getCurrTime(&st, &tt); + } else { + datetime.timeval2syslogTime(ts, &st); + tt = ts->tv_sec; + } + if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) { STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit); FINALIZE; } + /* created trusted properties */ + if(cred != NULL && pLstn->bAnnotate) { + if((unsigned) (lenRcv + 4096) < sizeof(msgbuf)) { + pmsgbuf = msgbuf; + } else { + CHKmalloc(pmsgbuf = malloc(lenRcv+4096)); + } + memcpy(pmsgbuf, pRcv, lenRcv); + memcpy(pmsgbuf+lenRcv, " @[", 3); + toffs = lenRcv + 3; /* next free location */ + lenProp = snprintf((char*)propBuf, sizeof(propBuf), "_PID=%lu _UID=%lu _GID=%lu", + (long unsigned) cred->pid, (long unsigned) cred->uid, + (long unsigned) cred->gid); + memcpy(pmsgbuf+toffs, propBuf, lenProp); + toffs = toffs + lenProp; + getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp); + if(lenProp) { + memcpy(pmsgbuf+toffs, " _COMM=", 7); + memcpy(pmsgbuf+toffs+7, propBuf, lenProp); + toffs = toffs + 7 + lenProp; + } + getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp); + if(lenProp) { + memcpy(pmsgbuf+toffs, " _EXE=", 6); + memcpy(pmsgbuf+toffs+6, propBuf, lenProp); + toffs = toffs + 6 + lenProp; + } + getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp); + if(lenProp) { + memcpy(pmsgbuf+toffs, " _CMDLINE=", 10); + toffs = toffs + 10 + + copyescaped(pmsgbuf+toffs+10, propBuf, lenProp); + } + /* finalize string */ + pmsgbuf[toffs] = ']'; + pmsgbuf[toffs+1] = '\0'; + pRcv = pmsgbuf; + lenRcv = toffs + 1; + } + /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, &st, tt)); MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv); @@ -564,15 +727,27 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) parse++; lenMsg--; /* '>' */ - if((pLstn->flags & IGNDATE)) { - /* in this case, we still need to find out if we have a valid - * datestamp or not .. and advance the parse pointer accordingly. - */ - struct syslogTime dummy; - datetime.ParseTIMESTAMP3164(&dummy, &parse, &lenMsg); - } else { - if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) { - DBGPRINTF("we have a problem, invalid timestamp in msg!\n"); + if(ts == NULL) { + if((pLstn->flags & IGNDATE)) { + /* in this case, we still need to find out if we have a valid + * datestamp or not .. and advance the parse pointer accordingly. + */ + datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg); + } else { + if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) { + DBGPRINTF("we have a problem, invalid timestamp in msg!\n"); + } + } + } else { /* if we pulled the time from the system, we need to update the message text */ + uchar *tmpParse = parse; /* just to check correctness of TS */ + if(datetime.ParseTIMESTAMP3164(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK) { + /* We modify the message only if it contained a valid timestamp, + * otherwise we do not touch it at all. */ + datetime.formatTimestamp3164(&st, (char*)parse, 0); + parse[15] = ' '; /* re-write \0 from fromatTimestamp3164 by SP */ + /* update "counters" to reflect processed timestamp */ + parse += 16; + lenMsg -= 16; } } @@ -624,6 +799,7 @@ static rsRetVal readSocket(lstn_t *pLstn) struct cmsghdr *cm; # endif struct ucred *cred; + struct timeval *ts; uchar bufRcv[4096+1]; char aux[128]; uchar *pRcv = NULL; /* receive buffer */ @@ -662,21 +838,28 @@ static rsRetVal readSocket(lstn_t *pLstn) dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd); if(iRcvd > 0) { cred = NULL; -# if HAVE_SCM_CREDENTIALS - if(pLstn->bUseCreds) { - dbgprintf("XXX: pre CM loop, length of control message %d\n", (int) msgh.msg_controllen); - for (cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) { - dbgprintf("XXX: in CM loop, %d, %d\n", cm->cmsg_level, cm->cmsg_type); - if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) { + ts = NULL; + if(pLstn->bUseCreds || pLstn->bUseSysTimeStamp) { + for(cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) { +# if HAVE_SCM_CREDENTIALS + if( pLstn->bUseCreds + && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) { cred = (struct ucred*) CMSG_DATA(cm); - dbgprintf("XXX: got credentials pid %d\n", (int) cred->pid); break; } +# endif /* HAVE_SCM_CREDENTIALS */ +# if HAVE_SO_TIMESTAMP + if( pLstn->bUseSysTimeStamp + && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) { + ts = (struct timeval *)CMSG_DATA(cm); + dbgprintf("XXX: got timestamp %ld.%ld\n", + (long) ts->tv_sec, (long) ts->tv_usec); + break; + } +# endif /* HAVE_SO_TIMESTAMP */ } - dbgprintf("XXX: post CM loop\n"); } -# endif /* HAVE_SCM_CREDENTIALS */ - CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred)); + CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred, ts)); } else if(iRcvd < 0 && errno != EINTR) { char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -786,8 +969,10 @@ CODESTARTwillRun listeners[0].ratelimitInterval = ratelimitIntervalSysSock; listeners[0].ratelimitBurst = ratelimitBurstSysSock; listeners[0].ratelimitSev = ratelimitSeveritySysSock; - listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0; + listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock || bAnnotateSysSock) ? 1 : 0; listeners[0].bWritePid = bWritePidSysSock; + listeners[0].bAnnotate = bAnnotateSysSock; + listeners[0].bUseSysTimeStamp = bUseSysTimeStampSysSock; sd_fds = sd_listen_fds(0); if (sd_fds < 0) { @@ -830,7 +1015,6 @@ CODESTARTafterRun /* Clean-up files. */ for(i = startIndexUxLocalSockets; i < nfd; i++) if (listeners[i].sockName && listeners[i].fd != -1) { - /* If systemd passed us a socket it is systemd's job to clean it up. * Do not unlink it -- we will get same socket (node) from systemd * e.g. on restart again. @@ -900,6 +1084,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a bUseFlowCtl = 0; bWritePid = 0; bWritePidSysSock = 0; + bUseSysTimeStamp = 1; + bUseSysTimeStampSysSock = 1; bCreatePath = DFLT_bCreatePath; ratelimitInterval = DFLT_ratelimitInterval; ratelimitIntervalSysSock = DFLT_ratelimitInterval; @@ -934,7 +1120,9 @@ CODEmodInit_QueryRegCFSLineHdlr listeners[0].fd = -1; listeners[0].bParseHost = 0; listeners[0].bUseCreds = 0; + listeners[0].bAnnotate = 0; listeners[0].bCreatePath = 0; + listeners[0].bUseSysTimeStamp = 1; /* initialize socket names */ for(i = 1 ; i < MAXFUNIX ; ++i) { @@ -962,10 +1150,14 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &pLogHostName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary, NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketannotate", 0, eCmdHdlrBinary, + NULL, &bAnnotate, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary, NULL, &bCreatePath, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary, NULL, &bWritePid, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusesystimestamp", 0, eCmdHdlrBinary, + NULL, &bUseSysTimeStamp, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord, addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt, @@ -986,6 +1178,10 @@ CODEmodInit_QueryRegCFSLineHdlr setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary, setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusesystimestamp", 0, eCmdHdlrBinary, + NULL, &bUseSysTimeStampSysSock, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketannotate", 0, eCmdHdlrBinary, + NULL, &bAnnotateSysSock, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary, NULL, &bWritePidSysSock, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt, diff --git a/plugins/mmsnmptrapd/mmsnmptrapd.c b/plugins/mmsnmptrapd/mmsnmptrapd.c index 767829d6..b78046ee 100644 --- a/plugins/mmsnmptrapd/mmsnmptrapd.c +++ b/plugins/mmsnmptrapd/mmsnmptrapd.c @@ -418,7 +418,7 @@ CODEmodInit_QueryRegCFSLineHdlr cs.pszTagName = NULL; cs.pszSeverityMapping = NULL; - CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrInt, + CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrGetWord, NULL, &cs.pszTagName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdseveritymapping", 0, eCmdHdlrGetWord, NULL, &cs.pszSeverityMapping, STD_LOADABLE_MODULE_ID)); diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am new file mode 100644 index 00000000..a574c72f --- /dev/null +++ b/plugins/omelasticsearch/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omelasticsearch.la + +omelasticsearch_la_SOURCES = omelasticsearch.c +omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +omelasticsearch_la_LDFLAGS = -module -avoid-version +omelasticsearch_la_LIBADD = $(CURL_LIBS) + +EXTRA_DIST = diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c new file mode 100644 index 00000000..3bec1838 --- /dev/null +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -0,0 +1,270 @@ +/* omelasticsearch.c + * This is the http://www.elasticsearch.org/ output module. + * + * NOTE: read comments in module-template.h for more specifics! + * + * Copyright 2011 Nathan Scott. + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <curl/curl.h> +#include <curl/types.h> +#include <curl/easy.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <time.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "statsobj.h" +#include "cfsysline.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP + +/* internal structures */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) + +statsobj_t *indexStats; +STATSCOUNTER_DEF(indexConFail, mutIndexConFail) +STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) +STATSCOUNTER_DEF(indexFailed, mutIndexFailed) +STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) + +/* REST API for elasticsearch hits this URL: + * http://<hostName>:<restPort>/<searchIndex>/<searchType> + */ +typedef struct curl_slist HEADER; +typedef struct _instanceData { + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ +} instanceData; + +/* config variables */ +static int restPort = 9200; +static char *hostName = "localhost"; +static char *searchIndex = "system"; +static char *searchType = "events"; + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +BEGINfreeInstance +CODESTARTfreeInstance + if (pData->postHeader) { + curl_slist_free_all(pData->postHeader); + pData->postHeader = NULL; + } + if (pData->curlHandle) { + curl_easy_cleanup(pData->curlHandle); + pData->curlHandle = NULL; + } +ENDfreeInstance + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + +rsRetVal +curlPost(instanceData *instance, uchar *message) +{ + CURLcode code; + CURL *curl = instance->curlHandle; + int length = strlen((char *)message); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + code = curl_easy_perform(curl); + switch (code) { + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_COULDNT_CONNECT: + case CURLE_WRITE_ERROR: + STATSCOUNTER_INC(indexConFail, mutIndexConFail); + return RS_RET_SUSPENDED; + default: + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); + return RS_RET_OK; + } +} + +BEGINdoAction +CODESTARTdoAction + CHKiRet(curlPost(pData, ppString[0])); +finalize_it: +ENDdoAction + +/* elasticsearch POST result string ... useful for debugging */ +size_t +curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) +{ + unsigned int i; + char *p = (char *)ptr; + char *jsonData = (char *)userdata; + static char ok[] = "{\"ok\":true,"; + + ASSERT(size == 1); + + if (size == 1 && + nmemb > sizeof(ok)-1 && + strncmp(p, ok, sizeof(ok)-1) == 0) { + STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); + } else { + STATSCOUNTER_INC(indexFailed, mutIndexFailed); + if (Debug) { + DBGPRINTF("omelasticsearch request: %s\n", jsonData); + DBGPRINTF("omelasticsearch result: "); + for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); + DBGPRINTF("\n"); + } + } + return size * nmemb; +} + +static rsRetVal +curlSetup(instanceData *instance) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + HEADER *header; + CURL *handle; + + handle = curl_easy_init(); + if (handle == NULL) { + return RS_RET_OBJ_CREATION_FAILED; + } + + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + hostName, restPort, searchIndex, searchType); + header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_URL, restURL); + curl_easy_setopt(handle, CURLOPT_POST, 1); + + instance->curlHandle = handle; + instance->postHeader = header; + + DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + return RS_RET_OK; +} + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + + /* check if a non-standard template is to be applied */ + if(*(p-1) == ';') + --p; + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + + /* all good, we can now initialise our private data */ + CHKiRet(curlSetup(pData)); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINmodExit +CODESTARTmodExit + curl_global_cleanup(); + statsobj.Destruct(&indexStats); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + DEFiRet; + restPort = 9200; + hostName = "localhost"; + searchIndex = "system"; + searchType = "events"; + RETiRet; +} + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); + ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&indexStats)); + CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", + ctrType_IntCtr, &indexConFail)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", + ctrType_IntCtr, &indexSubmit)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", + ctrType_IntCtr, &indexFailed)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", + ctrType_IntCtr, &indexSuccess)); + CHKiRet(statsobj.ConstructFinalize(indexStats)); +ENDmodInit + +/* vi:set ai: + */ diff --git a/plugins/omhdfs/javaenv.sh b/plugins/omhdfs/javaenv.sh new file mode 100644 index 00000000..d07a8473 --- /dev/null +++ b/plugins/omhdfs/javaenv.sh @@ -0,0 +1,14 @@ +# This is a sample file for environment settings on Fedora 13 +# that made me compile & run omhdfs. I really *hate* the way +# java uses environment variables... Hopefully this file will +# help building and testing omhdfs in the future (there is also +# some more information in the rsyslog wiki). +# rgerhards, 2011-03-11 +# this now works, but don't ask my why ;) +#export JAVA_HOME=/usr/java/jdk1.6.0_21/bin/java +export PATH=/usr/java/jdk1.6.0_21/bin:$PATH +export JAVA_INCLUDES="-I/usr/java/jdk1.6.0_21/include -I/usr/java/jdk1.6.0_21/include/linux" +export JAVA_LIBS="-L/usr/java/jdk1.6.0_21/jre/lib/i386 -ljava -ljvm -lverify" +export HADOOP_HOME=/usr/lib/hadoop +export CLASSPATH=/usr/lib/jvm/java-6-sun/lib:/usr/lib/hadoop/lib:/usr/lib/hadoop/hadoop-ant-0.20.2+320.jar:/usr/lib/hadoop/hadoop-core-0.20.2+320.jar:/usr/lib/hadoop/hadoop-examples-0.20.2+320.jar:/usr/lib/hadoop/hadoop-test-0.20.2+320.jar:/usr/lib/hadoop/hadoop-tools-0.20.2+320.jar/usr/lib/hadoop/lib/commons-cli-1.2.jar:/usr/lib/hadoop/lib/commons-codec-1.3.jar:/usr/lib/hadoop/lib/commons-el-1.0.jar:/usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar:/usr/lib/hadoop/lib/commons-logging-1.0.4.jar:/usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar:/usr/lib/hadoop/lib/commons-net-1.4.1.jar:/usr/lib/hadoop/lib/core-3.1.1.jar:/usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar:/usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar:/usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar:/usr/lib/hadoop/lib/hsqldb.jar:/usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar:/usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar:/usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar:/usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar:/usr/lib/hadoop/lib/jets3t-0.6.1.jar:/usr/lib/hadoop/lib/jetty-6.1.14.jar:/usr/lib/hadoop/lib/jetty-util-6.1.14.jar:/usr/lib/hadoop/lib/junit-4.5.jar:/usr/lib/hadoop/lib/kfs-0.2.2.jar:/usr/lib/hadoop/lib/libfb303.jar:/usr/lib/hadoop/lib/libthrift.jar:/usr/lib/hadoop/lib/log4j-1.2.15.jar:/usr/lib/hadoop/lib/mockito-all-1.8.2.jar:/usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar:/usr/lib/hadoop/lib/oro-2.0.8.jar:/usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar:/usr/lib/hadoop/lib/slf4j-api-1.4.3.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar:/usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf +###export CLASSPATH="/usr/lib/hadoop/hadoop-0.20.2+320-ant.jar: /usr/lib/hadoop/hadoop-0.20.2+320-core.jar: /usr/lib/hadoop/hadoop-0.20.2+320-examples.jar: /usr/lib/hadoop/hadoop-0.20.2+320-test.jar: /usr/lib/hadoop/hadoop-0.20.2+320-tools.jar: /usr/lib/hadoop/hadoop-ant-0.20.2+320.jar: /usr/lib/hadoop/hadoop-core-0.20.2+320.jar: /usr/lib/hadoop/hadoop-examples-0.20.2+320.jar: /usr/lib/hadoop/hadoop-test-0.20.2+320.jar: /usr/lib/hadoop/hadoop-tools-0.20.2+320.jar:/usr/lib/hadoop/lib: /usr/lib/hadoop/lib/commons-cli-1.2.jar: /usr/lib/hadoop/lib/commons-codec-1.3.jar: /usr/lib/hadoop/lib/commons-el-1.0.jar: /usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar: /usr/lib/hadoop/lib/commons-logging-1.0.4.jar: /usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar: /usr/lib/hadoop/lib/commons-net-1.4.1.jar: /usr/lib/hadoop/lib/core-3.1.1.jar: /usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar: /usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar: /usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar: /usr/lib/hadoop/lib/hsqldb.jar: /usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar: /usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar: /usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar: /usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar: /usr/lib/hadoop/lib/jets3t-0.6.1.jar: /usr/lib/hadoop/lib/jetty-6.1.14.jar: /usr/lib/hadoop/lib/jetty-util-6.1.14.jar: /usr/lib/hadoop/lib/junit-4.5.jar: /usr/lib/hadoop/lib/kfs-0.2.2.jar: /usr/lib/hadoop/lib/libfb303.jar: /usr/lib/hadoop/lib/libthrift.jar: /usr/lib/hadoop/lib/log4j-1.2.15.jar: /usr/lib/hadoop/lib/mockito-all-1.8.2.jar: /usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar: /usr/lib/hadoop/lib/oro-2.0.8.jar: /usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar: /usr/lib/hadoop/lib/slf4j-api-1.4.3.jar: /usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar: /usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf:/usr/lib/hadoop/lib" diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index 8b72747f..76128a4e 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -80,6 +80,8 @@ typedef struct { typedef struct _instanceData { file_t *pFile; + uchar ioBuf[64*1024]; + unsigned offsBuf; } instanceData; /* forward definitions (down here, need data types) */ @@ -260,7 +262,8 @@ fileOpen(file_t *pFile) if(errno == ENOENT) { DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n", pFile->name); - pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); + pFile->fh = hdfsOpenFile(pFile->fs, + (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); } } if(pFile->fh == NULL) { @@ -275,12 +278,15 @@ finalize_it: } +/* Note: lenWrite is reset to zero on successful write! */ static inline rsRetVal -fileWrite(file_t *pFile, uchar *buf) +fileWrite(file_t *pFile, uchar *buf, size_t *lenWrite) { - size_t lenWrite; DEFiRet; + if(*lenWrite == 0) + FINALIZE; + if(pFile->nUsers > 1) d_pthread_mutex_lock(&pFile->mut); @@ -294,18 +300,18 @@ fileWrite(file_t *pFile, uchar *buf) } } - lenWrite = strlen((char*) buf); - tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite); - if((unsigned) num_written_bytes != lenWrite) { - errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, " - "written %lu\n", pFile->name, (unsigned long) lenWrite, +dbgprintf("XXXXX: omhdfs writing %u bytes\n", *lenWrite); + tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, *lenWrite); + if((unsigned) num_written_bytes != *lenWrite) { + errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, + "omhdfs: failed to write %s, expected %lu bytes, " + "written %lu\n", pFile->name, (unsigned long) *lenWrite, (unsigned long) num_written_bytes); ABORT_FINALIZE(RS_RET_SUSPENDED); } + *lenWrite = 0; finalize_it: - if(pFile->nUsers > 1) - d_pthread_mutex_unlock(&pFile->mut); RETiRet; } @@ -333,6 +339,40 @@ finalize_it: /* ---END FILE OBJECT---------------------------------------------------- */ +/* This adds data to the output buffer and performs an actual write + * if the new data does not fit into the buffer. Note that we never write + * partial data records. Other actions may write into the same file, and if + * we would write partial records, data could become severely mixed up. + * Note that we must check of some new data arrived is large than our + * buffer. In that case, the new data will written with its own + * write operation. + */ +static inline rsRetVal +addData(instanceData *pData, uchar *buf) +{ + unsigned len; + DEFiRet; + + len = strlen((char*)buf); + if(pData->offsBuf + len < sizeof(pData->ioBuf)) { + /* new data fits into remaining buffer */ + memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len); + pData->offsBuf += len; + } else { +dbgprintf("XXXXX: not enough room, need to flush\n"); + CHKiRet(fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf)); + if(len >= sizeof(pData->ioBuf)) { + CHKiRet(fileWrite(pData->pFile, buf, &len)); + } else { + memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len); + pData->offsBuf += len; + } + } + + iRet = RS_RET_DEFER_COMMIT; +finalize_it: + RETiRet; +} BEGINcreateInstance CODESTARTcreateInstance @@ -358,13 +398,31 @@ CODESTARTtryResume } ENDtryResume + +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omhdfs: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction CODESTARTdoAction - DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name); - iRet = fileWrite(pData->pFile, ppString[0]); + DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name); + iRet = addData(pData, ppString[0]); +dbgprintf("omhdfs: done doAction\n"); ENDdoAction +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("omhdfs: endTransaction\n"); + if(pData->offsBuf != 0) { + DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n"); + iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf); + } +ENDendTransaction + + BEGINparseSelectorAct file_t *pFile; int r; @@ -409,6 +467,7 @@ CODESTARTparseSelectorAct } fileObjAddUser(pFile); pData->pFile = pFile; + pData->offsBuf = 0; CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -455,6 +514,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ CODEqueryEtryPt_doHUP ENDqueryEtryPt @@ -472,5 +532,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + DBGPRINTF("omhdfs: module compiled with rsyslog version %s.\n", VERSION); CODEmodInit_QueryRegCFSLineHdlr ENDmodInit |