summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/Makefile.am2
-rw-r--r--runtime/atomic.h2
-rw-r--r--runtime/datetime.c31
-rw-r--r--runtime/datetime.h2
-rw-r--r--runtime/debug.c5
-rw-r--r--runtime/debug.h2
-rw-r--r--runtime/glbl.c10
-rw-r--r--runtime/glbl.h2
-rw-r--r--runtime/module-template.h31
-rw-r--r--runtime/modules.c5
-rw-r--r--runtime/modules.h6
-rw-r--r--runtime/msg.c91
-rw-r--r--runtime/msg.h29
-rw-r--r--runtime/parser.c314
-rw-r--r--runtime/parser.h30
-rw-r--r--runtime/queue.c29
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--runtime/srutils.c4
-rw-r--r--runtime/sysvar.c2
-rw-r--r--runtime/wti.c21
-rw-r--r--runtime/wti.h1
-rw-r--r--runtime/wtp.c18
-rw-r--r--runtime/wtp.h1
24 files changed, 593 insertions, 47 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index 8dd8ad12..5e5d08ea 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -16,6 +16,8 @@ librsyslog_la_SOURCES = \
glbl.c \
conf.c \
conf.h \
+ parser.h \
+ parser.c \
msg.c \
msg.h \
linkedlist.c \
diff --git a/runtime/atomic.h b/runtime/atomic.h
index 2dbe7f52..7ad8e2e4 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -42,12 +42,14 @@
*/
#ifdef HAVE_ATOMIC_BUILTINS
# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1))
# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
#else
# warning "atomic builtins not available, using nul operations"
# define ATOMIC_INC(data) (++(data))
+# define ATOMIC_DEC(data) (--(data))
# define ATOMIC_DEC_AND_FETCH(data) (--(data))
# define ATOMIC_FETCH_32BIT(data) (data)
# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1
diff --git a/runtime/datetime.c b/runtime/datetime.c
index 20ca6191..676f76d5 100644
--- a/runtime/datetime.c
+++ b/runtime/datetime.c
@@ -62,9 +62,14 @@ DEFobjCurrIf(errmsg)
* most portable and removes the need for additional structures
* (but I have to admit it is somewhat "bulky";)).
*
- * Obviously, all caller-provided pointers must not be NULL...
+ * Obviously, *t must not be NULL...
+ *
+ * rgerhards, 2008-10-07: added ttSeconds to provide a way to
+ * obtain the second-resolution UNIX timestamp. This is needed
+ * in some situations to minimize time() calls (namely when doing
+ * output processing). This can be left NULL if not needed.
*/
-static void getCurrTime(struct syslogTime *t)
+static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
{
struct timeval tp;
struct tm *tm;
@@ -83,6 +88,9 @@ static void getCurrTime(struct syslogTime *t)
# else
gettimeofday(&tp, NULL);
# endif
+ if(ttSeconds != NULL)
+ *ttSeconds = tp.tv_sec;
+
tm = localtime_r((time_t*) &(tp.tv_sec), &tmBuf);
t->year = tm->tm_year + 1900;
@@ -304,6 +312,7 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, char** ppszTS)
/* variables to temporarily hold time information while we parse */
int month;
int day;
+ int year = 0; /* 0 means no year provided */
int hour; /* 24 hour clock */
int minute;
int second;
@@ -464,7 +473,23 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, char** ppszTS)
if(*pszTS++ != ' ')
ABORT_FINALIZE(RS_RET_INVLD_TIME);
+
+ /* time part */
hour = srSLMGParseInt32(&pszTS);
+ if(hour > 1970 && hour < 2100) {
+ /* if so, we assume this actually is a year. This is a format found
+ * e.g. in Cisco devices.
+ * (if you read this 2100+ trying to fix a bug, congratulate myself
+ * to how long the code survived - me no longer ;)) -- rgerhards, 2008-11-18
+ */
+ year = hour;
+
+ /* re-query the hour, this time it must be valid */
+ if(*pszTS++ != ' ')
+ ABORT_FINALIZE(RS_RET_INVLD_TIME);
+ hour = srSLMGParseInt32(&pszTS);
+ }
+
if(hour < 0 || hour > 23)
ABORT_FINALIZE(RS_RET_INVLD_TIME);
@@ -494,6 +519,8 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, char** ppszTS)
*ppszTS = pszTS; /* provide updated parse position back to caller */
pTime->timeType = 1;
pTime->month = month;
+ if(year > 0)
+ pTime->year = year; /* persist year if detected */
pTime->day = day;
pTime->hour = hour;
pTime->minute = minute;
diff --git a/runtime/datetime.h b/runtime/datetime.h
index 2210af02..0739588d 100644
--- a/runtime/datetime.h
+++ b/runtime/datetime.h
@@ -35,7 +35,7 @@ typedef struct datetime_s {
/* interfaces */
BEGINinterface(datetime) /* name must also be changed in ENDinterface macro! */
- void (*getCurrTime)(struct syslogTime *t);
+ void (*getCurrTime)(struct syslogTime *t, time_t *ttSeconds);
rsRetVal (*ParseTIMESTAMP3339)(struct syslogTime *pTime, char** ppszTS);
rsRetVal (*ParseTIMESTAMP3164)(struct syslogTime *pTime, char** pszTS);
int (*formatTimestampToMySQL)(struct syslogTime *ts, char* pDst, size_t iLenDst);
diff --git a/runtime/debug.c b/runtime/debug.c
index b0bf76ea..102d5d0f 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -1,3 +1,4 @@
+#include <sys/syscall.h>
/* debug.c
*
* This file proides debug and run time error analysis support. Some of the
@@ -817,7 +818,9 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf);
}
- lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszThrdName);
+
+ // old, reenable TODO lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszThrdName);
+ lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "{%ld}%s: ", (long) syscall(SYS_gettid), pszThrdName);
if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf);
/* print object name header if we have an object */
diff --git a/runtime/debug.h b/runtime/debug.h
index d9d576b5..59b3e776 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -100,6 +100,8 @@ void dbgSetThrdName(uchar *pszName);
void dbgPrintAllDebugInfo(void);
/* macros */
+#define DBGPRINTF(...) if(Debug) { dbgprintf(__VA_ARGS__); }
+#define DBGOPRINT(...) if(Debug) { dbgoprint(__VA_ARGS__); }
#ifdef RTINST
# define BEGINfunc static dbgFuncDB_t *pdbgFuncDB; int dbgCALLStaCK_POP_POINT = dbgEntrFunc(&pdbgFuncDB, __FILE__, __func__, __LINE__);
# define ENDfunc dbgExitFunc(pdbgFuncDB, dbgCALLStaCK_POP_POINT, RS_RET_NO_IRET);
diff --git a/runtime/glbl.c b/runtime/glbl.c
index 1114fcd3..2a6bfb11 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -51,6 +51,8 @@ DEFobjStaticHelpers
* class...
*/
static uchar *pszWorkDir = NULL;
+static int bOptimizeUniProc = 1; /* enable uniprocessor optimizations */
+static int bHUPisRestart = 1; /* should SIGHUP cause a full system restart? */
static int iMaxLine = 2048; /* maximum length of a syslog message */
static int iDefPFFamily = PF_UNSPEC; /* protocol family (IPv4, IPv6 or both) */
static int bDropMalPTRMsgs = 0;/* Drop messages which have malicious PTR records during DNS lookup */
@@ -85,6 +87,8 @@ static dataType Get##nameFunc(void) \
return(nameVar); \
}
+SIMP_PROP(OptimizeUniProc, bOptimizeUniProc, int)
+SIMP_PROP(HUPisRestart, bHUPisRestart, int)
SIMP_PROP(MaxLine, iMaxLine, int)
SIMP_PROP(DefPFFamily, iDefPFFamily, int) /* note that in the future we may check the family argument */
SIMP_PROP(DropMalPTRMsgs, bDropMalPTRMsgs, int)
@@ -173,6 +177,8 @@ CODESTARTobjQueryInterface(glbl)
pIf->Get##name = Get##name; \
pIf->Set##name = Set##name;
SIMP_PROP(MaxLine);
+ SIMP_PROP(OptimizeUniProc);
+ SIMP_PROP(HUPisRestart);
SIMP_PROP(DefPFFamily);
SIMP_PROP(DropMalPTRMsgs);
SIMP_PROP(Option_DisallowWarning);
@@ -216,6 +222,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
pszWorkDir = NULL;
}
bDropMalPTRMsgs = 0;
+ bOptimizeUniProc = 1;
+ bHUPisRestart = 1;
return RS_RET_OK;
}
@@ -235,6 +243,8 @@ BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercafile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCAF, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdriverkeyfile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrKeyFile, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercertfile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCertFile, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"optimizeforuniprocessor", 0, eCmdHdlrBinary, NULL, &bOptimizeUniProc, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"hupisrestart", 0, eCmdHdlrBinary, NULL, &bHUPisRestart, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
ENDObjClassInit(glbl)
diff --git a/runtime/glbl.h b/runtime/glbl.h
index 0c83bdd5..44e41e3e 100644
--- a/runtime/glbl.h
+++ b/runtime/glbl.h
@@ -41,6 +41,8 @@ BEGINinterface(glbl) /* name must also be changed in ENDinterface macro! */
dataType (*Get##name)(void); \
rsRetVal (*Set##name)(dataType);
SIMP_PROP(MaxLine, int)
+ SIMP_PROP(OptimizeUniProc, int)
+ SIMP_PROP(HUPisRestart, int)
SIMP_PROP(DefPFFamily, int)
SIMP_PROP(DropMalPTRMsgs, int)
SIMP_PROP(Option_DisallowWarning, int)
diff --git a/runtime/module-template.h b/runtime/module-template.h
index eb39b587..6f7d877c 100644
--- a/runtime/module-template.h
+++ b/runtime/module-template.h
@@ -481,6 +481,33 @@ static rsRetVal afterRun(void)\
}
-/*
- * vi:set ai:
+/* doHUP()
+ * This function is optional. Currently, it is available to output plugins
+ * only, but may be made available to other types of plugins in the future.
+ * A plugin does not need to define this entry point. If if does, it gets
+ * called when a non-restart type of HUP is done. A plugin should register
+ * this function so that it can close files, connection or other ressources
+ * on HUP - if it can be assume the user wanted to do this as a part of HUP
+ * processing. Note that the name "HUP" has historical reasons, it stems back
+ * to the infamous SIGHUP which was sent to restart a syslogd. We still retain
+ * that legacy, but may move this to a different signal.
+ * rgerhards, 2008-10-22
+ */
+#define CODEqueryEtryPt_doHUP \
+ else if(!strcmp((char*) name, "doHUP")) {\
+ *pEtryPoint = doHUP;\
+ }
+#define BEGINdoHUP \
+static rsRetVal doHUP(instanceData __attribute__((unused)) *pData)\
+{\
+ DEFiRet;
+
+#define CODESTARTdoHUP
+
+#define ENDdoHUP \
+ RETiRet;\
+}
+
+
+/* vim:set ai:
*/
diff --git a/runtime/modules.c b/runtime/modules.c
index d5730ede..169d234b 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -347,6 +347,7 @@ static rsRetVal
doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_t*), uchar *name, void *pModHdlr)
{
DEFiRet;
+ rsRetVal localRet;
modInfo_t *pNew = NULL;
rsRetVal (*modGetType)(eModType_t *pType);
@@ -391,6 +392,10 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parseSelectorAct", &pNew->mod.om.parseSelectorAct));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"isCompatibleWithFeature", &pNew->isCompatibleWithFeature));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"tryResume", &pNew->tryResume));
+ /* try load optional interfaces */
+ localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP);
+ if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
+ ABORT_FINALIZE(localRet);
break;
case eMOD_LIB:
break;
diff --git a/runtime/modules.h b/runtime/modules.h
index 7d34bcf7..372529ee 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -44,8 +44,11 @@
* rgerhards, 2008-03-04
* version 3 adds modInfo_t ptr to call of modInit -- rgerhards, 2008-03-10
* version 4 removes needUDPSocket OM callback -- rgerhards, 2008-03-22
+ * version 5 changes the way parsing works for input modules. This is
+ * an important change, parseAndSubmitMessage() goes away. Other
+ * module types are not affected. -- rgerhards, 2008-10-09
*/
-#define CURR_MOD_IF_VERSION 4
+#define CURR_MOD_IF_VERSION 5
typedef enum eModType_ {
eMOD_IN, /* input module */
@@ -88,6 +91,7 @@ typedef struct modInfo_s {
rsRetVal (*tryResume)(void*);/* called to see if module actin can be resumed now */
rsRetVal (*modExit)(void); /* called before termination or module unload */
rsRetVal (*modGetID)(void **); /* get its unique ID from module */
+ rsRetVal (*doHUP)(void *); /* non-restart type HUP handler */
/* below: parse a configuration line - return if processed
* or not. If not, must be parsed to next module.
*/
diff --git a/runtime/msg.c b/runtime/msg.c
index 3073fc5f..2e2d41ad 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -140,8 +140,8 @@ void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
#define MsgLock(pMsg) funcLock(pMsg)
#define MsgUnlock(pMsg) funcUnlock(pMsg)
#else
-#define MsgLock(pMsg) {dbgprintf("line %d\n - ", __LINE__); funcLock(pMsg);; }
-#define MsgUnlock(pMsg) {dbgprintf("line %d - ", __LINE__); funcUnlock(pMsg); }
+#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; }
+#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); }
#endif
/* the next function is a dummy to be used by the looking functions
@@ -239,12 +239,21 @@ rsRetVal MsgEnableThreadSafety(void)
/* end locking functions */
-/* "Constructor" for a msg "object". Returns a pointer to
+/* This is common code for all Constructors. It is defined in an
+ * inline'able function so that we can save a function call in the
+ * actual constructors (otherwise, the msgConstruct would need
+ * to call msgConstructWithTime(), which would require a
+ * function call). Now, both can use this inline function. This
+ * enables us to be optimal, but still have the code just once.
* the new object or NULL if no such object could be allocated.
* An object constructed via this function should only be destroyed
- * via "msgDestruct()".
+ * via "msgDestruct()". This constructor does not query system time
+ * itself but rather uses a user-supplied value. This enables the caller
+ * to do some tricks to save processing time (done, for example, in the
+ * udp input).
+ * rgerhards, 2008-10-06
*/
-rsRetVal msgConstruct(msg_t **ppThis)
+static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
{
DEFiRet;
msg_t *pM;
@@ -257,21 +266,59 @@ rsRetVal msgConstruct(msg_t **ppThis)
pM->iRefCount = 1;
pM->iSeverity = -1;
pM->iFacility = -1;
+ objConstructSetObjInfo(pM);
+
+ /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/
+
+ *ppThis = pM;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* "Constructor" for a msg "object". Returns a pointer to
+ * the new object or NULL if no such object could be allocated.
+ * An object constructed via this function should only be destroyed
+ * via "msgDestruct()". This constructor does not query system time
+ * itself but rather uses a user-supplied value. This enables the caller
+ * to do some tricks to save processing time (done, for example, in the
+ * udp input).
+ * rgerhards, 2008-10-06
+ */
+rsRetVal msgConstructWithTime(msg_t **ppThis, struct syslogTime *stTime, time_t ttGenTime)
+{
+ DEFiRet;
+
+ CHKiRet(msgBaseConstruct(ppThis));
+ (*ppThis)->ttGenTime = ttGenTime;
+ memcpy(&(*ppThis)->tRcvdAt, stTime, sizeof(struct syslogTime));
+ memcpy(&(*ppThis)->tTIMESTAMP, stTime, sizeof(struct syslogTime));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* "Constructor" for a msg "object". Returns a pointer to
+ * the new object or NULL if no such object could be allocated.
+ * An object constructed via this function should only be destroyed
+ * via "msgDestruct()". This constructor, for historical reasons,
+ * also sets the two timestamps to the current time.
+ */
+rsRetVal msgConstruct(msg_t **ppThis)
+{
+ DEFiRet;
+ CHKiRet(msgBaseConstruct(ppThis));
/* we initialize both timestamps to contain the current time, so that they
* are consistent. Also, this saves us from doing any further time calls just
* to obtain a timestamp. The memcpy() should not really make a difference,
* especially as I think there is no codepath currently where it would not be
* required (after I have cleaned up the pathes ;)). -- rgerhards, 2008-10-02
*/
- datetime.getCurrTime(&(pM->tRcvdAt));
- memcpy(&pM->tTIMESTAMP, &pM->tRcvdAt, sizeof(struct syslogTime));
-
- objConstructSetObjInfo(pM);
-
- /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/
-
- *ppThis = pM;
+ datetime.getCurrTime(&((*ppThis)->tRcvdAt), &((*ppThis)->ttGenTime));
+ memcpy(&(*ppThis)->tTIMESTAMP, &(*ppThis)->tRcvdAt, sizeof(struct syslogTime));
finalize_it:
RETiRet;
@@ -396,7 +443,7 @@ msg_t* MsgDup(msg_t* pOld)
assert(pOld != NULL);
BEGINfunc
- if(msgConstruct(&pNew) != RS_RET_OK) {
+ if(msgConstructWithTime(&pNew, &pOld->tTIMESTAMP, pOld->ttGenTime) != RS_RET_OK) {
return NULL;
}
@@ -407,8 +454,7 @@ msg_t* MsgDup(msg_t* pOld)
pNew->bParseHOSTNAME = pOld->bParseHOSTNAME;
pNew->msgFlags = pOld->msgFlags;
pNew->iProtocolVersion = pOld->iProtocolVersion;
- memcpy(&pNew->tRcvdAt, &pOld->tRcvdAt, sizeof(struct syslogTime));
- memcpy(&pNew->tTIMESTAMP, &pOld->tTIMESTAMP, sizeof(struct syslogTime));
+ pNew->ttGenTime = pOld->ttGenTime;
tmpCOPYSZ(Severity);
tmpCOPYSZ(SeverityStr);
tmpCOPYSZ(Facility);
@@ -462,6 +508,7 @@ static rsRetVal MsgSerialize(msg_t *pThis, strm_t *pStrm)
objSerializeSCALAR(pStrm, iSeverity, SHORT);
objSerializeSCALAR(pStrm, iFacility, SHORT);
objSerializeSCALAR(pStrm, msgFlags, INT);
+ objSerializeSCALAR(pStrm, ttGenTime, INT);
objSerializeSCALAR(pStrm, tRcvdAt, SYSLOGTIME);
objSerializeSCALAR(pStrm, tTIMESTAMP, SYSLOGTIME);
@@ -731,6 +778,7 @@ int getPRIi(msg_t *pM)
char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt)
{
+ BEGINfunc
if(pM == NULL)
return "";
@@ -802,11 +850,13 @@ char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt)
MsgUnlock(pM);
return(pM->pszTIMESTAMP_SecFrac);
}
+ ENDfunc
return "INVALID eFmt OPTION!";
}
char *getTimeGenerated(msg_t *pM, enum tplFormatTypes eFmt)
{
+ BEGINfunc
if(pM == NULL)
return "";
@@ -878,6 +928,7 @@ char *getTimeGenerated(msg_t *pM, enum tplFormatTypes eFmt)
MsgUnlock(pM);
return(pM->pszRcvdAt_SecFrac);
}
+ ENDfunc
return "INVALID eFmt OPTION!";
}
@@ -1623,7 +1674,7 @@ static uchar *getNOW(eNOWType eNow)
return NULL;
}
- datetime.getCurrTime(&t);
+ datetime.getCurrTime(&t, NULL);
switch(eNow) {
case NOW_NOW:
snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d-%2.2d-%2.2d", t.year, t.month, t.day);
@@ -2439,6 +2490,8 @@ rsRetVal MsgSetProperty(msg_t *pThis, var_t *pProp)
MsgSetPROCID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr));
} else if(isProp("pCSMSGID")) {
MsgSetMSGID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr));
+ } else if(isProp("ttGenTime")) {
+ pThis->ttGenTime = pProp->val.num;
} else if(isProp("tRcvdAt")) {
memcpy(&pThis->tRcvdAt, &pProp->val.vSyslogTime, sizeof(struct syslogTime));
} else if(isProp("tTIMESTAMP")) {
@@ -2500,7 +2553,5 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE)
funcDeleteMutex = MsgLockingDummy;
funcMsgPrepareEnqueue = MsgLockingDummy;
ENDObjClassInit(msg)
-
-/*
- * vi:set ai:
+/* vim:set ai:
*/
diff --git a/runtime/msg.h b/runtime/msg.h
index 21cb2c64..d98111a8 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -52,7 +52,9 @@ struct msg {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
pthread_mutexattr_t mutAttr;
pthread_mutex_t mut;
- int iRefCount; /* reference counter (0 = unused) */
+ flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because
+ once data has entered the queue, this property is no longer needed. */
+ short iRefCount; /* reference counter (0 = unused) */
short bParseHOSTNAME; /* should the hostname be parsed from the message? */
/* background: the hostname is not present on "regular" messages
* received via UNIX domain sockets from the same machine. However,
@@ -60,8 +62,6 @@ struct msg {
* sockets. All in all, the parser would need parse templates, that would
* resolve all these issues... rgerhards, 2005-10-06
*/
- flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because
- once data has entered the queue, this property is no longer needed. */
short iSeverity; /* the severity 0..7 */
uchar *pszSeverity; /* severity as string... */
int iLenSeverity; /* ... and its length. */
@@ -99,6 +99,13 @@ struct msg {
cstr_t *pCSAPPNAME; /* APP-NAME */
cstr_t *pCSPROCID; /* PROCID */
cstr_t *pCSMSGID; /* MSGID */
+ time_t ttGenTime; /* time msg object was generated, same as tRcvdAt, but a Unix timestamp.
+ While this field looks redundant, it is required because a Unix timestamp
+ is used at later processing stages (namely in the output arena). Thanks to
+ the subleties of how time is defined, there is no reliable way to reconstruct
+ the Unix timestamp from the syslogTime fields (in practice, we may be close
+ enough to reliable, but I prefer to leave the subtle things to the OS, where
+ it obviously is solved in way or another...). */
struct syslogTime tRcvdAt;/* time the message entered this program */
char *pszRcvdAt3164; /* time as RFC3164 formatted string (always 15 charcters) */
char *pszRcvdAt3339; /* time as RFC3164 formatted string (32 charcters at most) */
@@ -114,11 +121,24 @@ struct msg {
int msgFlags; /* flags associated with this message */
};
+
+/* message flags (msgFlags), not an enum for historical reasons
+ */
+#define NOFLAG 0x000 /* no flag is set (to be used when a flag must be specified and none is required) */
+#define INTERNAL_MSG 0x001 /* msg generated by logmsgInternal() --> special handling */
+/* 0x002 not used because it was previously a known value - rgerhards, 2008-10-09 */
+#define IGNDATE 0x004 /* ignore, if given, date in message and use date of reception as msg date */
+#define MARK 0x008 /* this message is a mark */
+#define NEEDS_PARSING 0x010 /* raw message, must be parsed before processing can be done */
+#define PARSE_HOSTNAME 0x020 /* parse the hostname during message parsing */
+
+
/* function prototypes
*/
PROTOTYPEObjClassInit(msg);
char* getProgramName(msg_t*);
rsRetVal msgConstruct(msg_t **ppThis);
+rsRetVal msgConstructWithTime(msg_t **ppThis, struct syslogTime *stTime, time_t ttGenTime);
rsRetVal msgDestruct(msg_t **ppM);
msg_t* MsgDup(msg_t* pOld);
msg_t *MsgAddRef(msg_t *pM);
@@ -180,6 +200,5 @@ extern void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
#define MsgPrepareEnqueue(pMsg) funcMsgPrepareEnqueue(pMsg)
#endif /* #ifndef MSG_H_INCLUDED */
-/*
- * vi:set ai:
+/* vim:set ai:
*/
diff --git a/runtime/parser.c b/runtime/parser.c
new file mode 100644
index 00000000..ec2a28c7
--- /dev/null
+++ b/runtime/parser.c
@@ -0,0 +1,314 @@
+/* parser.c
+ * This module contains functions for message parsers. It still needs to be
+ * converted into an object (and much extended).
+ *
+ * Module begun 2008-10-09 by Rainer Gerhards (based on previous code from syslogd.c)
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#include "config.h"
+#include <stdlib.h>
+#include <ctype.h>
+#include <assert.h>
+#ifdef USE_NETZIP
+#include <zlib.h>
+#endif
+
+#include "rsyslog.h"
+#include "dirty.h"
+#include "msg.h"
+#include "obj.h"
+#include "errmsg.h"
+
+/* some defines */
+#define DEFUPRI (LOG_USER|LOG_NOTICE)
+
+/* definitions for objects we access */
+DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(errmsg)
+
+/* static data */
+
+
+/* this is a dummy class init
+ */
+rsRetVal parserClassInit(void)
+{
+ DEFiRet;
+
+ /* request objects we use */
+ CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+// TODO: free components! see action.c
+finalize_it:
+ RETiRet;
+}
+
+
+/* uncompress a received message if it is compressed.
+ * pMsg->pszRawMsg buffer is updated.
+ * rgerhards, 2008-10-09
+ */
+static inline rsRetVal uncompressMessage(msg_t *pMsg)
+{
+ DEFiRet;
+# ifdef USE_NETZIP
+ uchar *deflateBuf = NULL;
+ uLongf iLenDefBuf;
+ uchar *pszMsg;
+ size_t lenMsg;
+
+ assert(pMsg != NULL);
+ pszMsg = pMsg->pszRawMsg;
+ lenMsg = pMsg->iLenRawMsg;
+
+ /* we first need to check if we have a compressed record. If so,
+ * we must decompress it.
+ */
+ if(lenMsg > 0 && *pszMsg == 'z') { /* compressed data present? (do NOT change order if conditions!) */
+ /* we have compressed data, so let's deflate it. We support a maximum
+ * message size of iMaxLine. If it is larger, an error message is logged
+ * and the message is dropped. We do NOT try to decompress larger messages
+ * as such might be used for denial of service. It might happen to later
+ * builds that such functionality be added as an optional, operator-configurable
+ * feature.
+ */
+ int ret;
+ iLenDefBuf = glbl.GetMaxLine();
+ CHKmalloc(deflateBuf = malloc(sizeof(uchar) * (iLenDefBuf + 1)));
+ ret = uncompress((uchar *) deflateBuf, &iLenDefBuf, (uchar *) pszMsg+1, lenMsg-1);
+ DBGPRINTF("Compressed message uncompressed with status %d, length: new %ld, old %d.\n",
+ ret, (long) iLenDefBuf, (int) (lenMsg-1));
+ /* Now check if the uncompression worked. If not, there is not much we can do. In
+ * that case, we log an error message but ignore the message itself. Storing the
+ * compressed text is dangerous, as it contains control characters. So we do
+ * not do this. If someone would like to have a copy, this code here could be
+ * modified to do a hex-dump of the buffer in question. We do not include
+ * this functionality right now.
+ * rgerhards, 2006-12-07
+ */
+ if(ret != Z_OK) {
+ errmsg.LogError(0, NO_ERRCODE, "Uncompression of a message failed with return code %d "
+ "- enable debug logging if you need further information. "
+ "Message ignored.", ret);
+ FINALIZE; /* unconditional exit, nothing left to do... */
+ }
+ free(pMsg->pszRawMsg);
+ pMsg->pszRawMsg = deflateBuf;
+ pMsg->iLenRawMsg = iLenDefBuf;
+ deflateBuf = NULL; /* logically "freed" - caller is now responsible */
+ }
+finalize_it:
+ if(deflateBuf != NULL)
+ free(deflateBuf);
+
+# else /* ifdef USE_NETZIP */
+
+ /* in this case, we still need to check if the message is compressed. If so, we must
+ * tell the user we can not accept it.
+ */
+ if(pMsg->iLenRawMsg > 0 && *pMsg->pszRawMsg == 'z') {
+ errmsg.LogError(0, NO_ERRCODE, "Received a compressed message, but rsyslogd does not have compression "
+ "support enabled. The message will be ignored.");
+ ABORT_FINALIZE(RS_RET_NO_ZIP);
+ }
+
+finalize_it:
+# endif /* ifdef USE_NETZIP */
+
+ RETiRet;
+}
+
+
+/* sanitize a received message
+ * if a message gets to large during sanitization, it is truncated. This is
+ * as specified in the upcoming syslog RFC series.
+ * rgerhards, 2008-10-09
+ * We check if we have a NUL character at the very end of the
+ * message. This seems to be a frequent problem with a number of senders.
+ * So I have now decided to drop these NULs. However, if they are intentional,
+ * that may cause us some problems, e.g. with syslog-sign. On the other hand,
+ * current code always has problems with intentional NULs (as it needs to escape
+ * them to prevent problems with the C string libraries), so that does not
+ * really matter. Just to be on the save side, we'll log destruction of such
+ * NULs in the debug log.
+ * rgerhards, 2007-09-14
+ */
+static inline rsRetVal
+sanitizeMessage(msg_t *pMsg)
+{
+ DEFiRet;
+ uchar *pszMsg;
+ uchar *pDst; /* destination for copy job */
+ size_t lenMsg;
+ size_t iSrc;
+ size_t iDst;
+ size_t iMaxLine;
+
+ assert(pMsg != NULL);
+
+# ifdef USE_NETZIP
+ CHKiRet(uncompressMessage(pMsg));
+# endif
+
+ pszMsg = pMsg->pszRawMsg;
+ lenMsg = pMsg->iLenRawMsg;
+
+ /* remove NUL character at end of message (see comment in function header) */
+ if(pszMsg[lenMsg-1] == '\0') {
+ DBGPRINTF("dropped NUL at very end of message\n");
+ lenMsg--;
+ }
+
+ /* then we check if we need to drop trailing LFs, which often make
+ * their way into syslog messages unintentionally. In order to remain
+ * compatible to recent IETF developments, we allow the user to
+ * turn on/off this handling. rgerhards, 2007-07-23
+ */
+ if(bDropTrailingLF && pszMsg[lenMsg-1] == '\n') {
+ DBGPRINTF("dropped LF at very end of message (DropTrailingLF is set)\n");
+ lenMsg--;
+ }
+
+ /* now copy over the message and sanitize it */
+ /* TODO: can we get cheaper memory alloc? {alloca()?}*/
+ iMaxLine = glbl.GetMaxLine();
+ CHKmalloc(pDst = malloc(sizeof(uchar) * (iMaxLine + 1)));
+ iSrc = iDst = 0;
+ while(iSrc < lenMsg && iDst < iMaxLine) {
+ if(pszMsg[iSrc] == '\0') { /* guard against \0 characters... */
+ /* changed to the sequence (somewhat) proposed in
+ * draft-ietf-syslog-protocol-19. rgerhards, 2006-11-30
+ */
+ if(iDst + 3 < iMaxLine) { /* do we have space? */
+ pDst[iDst++] = cCCEscapeChar;
+ pDst[iDst++] = '0';
+ pDst[iDst++] = '0';
+ pDst[iDst++] = '0';
+ } /* if we do not have space, we simply ignore the '\0'... */
+ /* log an error? Very questionable... rgerhards, 2006-11-30 */
+ /* decided: we do not log an error, it won't help... rger, 2007-06-21 */
+ } else if(bEscapeCCOnRcv && iscntrl((int) pszMsg[iSrc])) {
+ /* we are configured to escape control characters. Please note
+ * that this most probably break non-western character sets like
+ * Japanese, Korean or Chinese. rgerhards, 2007-07-17
+ * Note: sysklogd logs octal values only for DEL and CCs above 127.
+ * For others, it logs ^n where n is the control char converted to an
+ * alphabet character. We like consistency and thus escape it to octal
+ * in all cases. If someone complains, we may change the mode. At least
+ * we known now what's going on.
+ * rgerhards, 2007-07-17
+ */
+ if(iDst + 3 < iMaxLine) { /* do we have space? */
+ pDst[iDst++] = cCCEscapeChar;
+ pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0300) >> 6);
+ pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0070) >> 3);
+ pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0007));
+ } /* again, if we do not have space, we ignore the char - see comment at '\0' */
+ } else {
+ pDst[iDst++] = pszMsg[iSrc];
+ }
+ ++iSrc;
+ }
+ pDst[iDst] = '\0'; /* space *is* reserved for this! */
+
+ /* we have a sanitized string. Let's save it now */
+ free(pMsg->pszRawMsg);
+ if((pMsg->pszRawMsg = malloc((iDst+1) * sizeof(uchar))) == NULL) {
+ /* when we get no new buffer, we use what we already have ;) */
+ pMsg->pszRawMsg = pDst;
+ } else {
+ /* trim buffer */
+ memcpy(pMsg->pszRawMsg, pDst, iDst+1);
+ free(pDst); /* too big! */
+ pMsg->iLenRawMsg = iDst;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/* Parse a received message. The object's rawmsg property is taken and
+ * parsed according to the relevant standards. This can later be
+ * extended to support configured parsers.
+ * rgerhards, 2008-10-09
+ */
+rsRetVal parseMsg(msg_t *pMsg)
+{
+ DEFiRet;
+ uchar *msg;
+ int pri;
+
+ CHKiRet(sanitizeMessage(pMsg));
+
+ /* we needed to sanitize first, because we otherwise do not have a C-string we can print... */
+ DBGPRINTF("msg parser: flags %x, from '%s', msg %s\n", pMsg->msgFlags, pMsg->pszRcvFrom, pMsg->pszRawMsg);
+
+ /* pull PRI */
+ pri = DEFUPRI;
+ msg = pMsg->pszRawMsg;
+ if(*msg == '<') {
+ pri = 0;
+ while(isdigit((int) *++msg)) {
+ pri = 10 * pri + (*msg - '0');
+ }
+ if(*msg == '>')
+ ++msg;
+ if(pri & ~(LOG_FACMASK|LOG_PRIMASK))
+ pri = DEFUPRI;
+ }
+ pMsg->iFacility = LOG_FAC(pri);
+ pMsg->iSeverity = LOG_PRI(pri);
+ MsgSetUxTradMsg(pMsg, (char*) msg);
+
+ if(pMsg->bParseHOSTNAME == 0)
+ MsgSetHOSTNAME(pMsg, (char*) pMsg->pszRcvFrom);
+
+ /* rger 2005-11-24 (happy thanksgiving!): we now need to check if we have
+ * a traditional syslog message or one formatted according to syslog-protocol.
+ * We need to apply different parsers depending on that. We use the
+ * -protocol VERSION field for the detection.
+ */
+ if(msg[0] == '1' && msg[1] == ' ') {
+ dbgprintf("Message has syslog-protocol format.\n");
+ setProtocolVersion(pMsg, 1);
+ if(parseRFCSyslogMsg(pMsg, pMsg->msgFlags) == 1) {
+ msgDestruct(&pMsg);
+ ABORT_FINALIZE(RS_RET_ERR); // TODO: we need to handle these cases!
+ }
+ } else { /* we have legacy syslog */
+ dbgprintf("Message has legacy syslog format.\n");
+ setProtocolVersion(pMsg, 0);
+ if(parseLegacySyslogMsg(pMsg, pMsg->msgFlags) == 1) {
+ msgDestruct(&pMsg);
+ ABORT_FINALIZE(RS_RET_ERR); // TODO: we need to handle these cases!
+ }
+ }
+
+ /* finalize message object */
+ pMsg->msgFlags &= ~NEEDS_PARSING; /* this message is now parsed */
+ MsgPrepareEnqueue(pMsg); /* "historical" name - preparese for multi-threading */
+
+finalize_it:
+ RETiRet;
+}
diff --git a/runtime/parser.h b/runtime/parser.h
new file mode 100644
index 00000000..cec9c083
--- /dev/null
+++ b/runtime/parser.h
@@ -0,0 +1,30 @@
+/* header for parser.c
+ * This is not yet an object, but contains all those code necessary to
+ * parse syslog messages.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#ifndef INCLUDED_PARSE_H
+#define INCLUDED_PARSE_H
+
+extern rsRetVal parserClassInit(void);
+extern rsRetVal parseMsg(msg_t*);
+
+#endif /* #ifndef INCLUDED_PARSE_H */
diff --git a/runtime/queue.c b/runtime/queue.c
index 93b33967..a3e29a96 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -49,6 +49,7 @@
#include "obj.h"
#include "wtp.h"
#include "wti.h"
+#include "atomic.h"
/* static data */
DEFobjStaticHelpers
@@ -1015,7 +1016,7 @@ queueAdd(queue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ++pThis->iQueueSize;
+ ATOMIC_INC(pThis->iQueueSize);
dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
}
@@ -1044,7 +1045,7 @@ queueDel(queue_t *pThis, void *pUsr)
iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
- --pThis->iQueueSize;
+ ATOMIC_DEC(pThis->iQueueSize);
}
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
@@ -1284,6 +1285,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
/* we have an object, so let's fill the properties */
objConstructSetObjInfo(pThis);
+ pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
if((pThis->pszSpoolDir = (uchar*) strdup((char*)glbl.GetWorkDir())) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -1532,8 +1534,6 @@ queueRateLimiter(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
- dbgoprint((obj_t*) pThis, "entering rate limiter\n");
-
iDelay = 0;
if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
/* time calls are expensive, so only do them when needed */
@@ -2120,6 +2120,15 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
ISOBJ_TYPE_assert(pThis, queue);
+ /* first check if we need to discard this message (which will cause CHKiRet() to exit)
+ * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize
+ * and bRunsDA parameters may not reflect the correct settings here, but they are
+ * "good enough" in the sense that they can be used to drive the decision. Valgrind's
+ * threading tools may point this access to be an error, but this is done
+ * intentional. I do not see this causes problems to us.
+ */
+ CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
+
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
* during its execution. If that is not done, race conditions occur if the
@@ -2131,9 +2140,6 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
d_pthread_mutex_lock(pThis->mut);
}
- /* first check if we need to discard this message (which will cause CHKiRet() to exit) */
- CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
-
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
CHKiRet(queueChkStrtDA(pThis));
@@ -2196,10 +2202,17 @@ finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
queueAdviseMaxWorkers(pThis);
- dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+ dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
+ /* the following pthread_yield is experimental, but brought us performance
+ * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216
+ * rgerhards, 2008-10-09
+ * but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22
+ */
+ if(pThis->bOptimizeUniProc)
+ pthread_yield();
}
RETiRet;
diff --git a/runtime/queue.h b/runtime/queue.h
index 9e75b31b..a2dd594f 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -58,6 +58,7 @@ typedef struct qWrkThrd_s {
typedef struct queue_s {
BEGINobjInstance;
queueType_t qType;
+ int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 70af8c4f..00290ee5 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -252,6 +252,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_QUEUE_FULL = -2105, /**< queue is full, operation could not be completed */
RS_RET_ACCEPT_ERR = -2106, /**< error during accept() system call */
RS_RET_INVLD_TIME = -2107, /**< invalid timestamp (e.g. could not be parsed) */
+ RS_RET_NO_ZIP = -2108, /**< ZIP functionality is not present */
RS_RET_CODE_ERR = -2109, /**< program code (internal) error */
/* RainerScript error messages (range 1000.. 1999) */
diff --git a/runtime/srutils.c b/runtime/srutils.c
index 97cc3252..1280e40d 100644
--- a/runtime/srutils.c
+++ b/runtime/srutils.c
@@ -371,6 +371,7 @@ int getNumberDigits(long lNum)
rsRetVal
timeoutComp(struct timespec *pt, long iTimeout)
{
+ BEGINfunc
assert(pt != NULL);
/* compute timeout */
clock_gettime(CLOCK_REALTIME, pt);
@@ -379,6 +380,7 @@ timeoutComp(struct timespec *pt, long iTimeout)
pt->tv_nsec -= 1000000000;
}
pt->tv_sec += iTimeout / 1000;
+ ENDfunc
return RS_RET_OK; /* so far, this is static... */
}
@@ -393,6 +395,7 @@ timeoutVal(struct timespec *pt)
{
struct timespec t;
long iTimeout;
+ BEGINfunc
assert(pt != NULL);
/* compute timeout */
@@ -403,6 +406,7 @@ timeoutVal(struct timespec *pt)
if(iTimeout < 0)
iTimeout = 0;
+ ENDfunc
return iTimeout;
}
diff --git a/runtime/sysvar.c b/runtime/sysvar.c
index 5eec8f67..c102d1f5 100644
--- a/runtime/sysvar.c
+++ b/runtime/sysvar.c
@@ -84,7 +84,7 @@ getNOW(eNOWType eNow, cstr_t **ppStr)
uchar szBuf[16];
struct syslogTime t;
- datetime.getCurrTime(&t);
+ datetime.getCurrTime(&t, NULL);
switch(eNow) {
case NOW_NOW:
snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "%4.4d-%2.2d-%2.2d", t.year, t.month, t.day);
diff --git a/runtime/wti.c b/runtime/wti.c
index 365b25d5..e8a77474 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -45,9 +45,11 @@
#include "wtp.h"
#include "wti.h"
#include "obj.h"
+#include "glbl.h"
/* static data */
DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
/* forward-definitions */
@@ -202,6 +204,7 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
+ pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -299,7 +302,7 @@ wtiWorkerCancelCleanup(void *arg)
pWtp = pThis->pWtp;
ISOBJ_TYPE_assert(pWtp, wtp);
- dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
@@ -367,7 +370,8 @@ wtiWorker(wti_t *pThis)
wtpProcessThrdChanges(pWtp);
pthread_testcancel(); /* see big comment in function header */
# if !defined(__hpux) /* pthread_yield is missing there! */
- pthread_yield(); /* see big comment in function header */
+ if(pThis->bOptimizeUniProc)
+ pthread_yield(); /* see big comment in function header */
# endif
/* if we have a rate-limiter set for this worker pool, let's call it. Please
@@ -391,7 +395,7 @@ wtiWorker(wti_t *pThis)
/* if we reach this point, we are still protected by the mutex */
if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
- dbgprintf("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
if(pWtp->toWrkShutdown == -1) {
@@ -400,7 +404,7 @@ wtiWorker(wti_t *pThis)
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
- dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
bInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
@@ -466,6 +470,14 @@ finalize_it:
/* dummy */
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
+/* exit our class
+ */
+BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(nsdsel_gtls)
+ /* release objects we no longer need */
+ objRelease(glbl, CORE_COMPONENT);
+ENDObjClassExit(wti)
+
/* Initialize the wti class. Must be called as the very first method
* before anything else is called inside this class.
@@ -473,6 +485,7 @@ rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
*/
BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */
/* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
ENDObjClassInit(wti)
/*
diff --git a/runtime/wti.h b/runtime/wti.h
index 0cd6744e..6b60b833 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -31,6 +31,7 @@
/* the worker thread instance class */
typedef struct wti_s {
BEGINobjInstance;
+ int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
pthread_t thrdID; /* thread ID */
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 734c8d57..06173e04 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -46,9 +46,11 @@
#include "wtp.h"
#include "wti.h"
#include "obj.h"
+#include "glbl.h"
/* static data */
DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
/* forward-definitions */
@@ -75,6 +77,7 @@ static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
/* Standard-Constructor for the wtp object
*/
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
+ pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
pthread_mutex_init(&pThis->mut, NULL);
pthread_cond_init(&pThis->condThrdTrm, NULL);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
@@ -456,7 +459,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
ISOBJ_TYPE_assert(pThis, wtp);
- wtpProcessThrdChanges(pThis);
+ wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
@@ -484,7 +487,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
* hold the queue's mutex, but at least it has a chance to start on a single-CPU system.
*/
# if !defined(__hpux) /* pthread_yield is missing there! */
- pthread_yield();
+ if(pThis->bOptimizeUniProc)
+ pthread_yield();
# endif
/* indicate we just started a worker and would like to see it running */
@@ -617,12 +621,22 @@ finalize_it:
/* dummy */
rsRetVal wtpQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
+/* exit our class
+ */
+BEGINObjClassExit(wtp, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(nsdsel_gtls)
+ /* release objects we no longer need */
+ objRelease(glbl, CORE_COMPONENT);
+ENDObjClassExit(wtp)
+
+
/* Initialize the stream class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(wtp, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
ENDObjClassInit(wtp)
/*
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 13ebe536..88d88afd 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -52,6 +52,7 @@ typedef enum {
/* the worker thread pool (wtp) object */
typedef struct wtp_s {
BEGINobjInstance;
+ int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
wtpState_t wtpState;
int iNumWorkerThreads;/* number of worker threads to use */
int iCurNumWrkThrd;/* current number of active worker threads */