diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-05 13:05:17 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-05 13:05:17 +0000 |
commit | e055d4921b9a53e9dfedc56bbff3a9b12400d34d (patch) | |
tree | 7a0969cefb4fdd6023faec7e2614d033fdb78480 /msg.c | |
parent | 800ac1889b99057f1e6670d4ce5941bda33b6773 (diff) | |
download | rsyslog-e055d4921b9a53e9dfedc56bbff3a9b12400d34d.tar.gz rsyslog-e055d4921b9a53e9dfedc56bbff3a9b12400d34d.tar.xz rsyslog-e055d4921b9a53e9dfedc56bbff3a9b12400d34d.zip |
added capability for concurrent access to the msg class. Can be dynamically
activated. If active, locking is employed.
Diffstat (limited to 'msg.c')
-rw-r--r-- | msg.c | 355 |
1 files changed, 256 insertions, 99 deletions
@@ -87,6 +87,9 @@ static syslogCODE rs_facilitynames[] = { NULL, -1 } }; +/* some forward declarations */ +static int getAPPNAMELen(msg_t *pM); + /* The following functions will support advanced output module * multithreading, once this is implemented. Currently, we * include them as hooks only. The idea is that we need to guard @@ -99,9 +102,98 @@ static syslogCODE rs_facilitynames[] = * for "set" methods, as these are called during input. Only "get" * functions that modify important structures have them. * rgerhards, 2007-07-20 + * We now support locked and non-locked operations, depending on + * the configuration of rsyslog. To support this, we use function + * pointers. Initially, we start in non-locked mode. There, all + * locking operations call into dummy functions. When locking is + * enabled, the function pointers are changed to functions doing + * actual work. We also introduced another MsgPrepareEnqueue() function + * which initializes the locking structures, if needed. This is + * necessary because internal messages during config file startup + * processing are always created in non-locking mode. So we can + * not initialize locking structures during constructions. We now + * postpone this until when the message is fully constructed and + * enqueued. Then we know the status of locking. This has a nice + * side effect, and that is that during the initial creation of + * the Msg object no locking needs to be done, which results in better + * performance. -- rgerhards, 2008-01-05 + */ +static void (*funcLock)(msg_t *pMsg); +static void (*funcUnlock)(msg_t *pMsg); +static void (*funcDeleteMutex)(msg_t *pMsg); +void (*funcMsgPrepareEnqueue)(msg_t *pMsg); +#if 1 /* This is a debug aid */ +#define MsgLock(pMsg) funcLock(pMsg) +#define MsgUnlock(pMsg) funcUnlock(pMsg) +#else +#define MsgLock(pMsg) {dbgprintf("line %d\n - ", __LINE__); funcLock(pMsg);; } +#define MsgUnlock(pMsg) {dbgprintf("line %d - ", __LINE__); funcUnlock(pMsg); } +#endif + +/* the next function is a dummy to be used by the looking functions + * when the class is not yet running in an environment where locking + * is necessary. Please note that the need to lock can (and will) change + * during a single run. Typically, this is depending on the operation mode + * of the message queues (which is operator-configurable). -- rgerhards, 2008-01-05 + */ +static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg) +{ + /* empty be design */ +} + + +/* The following function prepares a message for enqueue into the queue. This is + * where a message may be accessed by multiple threads. This implementation here + * is the version for multiple concurrent acces. It initializes the locking + * structures. + */ +static void MsgPrepareEnqueueLockingCase(msg_t *pThis) +{ + assert(pThis != NULL); + pthread_mutexattr_settype(&pThis->mutAttr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&pThis->mut, &pThis->mutAttr); +} + +/* ... and now the locking and unlocking implementations: */ +static void MsgLockLockingCase(msg_t *pThis) +{ + assert(pThis != NULL); + pthread_mutex_lock(&pThis->mut); +} + +static void MsgUnlockLockingCase(msg_t *pThis) +{ + assert(pThis != NULL); + pthread_mutex_unlock(&pThis->mut); +} + +/* delete the mutex object on message destruction (locking case) + */ +static void MsgDeleteMutexLockingCase(msg_t *pThis) +{ + assert(pThis != NULL); + pthread_mutex_destroy(&pThis->mut); +} + +/* enable multiple concurrent access on the message object + * This works on a class-wide basis and can bot be undone. + * That is, if it is once enabled, it can not be disabled during + * the same run. When this function is called, no other thread + * must manipulate message objects. Then we would have race conditions, + * but guarding against this is counter-productive because it + * would cost additional time. Plus, it would be a programming error. + * rgerhards, 2008-01-05 */ -#define MsgLock(pMsg) -#define MsgUnlock(pMsg) +rsRetVal MsgEnableThreadSafety(void) +{ + funcLock = MsgLockLockingCase; + funcUnlock = MsgUnlockLockingCase; + funcMsgPrepareEnqueue = MsgPrepareEnqueueLockingCase; + funcDeleteMutex = MsgDeleteMutexLockingCase; + return RS_RET_OK; +} + +/* end locking functions */ /* "Constructor" for a msg "object". Returns a pointer to @@ -187,6 +279,7 @@ rsRetVal MsgDestruct(msg_t * pM) rsCStrDestruct(pM->pCSPROCID); if(pM->pCSMSGID != NULL) rsCStrDestruct(pM->pCSMSGID); + funcDeleteMutex(pM); free(pM); } @@ -341,9 +434,9 @@ finalize_it: msg_t *MsgAddRef(msg_t *pM) { assert(pM != NULL); - MsgLock(); + MsgLock(pM); pM->iRefCount++; - MsgUnlock(); + MsgUnlock(pM); /* DEV debugging only! dbgprintf("MsgAddRef\t0x%x done, Ref now: %d\n", (int)pM, pM->iRefCount);*/ return(pM); } @@ -546,7 +639,7 @@ char *getPRI(msg_t *pM) if(pM == NULL) return ""; - MsgLock(); + MsgLock(pM); if(pM->pszPRI == NULL) { /* OK, we need to construct it... * we use a 5 byte buffer - as of @@ -557,7 +650,7 @@ char *getPRI(msg_t *pM) pM->iLenPRI = snprintf((char*)pM->pszPRI, 5, "%d", LOG_MAKEPRI(pM->iFacility, pM->iSeverity)); } - MsgUnlock(); + MsgUnlock(pM); return (char*)pM->pszPRI; } @@ -578,64 +671,64 @@ char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt) switch(eFmt) { case tplFmtDefault: - MsgLock(); + MsgLock(pM); if(pM->pszTIMESTAMP3164 == NULL) { if((pM->pszTIMESTAMP3164 = malloc(16)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; } formatTimestamp3164(&pM->tTIMESTAMP, pM->pszTIMESTAMP3164, 16); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszTIMESTAMP3164); case tplFmtMySQLDate: - MsgLock(); + MsgLock(pM); if(pM->pszTIMESTAMP_MySQL == NULL) { if((pM->pszTIMESTAMP_MySQL = malloc(15)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; } formatTimestampToMySQL(&pM->tTIMESTAMP, pM->pszTIMESTAMP_MySQL, 15); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszTIMESTAMP_MySQL); case tplFmtPgSQLDate: - MsgLock(); + MsgLock(pM); if(pM->pszTIMESTAMP_PgSQL == NULL) { if((pM->pszTIMESTAMP_PgSQL = malloc(21)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; } formatTimestampToPgSQL(&pM->tTIMESTAMP, pM->pszTIMESTAMP_PgSQL, 21); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszTIMESTAMP_PgSQL); case tplFmtRFC3164Date: - MsgLock(); + MsgLock(pM); if(pM->pszTIMESTAMP3164 == NULL) { if((pM->pszTIMESTAMP3164 = malloc(16)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; } formatTimestamp3164(&pM->tTIMESTAMP, pM->pszTIMESTAMP3164, 16); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszTIMESTAMP3164); case tplFmtRFC3339Date: - MsgLock(); + MsgLock(pM); if(pM->pszTIMESTAMP3339 == NULL) { if((pM->pszTIMESTAMP3339 = malloc(33)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; /* TODO: check this: can it cause a free() of constant memory?) */ } formatTimestamp3339(&pM->tTIMESTAMP, pM->pszTIMESTAMP3339, 33); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszTIMESTAMP3339); } return "INVALID eFmt OPTION!"; @@ -648,64 +741,64 @@ char *getTimeGenerated(msg_t *pM, enum tplFormatTypes eFmt) switch(eFmt) { case tplFmtDefault: - MsgLock(); + MsgLock(pM); if(pM->pszRcvdAt3164 == NULL) { if((pM->pszRcvdAt3164 = malloc(16)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; } formatTimestamp3164(&pM->tRcvdAt, pM->pszRcvdAt3164, 16); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszRcvdAt3164); case tplFmtMySQLDate: - MsgLock(); + MsgLock(pM); if(pM->pszRcvdAt_MySQL == NULL) { if((pM->pszRcvdAt_MySQL = malloc(15)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; } formatTimestampToMySQL(&pM->tRcvdAt, pM->pszRcvdAt_MySQL, 15); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszRcvdAt_MySQL); case tplFmtPgSQLDate: - MsgLock(); + MsgLock(pM); if(pM->pszRcvdAt_PgSQL == NULL) { if((pM->pszRcvdAt_PgSQL = malloc(21)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; } formatTimestampToPgSQL(&pM->tRcvdAt, pM->pszRcvdAt_PgSQL, 21); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszRcvdAt_PgSQL); case tplFmtRFC3164Date: - MsgLock(); + MsgLock(pM); if(pM->pszRcvdAt3164 == NULL) { if((pM->pszRcvdAt3164 = malloc(16)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; } formatTimestamp3164(&pM->tRcvdAt, pM->pszRcvdAt3164, 16); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszRcvdAt3164); case tplFmtRFC3339Date: - MsgLock(); + MsgLock(pM); if(pM->pszRcvdAt3339 == NULL) { if((pM->pszRcvdAt3339 = malloc(33)) == NULL) { glblHadMemShortage = 1; - MsgUnlock(); + MsgUnlock(pM); return ""; } formatTimestamp3339(&pM->tRcvdAt, pM->pszRcvdAt3339, 33); } - MsgUnlock(); + MsgUnlock(pM); return(pM->pszRcvdAt3339); } return "INVALID eFmt OPTION!"; @@ -717,14 +810,14 @@ char *getSeverity(msg_t *pM) if(pM == NULL) return ""; - MsgLock(); + MsgLock(pM); if(pM->pszSeverity == NULL) { /* we use a 2 byte buffer - can only be one digit */ - if((pM->pszSeverity = malloc(2)) == NULL) { MsgUnlock() ; return ""; } + if((pM->pszSeverity = malloc(2)) == NULL) { MsgUnlock(pM) ; return ""; } pM->iLenSeverity = snprintf((char*)pM->pszSeverity, 2, "%d", pM->iSeverity); } - MsgUnlock(); + MsgUnlock(pM); return((char*)pM->pszSeverity); } @@ -738,7 +831,7 @@ char *getSeverityStr(msg_t *pM) if(pM == NULL) return ""; - MsgLock(); + MsgLock(pM); if(pM->pszSeverityStr == NULL) { for(c = rs_prioritynames, val = pM->iSeverity; c->c_name; c++) if(c->c_val == val) { @@ -747,15 +840,15 @@ char *getSeverityStr(msg_t *pM) } if(name == NULL) { /* we use a 2 byte buffer - can only be one digit */ - if((pM->pszSeverityStr = malloc(2)) == NULL) { MsgUnlock() ; return ""; } + if((pM->pszSeverityStr = malloc(2)) == NULL) { MsgUnlock(pM) ; return ""; } pM->iLenSeverityStr = snprintf((char*)pM->pszSeverityStr, 2, "%d", pM->iSeverity); } else { - if((pM->pszSeverityStr = (uchar*) strdup(name)) == NULL) { MsgUnlock() ; return ""; } + if((pM->pszSeverityStr = (uchar*) strdup(name)) == NULL) { MsgUnlock(pM) ; return ""; } pM->iLenSeverityStr = strlen((char*)name); } } - MsgUnlock(); + MsgUnlock(pM); return((char*)pM->pszSeverityStr); } @@ -764,17 +857,17 @@ char *getFacility(msg_t *pM) if(pM == NULL) return ""; - MsgLock(); + MsgLock(pM); if(pM->pszFacility == NULL) { /* we use a 12 byte buffer - as of * syslog-protocol, facility can go * up to 2^32 -1 */ - if((pM->pszFacility = malloc(12)) == NULL) { MsgUnlock() ; return ""; } + if((pM->pszFacility = malloc(12)) == NULL) { MsgUnlock(pM) ; return ""; } pM->iLenFacility = snprintf((char*)pM->pszFacility, 12, "%d", pM->iFacility); } - MsgUnlock(); + MsgUnlock(pM); return((char*)pM->pszFacility); } @@ -787,7 +880,7 @@ char *getFacilityStr(msg_t *pM) if(pM == NULL) return ""; - MsgLock(); + MsgLock(pM); if(pM->pszFacilityStr == NULL) { for(c = rs_facilitynames, val = pM->iFacility << 3; c->c_name; c++) if(c->c_val == val) { @@ -799,80 +892,57 @@ char *getFacilityStr(msg_t *pM) * syslog-protocol, facility can go * up to 2^32 -1 */ - if((pM->pszFacilityStr = malloc(12)) == NULL) { MsgUnlock() ; return ""; } + if((pM->pszFacilityStr = malloc(12)) == NULL) { MsgUnlock(pM) ; return ""; } pM->iLenFacilityStr = snprintf((char*)pM->pszFacilityStr, 12, "%d", val >> 3); } else { - if((pM->pszFacilityStr = (uchar*)strdup(name)) == NULL) { MsgUnlock() ; return ""; } + if((pM->pszFacilityStr = (uchar*)strdup(name)) == NULL) { MsgUnlock(pM) ; return ""; } pM->iLenFacilityStr = strlen((char*)name); } } - MsgUnlock(); + MsgUnlock(pM); return((char*)pM->pszFacilityStr); } /* rgerhards 2004-11-24: set APP-NAME in msg object + * TODO: revisit msg locking code! */ rsRetVal MsgSetAPPNAME(msg_t *pMsg, char* pszAPPNAME) { + DEFiRet; assert(pMsg != NULL); + //MsgLock(pMsg); if(pMsg->pCSAPPNAME == NULL) { /* we need to obtain the object first */ - if((pMsg->pCSAPPNAME = rsCStrConstruct()) == NULL) + if((pMsg->pCSAPPNAME = rsCStrConstruct()) == NULL) { + // MsgUnlock(pMsg); return RS_RET_OBJ_CREATION_FAILED; /* best we can do... */ + } rsCStrSetAllocIncrement(pMsg->pCSAPPNAME, 128); } /* if we reach this point, we have the object */ - return rsCStrSetSzStr(pMsg->pCSAPPNAME, (uchar*) pszAPPNAME); -} + iRet = rsCStrSetSzStr(pMsg->pCSAPPNAME, (uchar*) pszAPPNAME); + //MsgUnlock(pMsg); - -/* This function tries to emulate APPNAME if it is not present. Its - * main use is when we have received a log record via legacy syslog and - * now would like to send out the same one via syslog-protocol. - */ -static void tryEmulateAPPNAME(msg_t *pM) -{ - assert(pM != NULL); - if(pM->pCSAPPNAME != NULL) - return; /* we are already done */ - - if(getProtocolVersion(pM) == 0) { - /* only then it makes sense to emulate */ - MsgSetAPPNAME(pM, getProgramName(pM)); - } -} - - -/* rgerhards, 2005-11-24 - */ -int getAPPNAMELen(msg_t *pM) -{ - assert(pM != NULL); - MsgLock(); - if(pM->pCSAPPNAME == NULL) - tryEmulateAPPNAME(pM); - MsgUnlock(); - return (pM->pCSAPPNAME == NULL) ? 0 : rsCStrLen(pM->pCSAPPNAME); + return iRet; } +static void tryEmulateAPPNAME(msg_t *pM); /* forward reference */ /* rgerhards, 2005-11-24 */ char *getAPPNAME(msg_t *pM) { assert(pM != NULL); - MsgLock(); + MsgLock(pM); if(pM->pCSAPPNAME == NULL) tryEmulateAPPNAME(pM); - MsgUnlock(); + MsgUnlock(pM); return (pM->pCSAPPNAME == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSAPPNAME); } - - /* rgerhards 2004-11-24: set PROCID in msg object */ rsRetVal MsgSetPROCID(msg_t *pMsg, char* pszPROCID) @@ -893,10 +963,10 @@ rsRetVal MsgSetPROCID(msg_t *pMsg, char* pszPROCID) int getPROCIDLen(msg_t *pM) { assert(pM != NULL); - MsgLock(); + MsgLock(pM); if(pM->pCSPROCID == NULL) aquirePROCIDFromTAG(pM); - MsgUnlock(); + MsgUnlock(pM); return (pM->pCSPROCID == NULL) ? 1 : rsCStrLen(pM->pCSPROCID); } @@ -905,12 +975,15 @@ int getPROCIDLen(msg_t *pM) */ char *getPROCID(msg_t *pM) { + char* pszRet; + assert(pM != NULL); - MsgLock(); + MsgLock(pM); if(pM->pCSPROCID == NULL) aquirePROCIDFromTAG(pM); - MsgUnlock(); - return (pM->pCSPROCID == NULL) ? "-" : (char*) rsCStrGetSzStrNoNULL(pM->pCSPROCID); + pszRet = (pM->pCSPROCID == NULL) ? "-" : (char*) rsCStrGetSzStrNoNULL(pM->pCSPROCID); + MsgUnlock(pM); + return pszRet; } @@ -1027,17 +1100,17 @@ char *getTAG(msg_t *pM) { char *ret; - MsgLock(); if(pM == NULL) ret = ""; else { + MsgLock(pM); tryEmulateTAG(pM); if(pM->pszTAG == NULL) ret = ""; else ret = (char*) pM->pszTAG; + MsgUnlock(pM); } - MsgUnlock(); return(ret); } @@ -1121,13 +1194,13 @@ int getProgramNameLen(msg_t *pM) int iRet; assert(pM != NULL); - MsgLock(); + MsgLock(pM); if((iRet = aquireProgramName(pM)) != RS_RET_OK) { dbgprintf("error %d returned by aquireProgramName() in getProgramNameLen()\n", iRet); - MsgUnlock(); + MsgUnlock(pM); return 0; /* best we can do (consistent wiht what getProgramName() returns) */ } - MsgUnlock(); + MsgUnlock(pM); return (pM->pCSProgName == NULL) ? 0 : rsCStrLen(pM->pCSProgName); } @@ -1136,21 +1209,100 @@ int getProgramNameLen(msg_t *pM) /* get the "programname" as sz string * rgerhards, 2005-10-19 */ -char *getProgramName(msg_t *pM) +char *getProgramName(msg_t *pM) /* this is the non-locking version for internal use */ +{ + int iRet; + char *pszRet; + + assert(pM != NULL); + MsgLock(pM); + if((iRet = aquireProgramName(pM)) != RS_RET_OK) { + dbgprintf("error %d returned by aquireProgramName() in getProgramName()\n", iRet); + pszRet = ""; /* best we can do */ + } else { + pszRet = (pM->pCSProgName == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSProgName); + } + + MsgUnlock(pM); + return pszRet; +} +/* The code below was an approach without PTHREAD_MUTEX_RECURSIVE + * However, it turned out to be quite complex. So far, we use recursive + * locking, which is OK from a performance point of view, especially as + * we do not anticipate that multithreading msg objects is used often. + * However, we may re-think about using non-recursive locking and I leave this + * code in here to conserve the idea. -- rgerhards, 2008-01-05 + */ +#if 0 +static char *getProgramNameNoLock(msg_t *pM) /* this is the non-locking version for internal use */ { int iRet; assert(pM != NULL); - MsgLock(); if((iRet = aquireProgramName(pM)) != RS_RET_OK) { dbgprintf("error %d returned by aquireProgramName() in getProgramName()\n", iRet); - MsgUnlock(); return ""; /* best we can do */ } - MsgUnlock(); return (pM->pCSProgName == NULL) ? "" : (char*) rsCStrGetSzStrNoNULL(pM->pCSProgName); } +char *getProgramName(msg_t *pM) /* this is the external callable version */ +{ + char *pszRet; + + MsgLock(pM); + pszRet = getProgramNameNoLock(pM); + MsgUnlock(pM); + return pszRet; +} +/* an alternative approach has been: */ +/* The macro below is used to generate external function definitions + * for such functions that may also be called internally (and thus have + * both a locking and non-locking implementation. Over time, we could + * reconsider how we handle that. -- rgerhards, 2008-01-05 + */ +#define EXT_LOCKED_FUNC(fName, ret) \ +ret fName(msg_t *pM) \ +{ \ + ret valRet; \ + MsgLock(pM); \ + valRet = fName##NoLock(pM); \ + MsgUnlock(pM); \ + return(valRet); \ +} +EXT_LOCKED_FUNC(getProgramName, char*) +/* in this approach, the external function is provided by the macro and + * needs not to be writen. + */ +#endif /* #if 0 -- saved code */ + + +/* This function tries to emulate APPNAME if it is not present. Its + * main use is when we have received a log record via legacy syslog and + * now would like to send out the same one via syslog-protocol. + */ +static void tryEmulateAPPNAME(msg_t *pM) +{ + assert(pM != NULL); + if(pM->pCSAPPNAME != NULL) + return; /* we are already done */ + + if(getProtocolVersion(pM) == 0) { + /* only then it makes sense to emulate */ + MsgSetAPPNAME(pM, getProgramName(pM)); + } +} + + +/* rgerhards, 2005-11-24 + */ +static int getAPPNAMELen(msg_t *pM) +{ + assert(pM != NULL); + if(pM->pCSAPPNAME == NULL) + tryEmulateAPPNAME(pM); + return (pM->pCSAPPNAME == NULL) ? 0 : rsCStrLen(pM->pCSAPPNAME); +} /* rgerhards 2004-11-16: set pszRcvFrom in msg object @@ -1935,6 +2087,11 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, */ BEGINObjClassInit(Msg) OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize); + /* initially, we have no need to lock message objects */ + funcLock = MsgLockingDummy; + funcUnlock = MsgLockingDummy; + funcDeleteMutex = MsgLockingDummy; + funcMsgPrepareEnqueue = MsgLockingDummy; ENDObjClassInit /* |