diff options
-rw-r--r-- | action.c | 4 | ||||
-rw-r--r-- | outchannel.c | 2 | ||||
-rw-r--r-- | runtime/msg.c | 271 | ||||
-rw-r--r-- | runtime/msg.h | 8 | ||||
-rw-r--r-- | runtime/parser.c | 1 | ||||
-rw-r--r-- | runtime/rsyslog.h | 5 | ||||
-rw-r--r-- | runtime/rule.c | 7 | ||||
-rw-r--r-- | runtime/srUtils.h | 2 | ||||
-rw-r--r-- | runtime/stringbuf.c | 45 | ||||
-rw-r--r-- | runtime/stringbuf.h | 45 | ||||
-rw-r--r-- | template.c | 1 | ||||
-rw-r--r-- | tests/Makefile.am | 6 | ||||
-rw-r--r-- | tests/testsuites/threadingmq.conf | 20 | ||||
-rw-r--r-- | tests/testsuites/threadingmqaq.conf | 24 | ||||
-rwxr-xr-x | tests/threadingmq.sh | 15 | ||||
-rwxr-xr-x | tests/threadingmqaq.sh | 15 |
16 files changed, 231 insertions, 240 deletions
@@ -1182,8 +1182,8 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) && !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) && - !strcmp(getPROCID(pMsg), getPROCID(pAction->f_pMsg)) && - !strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) { + !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) && + !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) { pAction->f_prevcount++; dbgprintf("msg repeated %d times, %ld sec of %d.\n", pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time, diff --git a/outchannel.c b/outchannel.c index 4f8abb32..74c18218 100644 --- a/outchannel.c +++ b/outchannel.c @@ -106,7 +106,6 @@ static rsRetVal get_Field(uchar **pp, uchar **pField) p = *pp; CHKiRet(cstrConstruct(&pStrB)); - rsCStrSetAllocIncrement(pStrB, 32); /* copy the field */ while(*p && *p != ' ' && *p != ',') { @@ -175,7 +174,6 @@ static inline rsRetVal get_restOfLine(uchar **pp, uchar **pBuf) p = *pp; CHKiRet(cstrConstruct(&pStrB)); - rsCStrSetAllocIncrement(pStrB, 32); /* copy the field */ while(*p) { diff --git a/runtime/msg.c b/runtime/msg.c index 75933d68..3f69dbcb 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -271,8 +271,13 @@ static char *syslog_number_names[24] = { "0", "1", "2", "3", "4", "5", "6", "7", "15", "16", "17", "18", "19", "20", "21", "22", "23" }; /* some forward declarations */ -static int getAPPNAMELen(msg_t *pM); -static int getProtocolVersion(msg_t *pM); +static int getAPPNAMELen(msg_t *pM, bool bLockMutex); + +static inline int getProtocolVersion(msg_t *pM) +{ + return(pM->iProtocolVersion); +} + /* The following functions will support advanced output module * multithreading, once this is implemented. Currently, we @@ -334,33 +339,9 @@ static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg) */ static void MsgPrepareEnqueueLockingCase(msg_t *pThis) { - int iErr; - pthread_mutexattr_t mutAttr; BEGINfunc assert(pThis != NULL); - iErr = pthread_mutexattr_init(&mutAttr); - if(iErr != 0) { - dbgprintf("error initializing mutex attribute in %s:%d, trying to continue\n", - __FILE__, __LINE__); - } - iErr = pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE); - if(iErr != 0) { - dbgprintf("ERROR setting mutex attribute to recursive in %s:%d, trying to continue " - "but we will probably either abort or hang soon\n", - __FILE__, __LINE__); - /* TODO: it makes very little sense to continue here, - * but it requires an iRet interface to gracefully shut - * down. We should do that over time. -- rgerhards, 2008-07-14 - */ - } - pthread_mutex_init(&pThis->mut, &mutAttr); - - /* we do no longer need the attribute. According to the - * POSIX spec, we can destroy it without affecting the - * initialized mutex (that used the attribute). - * rgerhards, 2008-07-14 - */ - pthread_mutexattr_destroy(&mutAttr); + pthread_mutex_init(&pThis->mut, NULL); pThis->bDoLock = 1; ENDfunc } @@ -809,6 +790,7 @@ msg_t *MsgAddRef(msg_t *pM) * can obtain a PROCID. Take in mind that not every legacy syslog message * actually has a PROCID. * rgerhards, 2005-11-24 + * THIS MUST be called with the message lock locked. */ static rsRetVal aquirePROCIDFromTAG(msg_t *pM) { @@ -837,7 +819,6 @@ static rsRetVal aquirePROCIDFromTAG(msg_t *pM) /* now obtain the PROCID string... */ CHKiRet(cstrConstruct(&pM->pCSPROCID)); - rsCStrSetAllocIncrement(pM->pCSPROCID, 16); while((i < pM->iLenTAG) && (pszTag[i] != ']')) { CHKiRet(cstrAppendChar(pM->pCSPROCID, pszTag[i])); ++i; @@ -874,7 +855,8 @@ finalize_it: * The program name is not parsed by default, because it is infrequently-used. * If it is needed, this function should be called first. It checks if it is * already set and extracts it, if not. - * A message object must be provided, else a crash will occur. + * + * IMPORTANT: A locked message object must be provided, else a crash will occur. * rgerhards, 2005-10-19 */ static rsRetVal aquireProgramName(msg_t *pM) @@ -885,12 +867,9 @@ static rsRetVal aquireProgramName(msg_t *pM) assert(pM != NULL); if(pM->pCSProgName == NULL) { - /* ok, we do not yet have it. So let's parse the TAG - * to obtain it. - */ + /* ok, we do not yet have it. So let's parse the TAG to obtain it. */ pszTag = (uchar*) ((pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG); CHKiRet(cstrConstruct(&pM->pCSProgName)); - rsCStrSetAllocIncrement(pM->pCSProgName, 33); for( i = 0 ; (i < pM->iLenTAG) && isprint((int) pszTag[i]) && (pszTag[i] != '\0') && (pszTag[i] != ':') @@ -917,12 +896,6 @@ void setProtocolVersion(msg_t *pM, int iNewVersion) pM->iProtocolVersion = iNewVersion; } -static int getProtocolVersion(msg_t *pM) -{ - assert(pM != NULL); - return(pM->iProtocolVersion); -} - /* note: string is taken from constant pool, do NOT free */ char *getProtocolVersionString(msg_t *pM) { @@ -1233,7 +1206,10 @@ MsgSetAfterPRIOffs(msg_t *pMsg, short offs) /* rgerhards 2004-11-24: set APP-NAME in msg object - * TODO: revisit msg locking code! + * This is not locked, because it either is called during message + * construction (where we need no locking) or later as part of a function + * which already obtained the lock. So in general, this function here must + * only be called when it it safe to do so without it aquiring a lock. */ rsRetVal MsgSetAPPNAME(msg_t *pMsg, char* pszAPPNAME) { @@ -1242,7 +1218,6 @@ rsRetVal MsgSetAPPNAME(msg_t *pMsg, char* pszAPPNAME) if(pMsg->pCSAPPNAME == NULL) { /* we need to obtain the object first */ CHKiRet(rsCStrConstruct(&pMsg->pCSAPPNAME)); - rsCStrSetAllocIncrement(pMsg->pCSAPPNAME, 128); } /* if we reach this point, we have the object */ iRet = rsCStrSetSzStr(pMsg->pCSAPPNAME, (uchar*) pszAPPNAME); @@ -1252,20 +1227,6 @@ finalize_it: } -static void tryEmulateAPPNAME(msg_t *pM); /* forward reference */ -/* rgerhards, 2005-11-24 - */ -char *getAPPNAME(msg_t *pM) -{ - assert(pM != NULL); - MsgLock(pM); - if(pM->pCSAPPNAME == NULL) - tryEmulateAPPNAME(pM); - MsgUnlock(pM); - return (pM->pCSAPPNAME == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSAPPNAME); -} - - /* rgerhards 2004-11-24: set PROCID in msg object */ rsRetVal MsgSetPROCID(msg_t *pMsg, char* pszPROCID) @@ -1284,33 +1245,44 @@ finalize_it: RETiRet; } + +/* check if we have a procid, and, if not, try to aquire/emulate it. + * This must be called WITHOUT the message lock being held. + * rgerhards, 2009-06-26 + */ +static inline void preparePROCID(msg_t *pM, bool bLockMutex) +{ + if(pM->pCSPROCID == NULL) { + if(bLockMutex == LOCK_MUTEX) + MsgLock(pM); + /* re-query, things may have changed in the mean time... */ + if(pM->pCSPROCID == NULL) + aquirePROCIDFromTAG(pM); + if(bLockMutex == LOCK_MUTEX) + MsgUnlock(pM); + } +} + + +#if 0 /* rgerhards, 2005-11-24 */ -static inline int getPROCIDLen(msg_t *pM) +static inline int getPROCIDLen(msg_t *pM, bool bLockMutex) { assert(pM != NULL); - MsgLock(pM); - if(pM->pCSPROCID == NULL) - aquirePROCIDFromTAG(pM); - MsgUnlock(pM); + preparePROCID(pM, bLockMutex); return (pM->pCSPROCID == NULL) ? 1 : rsCStrLen(pM->pCSPROCID); } +#endif /* rgerhards, 2005-11-24 */ -char *getPROCID(msg_t *pM) +char *getPROCID(msg_t *pM, bool bLockMutex) { - char* pszRet; - ISOBJ_TYPE_assert(pM, msg); - MsgLock(pM); - if(pM->pCSPROCID == NULL) - aquirePROCIDFromTAG(pM); - pszRet = (pM->pCSPROCID == NULL) ? "-" : (char*) cstrGetSzStrNoNULL(pM->pCSPROCID); - //pszRet = (pM->pCSPROCID == NULL) ? "-" : (char*) rsCStrGetSzStrNoNULL(pM->pCSPROCID); - MsgUnlock(pM); - return pszRet; + preparePROCID(pM, bLockMutex); + return (pM->pCSPROCID == NULL) ? "-" : (char*) cstrGetSzStrNoNULL(pM->pCSPROCID); } @@ -1323,7 +1295,6 @@ rsRetVal MsgSetMSGID(msg_t *pMsg, char* pszMSGID) if(pMsg->pCSMSGID == NULL) { /* we need to obtain the object first */ CHKiRet(rsCStrConstruct(&pMsg->pCSMSGID)); - rsCStrSetAllocIncrement(pMsg->pCSMSGID, 128); } /* if we reach this point, we have the object */ iRet = rsCStrSetSzStr(pMsg->pCSMSGID, (uchar*) pszMSGID); @@ -1386,26 +1357,31 @@ void MsgSetTAG(msg_t *pMsg, uchar* pszBuf, size_t lenBuf) * if there is a TAG and, if not, if it can emulate it. * rgerhards, 2005-11-24 */ -static inline void tryEmulateTAG(msg_t *pM) +static inline void tryEmulateTAG(msg_t *pM, bool bLockMutex) { size_t lenTAG; uchar bufTAG[CONF_TAG_MAXSIZE]; assert(pM != NULL); + if(bLockMutex == LOCK_MUTEX) + MsgLock(pM); if(pM->iLenTAG > 0) return; /* done, no need to emulate */ if(getProtocolVersion(pM) == 1) { - if(!strcmp(getPROCID(pM), "-")) { + if(!strcmp(getPROCID(pM, MUTEX_ALREADY_LOCKED), "-")) { /* no process ID, use APP-NAME only */ - MsgSetTAG(pM, (uchar*) getAPPNAME(pM), getAPPNAMELen(pM)); + MsgSetTAG(pM, (uchar*) getAPPNAME(pM, MUTEX_ALREADY_LOCKED), getAPPNAMELen(pM, MUTEX_ALREADY_LOCKED)); } else { /* now we can try to emulate */ - lenTAG = snprintf((char*)bufTAG, CONF_TAG_MAXSIZE, "%s[%s]", getAPPNAME(pM), getPROCID(pM)); + lenTAG = snprintf((char*)bufTAG, CONF_TAG_MAXSIZE, "%s[%s]", + getAPPNAME(pM, MUTEX_ALREADY_LOCKED), getPROCID(pM, MUTEX_ALREADY_LOCKED)); bufTAG[32] = '\0'; /* just to make sure... */ MsgSetTAG(pM, bufTAG, lenTAG); } } + if(bLockMutex == LOCK_MUTEX) + MsgUnlock(pM); } @@ -1416,13 +1392,12 @@ static inline char *getTAG(msg_t *pM) if(pM == NULL) ret = ""; else { - MsgLock(pM); - tryEmulateTAG(pM); + if(pM->iLenTAG == 0) + tryEmulateTAG(pM, LOCK_MUTEX); if(pM->iLenTAG == 0) ret = ""; else ret = (char*) ((pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG); - MsgUnlock(pM); } return(ret); } @@ -1496,7 +1471,6 @@ rsRetVal MsgSetStructuredData(msg_t *pMsg, char* pszStrucData) if(pMsg->pCSStrucData == NULL) { /* we need to obtain the object first */ CHKiRet(rsCStrConstruct(&pMsg->pCSStrucData)); - rsCStrSetAllocIncrement(pMsg->pCSStrucData, 128); } /* if we reach this point, we have the object */ iRet = rsCStrSetSzStr(pMsg->pCSStrucData, (uchar*) pszStrucData); @@ -1525,101 +1499,50 @@ static inline char *getStructuredData(msg_t *pM) } - -/* get the length of the "programname" sz string - * rgerhards, 2005-10-19 +/* check if we have a ProgramName, and, if not, try to aquire/emulate it. + * rgerhards, 2009-06-26 */ -int getProgramNameLen(msg_t *pM) +static inline void prepareProgramName(msg_t *pM, bool bLockMutex) { - int iRet; + if(pM->pCSProgName == NULL) { + if(bLockMutex == LOCK_MUTEX) + MsgLock(pM); - assert(pM != NULL); - MsgLock(pM); - if((iRet = aquireProgramName(pM)) != RS_RET_OK) { - dbgprintf("error %d returned by aquireProgramName() in getProgramNameLen()\n", iRet); - MsgUnlock(pM); - return 0; /* best we can do (consistent wiht what getProgramName() returns) */ - } - MsgUnlock(pM); + /* re-query as things might have changed during locking */ + if(pM->pCSProgName == NULL) + aquireProgramName(pM); - return (pM->pCSProgName == NULL) ? 0 : rsCStrLen(pM->pCSProgName); + if(bLockMutex == LOCK_MUTEX) + MsgUnlock(pM); + } } -/* get the "programname" as sz string +/* get the length of the "programname" sz string * rgerhards, 2005-10-19 */ -char *getProgramName(msg_t *pM) /* this is the non-locking version for internal use */ +int getProgramNameLen(msg_t *pM, bool bLockMutex) { - int iRet; - char *pszRet; - assert(pM != NULL); - MsgLock(pM); - if((iRet = aquireProgramName(pM)) != RS_RET_OK) { - dbgprintf("error %d returned by aquireProgramName() in getProgramName()\n", iRet); - pszRet = ""; /* best we can do */ - } else { - pszRet = (pM->pCSProgName == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSProgName); - } - - MsgUnlock(pM); - return pszRet; + prepareProgramName(pM, bLockMutex); + return (pM->pCSProgName == NULL) ? 0 : rsCStrLen(pM->pCSProgName); } -/* The code below was an approach without PTHREAD_MUTEX_RECURSIVE - * However, it turned out to be quite complex. So far, we use recursive - * locking, which is OK from a performance point of view, especially as - * we do not anticipate that multithreading msg objects is used often. - * However, we may re-think about using non-recursive locking and I leave this - * code in here to conserve the idea. -- rgerhards, 2008-01-05 - */ -#if 0 -static char *getProgramNameNoLock(msg_t *pM) /* this is the non-locking version for internal use */ -{ - int iRet; - assert(pM != NULL); - if((iRet = aquireProgramName(pM)) != RS_RET_OK) { - dbgprintf("error %d returned by aquireProgramName() in getProgramName()\n", iRet); - return ""; /* best we can do */ - } - return (pM->pCSProgName == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSProgName); -} -char *getProgramName(msg_t *pM) /* this is the external callable version */ +/* get the "programname" as sz string + * rgerhards, 2005-10-19 + */ +char *getProgramName(msg_t *pM, bool bLockMutex) { - char *pszRet; - - MsgLock(pM); - pszRet = getProgramNameNoLock(pM); - MsgUnlock(pM); - return pszRet; + prepareProgramName(pM, bLockMutex); + return (pM->pCSProgName == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSProgName); } -/* an alternative approach has been: */ -/* The macro below is used to generate external function definitions - * for such functions that may also be called internally (and thus have - * both a locking and non-locking implementation. Over time, we could - * reconsider how we handle that. -- rgerhards, 2008-01-05 - */ -#define EXT_LOCKED_FUNC(fName, ret) \ -ret fName(msg_t *pM) \ -{ \ - ret valRet; \ - MsgLock(pM); \ - valRet = fName##NoLock(pM); \ - MsgUnlock(pM); \ - return(valRet); \ -} -EXT_LOCKED_FUNC(getProgramName, char*) -/* in this approach, the external function is provided by the macro and - * needs not to be writen. - */ -#endif /* #if 0 -- saved code */ /* This function tries to emulate APPNAME if it is not present. Its * main use is when we have received a log record via legacy syslog and * now would like to send out the same one via syslog-protocol. + * MUST be called with the Msg Lock locked! */ static void tryEmulateAPPNAME(msg_t *pM) { @@ -1629,18 +1552,46 @@ static void tryEmulateAPPNAME(msg_t *pM) if(getProtocolVersion(pM) == 0) { /* only then it makes sense to emulate */ - MsgSetAPPNAME(pM, getProgramName(pM)); + MsgSetAPPNAME(pM, getProgramName(pM, MUTEX_ALREADY_LOCKED)); + } +} + + + +/* check if we have a APPNAME, and, if not, try to aquire/emulate it. + * This must be called WITHOUT the message lock being held. + * rgerhards, 2009-06-26 + */ +static inline void prepareAPPNAME(msg_t *pM, bool bLockMutex) +{ + if(pM->pCSAPPNAME == NULL) { + if(bLockMutex == LOCK_MUTEX) + MsgLock(pM); + + /* re-query as things might have changed during locking */ + if(pM->pCSAPPNAME == NULL) + tryEmulateAPPNAME(pM); + + if(bLockMutex == LOCK_MUTEX) + MsgUnlock(pM); } } +/* rgerhards, 2005-11-24 + */ +char *getAPPNAME(msg_t *pM, bool bLockMutex) +{ + assert(pM != NULL); + prepareAPPNAME(pM, bLockMutex); + return (pM->pCSAPPNAME == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSAPPNAME); +} /* rgerhards, 2005-11-24 */ -static int getAPPNAMELen(msg_t *pM) +static int getAPPNAMELen(msg_t *pM, bool bLockMutex) { assert(pM != NULL); - if(pM->pCSAPPNAME == NULL) - tryEmulateAPPNAME(pM); + prepareAPPNAME(pM, bLockMutex); return (pM->pCSAPPNAME == NULL) ? 0 : rsCStrLen(pM->pCSAPPNAME); } @@ -1991,15 +1942,15 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, } else if(!strcmp((char*) pName, "timegenerated")) { pRes = getTimeGenerated(pMsg, pTpe->data.field.eDateFormat); } else if(!strcmp((char*) pName, "programname")) { - pRes = getProgramName(pMsg); + pRes = getProgramName(pMsg, LOCK_MUTEX); } else if(!strcmp((char*) pName, "protocol-version")) { pRes = getProtocolVersionString(pMsg); } else if(!strcmp((char*) pName, "structured-data")) { pRes = getStructuredData(pMsg); } else if(!strcmp((char*) pName, "app-name")) { - pRes = getAPPNAME(pMsg); + pRes = getAPPNAME(pMsg, LOCK_MUTEX); } else if(!strcmp((char*) pName, "procid")) { - pRes = getPROCID(pMsg); + pRes = getPROCID(pMsg, LOCK_MUTEX); } else if(!strcmp((char*) pName, "msgid")) { pRes = getMSGID(pMsg); /* here start system properties (those, that do not relate to the message itself */ diff --git a/runtime/msg.h b/runtime/msg.h index 4bfc1e3f..56ba5b80 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -168,14 +168,14 @@ rsRetVal MsgEnableThreadSafety(void); /* TODO: remove these five (so far used in action.c) */ char *getMSG(msg_t *pM); char *getHOSTNAME(msg_t *pM); -char *getPROCID(msg_t *pM); -char *getAPPNAME(msg_t *pM); +char *getPROCID(msg_t *pM, bool bLockMutex); +char *getAPPNAME(msg_t *pM, bool bLockMutex); int getMSGLen(msg_t *pM); char *getHOSTNAME(msg_t *pM); int getHOSTNAMELen(msg_t *pM); -char *getProgramName(msg_t *pM); -int getProgramNameLen(msg_t *pM); +char *getProgramName(msg_t *pM, bool bLockMutex); +int getProgramNameLen(msg_t *pM, bool bLockMutex); uchar *getRcvFrom(msg_t *pM); /* The MsgPrepareEnqueue() function is a macro for performance reasons. diff --git a/runtime/parser.c b/runtime/parser.c index 0a27c982..d4ca7673 100644 --- a/runtime/parser.c +++ b/runtime/parser.c @@ -307,7 +307,6 @@ rsRetVal parseMsg(msg_t *pMsg) /* finalize message object */ pMsg->msgFlags &= ~NEEDS_PARSING; /* this message is now parsed */ - MsgPrepareEnqueue(pMsg); /* "historical" name - preparese for multi-threading */ finalize_it: RETiRet; diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index a55b867f..09c3a066 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -430,9 +430,8 @@ typedef enum rsObjectID rsObjID; #endif /* some constants */ -// TODO: do we really need them - if not, delete -- rgerhards, 2009-06-10 -#define IGNORE_ERROR_CODES 1 -#define ABORT_ON_ERROR 0 +#define MUTEX_ALREADY_LOCKED 0 +#define LOCK_MUTEX 1 /* The following prototype is convenient, even though it may not be the 100% correct place.. -- rgerhards 2008-01-07 */ void dbgprintf(char *, ...) __attribute__((format(printf, 1, 2))); diff --git a/runtime/rule.c b/runtime/rule.c index 12494543..221368e6 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -147,13 +147,14 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) offset = 1; } } - if(!rsCStrOffsetSzStrCmp(pRule->pCSProgNameComp, offset, (uchar*) getProgramName(pMsg), getProgramNameLen(pMsg))) + if(!rsCStrOffsetSzStrCmp(pRule->pCSProgNameComp, offset, + (uchar*) getProgramName(pMsg, LOCK_MUTEX), getProgramNameLen(pMsg, LOCK_MUTEX))) bEqv = 1; if((!bEqv && !bInv) || (bEqv && bInv)) { /* not equal or inverted selection, so we are already done... */ - dbgprintf("programname filter '%s' does not match '%s'\n", - rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp), getProgramName(pMsg)); + DBGPRINTF("programname filter '%s' does not match '%s'\n", + rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp), getProgramName(pMsg, LOCK_MUTEX)); FINALIZE; } } diff --git a/runtime/srUtils.h b/runtime/srUtils.h index 9d6360e7..6d5a784b 100644 --- a/runtime/srUtils.h +++ b/runtime/srUtils.h @@ -109,8 +109,6 @@ rsRetVal getFileSize(uchar *pszName, off_t *pSize); #define mutex_cancelsafe_unlock(mut) pthread_cleanup_pop(1) /* some useful constants */ -#define MUTEX_ALREADY_LOCKED 0 -#define LOCK_MUTEX 1 #define DEFVARS_mutexProtection\ int iCancelStateSave; \ int bLockedOpIsLocked=0 diff --git a/runtime/stringbuf.c b/runtime/stringbuf.c index d3ddf33a..f3824831 100644 --- a/runtime/stringbuf.c +++ b/runtime/stringbuf.c @@ -71,7 +71,6 @@ rsRetVal cstrConstruct(cstr_t **ppThis) pThis->pszBuf = NULL; pThis->iBufSize = 0; pThis->iStrLen = 0; - pThis->iAllocIncrement = RS_STRINGBUF_ALLOC_INCREMENT; *ppThis = pThis; finalize_it: @@ -154,7 +153,7 @@ void rsCStrDestruct(cstr_t **ppThis) * rgerhards, 2008-01-07 * changed to utilized realloc() -- rgerhards, 2009-06-16 */ -static rsRetVal +rsRetVal rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded) { uchar *pNewBuf; @@ -162,16 +161,16 @@ rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded) DEFiRet; /* first compute the new size needed */ - if(iMinNeeded > pThis->iAllocIncrement) { - /* we allocate "n" iAllocIncrements. Usually, that should + if(iMinNeeded > RS_STRINGBUF_ALLOC_INCREMENT) { + /* we allocate "n" ALLOC_INCREMENTs. Usually, that should * leave some room after the absolutely needed one. It also * reduces memory fragmentation. Note that all of this are * integer operations (very important to understand what is * going on)! Parenthesis are for better readibility. */ - iNewSize = ((iMinNeeded / pThis->iAllocIncrement) + 1) * pThis->iAllocIncrement; + iNewSize = (iMinNeeded / RS_STRINGBUF_ALLOC_INCREMENT + 1) * RS_STRINGBUF_ALLOC_INCREMENT; } else { - iNewSize = pThis->iBufSize + pThis->iAllocIncrement; + iNewSize = pThis->iBufSize + RS_STRINGBUF_ALLOC_INCREMENT; } iNewSize += pThis->iBufSize; /* add current size */ @@ -246,33 +245,10 @@ finalize_it: } -/* Append a character to the current string object. This may only be done until - * cstrFinalize() is called. - * rgerhards, 2009-06-16 - */ -rsRetVal cstrAppendChar(cstr_t *pThis, uchar c) -{ - DEFiRet; - - rsCHECKVALIDOBJECT(pThis, OIDrsCStr); - - if(pThis->iStrLen >= pThis->iBufSize) { - CHKiRet(rsCStrExtendBuf(pThis, 1)); /* need more memory! */ - } - - /* ok, when we reach this, we have sufficient memory */ - *(pThis->pBuf + pThis->iStrLen++) = c; - -finalize_it: - RETiRet; -} - - /* Sets the string object to the classigal sz-string provided. * Any previously stored vlaue is discarded. If a NULL pointer * the the new value (pszNew) is provided, an empty string is - * created (this is NOT an error!). Property iAllocIncrement is - * not modified by this function. + * created (this is NOT an error!). * rgerhards, 2005-10-18 */ rsRetVal rsCStrSetSzStr(cstr_t *pThis, uchar *pszNew) @@ -290,7 +266,6 @@ rsRetVal rsCStrSetSzStr(cstr_t *pThis, uchar *pszNew) pThis->iStrLen = strlen((char*)pszNew); pThis->iBufSize = pThis->iStrLen; pThis->pszBuf = NULL; - /* iAllocIncrement is NOT modified! */ /* now save the new value */ if((pThis->pBuf = (uchar*) malloc(sizeof(uchar) * pThis->iStrLen)) == NULL) { @@ -472,14 +447,6 @@ finalize_it: } -void rsCStrSetAllocIncrement(cstr_t *pThis, int iNewIncrement) -{ - rsCHECKVALIDOBJECT(pThis, OIDrsCStr); - assert(iNewIncrement > 0); - pThis->iAllocIncrement = iNewIncrement; -} - - /* return the length of the current string * 2005-09-09 rgerhards * Please note: this is only a function in a debug build. diff --git a/runtime/stringbuf.h b/runtime/stringbuf.h index 9d2e7865..4fbd9a9b 100644 --- a/runtime/stringbuf.h +++ b/runtime/stringbuf.h @@ -35,6 +35,7 @@ #ifndef _STRINGBUF_H_INCLUDED__ #define _STRINGBUF_H_INCLUDED__ 1 + /** * The dynamic string buffer object. */ @@ -47,7 +48,6 @@ typedef struct cstr_s uchar *pszBuf; /**< pointer to the sz version of the string (after it has been created )*/ size_t iBufSize; /**< current maximum size of the string buffer */ size_t iStrLen; /**< length of the string in characters. */ - unsigned short iAllocIncrement; /**< the amount of bytes the string should be expanded if it needs to */ bool bIsForeignBuf; /**< is pBuf a buffer provided by someone else? */ } cstr_t; @@ -66,13 +66,28 @@ rsRetVal rsCStrConstructFromCStr(cstr_t **ppThis, cstr_t *pFrom); void rsCStrDestruct(cstr_t **ppThis); #define cstrDestruct(x) rsCStrDestruct((x)) -/** - * Append a character to an existing string. If necessary, the - * method expands the string buffer. - * - * \param c Character to append to string. + +/* Append a character to the current string object. This may only be done until + * cstrFinalize() is called. + * rgerhards, 2009-06-16 */ -rsRetVal cstrAppendChar(cstr_t *pThis, uchar c); +rsRetVal rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded); /* our helper, NOT a public interface! */ +static inline rsRetVal cstrAppendChar(cstr_t *pThis, uchar c) +{ + rsRetVal iRet; + + rsCHECKVALIDOBJECT(pThis, OIDrsCStr); + + if(pThis->iStrLen >= pThis->iBufSize) { + CHKiRet(rsCStrExtendBuf(pThis, 1)); /* need more memory! */ + } + + /* ok, when we reach this, we have sufficient memory */ + *(pThis->pBuf + pThis->iStrLen++) = c; + +finalize_it: + return iRet; +} /** * Truncate "n" number of characters from the end of the @@ -101,22 +116,6 @@ rsRetVal rsCStrAppendStr(cstr_t *pThis, uchar* psz); */ rsRetVal rsCStrAppendStrWithLen(cstr_t *pThis, uchar* psz, size_t iStrLen); -/** - * Set a new allocation incremet. This will influence - * the allocation the next time the string will be expanded. - * It can be set and changed at any time. If done immediately - * after custructing the StrB object, this will also be - * the inital allocation. - * - * \param iNewIncrement The new increment size - * - * \note It is possible to use a very low increment, e.g. 1 byte. - * This can generate a considerable overhead. We highly - * advise not to use an increment below 32 bytes, except - * if you are very well aware why you are doing it ;) - */ -void rsCStrSetAllocIncrement(cstr_t *pThis, int iNewIncrement); -#define rsCStrGetAllocIncrement(pThis) ((pThis)->iAllocIncrement) /** * Append an integer to the string. No special formatting is @@ -373,7 +373,6 @@ static int do_Constant(unsigned char **pp, struct template *pTpl) if(cstrConstruct(&pStrB) != RS_RET_OK) return 1; - rsCStrSetAllocIncrement(pStrB, 32); /* process the message and expand escapes * (additional escapes can be added here if needed) */ diff --git a/tests/Makefile.am b/tests/Makefile.am index f6bb0587..cbbca074 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -18,6 +18,8 @@ TESTS += omod-if-array.sh \ parsertest.sh \ timestamp.sh \ inputname.sh \ + threadingmq.sh \ + threadingmqaq.sh \ fieldtest.sh endif @@ -116,6 +118,10 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ queue-persist.sh \ queue-persist-drvr.sh \ testsuites/queue-persist.conf \ + threadingmq.sh \ + testsuites/threadingmq.conf \ + threadingmqaq.sh \ + testsuites/threadingmqaq.conf \ DiagTalker.java \ cfg.sh diff --git a/tests/testsuites/threadingmq.conf b/tests/testsuites/threadingmq.conf new file mode 100644 index 00000000..aa5197bb --- /dev/null +++ b/tests/testsuites/threadingmq.conf @@ -0,0 +1,20 @@ +# Threading test, we run a tcp flood to via an +# engine instructed to use multiple threads +# rgerhards, 2009-06-26 +$IncludeConfig diag-common.conf + +$ModLoad ../plugins/imtcp/.libs/imtcp +$MainMsgQueueTimeoutShutdown 10000 +$MaxOpenFiles 2000 +$InputTCPMaxSessions 1100 +$InputTCPServerRun 13514 + +$MainMsgQueueWorkerThreadMinimumMessages 10 +$MainMsgQueueWorkerThreads 5 + +$template outfmt,"%msg:F,58:2%\n" +$template dynfile,"rsyslog.out.log" # trick to use relative path names! +# write quickly to the output file: +$OMFileFlushOnTXEnd off +$OMFileIOBufferSize 256k +:msg, contains, "msgnum:" ?dynfile;outfmt diff --git a/tests/testsuites/threadingmqaq.conf b/tests/testsuites/threadingmqaq.conf new file mode 100644 index 00000000..f1bb72df --- /dev/null +++ b/tests/testsuites/threadingmqaq.conf @@ -0,0 +1,24 @@ +# Threading test, we run a tcp flood to via an +# engine instructed to use multiple threads +# rgerhards, 2009-06-26 +$IncludeConfig diag-common.conf + +$ModLoad ../plugins/imtcp/.libs/imtcp +$MainMsgQueueTimeoutShutdown 10000 +$MaxOpenFiles 2000 +$InputTCPMaxSessions 1100 +$InputTCPServerRun 13514 + +$MainMsgQueueWorkerThreadMinimumMessages 10 +$MainMsgQueueWorkerThreads 5 + +$template outfmt,"%msg:F,58:2%\n" +$template dynfile,"rsyslog.out.log" # trick to use relative path names! +# write quickly to the output file: +$OMFileFlushOnTXEnd off +$OMFileIOBufferSize 256k +# This time, also run the action queue detached +$ActionQueueWorkerThreadMinimumMessages 10 +$ActionQueueWorkerThreads 5 +$ActionQueueType LinkedList +:msg, contains, "msgnum:" ?dynfile;outfmt diff --git a/tests/threadingmq.sh b/tests/threadingmq.sh new file mode 100755 index 00000000..5c29ec60 --- /dev/null +++ b/tests/threadingmq.sh @@ -0,0 +1,15 @@ +# test many concurrent tcp connections +# we send 100,000 messages in the hopes that his puts at least a little bit +# of pressure on the threading subsystem. To really prove it, we would need to +# push messages for several minutes, but that takes too long during the +# automatted tests (hint: do this manually after suspect changes). Thankfully, +# in practice many threading bugs result in an abort rather quickly and these +# should be covered by this test here. +# rgerhards, 2009-06-26 +echo TEST: threadingmq.sh - main queue concurrency +source $srcdir/diag.sh init +source $srcdir/diag.sh startup threadingmq.conf +source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 100000 +source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +source $srcdir/diag.sh seq-check 0 99999 +source $srcdir/diag.sh exit diff --git a/tests/threadingmqaq.sh b/tests/threadingmqaq.sh new file mode 100755 index 00000000..009551fd --- /dev/null +++ b/tests/threadingmqaq.sh @@ -0,0 +1,15 @@ +# test many concurrent tcp connections +# we send 100,000 messages in the hopes that his puts at least a little bit +# of pressure on the threading subsystem. To really prove it, we would need to +# push messages for several minutes, but that takes too long during the +# automatted tests (hint: do this manually after suspect changes). Thankfully, +# in practice many threading bugs result in an abort rather quickly and these +# should be covered by this test here. +# rgerhards, 2009-06-26 +echo TEST: threadingmqaq.sh - main/action queue concurrency +source $srcdir/diag.sh init +source $srcdir/diag.sh startup threadingmqaq.conf +source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 100000 +source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +source $srcdir/diag.sh seq-check 0 99999 +source $srcdir/diag.sh exit |