summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c4
-rw-r--r--outchannel.c2
-rw-r--r--runtime/msg.c271
-rw-r--r--runtime/msg.h8
-rw-r--r--runtime/parser.c1
-rw-r--r--runtime/rsyslog.h5
-rw-r--r--runtime/rule.c7
-rw-r--r--runtime/srUtils.h2
-rw-r--r--runtime/stringbuf.c45
-rw-r--r--runtime/stringbuf.h45
-rw-r--r--template.c1
-rw-r--r--tests/Makefile.am6
-rw-r--r--tests/testsuites/threadingmq.conf20
-rw-r--r--tests/testsuites/threadingmqaq.conf24
-rwxr-xr-xtests/threadingmq.sh15
-rwxr-xr-xtests/threadingmqaq.sh15
16 files changed, 231 insertions, 240 deletions
diff --git a/action.c b/action.c
index d43028b8..352f7f9c 100644
--- a/action.c
+++ b/action.c
@@ -1182,8 +1182,8 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
(pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
!strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
!strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
- !strcmp(getPROCID(pMsg), getPROCID(pAction->f_pMsg)) &&
- !strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) {
+ !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) &&
+ !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) {
pAction->f_prevcount++;
dbgprintf("msg repeated %d times, %ld sec of %d.\n",
pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time,
diff --git a/outchannel.c b/outchannel.c
index 4f8abb32..74c18218 100644
--- a/outchannel.c
+++ b/outchannel.c
@@ -106,7 +106,6 @@ static rsRetVal get_Field(uchar **pp, uchar **pField)
p = *pp;
CHKiRet(cstrConstruct(&pStrB));
- rsCStrSetAllocIncrement(pStrB, 32);
/* copy the field */
while(*p && *p != ' ' && *p != ',') {
@@ -175,7 +174,6 @@ static inline rsRetVal get_restOfLine(uchar **pp, uchar **pBuf)
p = *pp;
CHKiRet(cstrConstruct(&pStrB));
- rsCStrSetAllocIncrement(pStrB, 32);
/* copy the field */
while(*p) {
diff --git a/runtime/msg.c b/runtime/msg.c
index 75933d68..3f69dbcb 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -271,8 +271,13 @@ static char *syslog_number_names[24] = { "0", "1", "2", "3", "4", "5", "6", "7",
"15", "16", "17", "18", "19", "20", "21", "22", "23" };
/* some forward declarations */
-static int getAPPNAMELen(msg_t *pM);
-static int getProtocolVersion(msg_t *pM);
+static int getAPPNAMELen(msg_t *pM, bool bLockMutex);
+
+static inline int getProtocolVersion(msg_t *pM)
+{
+ return(pM->iProtocolVersion);
+}
+
/* The following functions will support advanced output module
* multithreading, once this is implemented. Currently, we
@@ -334,33 +339,9 @@ static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg)
*/
static void MsgPrepareEnqueueLockingCase(msg_t *pThis)
{
- int iErr;
- pthread_mutexattr_t mutAttr;
BEGINfunc
assert(pThis != NULL);
- iErr = pthread_mutexattr_init(&mutAttr);
- if(iErr != 0) {
- dbgprintf("error initializing mutex attribute in %s:%d, trying to continue\n",
- __FILE__, __LINE__);
- }
- iErr = pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE);
- if(iErr != 0) {
- dbgprintf("ERROR setting mutex attribute to recursive in %s:%d, trying to continue "
- "but we will probably either abort or hang soon\n",
- __FILE__, __LINE__);
- /* TODO: it makes very little sense to continue here,
- * but it requires an iRet interface to gracefully shut
- * down. We should do that over time. -- rgerhards, 2008-07-14
- */
- }
- pthread_mutex_init(&pThis->mut, &mutAttr);
-
- /* we do no longer need the attribute. According to the
- * POSIX spec, we can destroy it without affecting the
- * initialized mutex (that used the attribute).
- * rgerhards, 2008-07-14
- */
- pthread_mutexattr_destroy(&mutAttr);
+ pthread_mutex_init(&pThis->mut, NULL);
pThis->bDoLock = 1;
ENDfunc
}
@@ -809,6 +790,7 @@ msg_t *MsgAddRef(msg_t *pM)
* can obtain a PROCID. Take in mind that not every legacy syslog message
* actually has a PROCID.
* rgerhards, 2005-11-24
+ * THIS MUST be called with the message lock locked.
*/
static rsRetVal aquirePROCIDFromTAG(msg_t *pM)
{
@@ -837,7 +819,6 @@ static rsRetVal aquirePROCIDFromTAG(msg_t *pM)
/* now obtain the PROCID string... */
CHKiRet(cstrConstruct(&pM->pCSPROCID));
- rsCStrSetAllocIncrement(pM->pCSPROCID, 16);
while((i < pM->iLenTAG) && (pszTag[i] != ']')) {
CHKiRet(cstrAppendChar(pM->pCSPROCID, pszTag[i]));
++i;
@@ -874,7 +855,8 @@ finalize_it:
* The program name is not parsed by default, because it is infrequently-used.
* If it is needed, this function should be called first. It checks if it is
* already set and extracts it, if not.
- * A message object must be provided, else a crash will occur.
+ *
+ * IMPORTANT: A locked message object must be provided, else a crash will occur.
* rgerhards, 2005-10-19
*/
static rsRetVal aquireProgramName(msg_t *pM)
@@ -885,12 +867,9 @@ static rsRetVal aquireProgramName(msg_t *pM)
assert(pM != NULL);
if(pM->pCSProgName == NULL) {
- /* ok, we do not yet have it. So let's parse the TAG
- * to obtain it.
- */
+ /* ok, we do not yet have it. So let's parse the TAG to obtain it. */
pszTag = (uchar*) ((pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG);
CHKiRet(cstrConstruct(&pM->pCSProgName));
- rsCStrSetAllocIncrement(pM->pCSProgName, 33);
for( i = 0
; (i < pM->iLenTAG) && isprint((int) pszTag[i])
&& (pszTag[i] != '\0') && (pszTag[i] != ':')
@@ -917,12 +896,6 @@ void setProtocolVersion(msg_t *pM, int iNewVersion)
pM->iProtocolVersion = iNewVersion;
}
-static int getProtocolVersion(msg_t *pM)
-{
- assert(pM != NULL);
- return(pM->iProtocolVersion);
-}
-
/* note: string is taken from constant pool, do NOT free */
char *getProtocolVersionString(msg_t *pM)
{
@@ -1233,7 +1206,10 @@ MsgSetAfterPRIOffs(msg_t *pMsg, short offs)
/* rgerhards 2004-11-24: set APP-NAME in msg object
- * TODO: revisit msg locking code!
+ * This is not locked, because it either is called during message
+ * construction (where we need no locking) or later as part of a function
+ * which already obtained the lock. So in general, this function here must
+ * only be called when it it safe to do so without it aquiring a lock.
*/
rsRetVal MsgSetAPPNAME(msg_t *pMsg, char* pszAPPNAME)
{
@@ -1242,7 +1218,6 @@ rsRetVal MsgSetAPPNAME(msg_t *pMsg, char* pszAPPNAME)
if(pMsg->pCSAPPNAME == NULL) {
/* we need to obtain the object first */
CHKiRet(rsCStrConstruct(&pMsg->pCSAPPNAME));
- rsCStrSetAllocIncrement(pMsg->pCSAPPNAME, 128);
}
/* if we reach this point, we have the object */
iRet = rsCStrSetSzStr(pMsg->pCSAPPNAME, (uchar*) pszAPPNAME);
@@ -1252,20 +1227,6 @@ finalize_it:
}
-static void tryEmulateAPPNAME(msg_t *pM); /* forward reference */
-/* rgerhards, 2005-11-24
- */
-char *getAPPNAME(msg_t *pM)
-{
- assert(pM != NULL);
- MsgLock(pM);
- if(pM->pCSAPPNAME == NULL)
- tryEmulateAPPNAME(pM);
- MsgUnlock(pM);
- return (pM->pCSAPPNAME == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSAPPNAME);
-}
-
-
/* rgerhards 2004-11-24: set PROCID in msg object
*/
rsRetVal MsgSetPROCID(msg_t *pMsg, char* pszPROCID)
@@ -1284,33 +1245,44 @@ finalize_it:
RETiRet;
}
+
+/* check if we have a procid, and, if not, try to aquire/emulate it.
+ * This must be called WITHOUT the message lock being held.
+ * rgerhards, 2009-06-26
+ */
+static inline void preparePROCID(msg_t *pM, bool bLockMutex)
+{
+ if(pM->pCSPROCID == NULL) {
+ if(bLockMutex == LOCK_MUTEX)
+ MsgLock(pM);
+ /* re-query, things may have changed in the mean time... */
+ if(pM->pCSPROCID == NULL)
+ aquirePROCIDFromTAG(pM);
+ if(bLockMutex == LOCK_MUTEX)
+ MsgUnlock(pM);
+ }
+}
+
+
+#if 0
/* rgerhards, 2005-11-24
*/
-static inline int getPROCIDLen(msg_t *pM)
+static inline int getPROCIDLen(msg_t *pM, bool bLockMutex)
{
assert(pM != NULL);
- MsgLock(pM);
- if(pM->pCSPROCID == NULL)
- aquirePROCIDFromTAG(pM);
- MsgUnlock(pM);
+ preparePROCID(pM, bLockMutex);
return (pM->pCSPROCID == NULL) ? 1 : rsCStrLen(pM->pCSPROCID);
}
+#endif
/* rgerhards, 2005-11-24
*/
-char *getPROCID(msg_t *pM)
+char *getPROCID(msg_t *pM, bool bLockMutex)
{
- char* pszRet;
-
ISOBJ_TYPE_assert(pM, msg);
- MsgLock(pM);
- if(pM->pCSPROCID == NULL)
- aquirePROCIDFromTAG(pM);
- pszRet = (pM->pCSPROCID == NULL) ? "-" : (char*) cstrGetSzStrNoNULL(pM->pCSPROCID);
- //pszRet = (pM->pCSPROCID == NULL) ? "-" : (char*) rsCStrGetSzStrNoNULL(pM->pCSPROCID);
- MsgUnlock(pM);
- return pszRet;
+ preparePROCID(pM, bLockMutex);
+ return (pM->pCSPROCID == NULL) ? "-" : (char*) cstrGetSzStrNoNULL(pM->pCSPROCID);
}
@@ -1323,7 +1295,6 @@ rsRetVal MsgSetMSGID(msg_t *pMsg, char* pszMSGID)
if(pMsg->pCSMSGID == NULL) {
/* we need to obtain the object first */
CHKiRet(rsCStrConstruct(&pMsg->pCSMSGID));
- rsCStrSetAllocIncrement(pMsg->pCSMSGID, 128);
}
/* if we reach this point, we have the object */
iRet = rsCStrSetSzStr(pMsg->pCSMSGID, (uchar*) pszMSGID);
@@ -1386,26 +1357,31 @@ void MsgSetTAG(msg_t *pMsg, uchar* pszBuf, size_t lenBuf)
* if there is a TAG and, if not, if it can emulate it.
* rgerhards, 2005-11-24
*/
-static inline void tryEmulateTAG(msg_t *pM)
+static inline void tryEmulateTAG(msg_t *pM, bool bLockMutex)
{
size_t lenTAG;
uchar bufTAG[CONF_TAG_MAXSIZE];
assert(pM != NULL);
+ if(bLockMutex == LOCK_MUTEX)
+ MsgLock(pM);
if(pM->iLenTAG > 0)
return; /* done, no need to emulate */
if(getProtocolVersion(pM) == 1) {
- if(!strcmp(getPROCID(pM), "-")) {
+ if(!strcmp(getPROCID(pM, MUTEX_ALREADY_LOCKED), "-")) {
/* no process ID, use APP-NAME only */
- MsgSetTAG(pM, (uchar*) getAPPNAME(pM), getAPPNAMELen(pM));
+ MsgSetTAG(pM, (uchar*) getAPPNAME(pM, MUTEX_ALREADY_LOCKED), getAPPNAMELen(pM, MUTEX_ALREADY_LOCKED));
} else {
/* now we can try to emulate */
- lenTAG = snprintf((char*)bufTAG, CONF_TAG_MAXSIZE, "%s[%s]", getAPPNAME(pM), getPROCID(pM));
+ lenTAG = snprintf((char*)bufTAG, CONF_TAG_MAXSIZE, "%s[%s]",
+ getAPPNAME(pM, MUTEX_ALREADY_LOCKED), getPROCID(pM, MUTEX_ALREADY_LOCKED));
bufTAG[32] = '\0'; /* just to make sure... */
MsgSetTAG(pM, bufTAG, lenTAG);
}
}
+ if(bLockMutex == LOCK_MUTEX)
+ MsgUnlock(pM);
}
@@ -1416,13 +1392,12 @@ static inline char *getTAG(msg_t *pM)
if(pM == NULL)
ret = "";
else {
- MsgLock(pM);
- tryEmulateTAG(pM);
+ if(pM->iLenTAG == 0)
+ tryEmulateTAG(pM, LOCK_MUTEX);
if(pM->iLenTAG == 0)
ret = "";
else
ret = (char*) ((pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG);
- MsgUnlock(pM);
}
return(ret);
}
@@ -1496,7 +1471,6 @@ rsRetVal MsgSetStructuredData(msg_t *pMsg, char* pszStrucData)
if(pMsg->pCSStrucData == NULL) {
/* we need to obtain the object first */
CHKiRet(rsCStrConstruct(&pMsg->pCSStrucData));
- rsCStrSetAllocIncrement(pMsg->pCSStrucData, 128);
}
/* if we reach this point, we have the object */
iRet = rsCStrSetSzStr(pMsg->pCSStrucData, (uchar*) pszStrucData);
@@ -1525,101 +1499,50 @@ static inline char *getStructuredData(msg_t *pM)
}
-
-/* get the length of the "programname" sz string
- * rgerhards, 2005-10-19
+/* check if we have a ProgramName, and, if not, try to aquire/emulate it.
+ * rgerhards, 2009-06-26
*/
-int getProgramNameLen(msg_t *pM)
+static inline void prepareProgramName(msg_t *pM, bool bLockMutex)
{
- int iRet;
+ if(pM->pCSProgName == NULL) {
+ if(bLockMutex == LOCK_MUTEX)
+ MsgLock(pM);
- assert(pM != NULL);
- MsgLock(pM);
- if((iRet = aquireProgramName(pM)) != RS_RET_OK) {
- dbgprintf("error %d returned by aquireProgramName() in getProgramNameLen()\n", iRet);
- MsgUnlock(pM);
- return 0; /* best we can do (consistent wiht what getProgramName() returns) */
- }
- MsgUnlock(pM);
+ /* re-query as things might have changed during locking */
+ if(pM->pCSProgName == NULL)
+ aquireProgramName(pM);
- return (pM->pCSProgName == NULL) ? 0 : rsCStrLen(pM->pCSProgName);
+ if(bLockMutex == LOCK_MUTEX)
+ MsgUnlock(pM);
+ }
}
-/* get the "programname" as sz string
+/* get the length of the "programname" sz string
* rgerhards, 2005-10-19
*/
-char *getProgramName(msg_t *pM) /* this is the non-locking version for internal use */
+int getProgramNameLen(msg_t *pM, bool bLockMutex)
{
- int iRet;
- char *pszRet;
-
assert(pM != NULL);
- MsgLock(pM);
- if((iRet = aquireProgramName(pM)) != RS_RET_OK) {
- dbgprintf("error %d returned by aquireProgramName() in getProgramName()\n", iRet);
- pszRet = ""; /* best we can do */
- } else {
- pszRet = (pM->pCSProgName == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSProgName);
- }
-
- MsgUnlock(pM);
- return pszRet;
+ prepareProgramName(pM, bLockMutex);
+ return (pM->pCSProgName == NULL) ? 0 : rsCStrLen(pM->pCSProgName);
}
-/* The code below was an approach without PTHREAD_MUTEX_RECURSIVE
- * However, it turned out to be quite complex. So far, we use recursive
- * locking, which is OK from a performance point of view, especially as
- * we do not anticipate that multithreading msg objects is used often.
- * However, we may re-think about using non-recursive locking and I leave this
- * code in here to conserve the idea. -- rgerhards, 2008-01-05
- */
-#if 0
-static char *getProgramNameNoLock(msg_t *pM) /* this is the non-locking version for internal use */
-{
- int iRet;
- assert(pM != NULL);
- if((iRet = aquireProgramName(pM)) != RS_RET_OK) {
- dbgprintf("error %d returned by aquireProgramName() in getProgramName()\n", iRet);
- return ""; /* best we can do */
- }
- return (pM->pCSProgName == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSProgName);
-}
-char *getProgramName(msg_t *pM) /* this is the external callable version */
+/* get the "programname" as sz string
+ * rgerhards, 2005-10-19
+ */
+char *getProgramName(msg_t *pM, bool bLockMutex)
{
- char *pszRet;
-
- MsgLock(pM);
- pszRet = getProgramNameNoLock(pM);
- MsgUnlock(pM);
- return pszRet;
+ prepareProgramName(pM, bLockMutex);
+ return (pM->pCSProgName == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSProgName);
}
-/* an alternative approach has been: */
-/* The macro below is used to generate external function definitions
- * for such functions that may also be called internally (and thus have
- * both a locking and non-locking implementation. Over time, we could
- * reconsider how we handle that. -- rgerhards, 2008-01-05
- */
-#define EXT_LOCKED_FUNC(fName, ret) \
-ret fName(msg_t *pM) \
-{ \
- ret valRet; \
- MsgLock(pM); \
- valRet = fName##NoLock(pM); \
- MsgUnlock(pM); \
- return(valRet); \
-}
-EXT_LOCKED_FUNC(getProgramName, char*)
-/* in this approach, the external function is provided by the macro and
- * needs not to be writen.
- */
-#endif /* #if 0 -- saved code */
/* This function tries to emulate APPNAME if it is not present. Its
* main use is when we have received a log record via legacy syslog and
* now would like to send out the same one via syslog-protocol.
+ * MUST be called with the Msg Lock locked!
*/
static void tryEmulateAPPNAME(msg_t *pM)
{
@@ -1629,18 +1552,46 @@ static void tryEmulateAPPNAME(msg_t *pM)
if(getProtocolVersion(pM) == 0) {
/* only then it makes sense to emulate */
- MsgSetAPPNAME(pM, getProgramName(pM));
+ MsgSetAPPNAME(pM, getProgramName(pM, MUTEX_ALREADY_LOCKED));
+ }
+}
+
+
+
+/* check if we have a APPNAME, and, if not, try to aquire/emulate it.
+ * This must be called WITHOUT the message lock being held.
+ * rgerhards, 2009-06-26
+ */
+static inline void prepareAPPNAME(msg_t *pM, bool bLockMutex)
+{
+ if(pM->pCSAPPNAME == NULL) {
+ if(bLockMutex == LOCK_MUTEX)
+ MsgLock(pM);
+
+ /* re-query as things might have changed during locking */
+ if(pM->pCSAPPNAME == NULL)
+ tryEmulateAPPNAME(pM);
+
+ if(bLockMutex == LOCK_MUTEX)
+ MsgUnlock(pM);
}
}
+/* rgerhards, 2005-11-24
+ */
+char *getAPPNAME(msg_t *pM, bool bLockMutex)
+{
+ assert(pM != NULL);
+ prepareAPPNAME(pM, bLockMutex);
+ return (pM->pCSAPPNAME == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSAPPNAME);
+}
/* rgerhards, 2005-11-24
*/
-static int getAPPNAMELen(msg_t *pM)
+static int getAPPNAMELen(msg_t *pM, bool bLockMutex)
{
assert(pM != NULL);
- if(pM->pCSAPPNAME == NULL)
- tryEmulateAPPNAME(pM);
+ prepareAPPNAME(pM, bLockMutex);
return (pM->pCSAPPNAME == NULL) ? 0 : rsCStrLen(pM->pCSAPPNAME);
}
@@ -1991,15 +1942,15 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
} else if(!strcmp((char*) pName, "timegenerated")) {
pRes = getTimeGenerated(pMsg, pTpe->data.field.eDateFormat);
} else if(!strcmp((char*) pName, "programname")) {
- pRes = getProgramName(pMsg);
+ pRes = getProgramName(pMsg, LOCK_MUTEX);
} else if(!strcmp((char*) pName, "protocol-version")) {
pRes = getProtocolVersionString(pMsg);
} else if(!strcmp((char*) pName, "structured-data")) {
pRes = getStructuredData(pMsg);
} else if(!strcmp((char*) pName, "app-name")) {
- pRes = getAPPNAME(pMsg);
+ pRes = getAPPNAME(pMsg, LOCK_MUTEX);
} else if(!strcmp((char*) pName, "procid")) {
- pRes = getPROCID(pMsg);
+ pRes = getPROCID(pMsg, LOCK_MUTEX);
} else if(!strcmp((char*) pName, "msgid")) {
pRes = getMSGID(pMsg);
/* here start system properties (those, that do not relate to the message itself */
diff --git a/runtime/msg.h b/runtime/msg.h
index 4bfc1e3f..56ba5b80 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -168,14 +168,14 @@ rsRetVal MsgEnableThreadSafety(void);
/* TODO: remove these five (so far used in action.c) */
char *getMSG(msg_t *pM);
char *getHOSTNAME(msg_t *pM);
-char *getPROCID(msg_t *pM);
-char *getAPPNAME(msg_t *pM);
+char *getPROCID(msg_t *pM, bool bLockMutex);
+char *getAPPNAME(msg_t *pM, bool bLockMutex);
int getMSGLen(msg_t *pM);
char *getHOSTNAME(msg_t *pM);
int getHOSTNAMELen(msg_t *pM);
-char *getProgramName(msg_t *pM);
-int getProgramNameLen(msg_t *pM);
+char *getProgramName(msg_t *pM, bool bLockMutex);
+int getProgramNameLen(msg_t *pM, bool bLockMutex);
uchar *getRcvFrom(msg_t *pM);
/* The MsgPrepareEnqueue() function is a macro for performance reasons.
diff --git a/runtime/parser.c b/runtime/parser.c
index 0a27c982..d4ca7673 100644
--- a/runtime/parser.c
+++ b/runtime/parser.c
@@ -307,7 +307,6 @@ rsRetVal parseMsg(msg_t *pMsg)
/* finalize message object */
pMsg->msgFlags &= ~NEEDS_PARSING; /* this message is now parsed */
- MsgPrepareEnqueue(pMsg); /* "historical" name - preparese for multi-threading */
finalize_it:
RETiRet;
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index a55b867f..09c3a066 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -430,9 +430,8 @@ typedef enum rsObjectID rsObjID;
#endif
/* some constants */
-// TODO: do we really need them - if not, delete -- rgerhards, 2009-06-10
-#define IGNORE_ERROR_CODES 1
-#define ABORT_ON_ERROR 0
+#define MUTEX_ALREADY_LOCKED 0
+#define LOCK_MUTEX 1
/* The following prototype is convenient, even though it may not be the 100% correct place.. -- rgerhards 2008-01-07 */
void dbgprintf(char *, ...) __attribute__((format(printf, 1, 2)));
diff --git a/runtime/rule.c b/runtime/rule.c
index 12494543..221368e6 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -147,13 +147,14 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
offset = 1;
}
}
- if(!rsCStrOffsetSzStrCmp(pRule->pCSProgNameComp, offset, (uchar*) getProgramName(pMsg), getProgramNameLen(pMsg)))
+ if(!rsCStrOffsetSzStrCmp(pRule->pCSProgNameComp, offset,
+ (uchar*) getProgramName(pMsg, LOCK_MUTEX), getProgramNameLen(pMsg, LOCK_MUTEX)))
bEqv = 1;
if((!bEqv && !bInv) || (bEqv && bInv)) {
/* not equal or inverted selection, so we are already done... */
- dbgprintf("programname filter '%s' does not match '%s'\n",
- rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp), getProgramName(pMsg));
+ DBGPRINTF("programname filter '%s' does not match '%s'\n",
+ rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp), getProgramName(pMsg, LOCK_MUTEX));
FINALIZE;
}
}
diff --git a/runtime/srUtils.h b/runtime/srUtils.h
index 9d6360e7..6d5a784b 100644
--- a/runtime/srUtils.h
+++ b/runtime/srUtils.h
@@ -109,8 +109,6 @@ rsRetVal getFileSize(uchar *pszName, off_t *pSize);
#define mutex_cancelsafe_unlock(mut) pthread_cleanup_pop(1)
/* some useful constants */
-#define MUTEX_ALREADY_LOCKED 0
-#define LOCK_MUTEX 1
#define DEFVARS_mutexProtection\
int iCancelStateSave; \
int bLockedOpIsLocked=0
diff --git a/runtime/stringbuf.c b/runtime/stringbuf.c
index d3ddf33a..f3824831 100644
--- a/runtime/stringbuf.c
+++ b/runtime/stringbuf.c
@@ -71,7 +71,6 @@ rsRetVal cstrConstruct(cstr_t **ppThis)
pThis->pszBuf = NULL;
pThis->iBufSize = 0;
pThis->iStrLen = 0;
- pThis->iAllocIncrement = RS_STRINGBUF_ALLOC_INCREMENT;
*ppThis = pThis;
finalize_it:
@@ -154,7 +153,7 @@ void rsCStrDestruct(cstr_t **ppThis)
* rgerhards, 2008-01-07
* changed to utilized realloc() -- rgerhards, 2009-06-16
*/
-static rsRetVal
+rsRetVal
rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded)
{
uchar *pNewBuf;
@@ -162,16 +161,16 @@ rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded)
DEFiRet;
/* first compute the new size needed */
- if(iMinNeeded > pThis->iAllocIncrement) {
- /* we allocate "n" iAllocIncrements. Usually, that should
+ if(iMinNeeded > RS_STRINGBUF_ALLOC_INCREMENT) {
+ /* we allocate "n" ALLOC_INCREMENTs. Usually, that should
* leave some room after the absolutely needed one. It also
* reduces memory fragmentation. Note that all of this are
* integer operations (very important to understand what is
* going on)! Parenthesis are for better readibility.
*/
- iNewSize = ((iMinNeeded / pThis->iAllocIncrement) + 1) * pThis->iAllocIncrement;
+ iNewSize = (iMinNeeded / RS_STRINGBUF_ALLOC_INCREMENT + 1) * RS_STRINGBUF_ALLOC_INCREMENT;
} else {
- iNewSize = pThis->iBufSize + pThis->iAllocIncrement;
+ iNewSize = pThis->iBufSize + RS_STRINGBUF_ALLOC_INCREMENT;
}
iNewSize += pThis->iBufSize; /* add current size */
@@ -246,33 +245,10 @@ finalize_it:
}
-/* Append a character to the current string object. This may only be done until
- * cstrFinalize() is called.
- * rgerhards, 2009-06-16
- */
-rsRetVal cstrAppendChar(cstr_t *pThis, uchar c)
-{
- DEFiRet;
-
- rsCHECKVALIDOBJECT(pThis, OIDrsCStr);
-
- if(pThis->iStrLen >= pThis->iBufSize) {
- CHKiRet(rsCStrExtendBuf(pThis, 1)); /* need more memory! */
- }
-
- /* ok, when we reach this, we have sufficient memory */
- *(pThis->pBuf + pThis->iStrLen++) = c;
-
-finalize_it:
- RETiRet;
-}
-
-
/* Sets the string object to the classigal sz-string provided.
* Any previously stored vlaue is discarded. If a NULL pointer
* the the new value (pszNew) is provided, an empty string is
- * created (this is NOT an error!). Property iAllocIncrement is
- * not modified by this function.
+ * created (this is NOT an error!).
* rgerhards, 2005-10-18
*/
rsRetVal rsCStrSetSzStr(cstr_t *pThis, uchar *pszNew)
@@ -290,7 +266,6 @@ rsRetVal rsCStrSetSzStr(cstr_t *pThis, uchar *pszNew)
pThis->iStrLen = strlen((char*)pszNew);
pThis->iBufSize = pThis->iStrLen;
pThis->pszBuf = NULL;
- /* iAllocIncrement is NOT modified! */
/* now save the new value */
if((pThis->pBuf = (uchar*) malloc(sizeof(uchar) * pThis->iStrLen)) == NULL) {
@@ -472,14 +447,6 @@ finalize_it:
}
-void rsCStrSetAllocIncrement(cstr_t *pThis, int iNewIncrement)
-{
- rsCHECKVALIDOBJECT(pThis, OIDrsCStr);
- assert(iNewIncrement > 0);
- pThis->iAllocIncrement = iNewIncrement;
-}
-
-
/* return the length of the current string
* 2005-09-09 rgerhards
* Please note: this is only a function in a debug build.
diff --git a/runtime/stringbuf.h b/runtime/stringbuf.h
index 9d2e7865..4fbd9a9b 100644
--- a/runtime/stringbuf.h
+++ b/runtime/stringbuf.h
@@ -35,6 +35,7 @@
#ifndef _STRINGBUF_H_INCLUDED__
#define _STRINGBUF_H_INCLUDED__ 1
+
/**
* The dynamic string buffer object.
*/
@@ -47,7 +48,6 @@ typedef struct cstr_s
uchar *pszBuf; /**< pointer to the sz version of the string (after it has been created )*/
size_t iBufSize; /**< current maximum size of the string buffer */
size_t iStrLen; /**< length of the string in characters. */
- unsigned short iAllocIncrement; /**< the amount of bytes the string should be expanded if it needs to */
bool bIsForeignBuf; /**< is pBuf a buffer provided by someone else? */
} cstr_t;
@@ -66,13 +66,28 @@ rsRetVal rsCStrConstructFromCStr(cstr_t **ppThis, cstr_t *pFrom);
void rsCStrDestruct(cstr_t **ppThis);
#define cstrDestruct(x) rsCStrDestruct((x))
-/**
- * Append a character to an existing string. If necessary, the
- * method expands the string buffer.
- *
- * \param c Character to append to string.
+
+/* Append a character to the current string object. This may only be done until
+ * cstrFinalize() is called.
+ * rgerhards, 2009-06-16
*/
-rsRetVal cstrAppendChar(cstr_t *pThis, uchar c);
+rsRetVal rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded); /* our helper, NOT a public interface! */
+static inline rsRetVal cstrAppendChar(cstr_t *pThis, uchar c)
+{
+ rsRetVal iRet;
+
+ rsCHECKVALIDOBJECT(pThis, OIDrsCStr);
+
+ if(pThis->iStrLen >= pThis->iBufSize) {
+ CHKiRet(rsCStrExtendBuf(pThis, 1)); /* need more memory! */
+ }
+
+ /* ok, when we reach this, we have sufficient memory */
+ *(pThis->pBuf + pThis->iStrLen++) = c;
+
+finalize_it:
+ return iRet;
+}
/**
* Truncate "n" number of characters from the end of the
@@ -101,22 +116,6 @@ rsRetVal rsCStrAppendStr(cstr_t *pThis, uchar* psz);
*/
rsRetVal rsCStrAppendStrWithLen(cstr_t *pThis, uchar* psz, size_t iStrLen);
-/**
- * Set a new allocation incremet. This will influence
- * the allocation the next time the string will be expanded.
- * It can be set and changed at any time. If done immediately
- * after custructing the StrB object, this will also be
- * the inital allocation.
- *
- * \param iNewIncrement The new increment size
- *
- * \note It is possible to use a very low increment, e.g. 1 byte.
- * This can generate a considerable overhead. We highly
- * advise not to use an increment below 32 bytes, except
- * if you are very well aware why you are doing it ;)
- */
-void rsCStrSetAllocIncrement(cstr_t *pThis, int iNewIncrement);
-#define rsCStrGetAllocIncrement(pThis) ((pThis)->iAllocIncrement)
/**
* Append an integer to the string. No special formatting is
diff --git a/template.c b/template.c
index 43a54d98..704f0b19 100644
--- a/template.c
+++ b/template.c
@@ -373,7 +373,6 @@ static int do_Constant(unsigned char **pp, struct template *pTpl)
if(cstrConstruct(&pStrB) != RS_RET_OK)
return 1;
- rsCStrSetAllocIncrement(pStrB, 32);
/* process the message and expand escapes
* (additional escapes can be added here if needed)
*/
diff --git a/tests/Makefile.am b/tests/Makefile.am
index f6bb0587..cbbca074 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -18,6 +18,8 @@ TESTS += omod-if-array.sh \
parsertest.sh \
timestamp.sh \
inputname.sh \
+ threadingmq.sh \
+ threadingmqaq.sh \
fieldtest.sh
endif
@@ -116,6 +118,10 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
queue-persist.sh \
queue-persist-drvr.sh \
testsuites/queue-persist.conf \
+ threadingmq.sh \
+ testsuites/threadingmq.conf \
+ threadingmqaq.sh \
+ testsuites/threadingmqaq.conf \
DiagTalker.java \
cfg.sh
diff --git a/tests/testsuites/threadingmq.conf b/tests/testsuites/threadingmq.conf
new file mode 100644
index 00000000..aa5197bb
--- /dev/null
+++ b/tests/testsuites/threadingmq.conf
@@ -0,0 +1,20 @@
+# Threading test, we run a tcp flood to via an
+# engine instructed to use multiple threads
+# rgerhards, 2009-06-26
+$IncludeConfig diag-common.conf
+
+$ModLoad ../plugins/imtcp/.libs/imtcp
+$MainMsgQueueTimeoutShutdown 10000
+$MaxOpenFiles 2000
+$InputTCPMaxSessions 1100
+$InputTCPServerRun 13514
+
+$MainMsgQueueWorkerThreadMinimumMessages 10
+$MainMsgQueueWorkerThreads 5
+
+$template outfmt,"%msg:F,58:2%\n"
+$template dynfile,"rsyslog.out.log" # trick to use relative path names!
+# write quickly to the output file:
+$OMFileFlushOnTXEnd off
+$OMFileIOBufferSize 256k
+:msg, contains, "msgnum:" ?dynfile;outfmt
diff --git a/tests/testsuites/threadingmqaq.conf b/tests/testsuites/threadingmqaq.conf
new file mode 100644
index 00000000..f1bb72df
--- /dev/null
+++ b/tests/testsuites/threadingmqaq.conf
@@ -0,0 +1,24 @@
+# Threading test, we run a tcp flood to via an
+# engine instructed to use multiple threads
+# rgerhards, 2009-06-26
+$IncludeConfig diag-common.conf
+
+$ModLoad ../plugins/imtcp/.libs/imtcp
+$MainMsgQueueTimeoutShutdown 10000
+$MaxOpenFiles 2000
+$InputTCPMaxSessions 1100
+$InputTCPServerRun 13514
+
+$MainMsgQueueWorkerThreadMinimumMessages 10
+$MainMsgQueueWorkerThreads 5
+
+$template outfmt,"%msg:F,58:2%\n"
+$template dynfile,"rsyslog.out.log" # trick to use relative path names!
+# write quickly to the output file:
+$OMFileFlushOnTXEnd off
+$OMFileIOBufferSize 256k
+# This time, also run the action queue detached
+$ActionQueueWorkerThreadMinimumMessages 10
+$ActionQueueWorkerThreads 5
+$ActionQueueType LinkedList
+:msg, contains, "msgnum:" ?dynfile;outfmt
diff --git a/tests/threadingmq.sh b/tests/threadingmq.sh
new file mode 100755
index 00000000..5c29ec60
--- /dev/null
+++ b/tests/threadingmq.sh
@@ -0,0 +1,15 @@
+# test many concurrent tcp connections
+# we send 100,000 messages in the hopes that his puts at least a little bit
+# of pressure on the threading subsystem. To really prove it, we would need to
+# push messages for several minutes, but that takes too long during the
+# automatted tests (hint: do this manually after suspect changes). Thankfully,
+# in practice many threading bugs result in an abort rather quickly and these
+# should be covered by this test here.
+# rgerhards, 2009-06-26
+echo TEST: threadingmq.sh - main queue concurrency
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup threadingmq.conf
+source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 100000
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh seq-check 0 99999
+source $srcdir/diag.sh exit
diff --git a/tests/threadingmqaq.sh b/tests/threadingmqaq.sh
new file mode 100755
index 00000000..009551fd
--- /dev/null
+++ b/tests/threadingmqaq.sh
@@ -0,0 +1,15 @@
+# test many concurrent tcp connections
+# we send 100,000 messages in the hopes that his puts at least a little bit
+# of pressure on the threading subsystem. To really prove it, we would need to
+# push messages for several minutes, but that takes too long during the
+# automatted tests (hint: do this manually after suspect changes). Thankfully,
+# in practice many threading bugs result in an abort rather quickly and these
+# should be covered by this test here.
+# rgerhards, 2009-06-26
+echo TEST: threadingmqaq.sh - main/action queue concurrency
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup threadingmqaq.conf
+source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 100000
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh seq-check 0 99999
+source $srcdir/diag.sh exit