summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-04-21 16:54:05 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-04-21 16:54:05 +0200
commit88caccecf8dd8beaf46915df05241a44f7d635f6 (patch)
tree7d6db5808b3bdd2453f041fef5c808b8fab36119 /runtime
parent6e410a76f64d74fec03de27a6ca1f3f996844917 (diff)
parent8e536c5b25ca1a7106f541149cf0d76bdf9237da (diff)
downloadrsyslog-88caccecf8dd8beaf46915df05241a44f7d635f6.tar.gz
rsyslog-88caccecf8dd8beaf46915df05241a44f7d635f6.tar.xz
rsyslog-88caccecf8dd8beaf46915df05241a44f7d635f6.zip
Merge branch 'master' into beta
Conflicts: ChangeLog configure.ac doc/manual.html
Diffstat (limited to 'runtime')
-rw-r--r--runtime/Makefile.am2
-rw-r--r--runtime/atomic.h2
-rw-r--r--runtime/cfsysline.c4
-rw-r--r--runtime/conf.c13
-rw-r--r--runtime/ctok.c17
-rw-r--r--runtime/datetime.c31
-rw-r--r--runtime/datetime.h2
-rw-r--r--runtime/debug.c5
-rw-r--r--runtime/debug.h2
-rw-r--r--runtime/expr.c63
-rw-r--r--runtime/glbl.c42
-rw-r--r--runtime/glbl.h7
-rw-r--r--runtime/module-template.h31
-rw-r--r--runtime/modules.c13
-rw-r--r--runtime/modules.h6
-rw-r--r--runtime/msg.c127
-rw-r--r--runtime/msg.h29
-rw-r--r--runtime/net.c9
-rw-r--r--runtime/objomsr.c18
-rw-r--r--runtime/objomsr.h6
-rw-r--r--runtime/parser.c315
-rw-r--r--runtime/parser.h30
-rw-r--r--runtime/queue.c452
-rw-r--r--runtime/queue.h51
-rw-r--r--runtime/rsyslog.c2
-rw-r--r--runtime/rsyslog.h10
-rw-r--r--runtime/srutils.c6
-rw-r--r--runtime/stringbuf.c50
-rw-r--r--runtime/stringbuf.h3
-rw-r--r--runtime/sysvar.c2
-rw-r--r--runtime/vm.c29
-rw-r--r--runtime/vmop.c60
-rw-r--r--runtime/vmop.h33
-rw-r--r--runtime/vmprg.c2
-rw-r--r--runtime/wti.c26
-rw-r--r--runtime/wti.h1
-rw-r--r--runtime/wtp.c23
-rw-r--r--runtime/wtp.h1
38 files changed, 1183 insertions, 342 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index 2e6c4041..2f0a1aa0 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -16,6 +16,8 @@ librsyslog_la_SOURCES = \
glbl.c \
conf.c \
conf.h \
+ parser.h \
+ parser.c \
msg.c \
msg.h \
linkedlist.c \
diff --git a/runtime/atomic.h b/runtime/atomic.h
index 10dbac90..fdf64214 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -42,6 +42,7 @@
*/
#ifdef HAVE_ATOMIC_BUILTINS
# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1))
# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
@@ -55,6 +56,7 @@
*/
# warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!"
# define ATOMIC_INC(data) (++(data))
+# define ATOMIC_DEC(data) (--(data))
# define ATOMIC_DEC_AND_FETCH(data) (--(data))
# define ATOMIC_FETCH_32BIT(data) (data)
# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1
diff --git a/runtime/cfsysline.c b/runtime/cfsysline.c
index c4490b48..e1e4a6a4 100644
--- a/runtime/cfsysline.c
+++ b/runtime/cfsysline.c
@@ -364,7 +364,7 @@ static rsRetVal doGetGID(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p
/* we set value via a set function */
CHKiRet(pSetHdlr(pVal, pgBuf->gr_gid));
}
- dbgprintf("gid %d obtained for group '%s'\n", pgBuf->gr_gid, szName);
+ dbgprintf("gid %d obtained for group '%s'\n", (int) pgBuf->gr_gid, szName);
}
skipWhiteSpace(pp); /* skip over any whitespace */
@@ -406,7 +406,7 @@ static rsRetVal doGetUID(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p
/* we set value via a set function */
CHKiRet(pSetHdlr(pVal, ppwBuf->pw_uid));
}
- dbgprintf("uid %d obtained for user '%s'\n", ppwBuf->pw_uid, szName);
+ dbgprintf("uid %d obtained for user '%s'\n", (int) ppwBuf->pw_uid, szName);
}
skipWhiteSpace(pp); /* skip over any whitespace */
diff --git a/runtime/conf.c b/runtime/conf.c
index f71d5669..ede15cc7 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -46,7 +46,9 @@
#include <glob.h>
#include <sys/types.h>
#ifdef HAVE_LIBGEN_H
-# include <libgen.h>
+# ifndef OS_SOLARIS
+# include <libgen.h>
+# endif
#endif
#include "rsyslog.h"
@@ -68,6 +70,9 @@
#include "ctok.h"
#include "ctok_token.h"
+#ifdef OS_SOLARIS
+# define NAME_MAX MAXNAMELEN
+#endif
/* forward definitions */
static rsRetVal cfline(uchar *line, selector_t **pfCurr);
@@ -789,6 +794,10 @@ dbgprintf("calling expression parser, pp %p ('%s')\n", *pline, *pline);
CHKiRet(ctok.Getpp(tok, pline));
CHKiRet(ctok.Destruct(&tok));
+ /* debug support - print vmprg after construction (uncomment to use) */
+ /* vmprgDebugPrint(f->f_filterData.f_expr->pVmprg); */
+ vmprgDebugPrint(f->f_filterData.f_expr->pVmprg);
+
/* we now need to skip whitespace to the action part, else we confuse
* the legacy rsyslog conf parser. -- rgerhards, 2008-02-25
*/
@@ -873,6 +882,8 @@ static rsRetVal cflineProcessPropFilter(uchar **pline, register selector_t *f)
f->f_filterData.prop.operation = FIOP_STARTSWITH;
} else if(!rsCStrOffsetSzStrCmp(pCSCompOp, iOffset, (unsigned char*) "regex", 5)) {
f->f_filterData.prop.operation = FIOP_REGEX;
+ } else if(!rsCStrOffsetSzStrCmp(pCSCompOp, iOffset, (unsigned char*) "ereregex", 8)) {
+ f->f_filterData.prop.operation = FIOP_EREREGEX;
} else {
errmsg.LogError(0, NO_ERRCODE, "error: invalid compare operation '%s' - ignoring selector",
(char*) rsCStrGetSzStrNoNULL(pCSCompOp));
diff --git a/runtime/ctok.c b/runtime/ctok.c
index ceab15bd..d2cd8bbd 100644
--- a/runtime/ctok.c
+++ b/runtime/ctok.c
@@ -259,12 +259,17 @@ ctokGetVar(ctok_t *pThis, ctok_token_t *pToken)
}
CHKiRet(rsCStrConstruct(&pstrVal));
- /* this loop is quite simple, a variable name is terminated by whitespace. */
- while(!isspace(c)) {
+ /* this loop is quite simple, a variable name is terminated when a non-supported
+ * character is detected. Note that we currently permit a numerical digit as the
+ * first char, which is not permitted by ABNF. -- rgerhards, 2009-03-10
+ */
+ while(isalpha(c) || isdigit(c) || (c == '_') || (c == '-')) {
CHKiRet(rsCStrAppendChar(pstrVal, tolower(c)));
CHKiRet(ctokGetCharFromStream(pThis, &c));
}
- CHKiRet(rsCStrFinish(pStrB));
+ CHKiRet(ctokUngetCharFromStream(pThis, c)); /* put not processed char back */
+
+ CHKiRet(rsCStrFinish(pstrVal));
CHKiRet(var.SetString(pToken->pVar, pstrVal));
pstrVal = NULL;
@@ -389,6 +394,7 @@ ctokGetToken(ctok_t *pThis, ctok_token_t **ppToken)
uchar c;
uchar szWord[128];
int bRetry = 0; /* retry parse? Only needed for inline comments... */
+ cstr_t *pstrVal;
ISOBJ_TYPE_assert(pThis, ctok);
ASSERT(ppToken != NULL);
@@ -512,7 +518,10 @@ ctokGetToken(ctok_t *pThis, ctok_token_t **ppToken)
/* push c back, higher level parser needs it */
CHKiRet(ctokUngetCharFromStream(pThis, c));
pToken->tok = ctok_FUNCTION;
- /* TODO: fill function name */
+ /* fill function name */
+ CHKiRet(rsCStrConstruct(&pstrVal));
+ CHKiRet(rsCStrSetSzStr(pstrVal, szWord));
+ CHKiRet(var.SetString(pToken->pVar, pstrVal));
} else { /* give up... */
dbgprintf("parser has an invalid word (token) '%s'\n", szWord);
pToken->tok = ctok_INVALID;
diff --git a/runtime/datetime.c b/runtime/datetime.c
index 20ca6191..deb66eb4 100644
--- a/runtime/datetime.c
+++ b/runtime/datetime.c
@@ -62,9 +62,14 @@ DEFobjCurrIf(errmsg)
* most portable and removes the need for additional structures
* (but I have to admit it is somewhat "bulky";)).
*
- * Obviously, all caller-provided pointers must not be NULL...
+ * Obviously, *t must not be NULL...
+ *
+ * rgerhards, 2008-10-07: added ttSeconds to provide a way to
+ * obtain the second-resolution UNIX timestamp. This is needed
+ * in some situations to minimize time() calls (namely when doing
+ * output processing). This can be left NULL if not needed.
*/
-static void getCurrTime(struct syslogTime *t)
+static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
{
struct timeval tp;
struct tm *tm;
@@ -83,6 +88,9 @@ static void getCurrTime(struct syslogTime *t)
# else
gettimeofday(&tp, NULL);
# endif
+ if(ttSeconds != NULL)
+ *ttSeconds = tp.tv_sec;
+
tm = localtime_r((time_t*) &(tp.tv_sec), &tmBuf);
t->year = tm->tm_year + 1900;
@@ -304,6 +312,7 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, char** ppszTS)
/* variables to temporarily hold time information while we parse */
int month;
int day;
+ int year = 0; /* 0 means no year provided */
int hour; /* 24 hour clock */
int minute;
int second;
@@ -464,7 +473,23 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, char** ppszTS)
if(*pszTS++ != ' ')
ABORT_FINALIZE(RS_RET_INVLD_TIME);
+
+ /* time part */
hour = srSLMGParseInt32(&pszTS);
+ if(hour > 1970 && hour < 2100) {
+ /* if so, we assume this actually is a year. This is a format found
+ * e.g. in Cisco devices.
+ * (if you read this 2100+ trying to fix a bug, congratulate me
+ * to how long the code survived - me no longer ;)) -- rgerhards, 2008-11-18
+ */
+ year = hour;
+
+ /* re-query the hour, this time it must be valid */
+ if(*pszTS++ != ' ')
+ ABORT_FINALIZE(RS_RET_INVLD_TIME);
+ hour = srSLMGParseInt32(&pszTS);
+ }
+
if(hour < 0 || hour > 23)
ABORT_FINALIZE(RS_RET_INVLD_TIME);
@@ -494,6 +519,8 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, char** ppszTS)
*ppszTS = pszTS; /* provide updated parse position back to caller */
pTime->timeType = 1;
pTime->month = month;
+ if(year > 0)
+ pTime->year = year; /* persist year if detected */
pTime->day = day;
pTime->hour = hour;
pTime->minute = minute;
diff --git a/runtime/datetime.h b/runtime/datetime.h
index 2210af02..0739588d 100644
--- a/runtime/datetime.h
+++ b/runtime/datetime.h
@@ -35,7 +35,7 @@ typedef struct datetime_s {
/* interfaces */
BEGINinterface(datetime) /* name must also be changed in ENDinterface macro! */
- void (*getCurrTime)(struct syslogTime *t);
+ void (*getCurrTime)(struct syslogTime *t, time_t *ttSeconds);
rsRetVal (*ParseTIMESTAMP3339)(struct syslogTime *pTime, char** ppszTS);
rsRetVal (*ParseTIMESTAMP3164)(struct syslogTime *pTime, char** pszTS);
int (*formatTimestampToMySQL)(struct syslogTime *ts, char* pDst, size_t iLenDst);
diff --git a/runtime/debug.c b/runtime/debug.c
index 62e2a687..4ee90226 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -1,3 +1,4 @@
+#include <sys/syscall.h>
/* debug.c
*
* This file proides debug and run time error analysis support. Some of the
@@ -886,7 +887,9 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf);
}
+
lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszThrdName);
+ // use for testing: lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "{%ld}%s: ", (long) syscall(SYS_gettid), pszThrdName);
if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf);
/* print object name header if we have an object */
@@ -1340,7 +1343,7 @@ rsRetVal dbgClassInit(void)
if(pszAltDbgFileName != NULL) {
/* we have a secondary file, so let's open it) */
- if((altdbg = open(pszAltDbgFileName, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, S_IRUSR|S_IWUSR)) == -1) {
+ if((altdbg = open(pszAltDbgFileName, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY|O_CLOEXEC, S_IRUSR|S_IWUSR)) == -1) {
fprintf(stderr, "alternate debug file could not be opened, ignoring. Error: %s\n", strerror(errno));
}
}
diff --git a/runtime/debug.h b/runtime/debug.h
index 43836024..1375493d 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -101,6 +101,8 @@ void dbgSetThrdName(uchar *pszName);
void dbgPrintAllDebugInfo(void);
/* macros */
+#define DBGPRINTF(...) if(Debug) { dbgprintf(__VA_ARGS__); }
+#define DBGOPRINT(...) if(Debug) { dbgoprint(__VA_ARGS__); }
#ifdef RTINST
# define BEGINfunc static dbgFuncDB_t *pdbgFuncDB; int dbgCALLStaCK_POP_POINT = dbgEntrFunc(&pdbgFuncDB, __FILE__, __func__, __LINE__);
# define ENDfunc dbgExitFunc(pdbgFuncDB, dbgCALLStaCK_POP_POINT, RS_RET_NO_IRET);
diff --git a/runtime/expr.c b/runtime/expr.c
index ee5b9e2c..38ed1c68 100644
--- a/runtime/expr.c
+++ b/runtime/expr.c
@@ -55,11 +55,63 @@ DEFobjCurrIf(ctok)
* rgerhards, 2008-02-19
*/
-/* forward definiton - thanks to recursive ABNF, we can not avoid at least one ;) */
+/* forward definition - thanks to recursive ABNF, we can not avoid at least one ;) */
static rsRetVal expr(expr_t *pThis, ctok_t *tok);
static rsRetVal
+function(expr_t *pThis, ctok_t *tok)
+{
+ DEFiRet;
+ ctok_token_t *pToken = NULL;
+ int iNumArgs = 0;
+ var_t *pVar;
+
+ ISOBJ_TYPE_assert(pThis, expr);
+ ISOBJ_TYPE_assert(tok, ctok);
+
+ CHKiRet(ctok.GetToken(tok, &pToken));
+ /* note: pToken is destructed in finalize_it */
+
+ if(pToken->tok == ctok_LPAREN) {
+ CHKiRet(ctok_token.Destruct(&pToken)); /* token processed, "eat" it */
+ CHKiRet(ctok.GetToken(tok, &pToken)); /* get next one */
+ } else
+ ABORT_FINALIZE(RS_RET_FUNC_NO_LPAREN);
+
+ /* we first push all the params on the stack. Then we call the function */
+ while(pToken->tok != ctok_RPAREN) {
+ ++iNumArgs;
+ CHKiRet(ctok.UngetToken(tok, pToken)); /* not for us, so let others process it */
+ CHKiRet(expr(pThis, tok));
+ CHKiRet(ctok.GetToken(tok, &pToken)); /* get next one, needed for while() check */
+ if(pToken->tok == ctok_COMMA) {
+ CHKiRet(ctok_token.Destruct(&pToken)); /* token processed, "eat" it */
+ CHKiRet(ctok.GetToken(tok, &pToken)); /* get next one */
+ if(pToken->tok == ctok_RPAREN) {
+ ABORT_FINALIZE(RS_RET_FUNC_MISSING_EXPR);
+ }
+ }
+ }
+
+
+ /* now push number of arguments - this must be on top of the stack */
+ CHKiRet(var.Construct(&pVar));
+ CHKiRet(var.ConstructFinalize(pVar));
+ CHKiRet(var.SetNumber(pVar, iNumArgs));
+ CHKiRet(vmprg.AddVarOperation(pThis->pVmprg, opcode_PUSHCONSTANT, pVar)); /* add to program */
+
+
+finalize_it:
+ if(pToken != NULL) {
+ ctok_token.Destruct(&pToken); /* "eat" processed token */
+ }
+
+ RETiRet;
+}
+
+
+static rsRetVal
terminal(expr_t *pThis, ctok_t *tok)
{
DEFiRet;
@@ -85,8 +137,12 @@ terminal(expr_t *pThis, ctok_t *tok)
break;
case ctok_FUNCTION:
dbgoprint((obj_t*) pThis, "function\n");
- /* TODO: vm: call - well, need to implement that first */
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED);
+ CHKiRet(function(pThis, tok)); /* this creates the stack call frame */
+ /* ... but we place the call instruction onto the stack ourselfs (because
+ * we have all relevant information)
+ */
+ CHKiRet(ctok_token.UnlinkVar(pToken, &pVar));
+ CHKiRet(vmprg.AddVarOperation(pThis->pVmprg, opcode_FUNC_CALL, pVar)); /* add to program */
break;
case ctok_MSGVAR:
dbgoprint((obj_t*) pThis, "MSGVAR\n");
@@ -406,6 +462,7 @@ ENDobjQueryInterface(expr)
*/
BEGINObjClassInit(expr, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* request objects we use */
+ CHKiRet(objUse(var, CORE_COMPONENT));
CHKiRet(objUse(vmprg, CORE_COMPONENT));
CHKiRet(objUse(var, CORE_COMPONENT));
CHKiRet(objUse(ctok_token, CORE_COMPONENT));
diff --git a/runtime/glbl.c b/runtime/glbl.c
index 1114fcd3..28f14320 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -51,12 +51,16 @@ DEFobjStaticHelpers
* class...
*/
static uchar *pszWorkDir = NULL;
+static int bOptimizeUniProc = 1; /* enable uniprocessor optimizations */
+static int bHUPisRestart = 1; /* should SIGHUP cause a full system restart? */
+static int bPreserveFQDN = 0; /* should FQDNs always be preserved? */
static int iMaxLine = 2048; /* maximum length of a syslog message */
static int iDefPFFamily = PF_UNSPEC; /* protocol family (IPv4, IPv6 or both) */
static int bDropMalPTRMsgs = 0;/* Drop messages which have malicious PTR records during DNS lookup */
static int option_DisallowWarning = 1; /* complain if message from disallowed sender is received */
static int bDisableDNS = 0; /* don't look up IP addresses of remote messages */
static uchar *LocalHostName = NULL;/* our hostname - read-only after startup */
+static uchar *LocalFQDNName = NULL;/* our hostname as FQDN - read-only after startup */
static uchar *LocalDomain; /* our local domain name - read-only after startup */
static char **StripDomains = NULL;/* these domains may be stripped before writing logs - r/o after s.u., never touched by init */
static char **LocalHosts = NULL;/* these hosts are logged with their hostname - read-only after startup, never touched by init */
@@ -85,6 +89,9 @@ static dataType Get##nameFunc(void) \
return(nameVar); \
}
+SIMP_PROP(OptimizeUniProc, bOptimizeUniProc, int)
+SIMP_PROP(PreserveFQDN, bPreserveFQDN, int)
+SIMP_PROP(HUPisRestart, bHUPisRestart, int)
SIMP_PROP(MaxLine, iMaxLine, int)
SIMP_PROP(DefPFFamily, iDefPFFamily, int) /* note that in the future we may check the family argument */
SIMP_PROP(DropMalPTRMsgs, bDropMalPTRMsgs, int)
@@ -94,6 +101,7 @@ SIMP_PROP(LocalDomain, LocalDomain, uchar*)
SIMP_PROP(StripDomains, StripDomains, char**)
SIMP_PROP(LocalHosts, LocalHosts, char**)
+SIMP_PROP_SET(LocalFQDNName, LocalFQDNName, uchar*)
SIMP_PROP_SET(LocalHostName, LocalHostName, uchar*)
SIMP_PROP_SET(DfltNetstrmDrvr, pszDfltNetstrmDrvr, uchar*) /* TODO: use custom function which frees existing value */
SIMP_PROP_SET(DfltNetstrmDrvrCAF, pszDfltNetstrmDrvrCAF, uchar*) /* TODO: use custom function which frees existing value */
@@ -110,7 +118,27 @@ SIMP_PROP_SET(DfltNetstrmDrvrCertFile, pszDfltNetstrmDrvrCertFile, uchar*) /* TO
static uchar*
GetLocalHostName(void)
{
- return(LocalHostName == NULL ? (uchar*) "[localhost]" : LocalHostName);
+ uchar *pszRet;
+
+ if(LocalHostName == NULL)
+ pszRet = (uchar*) "[localhost]";
+ else {
+ if(GetPreserveFQDN() == 1)
+ pszRet = LocalFQDNName;
+ else
+ pszRet = LocalHostName;
+ }
+ return(pszRet);
+}
+
+
+/* return the current localhost name as FQDN (requires FQDN to be set)
+ * TODO: we should set the FQDN ourselfs in here!
+ */
+static uchar*
+GetLocalFQDNName(void)
+{
+ return(LocalFQDNName == NULL ? (uchar*) "[localhost]" : LocalFQDNName);
}
@@ -173,10 +201,14 @@ CODESTARTobjQueryInterface(glbl)
pIf->Get##name = Get##name; \
pIf->Set##name = Set##name;
SIMP_PROP(MaxLine);
+ SIMP_PROP(OptimizeUniProc);
+ SIMP_PROP(PreserveFQDN);
+ SIMP_PROP(HUPisRestart);
SIMP_PROP(DefPFFamily);
SIMP_PROP(DropMalPTRMsgs);
SIMP_PROP(Option_DisallowWarning);
SIMP_PROP(DisableDNS);
+ SIMP_PROP(LocalFQDNName)
SIMP_PROP(LocalHostName)
SIMP_PROP(LocalDomain)
SIMP_PROP(StripDomains)
@@ -216,6 +248,9 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
pszWorkDir = NULL;
}
bDropMalPTRMsgs = 0;
+ bOptimizeUniProc = 1;
+ bHUPisRestart = 1;
+ bPreserveFQDN = 0;
return RS_RET_OK;
}
@@ -235,6 +270,9 @@ BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercafile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCAF, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdriverkeyfile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrKeyFile, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercertfile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCertFile, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"optimizeforuniprocessor", 0, eCmdHdlrBinary, NULL, &bOptimizeUniProc, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"hupisrestart", 0, eCmdHdlrBinary, NULL, &bHUPisRestart, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"preservefqdn", 0, eCmdHdlrBinary, NULL, &bPreserveFQDN, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
ENDObjClassInit(glbl)
@@ -255,6 +293,8 @@ BEGINObjClassExit(glbl, OBJ_IS_CORE_MODULE) /* class, version */
free(pszWorkDir);
if(LocalHostName != NULL)
free(LocalHostName);
+ if(LocalFQDNName != NULL)
+ free(LocalFQDNName);
ENDObjClassExit(glbl)
/* vi:set ai:
diff --git a/runtime/glbl.h b/runtime/glbl.h
index 0c83bdd5..5bdf4f57 100644
--- a/runtime/glbl.h
+++ b/runtime/glbl.h
@@ -41,10 +41,14 @@ BEGINinterface(glbl) /* name must also be changed in ENDinterface macro! */
dataType (*Get##name)(void); \
rsRetVal (*Set##name)(dataType);
SIMP_PROP(MaxLine, int)
+ SIMP_PROP(OptimizeUniProc, int)
+ SIMP_PROP(HUPisRestart, int)
+ SIMP_PROP(PreserveFQDN, int)
SIMP_PROP(DefPFFamily, int)
SIMP_PROP(DropMalPTRMsgs, int)
SIMP_PROP(Option_DisallowWarning, int)
SIMP_PROP(DisableDNS, int)
+ SIMP_PROP(LocalFQDNName, uchar*)
SIMP_PROP(LocalHostName, uchar*)
SIMP_PROP(LocalDomain, uchar*)
SIMP_PROP(StripDomains, char**)
@@ -55,7 +59,8 @@ BEGINinterface(glbl) /* name must also be changed in ENDinterface macro! */
SIMP_PROP(DfltNetstrmDrvrCertFile, uchar*)
#undef SIMP_PROP
ENDinterface(glbl)
-#define glblCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+#define glblCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */
+/* version 2 had PreserveFQDN added - rgerhards, 2008-12-08 */
/* the remaining prototypes */
PROTOTYPEObj(glbl);
diff --git a/runtime/module-template.h b/runtime/module-template.h
index eb39b587..6f7d877c 100644
--- a/runtime/module-template.h
+++ b/runtime/module-template.h
@@ -481,6 +481,33 @@ static rsRetVal afterRun(void)\
}
-/*
- * vi:set ai:
+/* doHUP()
+ * This function is optional. Currently, it is available to output plugins
+ * only, but may be made available to other types of plugins in the future.
+ * A plugin does not need to define this entry point. If if does, it gets
+ * called when a non-restart type of HUP is done. A plugin should register
+ * this function so that it can close files, connection or other ressources
+ * on HUP - if it can be assume the user wanted to do this as a part of HUP
+ * processing. Note that the name "HUP" has historical reasons, it stems back
+ * to the infamous SIGHUP which was sent to restart a syslogd. We still retain
+ * that legacy, but may move this to a different signal.
+ * rgerhards, 2008-10-22
+ */
+#define CODEqueryEtryPt_doHUP \
+ else if(!strcmp((char*) name, "doHUP")) {\
+ *pEtryPoint = doHUP;\
+ }
+#define BEGINdoHUP \
+static rsRetVal doHUP(instanceData __attribute__((unused)) *pData)\
+{\
+ DEFiRet;
+
+#define CODESTARTdoHUP
+
+#define ENDdoHUP \
+ RETiRet;\
+}
+
+
+/* vim:set ai:
*/
diff --git a/runtime/modules.c b/runtime/modules.c
index d5730ede..9fdb48e7 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -49,6 +49,10 @@
#include <unistd.h>
#include <sys/file.h>
+#ifdef OS_SOLARIS
+# define PATH_MAX MAXPATHLEN
+#endif
+
#include "cfsysline.h"
#include "modules.h"
#include "errmsg.h"
@@ -221,6 +225,8 @@ static rsRetVal queryHostEtryPt(uchar *name, rsRetVal (**pEtryPoint)())
*pEtryPoint = regCfSysLineHdlr;
} else if(!strcmp((char*) name, "objGetObjInterface")) {
*pEtryPoint = objGetObjInterface;
+ } else if(!strcmp((char*) name, "OMSRgetSupportedTplOpts")) {
+ *pEtryPoint = OMSRgetSupportedTplOpts;
} else {
*pEtryPoint = NULL; /* to be on the safe side */
ABORT_FINALIZE(RS_RET_ENTRY_POINT_NOT_FOUND);
@@ -347,6 +353,7 @@ static rsRetVal
doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_t*), uchar *name, void *pModHdlr)
{
DEFiRet;
+ rsRetVal localRet;
modInfo_t *pNew = NULL;
rsRetVal (*modGetType)(eModType_t *pType);
@@ -391,6 +398,10 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parseSelectorAct", &pNew->mod.om.parseSelectorAct));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"isCompatibleWithFeature", &pNew->isCompatibleWithFeature));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"tryResume", &pNew->tryResume));
+ /* try load optional interfaces */
+ localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP);
+ if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
+ ABORT_FINALIZE(localRet);
break;
case eMOD_LIB:
break;
@@ -599,7 +610,7 @@ Load(uchar *pModName)
iLoadCnt = 0;
do {
/* now build our load module name */
- if(*pModName == '/') {
+ if(*pModName == '/' || *pModName == '.') {
*szPath = '\0'; /* we do not need to append the path - its already in the module name */
iPathLen = 0;
} else {
diff --git a/runtime/modules.h b/runtime/modules.h
index 7d34bcf7..372529ee 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -44,8 +44,11 @@
* rgerhards, 2008-03-04
* version 3 adds modInfo_t ptr to call of modInit -- rgerhards, 2008-03-10
* version 4 removes needUDPSocket OM callback -- rgerhards, 2008-03-22
+ * version 5 changes the way parsing works for input modules. This is
+ * an important change, parseAndSubmitMessage() goes away. Other
+ * module types are not affected. -- rgerhards, 2008-10-09
*/
-#define CURR_MOD_IF_VERSION 4
+#define CURR_MOD_IF_VERSION 5
typedef enum eModType_ {
eMOD_IN, /* input module */
@@ -88,6 +91,7 @@ typedef struct modInfo_s {
rsRetVal (*tryResume)(void*);/* called to see if module actin can be resumed now */
rsRetVal (*modExit)(void); /* called before termination or module unload */
rsRetVal (*modGetID)(void **); /* get its unique ID from module */
+ rsRetVal (*doHUP)(void *); /* non-restart type HUP handler */
/* below: parse a configuration line - return if processed
* or not. If not, must be parsed to next module.
*/
diff --git a/runtime/msg.c b/runtime/msg.c
index 2b58eb88..9165e8d6 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -140,8 +140,8 @@ void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
#define MsgLock(pMsg) funcLock(pMsg)
#define MsgUnlock(pMsg) funcUnlock(pMsg)
#else
-#define MsgLock(pMsg) {dbgprintf("line %d\n - ", __LINE__); funcLock(pMsg);; }
-#define MsgUnlock(pMsg) {dbgprintf("line %d - ", __LINE__); funcUnlock(pMsg); }
+#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; }
+#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); }
#endif
/* the next function is a dummy to be used by the looking functions
@@ -242,12 +242,21 @@ rsRetVal MsgEnableThreadSafety(void)
/* end locking functions */
-/* "Constructor" for a msg "object". Returns a pointer to
+/* This is common code for all Constructors. It is defined in an
+ * inline'able function so that we can save a function call in the
+ * actual constructors (otherwise, the msgConstruct would need
+ * to call msgConstructWithTime(), which would require a
+ * function call). Now, both can use this inline function. This
+ * enables us to be optimal, but still have the code just once.
* the new object or NULL if no such object could be allocated.
* An object constructed via this function should only be destroyed
- * via "msgDestruct()".
+ * via "msgDestruct()". This constructor does not query system time
+ * itself but rather uses a user-supplied value. This enables the caller
+ * to do some tricks to save processing time (done, for example, in the
+ * udp input).
+ * rgerhards, 2008-10-06
*/
-rsRetVal msgConstruct(msg_t **ppThis)
+static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
{
DEFiRet;
msg_t *pM;
@@ -260,21 +269,59 @@ rsRetVal msgConstruct(msg_t **ppThis)
pM->iRefCount = 1;
pM->iSeverity = -1;
pM->iFacility = -1;
+ objConstructSetObjInfo(pM);
+
+ /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/
+
+ *ppThis = pM;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* "Constructor" for a msg "object". Returns a pointer to
+ * the new object or NULL if no such object could be allocated.
+ * An object constructed via this function should only be destroyed
+ * via "msgDestruct()". This constructor does not query system time
+ * itself but rather uses a user-supplied value. This enables the caller
+ * to do some tricks to save processing time (done, for example, in the
+ * udp input).
+ * rgerhards, 2008-10-06
+ */
+rsRetVal msgConstructWithTime(msg_t **ppThis, struct syslogTime *stTime, time_t ttGenTime)
+{
+ DEFiRet;
+
+ CHKiRet(msgBaseConstruct(ppThis));
+ (*ppThis)->ttGenTime = ttGenTime;
+ memcpy(&(*ppThis)->tRcvdAt, stTime, sizeof(struct syslogTime));
+ memcpy(&(*ppThis)->tTIMESTAMP, stTime, sizeof(struct syslogTime));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* "Constructor" for a msg "object". Returns a pointer to
+ * the new object or NULL if no such object could be allocated.
+ * An object constructed via this function should only be destroyed
+ * via "msgDestruct()". This constructor, for historical reasons,
+ * also sets the two timestamps to the current time.
+ */
+rsRetVal msgConstruct(msg_t **ppThis)
+{
+ DEFiRet;
+ CHKiRet(msgBaseConstruct(ppThis));
/* we initialize both timestamps to contain the current time, so that they
* are consistent. Also, this saves us from doing any further time calls just
* to obtain a timestamp. The memcpy() should not really make a difference,
* especially as I think there is no codepath currently where it would not be
* required (after I have cleaned up the pathes ;)). -- rgerhards, 2008-10-02
*/
- datetime.getCurrTime(&(pM->tRcvdAt));
- memcpy(&pM->tTIMESTAMP, &pM->tRcvdAt, sizeof(struct syslogTime));
-
- objConstructSetObjInfo(pM);
-
- /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/
-
- *ppThis = pM;
+ datetime.getCurrTime(&((*ppThis)->tRcvdAt), &((*ppThis)->ttGenTime));
+ memcpy(&(*ppThis)->tTIMESTAMP, &(*ppThis)->tRcvdAt, sizeof(struct syslogTime));
finalize_it:
RETiRet;
@@ -400,7 +447,7 @@ msg_t* MsgDup(msg_t* pOld)
assert(pOld != NULL);
BEGINfunc
- if(msgConstruct(&pNew) != RS_RET_OK) {
+ if(msgConstructWithTime(&pNew, &pOld->tTIMESTAMP, pOld->ttGenTime) != RS_RET_OK) {
return NULL;
}
@@ -411,8 +458,7 @@ msg_t* MsgDup(msg_t* pOld)
pNew->bParseHOSTNAME = pOld->bParseHOSTNAME;
pNew->msgFlags = pOld->msgFlags;
pNew->iProtocolVersion = pOld->iProtocolVersion;
- memcpy(&pNew->tRcvdAt, &pOld->tRcvdAt, sizeof(struct syslogTime));
- memcpy(&pNew->tTIMESTAMP, &pOld->tTIMESTAMP, sizeof(struct syslogTime));
+ pNew->ttGenTime = pOld->ttGenTime;
tmpCOPYSZ(Severity);
tmpCOPYSZ(SeverityStr);
tmpCOPYSZ(Facility);
@@ -466,6 +512,7 @@ static rsRetVal MsgSerialize(msg_t *pThis, strm_t *pStrm)
objSerializeSCALAR(pStrm, iSeverity, SHORT);
objSerializeSCALAR(pStrm, iFacility, SHORT);
objSerializeSCALAR(pStrm, msgFlags, INT);
+ objSerializeSCALAR(pStrm, ttGenTime, INT);
objSerializeSCALAR(pStrm, tRcvdAt, SYSLOGTIME);
objSerializeSCALAR(pStrm, tTIMESTAMP, SYSLOGTIME);
@@ -702,6 +749,7 @@ char *getMSG(msg_t *pM)
char *getPRI(msg_t *pM)
{
int pri;
+ BEGINfunc
if(pM == NULL)
return "";
@@ -721,6 +769,7 @@ char *getPRI(msg_t *pM)
}
MsgUnlock(pM);
+ ENDfunc
return (char*)pM->pszPRI;
}
@@ -735,6 +784,7 @@ int getPRIi(msg_t *pM)
char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt)
{
+ BEGINfunc
if(pM == NULL)
return "";
@@ -806,11 +856,13 @@ char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt)
MsgUnlock(pM);
return(pM->pszTIMESTAMP_SecFrac);
}
+ ENDfunc
return "INVALID eFmt OPTION!";
}
char *getTimeGenerated(msg_t *pM, enum tplFormatTypes eFmt)
{
+ BEGINfunc
if(pM == NULL)
return "";
@@ -882,6 +934,7 @@ char *getTimeGenerated(msg_t *pM, enum tplFormatTypes eFmt)
MsgUnlock(pM);
return(pM->pszRcvdAt_SecFrac);
}
+ ENDfunc
return "INVALID eFmt OPTION!";
}
@@ -1629,7 +1682,7 @@ static uchar *getNOW(eNOWType eNow)
return NULL;
}
- datetime.getCurrTime(&t);
+ datetime.getCurrTime(&t, NULL);
switch(eNow) {
case NOW_NOW:
snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d-%2.2d-%2.2d", t.year, t.month, t.day);
@@ -2354,6 +2407,40 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
}
+ /* finally, we need to check if the property should be formatted in CSV
+ * format (we use RFC 4180, and always use double quotes). As of this writing,
+ * this should be the last action carried out on the property, but in the
+ * future there may be reasons to change that. -- rgerhards, 2009-04-02
+ */
+ if(pTpe->data.field.options.bCSV) {
+ /* we need to obtain a private copy, as we need to at least add the double quotes */
+ int iBufLen = strlen(pRes);
+ char *pBStart;
+ char *pDst;
+ char *pSrc;
+ /* the malloc may be optimized, we currently use the worst case... */
+ pBStart = pDst = malloc((2 * iBufLen + 3) * sizeof(char));
+ if(pDst == NULL) {
+ if(*pbMustBeFreed == 1)
+ free(pRes);
+ *pbMustBeFreed = 0;
+ return "**OUT OF MEMORY**";
+ }
+ pSrc = pRes;
+ *pDst++ = '"'; /* starting quote */
+ while(*pSrc) {
+ if(*pSrc == '"')
+ *pDst++ = '"'; /* need to add double double quote (see RFC4180) */
+ *pDst++ = *pSrc++;
+ }
+ *pDst++ = '"'; /* ending quote */
+ *pDst = '\0';
+ if(*pbMustBeFreed == 1)
+ free(pRes);
+ pRes = pBStart;
+ *pbMustBeFreed = 1;
+ }
+
/*dbgprintf("MsgGetProp(\"%s\"): \"%s\"\n", pName, pRes); only for verbose debug logging */
return(pRes);
}
@@ -2445,6 +2532,8 @@ rsRetVal MsgSetProperty(msg_t *pThis, var_t *pProp)
MsgSetPROCID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr));
} else if(isProp("pCSMSGID")) {
MsgSetMSGID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr));
+ } else if(isProp("ttGenTime")) {
+ pThis->ttGenTime = pProp->val.num;
} else if(isProp("tRcvdAt")) {
memcpy(&pThis->tRcvdAt, &pProp->val.vSyslogTime, sizeof(struct syslogTime));
} else if(isProp("tTIMESTAMP")) {
@@ -2506,7 +2595,5 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE)
funcDeleteMutex = MsgLockingDummy;
funcMsgPrepareEnqueue = MsgLockingDummy;
ENDObjClassInit(msg)
-
-/*
- * vi:set ai:
+/* vim:set ai:
*/
diff --git a/runtime/msg.h b/runtime/msg.h
index 1bad9c66..c8350626 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -53,7 +53,9 @@ struct msg {
pthread_mutexattr_t mutAttr;
short bDoLock; /* use the mutex? */
pthread_mutex_t mut;
- int iRefCount; /* reference counter (0 = unused) */
+ flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because
+ once data has entered the queue, this property is no longer needed. */
+ short iRefCount; /* reference counter (0 = unused) */
short bParseHOSTNAME; /* should the hostname be parsed from the message? */
/* background: the hostname is not present on "regular" messages
* received via UNIX domain sockets from the same machine. However,
@@ -61,8 +63,6 @@ short bDoLock; /* use the mutex? */
* sockets. All in all, the parser would need parse templates, that would
* resolve all these issues... rgerhards, 2005-10-06
*/
- flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because
- once data has entered the queue, this property is no longer needed. */
short iSeverity; /* the severity 0..7 */
uchar *pszSeverity; /* severity as string... */
int iLenSeverity; /* ... and its length. */
@@ -100,6 +100,13 @@ short bDoLock; /* use the mutex? */
cstr_t *pCSAPPNAME; /* APP-NAME */
cstr_t *pCSPROCID; /* PROCID */
cstr_t *pCSMSGID; /* MSGID */
+ time_t ttGenTime; /* time msg object was generated, same as tRcvdAt, but a Unix timestamp.
+ While this field looks redundant, it is required because a Unix timestamp
+ is used at later processing stages (namely in the output arena). Thanks to
+ the subleties of how time is defined, there is no reliable way to reconstruct
+ the Unix timestamp from the syslogTime fields (in practice, we may be close
+ enough to reliable, but I prefer to leave the subtle things to the OS, where
+ it obviously is solved in way or another...). */
struct syslogTime tRcvdAt;/* time the message entered this program */
char *pszRcvdAt3164; /* time as RFC3164 formatted string (always 15 charcters) */
char *pszRcvdAt3339; /* time as RFC3164 formatted string (32 charcters at most) */
@@ -115,11 +122,24 @@ short bDoLock; /* use the mutex? */
int msgFlags; /* flags associated with this message */
};
+
+/* message flags (msgFlags), not an enum for historical reasons
+ */
+#define NOFLAG 0x000 /* no flag is set (to be used when a flag must be specified and none is required) */
+#define INTERNAL_MSG 0x001 /* msg generated by logmsgInternal() --> special handling */
+/* 0x002 not used because it was previously a known value - rgerhards, 2008-10-09 */
+#define IGNDATE 0x004 /* ignore, if given, date in message and use date of reception as msg date */
+#define MARK 0x008 /* this message is a mark */
+#define NEEDS_PARSING 0x010 /* raw message, must be parsed before processing can be done */
+#define PARSE_HOSTNAME 0x020 /* parse the hostname during message parsing */
+
+
/* function prototypes
*/
PROTOTYPEObjClassInit(msg);
char* getProgramName(msg_t*);
rsRetVal msgConstruct(msg_t **ppThis);
+rsRetVal msgConstructWithTime(msg_t **ppThis, struct syslogTime *stTime, time_t ttGenTime);
rsRetVal msgDestruct(msg_t **ppM);
msg_t* MsgDup(msg_t* pOld);
msg_t *MsgAddRef(msg_t *pM);
@@ -181,6 +201,5 @@ extern void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
#define MsgPrepareEnqueue(pMsg) funcMsgPrepareEnqueue(pMsg)
#endif /* #ifndef MSG_H_INCLUDED */
-/*
- * vi:set ai:
+/* vim:set ai:
*/
diff --git a/runtime/net.c b/runtime/net.c
index f289ecc5..db2d7e37 100644
--- a/runtime/net.c
+++ b/runtime/net.c
@@ -63,6 +63,11 @@
#include "errmsg.h"
#include "net.h"
+#ifdef OS_SOLARIS
+# define s6_addr32 _S6_un._S6_u32
+ typedef unsigned int u_int32_t;
+#endif
+
MODULE_TYPE_LIB
/* static data */
@@ -1233,7 +1238,9 @@ rsRetVal cvthname(struct sockaddr_storage *f, uchar *pszHost, uchar *pszHostFQDN
* make this in option in the long term. (rgerhards, 2007-09-11)
*/
strcpy((char*)pszHost, (char*)pszHostFQDN);
- if ((p = (uchar*) strchr((char*)pszHost, '.'))) { /* find start of domain name "machine.example.com" */
+ if( (glbl.GetPreserveFQDN() == 0)
+ && (p = (uchar*) strchr((char*)pszHost, '.'))) { /* find start of domain name "machine.example.com" */
+ strcmp((char*)(p + 1), (char*)glbl.GetLocalDomain());
if(strcmp((char*)(p + 1), (char*)glbl.GetLocalDomain()) == 0) {
*p = '\0'; /* simply terminate the string */
} else {
diff --git a/runtime/objomsr.c b/runtime/objomsr.c
index 21d284f3..8dddc4b8 100644
--- a/runtime/objomsr.c
+++ b/runtime/objomsr.c
@@ -141,5 +141,23 @@ int OMSRgetEntry(omodStringRequest_t *pThis, int iEntry, uchar **ppTplName, int
return RS_RET_OK;
}
+
+
+/* return the full set of template options that are supported by this version of
+ * OMSR. They are returned in an unsigned long value. The caller can mask that
+ * value to check on the option he is interested in.
+ * Note that this interface was added in 4.1.6, so a plugin must obtain a pointer
+ * to this interface via queryHostEtryPt().
+ * rgerhards, 2009-04-03
+ */
+rsRetVal
+OMSRgetSupportedTplOpts(unsigned long *pOpts)
+{
+ DEFiRet;
+ assert(pOpts != NULL);
+ *pOpts = OMSR_RQD_TPL_OPT_SQL | OMSR_TPL_AS_ARRAY;
+ RETiRet;
+}
+
/* vim:set ai:
*/
diff --git a/runtime/objomsr.h b/runtime/objomsr.h
index 2255e4f3..75ad0fb8 100644
--- a/runtime/objomsr.h
+++ b/runtime/objomsr.h
@@ -1,6 +1,6 @@
/* Definition of the omsr (omodStringRequest) object.
*
- * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -27,7 +27,8 @@
/* define flags for required template options */
#define OMSR_NO_RQD_TPL_OPTS 0
#define OMSR_RQD_TPL_OPT_SQL 1
-/* next option is 2, 4, 8, ... */
+#define OMSR_TPL_AS_ARRAY 2 /* introduced in 4.1.6, 2009-04-03 */
+/* next option is 4, 8, 16, ... */
struct omodStringRequest_s { /* strings requested by output module for doAction() */
int iNumEntries; /* number of array entries for data elements below */
@@ -40,6 +41,7 @@ typedef struct omodStringRequest_s omodStringRequest_t;
rsRetVal OMSRdestruct(omodStringRequest_t *pThis);
rsRetVal OMSRconstruct(omodStringRequest_t **ppThis, int iNumEntries);
rsRetVal OMSRsetEntry(omodStringRequest_t *pThis, int iEntry, uchar *pTplName, int iTplOpts);
+rsRetVal OMSRgetSupportedTplOpts(unsigned long *pOpts);
int OMSRgetEntryCount(omodStringRequest_t *pThis);
int OMSRgetEntry(omodStringRequest_t *pThis, int iEntry, uchar **ppTplName, int *piTplOpts);
diff --git a/runtime/parser.c b/runtime/parser.c
new file mode 100644
index 00000000..b4ab0a3e
--- /dev/null
+++ b/runtime/parser.c
@@ -0,0 +1,315 @@
+/* parser.c
+ * This module contains functions for message parsers. It still needs to be
+ * converted into an object (and much extended).
+ *
+ * Module begun 2008-10-09 by Rainer Gerhards (based on previous code from syslogd.c)
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#include "config.h"
+#include <stdlib.h>
+#include <ctype.h>
+#include <string.h>
+#include <assert.h>
+#ifdef USE_NETZIP
+#include <zlib.h>
+#endif
+
+#include "rsyslog.h"
+#include "dirty.h"
+#include "msg.h"
+#include "obj.h"
+#include "errmsg.h"
+
+/* some defines */
+#define DEFUPRI (LOG_USER|LOG_NOTICE)
+
+/* definitions for objects we access */
+DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(errmsg)
+
+/* static data */
+
+
+/* this is a dummy class init
+ */
+rsRetVal parserClassInit(void)
+{
+ DEFiRet;
+
+ /* request objects we use */
+ CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+// TODO: free components! see action.c
+finalize_it:
+ RETiRet;
+}
+
+
+/* uncompress a received message if it is compressed.
+ * pMsg->pszRawMsg buffer is updated.
+ * rgerhards, 2008-10-09
+ */
+static inline rsRetVal uncompressMessage(msg_t *pMsg)
+{
+ DEFiRet;
+# ifdef USE_NETZIP
+ uchar *deflateBuf = NULL;
+ uLongf iLenDefBuf;
+ uchar *pszMsg;
+ size_t lenMsg;
+
+ assert(pMsg != NULL);
+ pszMsg = pMsg->pszRawMsg;
+ lenMsg = pMsg->iLenRawMsg;
+
+ /* we first need to check if we have a compressed record. If so,
+ * we must decompress it.
+ */
+ if(lenMsg > 0 && *pszMsg == 'z') { /* compressed data present? (do NOT change order if conditions!) */
+ /* we have compressed data, so let's deflate it. We support a maximum
+ * message size of iMaxLine. If it is larger, an error message is logged
+ * and the message is dropped. We do NOT try to decompress larger messages
+ * as such might be used for denial of service. It might happen to later
+ * builds that such functionality be added as an optional, operator-configurable
+ * feature.
+ */
+ int ret;
+ iLenDefBuf = glbl.GetMaxLine();
+ CHKmalloc(deflateBuf = malloc(sizeof(uchar) * (iLenDefBuf + 1)));
+ ret = uncompress((uchar *) deflateBuf, &iLenDefBuf, (uchar *) pszMsg+1, lenMsg-1);
+ DBGPRINTF("Compressed message uncompressed with status %d, length: new %ld, old %d.\n",
+ ret, (long) iLenDefBuf, (int) (lenMsg-1));
+ /* Now check if the uncompression worked. If not, there is not much we can do. In
+ * that case, we log an error message but ignore the message itself. Storing the
+ * compressed text is dangerous, as it contains control characters. So we do
+ * not do this. If someone would like to have a copy, this code here could be
+ * modified to do a hex-dump of the buffer in question. We do not include
+ * this functionality right now.
+ * rgerhards, 2006-12-07
+ */
+ if(ret != Z_OK) {
+ errmsg.LogError(0, NO_ERRCODE, "Uncompression of a message failed with return code %d "
+ "- enable debug logging if you need further information. "
+ "Message ignored.", ret);
+ FINALIZE; /* unconditional exit, nothing left to do... */
+ }
+ free(pMsg->pszRawMsg);
+ pMsg->pszRawMsg = deflateBuf;
+ pMsg->iLenRawMsg = iLenDefBuf;
+ deflateBuf = NULL; /* logically "freed" - caller is now responsible */
+ }
+finalize_it:
+ if(deflateBuf != NULL)
+ free(deflateBuf);
+
+# else /* ifdef USE_NETZIP */
+
+ /* in this case, we still need to check if the message is compressed. If so, we must
+ * tell the user we can not accept it.
+ */
+ if(pMsg->iLenRawMsg > 0 && *pMsg->pszRawMsg == 'z') {
+ errmsg.LogError(0, NO_ERRCODE, "Received a compressed message, but rsyslogd does not have compression "
+ "support enabled. The message will be ignored.");
+ ABORT_FINALIZE(RS_RET_NO_ZIP);
+ }
+
+finalize_it:
+# endif /* ifdef USE_NETZIP */
+
+ RETiRet;
+}
+
+
+/* sanitize a received message
+ * if a message gets to large during sanitization, it is truncated. This is
+ * as specified in the upcoming syslog RFC series.
+ * rgerhards, 2008-10-09
+ * We check if we have a NUL character at the very end of the
+ * message. This seems to be a frequent problem with a number of senders.
+ * So I have now decided to drop these NULs. However, if they are intentional,
+ * that may cause us some problems, e.g. with syslog-sign. On the other hand,
+ * current code always has problems with intentional NULs (as it needs to escape
+ * them to prevent problems with the C string libraries), so that does not
+ * really matter. Just to be on the save side, we'll log destruction of such
+ * NULs in the debug log.
+ * rgerhards, 2007-09-14
+ */
+static inline rsRetVal
+sanitizeMessage(msg_t *pMsg)
+{
+ DEFiRet;
+ uchar *pszMsg;
+ uchar *pDst; /* destination for copy job */
+ size_t lenMsg;
+ size_t iSrc;
+ size_t iDst;
+ size_t iMaxLine;
+
+ assert(pMsg != NULL);
+
+# ifdef USE_NETZIP
+ CHKiRet(uncompressMessage(pMsg));
+# endif
+
+ pszMsg = pMsg->pszRawMsg;
+ lenMsg = pMsg->iLenRawMsg;
+
+ /* remove NUL character at end of message (see comment in function header) */
+ if(pszMsg[lenMsg-1] == '\0') {
+ DBGPRINTF("dropped NUL at very end of message\n");
+ lenMsg--;
+ }
+
+ /* then we check if we need to drop trailing LFs, which often make
+ * their way into syslog messages unintentionally. In order to remain
+ * compatible to recent IETF developments, we allow the user to
+ * turn on/off this handling. rgerhards, 2007-07-23
+ */
+ if(bDropTrailingLF && pszMsg[lenMsg-1] == '\n') {
+ DBGPRINTF("dropped LF at very end of message (DropTrailingLF is set)\n");
+ lenMsg--;
+ }
+
+ /* now copy over the message and sanitize it */
+ /* TODO: can we get cheaper memory alloc? {alloca()?}*/
+ iMaxLine = glbl.GetMaxLine();
+ CHKmalloc(pDst = malloc(sizeof(uchar) * (iMaxLine + 1)));
+ iSrc = iDst = 0;
+ while(iSrc < lenMsg && iDst < iMaxLine) {
+ if(pszMsg[iSrc] == '\0') { /* guard against \0 characters... */
+ /* changed to the sequence (somewhat) proposed in
+ * draft-ietf-syslog-protocol-19. rgerhards, 2006-11-30
+ */
+ if(iDst + 3 < iMaxLine) { /* do we have space? */
+ pDst[iDst++] = cCCEscapeChar;
+ pDst[iDst++] = '0';
+ pDst[iDst++] = '0';
+ pDst[iDst++] = '0';
+ } /* if we do not have space, we simply ignore the '\0'... */
+ /* log an error? Very questionable... rgerhards, 2006-11-30 */
+ /* decided: we do not log an error, it won't help... rger, 2007-06-21 */
+ } else if(bEscapeCCOnRcv && iscntrl((int) pszMsg[iSrc])) {
+ /* we are configured to escape control characters. Please note
+ * that this most probably break non-western character sets like
+ * Japanese, Korean or Chinese. rgerhards, 2007-07-17
+ * Note: sysklogd logs octal values only for DEL and CCs above 127.
+ * For others, it logs ^n where n is the control char converted to an
+ * alphabet character. We like consistency and thus escape it to octal
+ * in all cases. If someone complains, we may change the mode. At least
+ * we known now what's going on.
+ * rgerhards, 2007-07-17
+ */
+ if(iDst + 3 < iMaxLine) { /* do we have space? */
+ pDst[iDst++] = cCCEscapeChar;
+ pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0300) >> 6);
+ pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0070) >> 3);
+ pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0007));
+ } /* again, if we do not have space, we ignore the char - see comment at '\0' */
+ } else {
+ pDst[iDst++] = pszMsg[iSrc];
+ }
+ ++iSrc;
+ }
+ pDst[iDst] = '\0'; /* space *is* reserved for this! */
+
+ /* we have a sanitized string. Let's save it now */
+ free(pMsg->pszRawMsg);
+ if((pMsg->pszRawMsg = malloc((iDst+1) * sizeof(uchar))) == NULL) {
+ /* when we get no new buffer, we use what we already have ;) */
+ pMsg->pszRawMsg = pDst;
+ } else {
+ /* trim buffer */
+ memcpy(pMsg->pszRawMsg, pDst, iDst+1);
+ free(pDst); /* too big! */
+ pMsg->iLenRawMsg = iDst;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/* Parse a received message. The object's rawmsg property is taken and
+ * parsed according to the relevant standards. This can later be
+ * extended to support configured parsers.
+ * rgerhards, 2008-10-09
+ */
+rsRetVal parseMsg(msg_t *pMsg)
+{
+ DEFiRet;
+ uchar *msg;
+ int pri;
+
+ CHKiRet(sanitizeMessage(pMsg));
+
+ /* we needed to sanitize first, because we otherwise do not have a C-string we can print... */
+ DBGPRINTF("msg parser: flags %x, from '%s', msg '%s'\n", pMsg->msgFlags, pMsg->pszRcvFrom, pMsg->pszRawMsg);
+
+ /* pull PRI */
+ pri = DEFUPRI;
+ msg = pMsg->pszRawMsg;
+ if(*msg == '<') {
+ pri = 0;
+ while(isdigit((int) *++msg)) {
+ pri = 10 * pri + (*msg - '0');
+ }
+ if(*msg == '>')
+ ++msg;
+ if(pri & ~(LOG_FACMASK|LOG_PRIMASK))
+ pri = DEFUPRI;
+ }
+ pMsg->iFacility = LOG_FAC(pri);
+ pMsg->iSeverity = LOG_PRI(pri);
+ MsgSetUxTradMsg(pMsg, (char*) msg);
+
+ if(pMsg->bParseHOSTNAME == 0)
+ MsgSetHOSTNAME(pMsg, (char*) pMsg->pszRcvFrom);
+
+ /* rger 2005-11-24 (happy thanksgiving!): we now need to check if we have
+ * a traditional syslog message or one formatted according to syslog-protocol.
+ * We need to apply different parsers depending on that. We use the
+ * -protocol VERSION field for the detection.
+ */
+ if(msg[0] == '1' && msg[1] == ' ') {
+ dbgprintf("Message has syslog-protocol format.\n");
+ setProtocolVersion(pMsg, 1);
+ if(parseRFCSyslogMsg(pMsg, pMsg->msgFlags) == 1) {
+ msgDestruct(&pMsg);
+ ABORT_FINALIZE(RS_RET_ERR); // TODO: we need to handle these cases!
+ }
+ } else { /* we have legacy syslog */
+ dbgprintf("Message has legacy syslog format.\n");
+ setProtocolVersion(pMsg, 0);
+ if(parseLegacySyslogMsg(pMsg, pMsg->msgFlags) == 1) {
+ msgDestruct(&pMsg);
+ ABORT_FINALIZE(RS_RET_ERR); // TODO: we need to handle these cases!
+ }
+ }
+
+ /* finalize message object */
+ pMsg->msgFlags &= ~NEEDS_PARSING; /* this message is now parsed */
+ MsgPrepareEnqueue(pMsg); /* "historical" name - preparese for multi-threading */
+
+finalize_it:
+ RETiRet;
+}
diff --git a/runtime/parser.h b/runtime/parser.h
new file mode 100644
index 00000000..cec9c083
--- /dev/null
+++ b/runtime/parser.h
@@ -0,0 +1,30 @@
+/* header for parser.c
+ * This is not yet an object, but contains all those code necessary to
+ * parse syslog messages.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#ifndef INCLUDED_PARSE_H
+#define INCLUDED_PARSE_H
+
+extern rsRetVal parserClassInit(void);
+extern rsRetVal parseMsg(msg_t*);
+
+#endif /* #ifndef INCLUDED_PARSE_H */
diff --git a/runtime/queue.c b/runtime/queue.c
index 93b33967..c4a0fad2 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -49,20 +49,26 @@
#include "obj.h"
#include "wtp.h"
#include "wti.h"
+#include "atomic.h"
+
+#ifdef OS_SOLARIS
+# include <sched.h>
+# define pthread_yield() sched_yield()
+#endif
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
/* forward-definitions */
-rsRetVal queueChkPersist(queue_t *pThis);
-static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
-static rsRetVal queueRateLimiter(queue_t *pThis);
-static int queueChkStopWrkrDA(queue_t *pThis);
-static int queueIsIdleDA(queue_t *pThis);
-static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
+rsRetVal qqueueChkPersist(qqueue_t *pThis);
+static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
+static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
+static int qqueueChkStopWrkrDA(qqueue_t *pThis);
+static int qqueueIsIdleDA(qqueue_t *pThis);
+static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
+static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -76,7 +82,7 @@ static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
* rgerhards, 2008-01-29
*/
static inline int
-queueGetOverallQueueSize(queue_t *pThis)
+qqueueGetOverallQueueSize(qqueue_t *pThis)
{
#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
BEGINfunc
@@ -95,7 +101,7 @@ ENDfunc
* This function returns void, as it makes no sense to communicate an error back, even if
* it happens.
*/
-static inline void queueDrain(queue_t *pThis)
+static inline void queueDrain(qqueue_t *pThis)
{
void *pUsr;
@@ -118,26 +124,26 @@ static inline void queueDrain(queue_t *pThis)
* this point in time. The mutex must be locked when
* ths function is called. -- rgerhards, 2008-01-25
*/
-static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
+static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
{
DEFiRet;
int iMaxWorkers;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(!pThis->bEnqOnly) {
if(pThis->bRunsDA) {
/* if we have not yet reached the high water mark, there is no need to start a
* worker. -- rgerhards, 2008-01-26
*/
- if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
+ if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
}
} else {
if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
} else {
- iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
}
@@ -152,11 +158,11 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
* rgerhards, 2008-02-27
*/
static rsRetVal
-queueWaitDAModeInitialized(queue_t *pThis)
+qqueueWaitDAModeInitialized(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
while(pThis->bRunsDA != 2) {
@@ -178,17 +184,17 @@ queueWaitDAModeInitialized(queue_t *pThis)
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueTurnOffDAMode(queue_t *pThis)
+qqueueTurnOffDAMode(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
*/
- queueWaitDAModeInitialized(pThis);
+ qqueueWaitDAModeInitialized(pThis);
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
@@ -207,15 +213,15 @@ queueTurnOffDAMode(queue_t *pThis)
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
*/
- queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
+ qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
iRet);
/* now we need to check if the regular queue has some messages. This may be the case
* when it is waiting that the high water mark is reached again. If so, we need to start up
* a regular worker. -- rgerhards, 2008-01-26
*/
- if(queueGetOverallQueueSize(pThis) > 0) {
- queueAdviseMaxWorkers(pThis);
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
+ qqueueAdviseMaxWorkers(pThis);
}
}
@@ -231,11 +237,11 @@ queueTurnOffDAMode(queue_t *pThis)
* rgerhards, 2008-01-14
*/
static rsRetVal
-queueChkIsDA(queue_t *pThis)
+qqueueChkIsDA(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
@@ -259,18 +265,18 @@ queueChkIsDA(queue_t *pThis)
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueStartDA(queue_t *pThis)
+qqueueStartDA(qqueue_t *pThis)
{
DEFiRet;
uchar pszDAQName[128];
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
FINALIZE; /* ... then we are already done! */
/* create message queue */
- CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
+ CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
/* give it a name */
snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis));
@@ -281,30 +287,30 @@ queueStartDA(queue_t *pThis)
*/
pThis->pqDA->pqParent = pThis;
- CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
- CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
- CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
- CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
- CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
- CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
- CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
- CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
- CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
- CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
- CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
- CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
+ CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr));
+ CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
+ CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
+ CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
+ CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
+ CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
+ CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
+ CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
+ CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
+ CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
+ CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
+ CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
- CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
} else {
/* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
* have an obviously large backlog, we can't finish it in any case. So there is no point
* in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
*/
- CHKiRet(queueSettoQShutdown(pThis->pqDA, 1));
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1));
}
- iRet = queueStart(pThis->pqDA);
+ iRet = qqueueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
@@ -322,12 +328,12 @@ queueStartDA(queue_t *pThis)
pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
- queueGetID(pThis->pqDA));
+ qqueueGetID(pThis->pqDA));
finalize_it:
if(iRet != RS_RET_OK) {
if(pThis->pqDA != NULL) {
- queueDestruct(&pThis->pqDA);
+ qqueueDestruct(&pThis->pqDA);
}
dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
pThis->bIsDA = 0;
@@ -344,7 +350,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static inline rsRetVal
-queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
+qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -362,12 +368,12 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
@@ -400,14 +406,14 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-queueChkStrtDA(queue_t *pThis)
+qqueueChkStrtDA(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* if we do not hit the high water mark, we have nothing to do */
- if(queueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
+ if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
ABORT_FINALIZE(RS_RET_OK);
if(pThis->bRunsDA) {
@@ -421,15 +427,15 @@ queueChkStrtDA(queue_t *pThis)
* we need at least one).
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
- queueGetOverallQueueSize(pThis));
- queueAdviseMaxWorkers(pThis);
+ qqueueGetOverallQueueSize(pThis));
+ qqueueAdviseMaxWorkers(pThis);
} else {
/* this is the case when we are currently not running in DA mode. So it is time
* to turn it back on.
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
- queueGetOverallQueueSize(pThis));
- queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
+ qqueueGetOverallQueueSize(pThis));
+ qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
finalize_it:
@@ -447,7 +453,7 @@ finalize_it:
*/
/* -------------------- fixed array -------------------- */
-static rsRetVal qConstructFixedArray(queue_t *pThis)
+static rsRetVal qConstructFixedArray(qqueue_t *pThis)
{
DEFiRet;
@@ -463,14 +469,14 @@ static rsRetVal qConstructFixedArray(queue_t *pThis)
pThis->tVars.farray.head = 0;
pThis->tVars.farray.tail = 0;
- queueChkIsDA(pThis);
+ qqueueChkIsDA(pThis);
finalize_it:
RETiRet;
}
-static rsRetVal qDestructFixedArray(queue_t *pThis)
+static rsRetVal qDestructFixedArray(qqueue_t *pThis)
{
DEFiRet;
@@ -485,7 +491,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis)
}
-static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
+static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
{
DEFiRet;
@@ -498,7 +504,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
RETiRet;
}
-static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
+static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out)
{
DEFiRet;
@@ -517,7 +523,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
/* first some generic functions which are also used for the unget linked list */
-static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
+static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
{
DEFiRet;
qLinkedList_t *pEntry;
@@ -543,7 +549,7 @@ finalize_it:
RETiRet;
}
-static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
+static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
{
DEFiRet;
qLinkedList_t *pEntry;
@@ -570,7 +576,7 @@ static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t
/* end generic functions which are also used for the unget linked list */
-static rsRetVal qConstructLinkedList(queue_t *pThis)
+static rsRetVal qConstructLinkedList(qqueue_t *pThis)
{
DEFiRet;
@@ -579,13 +585,13 @@ static rsRetVal qConstructLinkedList(queue_t *pThis)
pThis->tVars.linklist.pRoot = 0;
pThis->tVars.linklist.pLast = 0;
- queueChkIsDA(pThis);
+ qqueueChkIsDA(pThis);
RETiRet;
}
-static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
{
DEFiRet;
@@ -598,11 +604,11 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
RETiRet;
}
-static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
+static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
- iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
+ iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
#if 0
qLinkedList_t *pEntry;
@@ -626,10 +632,10 @@ finalize_it:
RETiRet;
}
-static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr)
+static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
- iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
+ iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
#if 0
qLinkedList_t *pEntry;
@@ -656,11 +662,11 @@ static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr)
static rsRetVal
-queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t __attribute__((unused)) *pThis)
+qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pStrm, strm);
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
CHKiRet(strmSetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
finalize_it:
RETiRet;
@@ -672,14 +678,14 @@ finalize_it:
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueHaveQIF(queue_t *pThis)
+qqueueHaveQIF(qqueue_t *pThis)
{
DEFiRet;
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
struct stat stat_buf;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pszFilePrefix == NULL)
ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
@@ -709,7 +715,7 @@ finalize_it:
* rgerhards, 2008-01-11
*/
static rsRetVal
-queueTryLoadPersistedInfo(queue_t *pThis)
+qqueueTryLoadPersistedInfo(qqueue_t *pThis)
{
DEFiRet;
strm_t *psQIF = NULL;
@@ -719,7 +725,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
int iUngottenObjs;
obj_t *pUsr;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
@@ -754,15 +760,15 @@ queueTryLoadPersistedInfo(queue_t *pThis)
while(iUngottenObjs > 0) {
/* fill the queue from disk */
CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
+ qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
--iUngottenObjs; /* one less */
}
/* and now the stream objects (some order as when persisted!) */
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
- (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+ (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF,
- (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+ (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
@@ -792,7 +798,7 @@ finalize_it:
* allowed file size at this point - that should be a config setting...
* rgerhards, 2008-01-10
*/
-static rsRetVal qConstructDisk(queue_t *pThis)
+static rsRetVal qConstructDisk(qqueue_t *pThis)
{
DEFiRet;
int bRestarted = 0;
@@ -800,7 +806,7 @@ static rsRetVal qConstructDisk(queue_t *pThis)
ASSERT(pThis != NULL);
/* and now check if there is some persistent information that needs to be read in */
- iRet = queueTryLoadPersistedInfo(pThis);
+ iRet = qqueueTryLoadPersistedInfo(pThis);
if(iRet == RS_RET_OK)
bRestarted = 1;
else if(iRet != RS_RET_FILE_NOT_FOUND)
@@ -842,7 +848,7 @@ finalize_it:
}
-static rsRetVal qDestructDisk(queue_t *pThis)
+static rsRetVal qDestructDisk(qqueue_t *pThis)
{
DEFiRet;
@@ -854,7 +860,7 @@ static rsRetVal qDestructDisk(queue_t *pThis)
RETiRet;
}
-static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
+static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
number_t nWriteCount;
@@ -881,7 +887,7 @@ finalize_it:
RETiRet;
}
-static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
+static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr)
{
DEFiRet;
@@ -912,18 +918,18 @@ finalize_it:
}
/* -------------------- direct (no queueing) -------------------- */
-static rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
-static rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
-static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
+static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
@@ -940,7 +946,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
RETiRet;
}
-static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
{
return RS_RET_OK;
}
@@ -955,12 +961,12 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__
* rgerhards, 2008-01-20
*/
static rsRetVal
-queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
+qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
The second time I noticed it the queue was in destruction with NO worker threads
running. The pUsr ptr was totally off and provided no clue what it may be pointing
@@ -969,7 +975,7 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
+ iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
++pThis->iUngottenObjs; /* indicate one more */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -985,14 +991,14 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
* rgerhards, 2008-01-29
*/
static rsRetVal
-queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
+qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(ppUsr != NULL);
- iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
+ iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
--pThis->iUngottenObjs; /* indicate one less */
dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
@@ -1006,7 +1012,7 @@ queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
* things truely different. -- rgerhards, 2008-02-12
*/
static rsRetVal
-queueAdd(queue_t *pThis, void *pUsr)
+qqueueAdd(qqueue_t *pThis, void *pUsr)
{
DEFiRet;
@@ -1015,7 +1021,7 @@ queueAdd(queue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ++pThis->iQueueSize;
+ ATOMIC_INC(pThis->iQueueSize);
dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
}
@@ -1029,7 +1035,7 @@ finalize_it:
* ungotten list and, if so, dequeue it first.
*/
static rsRetVal
-queueDel(queue_t *pThis, void *pUsr)
+qqueueDel(qqueue_t *pThis, void *pUsr)
{
DEFiRet;
@@ -1041,10 +1047,10 @@ queueDel(queue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
if(pThis->iUngottenObjs > 0) {
- iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
+ iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
- --pThis->iQueueSize;
+ ATOMIC_DEC(pThis->iQueueSize);
}
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
@@ -1065,14 +1071,14 @@ queueDel(queue_t *pThis, void *pUsr)
* complex) if each would have its own shutdown. The function does not self check
* this condition - the caller must make sure it is not called with a parent.
*/
-static rsRetVal queueShutdownWorkers(queue_t *pThis)
+static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
{
DEFiRet;
DEFVARS_mutexProtection;
struct timespec tTimeout;
rsRetVal iRetLocal;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
@@ -1086,7 +1092,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* first try to shutdown the queue within the regular shutdown period */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(queueGetOverallQueueSize(pThis) > 0) {
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
if(pThis->bRunsDA) {
/* We may have waited on the low water mark. As it may have changed, we
* see if we reactivate the worker.
@@ -1124,7 +1130,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
if(pThis->bRunsDA) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
- queueGetID(pThis->pqDA));
+ qqueueGetID(pThis->pqDA));
/* we use the same absolute timeout as above, so we do not use more than the configured
* timeout interval!
*/
@@ -1153,19 +1159,19 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
if(pThis->bRunsDA)
- queueWaitDAModeInitialized(pThis);
+ qqueueWaitDAModeInitialized(pThis);
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
+ if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
- queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
+ qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
} else {
/* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
* but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
*/
- queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
+ qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
/* make sure we do not timeout before we are done */
@@ -1187,7 +1193,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* they will automatically terminate as there no longer is any message left to process.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(queueGetOverallQueueSize(pThis) > 0) {
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1256,7 +1262,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", queueGetOverallQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis));
RETiRet;
}
@@ -1268,22 +1274,23 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* is done by queueStart(). The reason is that we want to give the caller a chance
* to modify some parameters before the queue is actually started.
*/
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
+rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
{
DEFiRet;
- queue_t *pThis;
+ qqueue_t *pThis;
ASSERT(ppThis != NULL);
ASSERT(pConsumer != NULL);
ASSERT(iWorkerThreads >= 0);
- if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) {
+ if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
/* we have an object, so let's fill the properties */
objConstructSetObjInfo(pThis);
+ pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
if((pThis->pszSpoolDir = (uchar*) strdup((char*)glbl.GetWorkDir())) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -1314,7 +1321,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDel = (rsRetVal (*)(queue_t*,void**)) qDelLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1341,25 +1348,25 @@ finalize_it:
/* cancellation cleanup handler for queueWorker ()
* Updates admin structure and frees ressources.
* Params:
- * arg1 - user pointer (in this case a queue_t)
+ * arg1 - user pointer (in this case a qqueue_t)
* arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
* Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists!
* rgerhards, 2008-01-16
*/
static rsRetVal
-queueConsumerCancelCleanup(void *arg1, void *arg2)
+qqueueConsumerCancelCleanup(void *arg1, void *arg2)
{
DEFiRet;
- queue_t *pThis = (queue_t*) arg1;
+ qqueue_t *pThis = (qqueue_t*) arg1;
obj_t *pUsr = (obj_t*) arg2;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX));
+ CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
}
finalize_it:
@@ -1381,13 +1388,13 @@ finalize_it:
* the return state!
* rgerhards, 2008-01-24
*/
-static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
{
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_assert(pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
@@ -1412,7 +1419,7 @@ finalize_it:
* rgerhards, 2008-10-21
*/
static rsRetVal
-queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
void *pUsr;
@@ -1420,9 +1427,9 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
int bRunsDA; /* cache for early mutex release */
/* dequeue element (still protected from mutex) */
- iRet = queueDel(pThis, &pUsr);
- queueChkPersist(pThis);
- iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */
+ iRet = qqueueDel(pThis, &pUsr);
+ qqueueChkPersist(pThis);
+ iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
/* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
@@ -1473,7 +1480,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
* provide real-time creation of spool files.
* Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
*/
- CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
@@ -1522,7 +1529,7 @@ finalize_it:
* but you get the idea from the code above.
*/
static rsRetVal
-queueRateLimiter(queue_t *pThis)
+qqueueRateLimiter(qqueue_t *pThis)
{
DEFiRet;
int iDelay;
@@ -1530,9 +1537,7 @@ queueRateLimiter(queue_t *pThis)
time_t tCurr;
struct tm m;
- ISOBJ_TYPE_assert(pThis, queue);
-
- dbgoprint((obj_t*) pThis, "entering rate limiter\n");
+ ISOBJ_TYPE_assert(pThis, qqueue);
iDelay = 0;
if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
@@ -1587,14 +1592,14 @@ queueRateLimiter(queue_t *pThis)
* rgerhards, 2008-01-21
*/
static rsRetVal
-queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
/* we now need to check if we should deliberately delay processing a bit
@@ -1621,15 +1626,15 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static rsRetVal
-queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
+ CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1645,7 +1650,7 @@ finalize_it:
* the DA queue
*/
static int
-queueChkStopWrkrDA(queue_t *pThis)
+qqueueChkStopWrkrDA(qqueue_t *pThis)
{
/* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
* until we have done so.
@@ -1664,7 +1669,7 @@ queueChkStopWrkrDA(queue_t *pThis)
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
/* this queue can never grow, so we can give up... */
bStopWrkr = 1;
- } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
+ } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
bStopWrkr = 0;
@@ -1687,9 +1692,9 @@ queueChkStopWrkrDA(queue_t *pThis)
* the DA queue
*/
static int
-queueChkStopWrkrReg(queue_t *pThis)
+qqueueChkStopWrkrReg(qqueue_t *pThis)
{
- return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && queueGetOverallQueueSize(pThis) == 0);
+ return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
}
@@ -1697,26 +1702,26 @@ queueChkStopWrkrReg(queue_t *pThis)
* are not stable! DA queue version
*/
static int
-queueIsIdleDA(queue_t *pThis)
+qqueueIsIdleDA(qqueue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
*/
static int
-queueIsIdleReg(queue_t *pThis)
+qqueueIsIdleReg(qqueue_t *pThis)
{
#if 0 /* enable for performance testing */
int ret;
- ret = queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
+ ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
if(ret) fprintf(stderr, "queue is idle\n");
return ret;
#else
/* regular code! */
- return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
#endif
}
@@ -1735,11 +1740,11 @@ queueIsIdleReg(queue_t *pThis)
* I am telling this, because I, too, always get confused by those...
*/
static rsRetVal
-queueRegOnWrkrShutdown(queue_t *pThis)
+qqueueRegOnWrkrShutdown(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pqParent != NULL) {
pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
@@ -1756,11 +1761,11 @@ queueRegOnWrkrShutdown(queue_t *pThis)
* hook to indicate in the parent queue (if we are a child) that we are not done yet.
*/
static rsRetVal
-queueRegOnWrkrStartup(queue_t *pThis)
+qqueueRegOnWrkrStartup(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pqParent != NULL) {
pThis->pqParent->bChildIsDone = 0;
@@ -1773,7 +1778,7 @@ queueRegOnWrkrStartup(queue_t *pThis)
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
-rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
+rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
rsRetVal iRetLocal;
@@ -1811,7 +1816,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
- queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1);
+ qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1822,13 +1827,13 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1841,10 +1846,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
/* If we are disk-assisted, we need to check if there is a QIF file
* which we need to load. -- rgerhards, 2008-01-15
*/
- iRetLocal = queueHaveQIF(pThis);
+ iRetLocal = qqueueHaveQIF(pThis);
if(iRetLocal == RS_RET_OK) {
dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
- queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
+ qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
/* TODO: use logerror? -- rgerhards, 2008-01-16 */
@@ -1861,7 +1866,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
/* if the queue already contains data, we need to start the correct number of worker threads. This can be
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
*/
- queueAdviseMaxWorkers(pThis);
+ qqueueAdviseMaxWorkers(pThis);
pThis->bQueueStarted = 1;
finalize_it:
@@ -1876,7 +1881,7 @@ finalize_it:
* and 0 otherwise.
* rgerhards, 2008-01-10
*/
-static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
+static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
{
DEFiRet;
strm_t *psQIF = NULL; /* Queue Info File */
@@ -1887,7 +1892,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
ASSERT(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK) {
- if(queueGetOverallQueueSize(pThis) > 0) {
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
/* This error code is OK, but we will probably not implement this any time
* The reason is that persistence happens via DA queues. But I would like to
* leave the code as is, as we so have a hook in case we need one.
@@ -1898,13 +1903,13 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
FINALIZE; /* if the queue is empty, we are happy and done... */
}
- dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis));
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
- if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) {
+ if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
pThis->bNeedDelQIF = 0;
@@ -1938,7 +1943,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
* to the regular files. -- rgerhards, 2008-01-29
*/
while(pThis->iUngottenObjs > 0) {
- CHKiRet(queueGetUngottenObj(pThis, &pUsr));
+ CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
objDestruct(pUsr);
}
@@ -1972,14 +1977,14 @@ finalize_it:
* abide to our regular call interface)...
* rgerhards, 2008-01-13
*/
-rsRetVal queueChkPersist(queue_t *pThis)
+rsRetVal qqueueChkPersist(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
- queuePersist(pThis, QUEUE_CHECKPOINT);
+ qqueuePersist(pThis, QUEUE_CHECKPOINT);
pThis->iUpdsSincePersist = 0;
}
@@ -1988,8 +1993,8 @@ rsRetVal queueChkPersist(queue_t *pThis)
/* destructor for the queue object */
-BEGINobjDestruct(queue) /* be sure to specify the object type also in END and CODESTART macros! */
-CODESTARTobjDestruct(queue)
+BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
/* shut down all workers (handles *all* of the persistence logic)
@@ -1999,7 +2004,7 @@ CODESTARTobjDestruct(queue)
* with a child! -- rgerhards, 2008-01-28
*/
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
- queueShutdownWorkers(pThis);
+ qqueueShutdownWorkers(pThis);
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
@@ -2024,7 +2029,7 @@ CODESTARTobjDestruct(queue)
wtpDestruct(&pThis->pWtpDA);
}
if(pThis->pqDA != NULL) {
- queueDestruct(&pThis->pqDA);
+ qqueueDestruct(&pThis->pqDA);
}
/* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
@@ -2034,7 +2039,7 @@ CODESTARTobjDestruct(queue)
* disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
* if need arises (what I doubt...) -- rgerhards, 2008-01-25
*/
- CHKiRet_Hdlr(queuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
+ CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
}
@@ -2059,7 +2064,7 @@ CODESTARTobjDestruct(queue)
if(pThis->pszSpoolDir != NULL)
free(pThis->pszSpoolDir);
-ENDobjDestruct(queue)
+ENDobjDestruct(qqueue)
/* set the queue's file prefix
@@ -2068,7 +2073,7 @@ ENDobjDestruct(queue)
* rgerhards, 2008-01-09
*/
rsRetVal
-queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
+qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
@@ -2091,11 +2096,11 @@ finalize_it:
* rgerhards, 2008-01-09
*/
rsRetVal
-queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize)
+qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(iMaxFileSize < 1024) {
ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW);
@@ -2112,13 +2117,22 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
{
DEFiRet;
int iCancelStateSave;
struct timespec t;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ /* first check if we need to discard this message (which will cause CHKiRet() to exit)
+ * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize
+ * and bRunsDA parameters may not reflect the correct settings here, but they are
+ * "good enough" in the sense that they can be used to drive the decision. Valgrind's
+ * threading tools may point this access to be an error, but this is done
+ * intentional. I do not see this causes problems to us.
+ */
+ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
@@ -2131,12 +2145,9 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
d_pthread_mutex_lock(pThis->mut);
}
- /* first check if we need to discard this message (which will cause CHKiRet() to exit) */
- CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
-
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
- CHKiRet(queueChkStrtDA(pThis));
+ CHKiRet(qqueueChkStrtDA(pThis));
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@@ -2189,17 +2200,24 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
}
/* and finally enqueue the message */
- CHKiRet(queueAdd(pThis, pUsr));
- queueChkPersist(pThis);
+ CHKiRet(qqueueAdd(pThis, pUsr));
+ qqueueChkPersist(pThis);
finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
- queueAdviseMaxWorkers(pThis);
- dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
+ qqueueAdviseMaxWorkers(pThis);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+ dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
+ /* the following pthread_yield is experimental, but brought us performance
+ * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216
+ * rgerhards, 2008-10-09
+ * but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22
+ */
+ if(pThis->bOptimizeUniProc)
+ pthread_yield();
}
RETiRet;
@@ -2215,12 +2233,12 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex)
+qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* for simplicity, we do one big mutex lock. This method is extremely seldom
* called, so that doesn't matter... -- rgerhards, 2008-01-16
@@ -2259,24 +2277,24 @@ finalize_it:
/* some simple object access methods */
-DEFpropSetMeth(queue, iPersistUpdCnt, int)
-DEFpropSetMeth(queue, iDeqtWinFromHr, int)
-DEFpropSetMeth(queue, iDeqtWinToHr, int)
-DEFpropSetMeth(queue, toQShutdown, long)
-DEFpropSetMeth(queue, toActShutdown, long)
-DEFpropSetMeth(queue, toWrkShutdown, long)
-DEFpropSetMeth(queue, toEnq, long)
-DEFpropSetMeth(queue, iHighWtrMrk, int)
-DEFpropSetMeth(queue, iLowWtrMrk, int)
-DEFpropSetMeth(queue, iDiscardMrk, int)
-DEFpropSetMeth(queue, iFullDlyMrk, int)
-DEFpropSetMeth(queue, iDiscardSeverity, int)
-DEFpropSetMeth(queue, bIsDA, int)
-DEFpropSetMeth(queue, iMinMsgsPerWrkr, int)
-DEFpropSetMeth(queue, bSaveOnShutdown, int)
-DEFpropSetMeth(queue, pUsr, void*)
-DEFpropSetMeth(queue, iDeqSlowdown, int)
-DEFpropSetMeth(queue, sizeOnDiskMax, int64)
+DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
+DEFpropSetMeth(qqueue, iDeqtWinFromHr, int)
+DEFpropSetMeth(qqueue, iDeqtWinToHr, int)
+DEFpropSetMeth(qqueue, toQShutdown, long)
+DEFpropSetMeth(qqueue, toActShutdown, long)
+DEFpropSetMeth(qqueue, toWrkShutdown, long)
+DEFpropSetMeth(qqueue, toEnq, long)
+DEFpropSetMeth(qqueue, iHighWtrMrk, int)
+DEFpropSetMeth(qqueue, iLowWtrMrk, int)
+DEFpropSetMeth(qqueue, iDiscardMrk, int)
+DEFpropSetMeth(qqueue, iFullDlyMrk, int)
+DEFpropSetMeth(qqueue, iDiscardSeverity, int)
+DEFpropSetMeth(qqueue, bIsDA, int)
+DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
+DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
+DEFpropSetMeth(qqueue, pUsr, void*)
+DEFpropSetMeth(qqueue, iDeqSlowdown, int)
+DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
/* This function can be used as a generic way to set properties. Only the subset
@@ -2285,11 +2303,11 @@ DEFpropSetMeth(queue, sizeOnDiskMax, int64)
* rgerhards, 2008-01-11
*/
#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1)
-static rsRetVal queueSetProperty(queue_t *pThis, var_t *pProp)
+static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pProp != NULL);
if(isProp("iQueueSize")) {
@@ -2311,19 +2329,19 @@ finalize_it:
#undef isProp
/* dummy */
-rsRetVal queueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
+rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
/* Initialize the stream class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-09
*/
-BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE)
+BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
/* now set our own handlers */
- OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
-ENDObjClassInit(queue)
+ OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty);
+ENDObjClassInit(qqueue)
/* vi:set ai:
*/
diff --git a/runtime/queue.h b/runtime/queue.h
index 9e75b31b..a267862d 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -58,6 +58,7 @@ typedef struct qWrkThrd_s {
typedef struct queue_s {
BEGINobjInstance;
queueType_t qType;
+ int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
@@ -159,7 +160,7 @@ typedef struct queue_s {
strm_t *pRead; /* current file to be read */
} disk;
} tVars;
-} queue_t;
+} qqueue_t;
/* some symbolic constants for easier reference */
#define QUEUE_MODE_ENQDEQ 0
@@ -176,30 +177,30 @@ typedef struct queue_s {
#define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000
/* prototypes */
-rsRetVal queueDestruct(queue_t **ppThis);
-rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flwCtlType, void *pUsr);
-rsRetVal queueStart(queue_t *pThis);
-rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize);
-rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
+rsRetVal qqueueDestruct(qqueue_t **ppThis);
+rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
+rsRetVal qqueueStart(qqueue_t *pThis);
+rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
+rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
+rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
-PROTOTYPEObjClassInit(queue);
-PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int);
-PROTOTYPEpropSetMeth(queue, iDeqtWinFromHr, int);
-PROTOTYPEpropSetMeth(queue, iDeqtWinToHr, int);
-PROTOTYPEpropSetMeth(queue, toQShutdown, long);
-PROTOTYPEpropSetMeth(queue, toActShutdown, long);
-PROTOTYPEpropSetMeth(queue, toWrkShutdown, long);
-PROTOTYPEpropSetMeth(queue, toEnq, long);
-PROTOTYPEpropSetMeth(queue, iHighWtrMrk, int);
-PROTOTYPEpropSetMeth(queue, iLowWtrMrk, int);
-PROTOTYPEpropSetMeth(queue, iDiscardMrk, int);
-PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int);
-PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int);
-PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int);
-PROTOTYPEpropSetMeth(queue, pUsr, void*);
-PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int);
-PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, int64);
-#define queueGetID(pThis) ((unsigned long) pThis)
+PROTOTYPEObjClassInit(qqueue);
+PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
+PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
+PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int);
+PROTOTYPEpropSetMeth(qqueue, toQShutdown, long);
+PROTOTYPEpropSetMeth(qqueue, toActShutdown, long);
+PROTOTYPEpropSetMeth(qqueue, toWrkShutdown, long);
+PROTOTYPEpropSetMeth(qqueue, toEnq, long);
+PROTOTYPEpropSetMeth(qqueue, iHighWtrMrk, int);
+PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int);
+PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int);
+PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int);
+PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int);
+PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
+PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
+PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
+PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
+#define qqueueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index 54db12c2..8df100a1 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -157,7 +157,7 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF)
if(ppErrObj != NULL) *ppErrObj = "wtp";
CHKiRet(wtpClassInit(NULL));
if(ppErrObj != NULL) *ppErrObj = "queue";
- CHKiRet(queueClassInit(NULL));
+ CHKiRet(qqueueClassInit(NULL));
if(ppErrObj != NULL) *ppErrObj = "vmstk";
CHKiRet(vmstkClassInit(NULL));
if(ppErrObj != NULL) *ppErrObj = "sysvar";
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 70af8c4f..032d8c04 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -252,7 +252,12 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_QUEUE_FULL = -2105, /**< queue is full, operation could not be completed */
RS_RET_ACCEPT_ERR = -2106, /**< error during accept() system call */
RS_RET_INVLD_TIME = -2107, /**< invalid timestamp (e.g. could not be parsed) */
+ RS_RET_NO_ZIP = -2108, /**< ZIP functionality is not present */
RS_RET_CODE_ERR = -2109, /**< program code (internal) error */
+ RS_RET_FUNC_NO_LPAREN = -2110, /**< left parenthesis missing after function call (rainerscript) */
+ RS_RET_FUNC_MISSING_EXPR = -2111, /**< no expression after comma in function call (rainerscript) */
+ RS_RET_INVLD_NBR_ARGUMENTS = -2112, /**< invalid number of arguments for function call (rainerscript) */
+ RS_RET_INVLD_FUNC = -2113, /**< invalid function name for function call (rainerscript) */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
@@ -336,6 +341,11 @@ typedef enum rsObjectID rsObjID;
# define __attribute__(x) /*NOTHING*/
#endif
+#ifndef O_CLOEXEC
+/* of course, this limits the functionality... */
+# define O_CLOEXEC 0
+#endif
+
/* The following prototype is convenient, even though it may not be the 100% correct place.. -- rgerhards 2008-01-07 */
void dbgprintf(char *, ...) __attribute__((format(printf, 1, 2)));
diff --git a/runtime/srutils.c b/runtime/srutils.c
index 97cc3252..d01ca20d 100644
--- a/runtime/srutils.c
+++ b/runtime/srutils.c
@@ -371,6 +371,7 @@ int getNumberDigits(long lNum)
rsRetVal
timeoutComp(struct timespec *pt, long iTimeout)
{
+ BEGINfunc
assert(pt != NULL);
/* compute timeout */
clock_gettime(CLOCK_REALTIME, pt);
@@ -379,6 +380,7 @@ timeoutComp(struct timespec *pt, long iTimeout)
pt->tv_nsec -= 1000000000;
}
pt->tv_sec += iTimeout / 1000;
+ ENDfunc
return RS_RET_OK; /* so far, this is static... */
}
@@ -393,6 +395,7 @@ timeoutVal(struct timespec *pt)
{
struct timespec t;
long iTimeout;
+ BEGINfunc
assert(pt != NULL);
/* compute timeout */
@@ -403,6 +406,7 @@ timeoutVal(struct timespec *pt)
if(iTimeout < 0)
iTimeout = 0;
+ ENDfunc
return iTimeout;
}
@@ -454,7 +458,7 @@ srSleep(int iSeconds, int iuSeconds)
* Added 2008-01-30
*/
char *rs_strerror_r(int errnum, char *buf, size_t buflen) {
-#ifdef __hpux
+#ifndef HAVE_STRERROR_R
char *pszErr;
pszErr = strerror(errnum);
snprintf(buf, buflen, "%s", pszErr);
diff --git a/runtime/stringbuf.c b/runtime/stringbuf.c
index 63b42348..07256fab 100644
--- a/runtime/stringbuf.c
+++ b/runtime/stringbuf.c
@@ -694,6 +694,7 @@ int rsCStrCaseInsensitveStartsWithSzStr(cstr_t *pCS1, uchar *psz, size_t iLenSz)
return -1; /* pCS1 is less then psz */
}
+
/* check if a CStr object matches a regex.
* msamia@redhat.com 2007-07-12
* @return returns 0 if matched
@@ -701,25 +702,54 @@ int rsCStrCaseInsensitveStartsWithSzStr(cstr_t *pCS1, uchar *psz, size_t iLenSz)
* rgerhards, 2007-07-16: bug is no real bug, because rsyslogd ensures there
* never is a \0 *inside* a property string.
* Note that the function returns -1 if regexp functionality is not available.
- * TODO: change calling interface! -- rgerhards, 2008-03-07
+ * rgerhards: 2009-03-04: ERE support added, via parameter iType: 0 - BRE, 1 - ERE
+ * Arnaud Cornet/rgerhards: 2009-04-02: performance improvement by caching compiled regex
+ * If a caller does not need the cached version, it must still provide memory for it
+ * and must call rsCStrRegexDestruct() afterwards.
*/
-int rsCStrSzStrMatchRegex(cstr_t *pCS1, uchar *psz)
+rsRetVal rsCStrSzStrMatchRegex(cstr_t *pCS1, uchar *psz, int iType, void *rc)
{
- regex_t preq;
+ regex_t **cache = (regex_t**) rc;
int ret;
+ DEFiRet;
- BEGINfunc
+ assert(pCS1 != NULL);
+ assert(psz != NULL);
+ assert(cache != NULL);
if(objUse(regexp, LM_REGEXP_FILENAME) == RS_RET_OK) {
- regexp.regcomp(&preq, (char*) rsCStrGetSzStr(pCS1), 0);
- ret = regexp.regexec(&preq, (char*) psz, 0, NULL, 0);
- regexp.regfree(&preq);
+ if (*cache == NULL) {
+ *cache = calloc(sizeof(regex_t), 1);
+ regexp.regcomp(*cache, (char*) rsCStrGetSzStr(pCS1), (iType == 1 ? REG_EXTENDED : 0) | REG_NOSUB);
+ }
+ ret = regexp.regexec(*cache, (char*) psz, 0, NULL, 0);
+ if(ret != 0)
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
} else {
- ret = 1; /* simulate "not found" */
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
}
- ENDfunc
- return ret;
+finalize_it:
+ RETiRet;
+}
+
+
+/* free a cached compiled regex
+ * Caller must provide a pointer to a buffer that was created by
+ * rsCStrSzStrMatchRegexCache()
+ */
+void rsCStrRegexDestruct(void *rc)
+{
+ regex_t **cache = rc;
+
+ assert(cache != NULL);
+ assert(*cache != NULL);
+
+ if(objUse(regexp, LM_REGEXP_FILENAME) == RS_RET_OK) {
+ regexp.regfree(*cache);
+ free(*cache);
+ *cache = NULL;
+ }
}
diff --git a/runtime/stringbuf.h b/runtime/stringbuf.h
index c1966449..684133bb 100644
--- a/runtime/stringbuf.h
+++ b/runtime/stringbuf.h
@@ -136,7 +136,8 @@ int rsCStrCaseInsensitiveLocateInSzStr(cstr_t *pThis, uchar *sz);
int rsCStrStartsWithSzStr(cstr_t *pCS1, uchar *psz, size_t iLenSz);
int rsCStrCaseInsensitveStartsWithSzStr(cstr_t *pCS1, uchar *psz, size_t iLenSz);
int rsCStrSzStrStartsWithCStr(cstr_t *pCS1, uchar *psz, size_t iLenSz);
-int rsCStrSzStrMatchRegex(cstr_t *pCS1, uchar *psz);
+rsRetVal rsCStrSzStrMatchRegex(cstr_t *pCS1, uchar *psz, int iType, void *cache);
+void rsCStrRegexDestruct(void *rc);
rsRetVal rsCStrConvertToNumber(cstr_t *pStr, number_t *pNumber);
rsRetVal rsCStrConvertToBool(cstr_t *pStr, number_t *pBool);
rsRetVal rsCStrAppendCStr(cstr_t *pThis, cstr_t *pstrAppend);
diff --git a/runtime/sysvar.c b/runtime/sysvar.c
index 5eec8f67..c102d1f5 100644
--- a/runtime/sysvar.c
+++ b/runtime/sysvar.c
@@ -84,7 +84,7 @@ getNOW(eNOWType eNow, cstr_t **ppStr)
uchar szBuf[16];
struct syslogTime t;
- datetime.getCurrTime(&t);
+ datetime.getCurrTime(&t, NULL);
switch(eNow) {
case NOW_NOW:
snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "%4.4d-%2.2d-%2.2d", t.year, t.month, t.day);
diff --git a/runtime/vm.c b/runtime/vm.c
index bc6c3dd2..a25476c2 100644
--- a/runtime/vm.c
+++ b/runtime/vm.c
@@ -331,6 +331,34 @@ finalize_it:
ENDop(PUSHSYSVAR)
+/* The function call operation is only very roughly implemented. While the plumbing
+ * to reach this instruction is fine, the instruction itself currently supports only
+ * functions with a single argument AND with a name that we know.
+ * TODO: later, we can add here the real logic, that involves looking up function
+ * names, loading them dynamically ... and all that...
+ * implementation begun 2009-03-10 by rgerhards
+ */
+BEGINop(FUNC_CALL) /* remember to set the instruction also in the ENDop macro! */
+ var_t *numOperands;
+ var_t *operand1;
+ int iStrlen;
+CODESTARTop(FUNC_CALL)
+ vmstk.PopNumber(pThis->pStk, &numOperands);
+ if(numOperands->val.num != 1)
+ ABORT_FINALIZE(RS_RET_INVLD_NBR_ARGUMENTS);
+ vmstk.PopString(pThis->pStk, &operand1); /* guess there's just one ;) */
+ if(!rsCStrSzStrCmp(pOp->operand.pVar->val.pStr, (uchar*) "strlen", 6)) { /* only one supported so far ;) */
+RUNLOG_VAR("%s", rsCStrGetSzStr(operand1->val.pStr));
+ iStrlen = strlen((char*) rsCStrGetSzStr(operand1->val.pStr));
+RUNLOG_VAR("%d", iStrlen);
+ } else
+ ABORT_FINALIZE(RS_RET_INVLD_FUNC);
+ PUSHRESULTop(operand1, iStrlen); // TODO: dummy, FIXME
+ var.Destruct(&numOperands); /* no longer needed */
+finalize_it:
+ENDop(FUNC_CALL)
+
+
/* ------------------------------ end instruction set implementation ------------------------------ */
@@ -412,6 +440,7 @@ execProg(vm_t *pThis, vmprg_t *pProg)
doOP(DIV);
doOP(MOD);
doOP(UNARY_MINUS);
+ doOP(FUNC_CALL);
default:
ABORT_FINALIZE(RS_RET_INVALID_VMOP);
dbgoprint((obj_t*) pThis, "invalid instruction %d in vmprg\n", pCurrOp->opcode);
diff --git a/runtime/vmop.c b/runtime/vmop.c
index fcacb15b..a343481e 100644
--- a/runtime/vmop.c
+++ b/runtime/vmop.c
@@ -61,24 +61,25 @@ rsRetVal vmopConstructFinalize(vmop_t __attribute__((unused)) *pThis)
/* destructor for the vmop object */
BEGINobjDestruct(vmop) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(vmop)
- if( pThis->opcode == opcode_PUSHSYSVAR
- || pThis->opcode == opcode_PUSHMSGVAR
- || pThis->opcode == opcode_PUSHCONSTANT) {
- if(pThis->operand.pVar != NULL)
- var.Destruct(&pThis->operand.pVar);
- }
+ if(pThis->operand.pVar != NULL)
+ var.Destruct(&pThis->operand.pVar);
ENDobjDestruct(vmop)
/* DebugPrint support for the vmop object */
BEGINobjDebugPrint(vmop) /* be sure to specify the object type also in END and CODESTART macros! */
uchar *pOpcodeName;
+ cstr_t *pStrVar;
CODESTARTobjDebugPrint(vmop)
vmopOpcode2Str(pThis, &pOpcodeName);
- dbgoprint((obj_t*) pThis, "opcode: %d\t(%s), next %p, var in next line\n", (int) pThis->opcode, pOpcodeName,
- pThis->pNext);
- if(pThis->operand.pVar != NULL)
- var.DebugPrint(pThis->operand.pVar);
+ CHKiRet(rsCStrConstruct(&pStrVar));
+ CHKiRet(rsCStrFinish(&pStrVar));
+ if(pThis->operand.pVar != NULL) {
+ CHKiRet(var.Obj2Str(pThis->operand.pVar, pStrVar));
+ }
+ dbgoprint((obj_t*) pThis, "%.12s\t%s\n", pOpcodeName, rsCStrGetSzStrNoNULL(pStrVar));
+ rsCStrDestruct(&pStrVar);
+finalize_it:
ENDobjDebugPrint(vmop)
@@ -158,37 +159,37 @@ vmopOpcode2Str(vmop_t *pThis, uchar **ppName)
*ppName = (uchar*) "and";
break;
case opcode_PLUS:
- *ppName = (uchar*) "+";
+ *ppName = (uchar*) "add";
break;
case opcode_MINUS:
- *ppName = (uchar*) "-";
+ *ppName = (uchar*) "sub";
break;
case opcode_TIMES:
- *ppName = (uchar*) "*";
+ *ppName = (uchar*) "mul";
break;
case opcode_DIV:
- *ppName = (uchar*) "/";
+ *ppName = (uchar*) "div";
break;
case opcode_MOD:
- *ppName = (uchar*) "%";
+ *ppName = (uchar*) "mod";
break;
case opcode_NOT:
*ppName = (uchar*) "not";
break;
case opcode_CMP_EQ:
- *ppName = (uchar*) "==";
+ *ppName = (uchar*) "cmp_==";
break;
case opcode_CMP_NEQ:
- *ppName = (uchar*) "!=";
+ *ppName = (uchar*) "cmp_!=";
break;
case opcode_CMP_LT:
- *ppName = (uchar*) "<";
+ *ppName = (uchar*) "cmp_<";
break;
case opcode_CMP_GT:
- *ppName = (uchar*) ">";
+ *ppName = (uchar*) "cmp_>";
break;
case opcode_CMP_LTEQ:
- *ppName = (uchar*) "<=";
+ *ppName = (uchar*) "cmp_<=";
break;
case opcode_CMP_CONTAINS:
*ppName = (uchar*) "contains";
@@ -197,28 +198,31 @@ vmopOpcode2Str(vmop_t *pThis, uchar **ppName)
*ppName = (uchar*) "startswith";
break;
case opcode_CMP_GTEQ:
- *ppName = (uchar*) ">=";
+ *ppName = (uchar*) "cmp_>=";
break;
case opcode_PUSHSYSVAR:
- *ppName = (uchar*) "PUSHSYSVAR";
+ *ppName = (uchar*) "push_sysvar";
break;
case opcode_PUSHMSGVAR:
- *ppName = (uchar*) "PUSHMSGVAR";
+ *ppName = (uchar*) "push_msgvar";
break;
case opcode_PUSHCONSTANT:
- *ppName = (uchar*) "PUSHCONSTANT";
+ *ppName = (uchar*) "push_const";
break;
case opcode_POP:
- *ppName = (uchar*) "POP";
+ *ppName = (uchar*) "pop";
break;
case opcode_UNARY_MINUS:
- *ppName = (uchar*) "UNARY_MINUS";
+ *ppName = (uchar*) "unary_minus";
break;
case opcode_STRADD:
- *ppName = (uchar*) "STRADD";
+ *ppName = (uchar*) "strconcat";
+ break;
+ case opcode_FUNC_CALL:
+ *ppName = (uchar*) "func_call";
break;
default:
- *ppName = (uchar*) "INVALID opcode";
+ *ppName = (uchar*) "!invalid_opcode!";
break;
}
diff --git a/runtime/vmop.h b/runtime/vmop.h
index c3d5d5f4..938b08fd 100644
--- a/runtime/vmop.h
+++ b/runtime/vmop.h
@@ -59,17 +59,44 @@ typedef enum { /* do NOT start at 0 to detect uninitialized types after calloc(
opcode_PUSHMSGVAR = 1002, /* requires var operand */
opcode_PUSHCONSTANT = 1003, /* requires var operand */
opcode_UNARY_MINUS = 1010,
- opcode_END_PROG = 1011
+ opcode_FUNC_CALL = 1012,
+ opcode_END_PROG = 2000
} opcode_t;
+/* Additional doc, operation specific
+
+ FUNC_CALL
+ All parameter passing is via the stack. Parameters are placed onto the stack in reverse order,
+ that means the last parameter is on top of the stack, the first at the bottom location.
+ At the actual top of the stack is the number of parameters. This permits functions to be
+ called with variable number of arguments. The function itself is responsible for poping
+ the right number of parameters of the stack and complaining if the number is incorrect.
+ On exit, a single return value must be pushed onto the stack. The FUNC_CALL operation
+ is generic. Its pVar argument contains the function name string (TODO: very slow, make
+ faster in later releases).
+
+ Sample Function call: sampleFunc(p1, p2, p3) ; returns number 4711 (sample)
+ Stacklayout on entry (order is top to bottom):
+ 3
+ p3
+ p2
+ p1
+ ... other vars ...
+
+ Stack on exit
+ 4711
+ ... other vars ...
+
+ */
+
+
/* the vmop object */
typedef struct vmop_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
opcode_t opcode;
union {
- var_t *pVar;
- /* TODO: add function pointer */
+ var_t *pVar; /* for function call, this is the name (string) of function to be called */
} operand;
struct vmop_s *pNext; /* next operation or NULL, if end of program (logically this belongs to vmprg) */
} vmop_t;
diff --git a/runtime/vmprg.c b/runtime/vmprg.c
index 705e6948..75915025 100644
--- a/runtime/vmprg.c
+++ b/runtime/vmprg.c
@@ -74,7 +74,7 @@ ENDobjDestruct(vmprg)
BEGINobjDebugPrint(vmprg) /* be sure to specify the object type also in END and CODESTART macros! */
vmop_t *pOp;
CODESTARTobjDebugPrint(vmprg)
- dbgoprint((obj_t*) pThis, "program contents:\n");
+ dbgoprint((obj_t*) pThis, "VM Program:\n");
for(pOp = pThis->vmopRoot ; pOp != NULL ; pOp = pOp->pNext) {
vmop.DebugPrint(pOp);
}
diff --git a/runtime/wti.c b/runtime/wti.c
index 343a7227..544bffa7 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -39,15 +39,22 @@
#include <pthread.h>
#include <errno.h>
+#ifdef OS_SOLARIS
+# include <sched.h>
+# define pthread_yield() sched_yield()
+#endif
+
#include "rsyslog.h"
#include "stringbuf.h"
#include "srUtils.h"
#include "wtp.h"
#include "wti.h"
#include "obj.h"
+#include "glbl.h"
/* static data */
DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
/* forward-definitions */
@@ -202,6 +209,7 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
+ pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -303,7 +311,7 @@ wtiWorkerCancelCleanup(void *arg)
pWtp = pThis->pWtp;
ISOBJ_TYPE_assert(pWtp, wtp);
- dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
@@ -371,7 +379,8 @@ wtiWorker(wti_t *pThis)
wtpProcessThrdChanges(pWtp);
pthread_testcancel(); /* see big comment in function header */
# if !defined(__hpux) /* pthread_yield is missing there! */
- pthread_yield(); /* see big comment in function header */
+ if(pThis->bOptimizeUniProc)
+ pthread_yield(); /* see big comment in function header */
# endif
/* if we have a rate-limiter set for this worker pool, let's call it. Please
@@ -395,7 +404,7 @@ wtiWorker(wti_t *pThis)
/* if we reach this point, we are still protected by the mutex */
if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
- dbgprintf("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
if(pWtp->toWrkShutdown == -1) {
@@ -404,7 +413,7 @@ wtiWorker(wti_t *pThis)
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
- dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
bInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
@@ -470,6 +479,14 @@ finalize_it:
/* dummy */
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
+/* exit our class
+ */
+BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(nsdsel_gtls)
+ /* release objects we no longer need */
+ objRelease(glbl, CORE_COMPONENT);
+ENDObjClassExit(wti)
+
/* Initialize the wti class. Must be called as the very first method
* before anything else is called inside this class.
@@ -477,6 +494,7 @@ rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
*/
BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */
/* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
ENDObjClassInit(wti)
/*
diff --git a/runtime/wti.h b/runtime/wti.h
index 0cd6744e..6b60b833 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -31,6 +31,7 @@
/* the worker thread instance class */
typedef struct wti_s {
BEGINobjInstance;
+ int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
pthread_t thrdID; /* thread ID */
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 3beae271..04eb974f 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -40,15 +40,22 @@
#include <unistd.h>
#include <errno.h>
+#ifdef OS_SOLARIS
+# include <sched.h>
+# define pthread_yield() sched_yield()
+#endif
+
#include "rsyslog.h"
#include "stringbuf.h"
#include "srUtils.h"
#include "wtp.h"
#include "wti.h"
#include "obj.h"
+#include "glbl.h"
/* static data */
DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
/* forward-definitions */
@@ -75,6 +82,7 @@ static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
/* Standard-Constructor for the wtp object
*/
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
+ pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
pthread_mutex_init(&pThis->mut, NULL);
pthread_mutex_init(&pThis->mutThrdShutdwn, NULL);
pthread_cond_init(&pThis->condThrdTrm, NULL);
@@ -478,7 +486,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
ISOBJ_TYPE_assert(pThis, wtp);
- wtpProcessThrdChanges(pThis);
+ wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
@@ -506,7 +514,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
* hold the queue's mutex, but at least it has a chance to start on a single-CPU system.
*/
# if !defined(__hpux) /* pthread_yield is missing there! */
- pthread_yield();
+ if(pThis->bOptimizeUniProc)
+ pthread_yield();
# endif
/* indicate we just started a worker and would like to see it running */
@@ -639,12 +648,22 @@ finalize_it:
/* dummy */
rsRetVal wtpQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
+/* exit our class
+ */
+BEGINObjClassExit(wtp, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(nsdsel_gtls)
+ /* release objects we no longer need */
+ objRelease(glbl, CORE_COMPONENT);
+ENDObjClassExit(wtp)
+
+
/* Initialize the stream class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(wtp, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
ENDObjClassInit(wtp)
/*
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 0f21ac11..b9cb07c5 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -52,6 +52,7 @@ typedef enum {
/* the worker thread pool (wtp) object */
typedef struct wtp_s {
BEGINobjInstance;
+ int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
wtpState_t wtpState;
int iNumWorkerThreads;/* number of worker threads to use */
int iCurNumWrkThrd;/* current number of active worker threads */