summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dirty.h13
-rw-r--r--plugins/immark/immark.c1
-rw-r--r--plugins/imrelp/imrelp.c5
-rw-r--r--plugins/imudp/imudp.c4
-rw-r--r--plugins/imuxsock/imuxsock.c4
-rw-r--r--runtime/atomic.h2
-rw-r--r--runtime/debug.h1
-rw-r--r--runtime/msg.c1
-rw-r--r--runtime/msg.h13
-rw-r--r--runtime/parser.c3
-rw-r--r--runtime/queue.c5
-rw-r--r--tcps_sess.c11
-rw-r--r--tools/syslogd.c21
13 files changed, 46 insertions, 38 deletions
diff --git a/dirty.h b/dirty.h
index f5d0415e..4b13adcd 100644
--- a/dirty.h
+++ b/dirty.h
@@ -27,20 +27,9 @@
#ifndef DIRTY_H_INCLUDED
#define DIRTY_H_INCLUDED 1
-/* Flags to logmsg().
- */
-#define NOFLAG 0x000 /* no flag is set (to be used when a flag must be specified and none is required) */
-#define INTERNAL_MSG 0x001 /* msg generated by logmsgInternal() --> special handling */
-/* NO LONGER USED: #define SYNC_FILE 0x002 / * do fsync on file after printing */
-#define IGNDATE 0x004 /* ignore, if given, date in message and use date of reception as msg date */
-#define MARK 0x008 /* this message is a mark */
-
-#define MSG_PARSE_HOSTNAME 1
-#define MSG_DONT_PARSE_HOSTNAME 0
-
rsRetVal submitMsg(msg_t *pMsg);
rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags);
-rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bParseHost, int flags, flowControl_t flowCtlTypeu, uchar *pszInputName, struct syslogTime *stTime, time_t ttGenTime);
+rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, uchar *pszInputName, struct syslogTime *stTime, time_t ttGenTime);
/* TODO: the following 2 need to go in conf obj interface... */
rsRetVal cflineParseTemplateName(uchar** pp, omodStringRequest_t *pOMSR, int iEntry, int iTplOpts, uchar *dfltTplName);
diff --git a/plugins/immark/immark.c b/plugins/immark/immark.c
index 323da3fe..8504f872 100644
--- a/plugins/immark/immark.c
+++ b/plugins/immark/immark.c
@@ -41,6 +41,7 @@
#include "cfsysline.h"
#include "module-template.h"
#include "errmsg.h"
+#include "msg.h"
MODULE_TYPE_INPUT
diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c
index 4515acd7..2255e643 100644
--- a/plugins/imrelp/imrelp.c
+++ b/plugins/imrelp/imrelp.c
@@ -42,6 +42,7 @@
#include "cfsysline.h"
#include "module-template.h"
#include "net.h"
+#include "msg.h"
MODULE_TYPE_INPUT
@@ -83,8 +84,8 @@ static relpRetVal
onSyslogRcv(uchar *pHostname, uchar __attribute__((unused)) *pIP, uchar *pMsg, size_t lenMsg)
{
DEFiRet;
- parseAndSubmitMessage(pHostname, (uchar*) "[unset]", pMsg, lenMsg, MSG_PARSE_HOSTNAME,
- NOFLAG, eFLOWCTL_LIGHT_DELAY, (uchar*)"imrelp", NULL, 0);
+ parseAndSubmitMessage(pHostname, (uchar*) "[unset]", pMsg, lenMsg, PARSE_HOSTNAME,
+ eFLOWCTL_LIGHT_DELAY, (uchar*)"imrelp", NULL, 0);
RETiRet;
}
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 114b433c..a49378cf 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -207,12 +207,10 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
/* first trim the buffer to what we have actually received */
CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar)* lenRcvBuf));
memcpy(pMsg->pszRawMsg, pRcvBuf, lenRcvBuf);
- pMsg->bIsParsed = 0; /* indicate message needs to be parsed */
pMsg->iLenRawMsg = lenRcvBuf;
MsgSetInputName(pMsg, "imudp");
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
- pMsg->bParseHOSTNAME = MSG_PARSE_HOSTNAME;
- pMsg->msgFlags = NOFLAG;
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
MsgSetRcvFrom(pMsg, (char*)fromHost);
CHKiRet(MsgSetRcvFromIP(pMsg, fromHostIP));
CHKiRet(submitMsg(pMsg));
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index efa0365d..1d88a2b5 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -42,6 +42,7 @@
#include "errmsg.h"
#include "net.h"
#include "glbl.h"
+#include "msg.h"
MODULE_TYPE_INPUT
@@ -221,7 +222,8 @@ static rsRetVal readSocket(int fd, int iSock)
if (iRcvd > 0) {
parseAndSubmitMessage(funixHName[iSock] == NULL ? glbl.GetLocalHostName() : funixHName[iSock],
(uchar*)"127.0.0.1", pRcv,
- iRcvd, funixParseHost[iSock], funixFlags[iSock], funixFlowCtl[iSock], (uchar*)"imuxsock", NULL, 0);
+ iRcvd, funixParseHost[iSock] ? (funixFlags[iSock] | PARSE_HOSTNAME) : funixFlags[iSock],
+ funixFlowCtl[iSock], (uchar*)"imuxsock", NULL, 0);
} else if (iRcvd < 0 && errno != EINTR) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));
diff --git a/runtime/atomic.h b/runtime/atomic.h
index 2dbe7f52..7ad8e2e4 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -42,12 +42,14 @@
*/
#ifdef HAVE_ATOMIC_BUILTINS
# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1))
# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
#else
# warning "atomic builtins not available, using nul operations"
# define ATOMIC_INC(data) (++(data))
+# define ATOMIC_DEC(data) (--(data))
# define ATOMIC_DEC_AND_FETCH(data) (--(data))
# define ATOMIC_FETCH_32BIT(data) (data)
# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1
diff --git a/runtime/debug.h b/runtime/debug.h
index 7ac29765..59b3e776 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -101,6 +101,7 @@ void dbgPrintAllDebugInfo(void);
/* macros */
#define DBGPRINTF(...) if(Debug) { dbgprintf(__VA_ARGS__); }
+#define DBGOPRINT(...) if(Debug) { dbgoprint(__VA_ARGS__); }
#ifdef RTINST
# define BEGINfunc static dbgFuncDB_t *pdbgFuncDB; int dbgCALLStaCK_POP_POINT = dbgEntrFunc(&pdbgFuncDB, __FILE__, __func__, __LINE__);
# define ENDfunc dbgExitFunc(pdbgFuncDB, dbgCALLStaCK_POP_POINT, RS_RET_NO_IRET);
diff --git a/runtime/msg.c b/runtime/msg.c
index e52c9e3f..c030fa45 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -264,7 +264,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
/* initialize members that are non-zero */
pM->iRefCount = 1;
- pM->bIsParsed = 1; /* first we assume this is parsed. If not, input must re-set to 0 */
pM->iSeverity = -1;
pM->iFacility = -1;
objConstructSetObjInfo(pM);
diff --git a/runtime/msg.h b/runtime/msg.h
index 98635f85..d98111a8 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -55,7 +55,6 @@ struct msg {
flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because
once data has entered the queue, this property is no longer needed. */
short iRefCount; /* reference counter (0 = unused) */
- short bIsParsed; /* is message parsed? (0=no, 1=yes), 0 means parser needs to be called */
short bParseHOSTNAME; /* should the hostname be parsed from the message? */
/* background: the hostname is not present on "regular" messages
* received via UNIX domain sockets from the same machine. However,
@@ -122,6 +121,18 @@ struct msg {
int msgFlags; /* flags associated with this message */
};
+
+/* message flags (msgFlags), not an enum for historical reasons
+ */
+#define NOFLAG 0x000 /* no flag is set (to be used when a flag must be specified and none is required) */
+#define INTERNAL_MSG 0x001 /* msg generated by logmsgInternal() --> special handling */
+/* 0x002 not used because it was previously a known value - rgerhards, 2008-10-09 */
+#define IGNDATE 0x004 /* ignore, if given, date in message and use date of reception as msg date */
+#define MARK 0x008 /* this message is a mark */
+#define NEEDS_PARSING 0x010 /* raw message, must be parsed before processing can be done */
+#define PARSE_HOSTNAME 0x020 /* parse the hostname during message parsing */
+
+
/* function prototypes
*/
PROTOTYPEObjClassInit(msg);
diff --git a/runtime/parser.c b/runtime/parser.c
index 8c4272a0..fbdeebeb 100644
--- a/runtime/parser.c
+++ b/runtime/parser.c
@@ -41,7 +41,6 @@
/* some defines */
#define DEFUPRI (LOG_USER|LOG_NOTICE)
-#warning "msg object must be updated with new property for persisting the queue!"
/* definitions for objects we access */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
@@ -306,7 +305,7 @@ rsRetVal parseMsg(msg_t *pMsg)
}
/* finalize message object */
- pMsg->bIsParsed = 1; /* this message is now parsed */
+ pMsg->msgFlags &= ~NEEDS_PARSING; /* this message is now parsed */
MsgPrepareEnqueue(pMsg); /* "historical" name - preparese for multi-threading */
finalize_it:
diff --git a/runtime/queue.c b/runtime/queue.c
index b0043ef5..42b8137d 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -49,6 +49,7 @@
#include "obj.h"
#include "wtp.h"
#include "wti.h"
+#include "atomic.h"
/* static data */
DEFobjStaticHelpers
@@ -996,7 +997,7 @@ queueAdd(queue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ++pThis->iQueueSize;
+ ATOMIC_INC(pThis->iQueueSize);
dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
}
@@ -1025,7 +1026,7 @@ queueDel(queue_t *pThis, void *pUsr)
iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
- --pThis->iQueueSize;
+ ATOMIC_DEC(pThis->iQueueSize);
}
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
diff --git a/tcps_sess.c b/tcps_sess.c
index 13644f45..e8bef5b1 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -42,6 +42,7 @@
#include "obj.h"
#include "errmsg.h"
#include "netstrm.h"
+#include "msg.h"
/* static data */
@@ -229,8 +230,8 @@ PrepareClose(tcps_sess_t *pThis)
* this case.
*/
dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n");
- parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg, MSG_PARSE_HOSTNAME,
- NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL, NULL, 0); /* TODO: add real InputName */
+ parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg,
+ PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, NULL, NULL, 0); /* TODO: add real InputName */
pThis->bAtStrtOfFram = 1;
}
@@ -313,7 +314,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
/* emergency, we now need to flush, no matter if we are at end of message or not... */
dbgprintf("error: message received is larger than max msg size, we split it\n");
parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg,
- MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL, NULL, 0); /* TODO: add real InputName */
+ PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, NULL, NULL, 0); /* TODO: add real InputName */
pThis->iMsg = 0;
/* we might think if it is better to ignore the rest of the
* message than to treat it as a new one. Maybe this is a good
@@ -324,7 +325,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
if(c == '\n' && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delemiter? */
parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg,
- MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL, NULL, 0); /* TODO: add real InputName */
+ PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, NULL, NULL, 0); /* TODO: add real InputName */
pThis->iMsg = 0;
pThis->inputState = eAtStrtFram;
} else {
@@ -343,7 +344,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
if(pThis->iOctetsRemain < 1) {
/* we have end of frame! */
parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg,
- MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY, NULL, NULL, 0); /* TODO: add real InputName */
+ PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, NULL, NULL, 0); /* TODO: add real InputName */
pThis->iMsg = 0;
pThis->inputState = eAtStrtFram;
}
diff --git a/tools/syslogd.c b/tools/syslogd.c
index a45942fa..13696955 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -588,8 +588,11 @@ void untty(void)
* a timestamp that is to be used as timegenerated instead of the current system time.
* This is meant to facilitate performance optimization. Some inputs support such modes.
* If stTime is NULL, the current system time is used.
+ *
+ * rgerhards, 2008-10-09:
+ * interface change: bParseHostname removed, now in flags
*/
-static inline rsRetVal printline(uchar *hname, uchar *hnameIP, uchar *msg, int bParseHost, int flags, flowControl_t flowCtlType,
+static inline rsRetVal printline(uchar *hname, uchar *hnameIP, uchar *msg, int flags, flowControl_t flowCtlType,
uchar *pszInputName, struct syslogTime *stTime, time_t ttGenTime)
{
DEFiRet;
@@ -603,13 +606,11 @@ static inline rsRetVal printline(uchar *hname, uchar *hnameIP, uchar *msg, int b
} else {
CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime));
}
- pMsg->bIsParsed = 1; /* this is a hack until this function can be removed TODO: do it soon (rgerhards, 2008-10-09)! */
if(pszInputName != NULL)
MsgSetInputName(pMsg, (char*) pszInputName);
MsgSetFlowControlType(pMsg, flowCtlType);
MsgSetRawMsg(pMsg, (char*)msg);
- pMsg->bParseHOSTNAME = bParseHost;
/* test for special codes */
pri = DEFUPRI;
p = msg;
@@ -634,7 +635,7 @@ static inline rsRetVal printline(uchar *hname, uchar *hnameIP, uchar *msg, int b
* the message was received from (that, for obvious reasons,
* being the local host). rgerhards 2004-11-16
*/
- if(bParseHost == 0)
+ if((pMsg->msgFlags & PARSE_HOSTNAME) == 0)
MsgSetHOSTNAME(pMsg, (char*)hname);
MsgSetRcvFrom(pMsg, (char*)hname);
CHKiRet(MsgSetRcvFromIP(pMsg, hnameIP));
@@ -702,9 +703,12 @@ finalize_it:
* a timestamp that is to be used as timegenerated instead of the current system time.
* This is meant to facilitate performance optimization. Some inputs support such modes.
* If stTime is NULL, the current system time is used.
+ *
+ * rgerhards, 2008-10-09:
+ * interface change: bParseHostname removed, now in flags
*/
rsRetVal
-parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bParseHost, int flags, flowControl_t flowCtlType,
+parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlType,
uchar *pszInputName, struct syslogTime *stTime, time_t ttGenTime)
{
DEFiRet;
@@ -816,7 +820,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bPa
*/
if(iMsg == iMaxLine) {
*(pMsg + iMsg) = '\0'; /* space *is* reserved for this! */
- printline(hname, hnameIP, tmpline, bParseHost, flags, flowCtlType, pszInputName, stTime, ttGenTime);
+ printline(hname, hnameIP, tmpline, flags, flowCtlType, pszInputName, stTime, ttGenTime);
} else {
/* This case in theory never can happen. If it happens, we have
* a logic error. I am checking for it, because if I would not,
@@ -868,7 +872,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int bPa
*(pMsg + iMsg) = '\0'; /* space *is* reserved for this! */
/* typically, we should end up here! */
- printline(hname, hnameIP, tmpline, bParseHost, flags, flowCtlType, pszInputName, stTime, ttGenTime);
+ printline(hname, hnameIP, tmpline, flags, flowCtlType, pszInputName, stTime, ttGenTime);
finalize_it:
if(tmpline != NULL)
@@ -1192,8 +1196,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, void *pUsr)
assert(pMsg != NULL);
-RUNLOG_VAR("%d", pMsg->bIsParsed);
- if(pMsg->bIsParsed == 0) {
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
parseMsg(pMsg);
}
processMsg(pMsg);