diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/conf.c | 48 | ||||
-rw-r--r-- | runtime/datetime.c | 77 | ||||
-rw-r--r-- | runtime/datetime.h | 5 | ||||
-rw-r--r-- | runtime/module-template.h | 2 | ||||
-rw-r--r-- | runtime/modules.c | 80 | ||||
-rw-r--r-- | runtime/nsd_gtls.c | 8 | ||||
-rw-r--r-- | runtime/parser.c | 9 | ||||
-rw-r--r-- | runtime/queue.c | 64 | ||||
-rw-r--r-- | runtime/rsyslog.h | 2 | ||||
-rw-r--r-- | runtime/strmsrv.c | 2 | ||||
-rw-r--r-- | runtime/wti.c | 5 |
11 files changed, 206 insertions, 96 deletions
diff --git a/runtime/conf.c b/runtime/conf.c index 4bd18bf5..46a89281 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -399,6 +399,7 @@ processConfFile(uchar *pConfFile) uchar cbuf[CFGLNSIZ]; uchar *cline; int i; + rsRetVal localRet; int bHadAnError = 0; uchar *pszOrgLine = NULL; size_t lenLine; @@ -460,16 +461,20 @@ processConfFile(uchar *pConfFile) /* we now have the complete line, and are positioned at the first non-whitespace * character. So let's process it */ - if(cfline(cbuf, &pCurrRule) != RS_RET_OK) { + if((localRet = cfline(cbuf, &pCurrRule)) != RS_RET_OK) { /* we log a message, but otherwise ignore the error. After all, the next * line can be correct. -- rgerhards, 2007-08-02 */ uchar szErrLoc[MAXFNAME + 64]; - dbgprintf("config line NOT successfully processed\n"); + if(localRet != RS_RET_OK_WARN) { + dbgprintf("config line NOT successfully processed\n"); + bHadAnError = 1; + } snprintf((char*)szErrLoc, sizeof(szErrLoc) / sizeof(uchar), "%s, line %d", pConfFile, iLnNbr); - errmsg.LogError(0, NO_ERRCODE, "the last error occured in %s:\"%s\"", (char*)szErrLoc, (char*)pszOrgLine); - bHadAnError = 1; + errmsg.LogError(0, NO_ERRCODE, "the last %s occured in %s:\"%s\"", + (localRet == RS_RET_OK_WARN) ? "warning" : "error", + (char*)szErrLoc, (char*)pszOrgLine); } } @@ -651,17 +656,14 @@ static rsRetVal cflineProcessTradPRIFilter(uchar **pline, register rule_t *pRule for (bp=buf; *(bp+1); bp++) *bp=*(bp+1); *bp='\0'; - } - else { + } else { ignorepri = 0; } - if ( *buf == '=' ) - { + if ( *buf == '=' ) { singlpri = 1; pri = decodeSyslogName(&buf[1], syslogPriNames); } - else { - singlpri = 0; + else { singlpri = 0; pri = decodeSyslogName(buf, syslogPriNames); } @@ -689,17 +691,13 @@ static rsRetVal cflineProcessTradPRIFilter(uchar **pline, register rule_t *pRule pRule->f_filterData.f_pmask[i] &= ~(1<<pri); else pRule->f_filterData.f_pmask[i] |= (1<<pri); - } - else - { + } else { if ( pri == TABLE_ALLPRI ) { if ( ignorepri ) pRule->f_filterData.f_pmask[i] = TABLE_NOPRI; else pRule->f_filterData.f_pmask[i] = TABLE_ALLPRI; - } - else - { + } else { if ( ignorepri ) for (i2= 0; i2 <= pri; ++i2) pRule->f_filterData.f_pmask[i] &= ~(1<<i2); @@ -1085,6 +1083,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction) omodStringRequest_t *pOMSR; action_t *pAction = NULL; void *pModData; + int bHadWarning = 0; ASSERT(p != NULL); ASSERT(ppAction != NULL); @@ -1100,6 +1099,10 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction) pOMSR = NULL; iRet = pMod->mod.om.parseSelectorAct(p, &pModData, &pOMSR); dbgprintf("tried selector action for %s: %d\n", module.GetName(pMod), iRet); + if(iRet == RS_RET_OK_WARN) { + bHadWarning = 1; + iRet = RS_RET_OK; + } if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) { if((iRet = addAction(&pAction, pMod, pModData, pOMSR, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* now check if the module is compatible with select features */ @@ -1128,6 +1131,8 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction) } *ppAction = pAction; + if(iRet == RS_RET_OK && bHadWarning) + iRet = RS_RET_OK_WARN; RETiRet; } @@ -1142,6 +1147,8 @@ cflineClassic(uchar *p, rule_t **ppRule) { DEFiRet; action_t *pAction; + rsRetVal localRet; + int bHadWarning = 0; /* lines starting with '&' have no new filters and just add * new actions to the currently processed selector. @@ -1168,10 +1175,17 @@ cflineClassic(uchar *p, rule_t **ppRule) CHKiRet(cflineDoFilter(&p, *ppRule)); /* pull filters */ } - CHKiRet(cflineDoAction(&p, &pAction)); + localRet = cflineDoAction(&p, &pAction); + if(localRet == RS_RET_OK_WARN) { + bHadWarning = 1; + } else { + CHKiRet(localRet); + } CHKiRet(llAppend(&(*ppRule)->llActList, NULL, (void*) pAction)); finalize_it: + if(iRet == RS_RET_OK && bHadWarning) + iRet = RS_RET_OK_WARN; RETiRet; } diff --git a/runtime/datetime.c b/runtime/datetime.c index 679ce0b4..85cbab84 100644 --- a/runtime/datetime.c +++ b/runtime/datetime.c @@ -53,6 +53,47 @@ static const int tenPowers[6] = { 1, 10, 100, 1000, 10000, 100000 }; /* ------------------------------ methods ------------------------------ */ +/** + * Convert struct timeval to syslog_time + */ +void +timeval2syslogTime(struct timeval *tp, struct syslogTime *t) +{ + struct tm *tm; + struct tm tmBuf; + long lBias; + + tm = localtime_r((time_t*) &(tp->tv_sec), &tmBuf); + + t->year = tm->tm_year + 1900; + t->month = tm->tm_mon + 1; + t->day = tm->tm_mday; + t->hour = tm->tm_hour; + t->minute = tm->tm_min; + t->second = tm->tm_sec; + t->secfrac = tp->tv_usec; + t->secfracPrecision = 6; + +# if __sun + /* Solaris uses a different method of exporting the time zone. + * It is UTC - localtime, which is the opposite sign of mins east of GMT. + */ + lBias = -(daylight ? altzone : timezone); +# elif defined(__hpux) + lBias = tz.tz_dsttime ? - tz.tz_minuteswest : 0; +# else + lBias = tm->tm_gmtoff; +# endif + if(lBias < 0) { + t->OffsetMode = '-'; + lBias *= -1; + } else + t->OffsetMode = '+'; + t->OffsetHour = lBias / 3600; + t->OffsetMinute = (lBias % 3600) / 60; + t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */ +} + /** * Get the current date/time in the best resolution the operating * system has to offer (well, actually at most down to the milli- @@ -72,9 +113,6 @@ static const int tenPowers[6] = { 1, 10, 100, 1000, 10000, 100000 }; static void getCurrTime(struct syslogTime *t, time_t *ttSeconds) { struct timeval tp; - struct tm *tm; - struct tm tmBuf; - long lBias; # if defined(__hpux) struct timezone tz; # endif @@ -91,37 +129,7 @@ static void getCurrTime(struct syslogTime *t, time_t *ttSeconds) if(ttSeconds != NULL) *ttSeconds = tp.tv_sec; - tm = localtime_r((time_t*) &(tp.tv_sec), &tmBuf); - - t->year = tm->tm_year + 1900; - t->month = tm->tm_mon + 1; - t->day = tm->tm_mday; - t->hour = tm->tm_hour; - t->minute = tm->tm_min; - t->second = tm->tm_sec; - t->secfrac = tp.tv_usec; - t->secfracPrecision = 6; - -# if __sun - /* Solaris uses a different method of exporting the time zone. - * It is UTC - localtime, which is the opposite sign of mins east of GMT. - */ - lBias = -(daylight ? altzone : timezone); -# elif defined(__hpux) - lBias = tz.tz_dsttime ? - tz.tz_minuteswest : 0; -# else - lBias = tm->tm_gmtoff; -# endif - if(lBias < 0) - { - t->OffsetMode = '-'; - lBias *= -1; - } - else - t->OffsetMode = '+'; - t->OffsetHour = lBias / 3600; - t->OffsetMinute = (lBias % 3600) / 60; - t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */ + timeval2syslogTime(&tp, t); } @@ -859,6 +867,7 @@ CODESTARTobjQueryInterface(datetime) */ pIf->getCurrTime = getCurrTime; pIf->GetTime = getTime; + pIf->timeval2syslogTime = timeval2syslogTime; pIf->ParseTIMESTAMP3339 = ParseTIMESTAMP3339; pIf->ParseTIMESTAMP3164 = ParseTIMESTAMP3164; pIf->formatTimestampToMySQL = formatTimestampToMySQL; diff --git a/runtime/datetime.h b/runtime/datetime.h index 7fcd273b..acf54df5 100644 --- a/runtime/datetime.h +++ b/runtime/datetime.h @@ -42,8 +42,10 @@ BEGINinterface(datetime) /* name must also be changed in ENDinterface macro! */ int (*formatTimestampSecFrac)(struct syslogTime *ts, char* pBuf); /* v3, 2009-11-12 */ time_t (*GetTime)(time_t *ttSeconds); + /* v6, 2011-06-20 */ + void (*timeval2syslogTime)(struct timeval *tp, struct syslogTime *t); ENDinterface(datetime) -#define datetimeCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */ +#define datetimeCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */ /* interface changes: * 1 - initial version * 2 - not compatible to 1 - bugfix required ParseTIMESTAMP3164 to accept char ** as @@ -52,6 +54,7 @@ ENDinterface(datetime) * 3 - taken by v5 branch! * 4 - formatTimestamp3164 takes a third int parameter * 5 - merge of versions 3 + 4 (2010-03-09) + * 6 - see above */ /* prototypes */ diff --git a/runtime/module-template.h b/runtime/module-template.h index e21d6157..63bf9a10 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -275,7 +275,7 @@ static rsRetVal parseSelectorAct(uchar **pp, void **ppModData, omodStringRequest #define CODE_STD_FINALIZERparseSelectorAct \ finalize_it:\ - if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) {\ + if(iRet == RS_RET_OK || iRet == RS_RET_OK_WARN || iRet == RS_RET_SUSPENDED) {\ *ppModData = pData;\ *pp = p;\ } else {\ diff --git a/runtime/modules.c b/runtime/modules.c index 4541bddf..6a32b2e8 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -767,7 +767,6 @@ Load(uchar *pModName) DEFiRet; size_t iPathLen, iModNameLen; - uchar szPath[PATH_MAX]; uchar *pModNameCmp; int bHasExtension; void *pModHdlr, *pModInit; @@ -775,13 +774,25 @@ Load(uchar *pModName) uchar *pModDirCurr, *pModDirNext; int iLoadCnt; struct dlhandle_s *pHandle = NULL; +# ifdef PATH_MAX + uchar pathBuf[PATH_MAX+1]; +# else + uchar pathBuf[4096]; +# endif + uchar *pPathBuf = pathBuf; + size_t lenPathBuf = sizeof(pathBuf); assert(pModName != NULL); dbgprintf("Requested to load module '%s'\n", pModName); + iModNameLen = strlen((char*)pModName); + /* overhead for a full path is potentially 1 byte for a slash, + * three bytes for ".so" and one byte for '\0'. + */ +# define PATHBUF_OVERHEAD 1 + iModNameLen + 3 + 1 + pthread_mutex_lock(&mutLoadUnload); - iModNameLen = strlen((char *) pModName); if(iModNameLen > 3 && !strcmp((char *) pModName + iModNameLen - 3, ".so")) { iModNameLen -= 3; bHasExtension = TRUE; @@ -802,13 +813,19 @@ Load(uchar *pModName) pModDirNext = NULL; pModHdlr = NULL; iLoadCnt = 0; - do { - /* now build our load module name */ + do { /* now build our load module name */ if(*pModName == '/' || *pModName == '.') { - *szPath = '\0'; /* we do not need to append the path - its already in the module name */ + if(lenPathBuf < PATHBUF_OVERHEAD) { + if(pPathBuf != pathBuf) /* already malloc()ed memory? */ + free(pPathBuf); + /* we always alloc enough memory for everything we potentiall need to add */ + lenPathBuf = PATHBUF_OVERHEAD; + CHKmalloc(pPathBuf = malloc(sizeof(char)*lenPathBuf)); + } + *pPathBuf = '\0'; /* we do not need to append the path - its already in the module name */ iPathLen = 0; } else { - *szPath = '\0'; + *pPathBuf = '\0'; iPathLen = strlen((char *)pModDirCurr); pModDirNext = (uchar *)strchr((char *)pModDirCurr, ':'); @@ -821,30 +838,27 @@ Load(uchar *pModName) continue; } break; - } else if(iPathLen > sizeof(szPath) - 1) { - errmsg.LogError(0, NO_ERRCODE, "could not load module '%s', module path too long\n", pModName); - ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_PATHLEN); + } else if(iPathLen > lenPathBuf - PATHBUF_OVERHEAD) { + if(pPathBuf != pathBuf) /* already malloc()ed memory? */ + free(pPathBuf); + /* we always alloc enough memory for everything we potentiall need to add */ + lenPathBuf = iPathLen + PATHBUF_OVERHEAD; + CHKmalloc(pPathBuf = malloc(sizeof(char)*lenPathBuf)); } - strncat((char *) szPath, (char *)pModDirCurr, iPathLen); - iPathLen = strlen((char*) szPath); + memcpy((char *) pPathBuf, (char *)pModDirCurr, iPathLen); + if((pPathBuf[iPathLen - 1] != '/')) { + /* we have space, made sure in previous check */ + pPathBuf[iPathLen++] = '/'; + } + pPathBuf[iPathLen] = '\0'; if(pModDirNext) pModDirCurr = pModDirNext + 1; - - if((szPath[iPathLen - 1] != '/')) { - if((iPathLen <= sizeof(szPath) - 2)) { - szPath[iPathLen++] = '/'; - szPath[iPathLen] = '\0'; - } else { - errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_PATHLEN, "could not load module '%s', path too long\n", pModName); - ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_PATHLEN); - } - } } /* ... add actual name ... */ - strncat((char *) szPath, (char *) pModName, sizeof(szPath) - iPathLen - 1); + strncat((char *) pPathBuf, (char *) pModName, lenPathBuf - iPathLen - 1); /* now see if we have an extension and, if not, append ".so" */ if(!bHasExtension) { @@ -853,17 +867,12 @@ Load(uchar *pModName) * algo over time... -- rgerhards, 2008-03-05 */ /* ... so now add the extension */ - strncat((char *) szPath, ".so", sizeof(szPath) - strlen((char*) szPath) - 1); + strncat((char *) pPathBuf, ".so", lenPathBuf - strlen((char*) pPathBuf) - 1); iPathLen += 3; } - if(iPathLen + strlen((char*) pModName) >= sizeof(szPath)) { - errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_PATHLEN, "could not load module '%s', path too long\n", pModName); - ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_PATHLEN); - } - /* complete load path constructed, so ... GO! */ - dbgprintf("loading module '%s'\n", szPath); + dbgprintf("loading module '%s'\n", pPathBuf); /* see if we have this one already */ for (pHandle = pHandles; pHandle; pHandle = pHandle->next) { @@ -875,7 +884,7 @@ Load(uchar *pModName) /* not found, try to dynamically link it */ if (!pModHdlr) { - pModHdlr = dlopen((char *) szPath, RTLD_NOW); + pModHdlr = dlopen((char *) pPathBuf, RTLD_NOW); } iLoadCnt++; @@ -884,25 +893,28 @@ Load(uchar *pModName) if(!pModHdlr) { if(iLoadCnt) { - errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_DLOPEN, "could not load module '%s', dlopen: %s\n", szPath, dlerror()); + errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_DLOPEN, "could not load module '%s', dlopen: %s\n", + pPathBuf, dlerror()); } else { - errmsg.LogError(0, NO_ERRCODE, "could not load module '%s', ModDir was '%s'\n", szPath, + errmsg.LogError(0, NO_ERRCODE, "could not load module '%s', ModDir was '%s'\n", pPathBuf, ((pModDir == NULL) ? _PATH_MODDIR : (char *)pModDir)); } ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_DLOPEN); } if(!(pModInit = dlsym(pModHdlr, "modInit"))) { - errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_NO_INIT, "could not load module '%s', dlsym: %s\n", szPath, dlerror()); + errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_NO_INIT, "could not load module '%s', dlsym: %s\n", pPathBuf, dlerror()); dlclose(pModHdlr); ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_NO_INIT); } if((iRet = doModInit(pModInit, (uchar*) pModName, pModHdlr)) != RS_RET_OK) { - errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_INIT_FAILED, "could not load module '%s', rsyslog error %d\n", szPath, iRet); + errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_INIT_FAILED, "could not load module '%s', rsyslog error %d\n", pPathBuf, iRet); dlclose(pModHdlr); ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_INIT_FAILED); } finalize_it: + if(pPathBuf != pathBuf) /* used malloc()ed memory? */ + free(pPathBuf); pthread_mutex_unlock(&mutLoadUnload); RETiRet; } diff --git a/runtime/nsd_gtls.c b/runtime/nsd_gtls.c index 036e8290..e1192aaf 100644 --- a/runtime/nsd_gtls.c +++ b/runtime/nsd_gtls.c @@ -29,7 +29,9 @@ #include <string.h> #include <gnutls/gnutls.h> #include <gnutls/x509.h> -#include <gcrypt.h> +#if GNUTLS_VERSION_NUMBER <= 0x020b00 +# include <gcrypt.h> +#endif #include <errno.h> #include <sys/stat.h> #include <unistd.h> @@ -54,7 +56,9 @@ #define CRLFILE "crl.pem" +#if GNUTLS_VERSION_NUMBER <= 0x020b00 GCRY_THREAD_OPTION_PTHREAD_IMPL; +#endif MODULE_TYPE_LIB MODULE_TYPE_KEEP @@ -567,7 +571,9 @@ gtlsGlblInit(void) DEFiRet; /* gcry_control must be called first, so that the thread system is correctly set up */ + #if GNUTLS_VERSION_NUMBER <= 0x020b00 gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread); + #endif CHKgnutls(gnutls_global_init()); /* X509 stuff */ diff --git a/runtime/parser.c b/runtime/parser.c index b385c54b..300db1e0 100644 --- a/runtime/parser.c +++ b/runtime/parser.c @@ -60,6 +60,7 @@ DEFobjCurrIf(ruleset) /* config data */ static uchar cCCEscapeChar = '#';/* character to be used to start an escape sequence for control chars */ static int bEscapeCCOnRcv = 1; /* escape control characters on reception: 0 - no, 1 - yes */ +static int bSpaceLFOnRcv = 0; /* replace newlines with spaces on reception: 0 - no, 1 - yes */ static int bEscape8BitChars = 0; /* escape characters > 127 on reception: 0 - no, 1 - yes */ static int bEscapeTab = 1; /* escape tab control character when doing CC escapes: 0 - no, 1 - yes */ static int bDropTrailingLF = 1; /* drop trailing LF's on reception? */ @@ -354,9 +355,13 @@ SanitizeMsg(msg_t *pMsg) int bNeedSanitize = 0; for(iSrc = 0 ; iSrc < lenMsg ; iSrc++) { if(iscntrl(pszMsg[iSrc])) { + if(bSpaceLFOnRcv && pszMsg[iSrc] == '\n') + pszMsg[iSrc] = ' '; + else if(pszMsg[iSrc] == '\0' || bEscapeCCOnRcv) { bNeedSanitize = 1; - break; + if (!bSpaceLFOnRcv) + break; } } else if(pszMsg[iSrc] > 127 && bEscape8BitChars) { bNeedSanitize = 1; @@ -645,6 +650,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus { cCCEscapeChar = '#'; bEscapeCCOnRcv = 1; /* default is to escape control characters */ + bSpaceLFOnRcv = 0; bEscape8BitChars = 0; /* default is to escape control characters */ bEscapeTab = 1; /* default is to escape control characters */ bDropTrailingLF = 1; /* default is to drop trailing LF's on reception */ @@ -698,6 +704,7 @@ BEGINObjClassInit(parser, 1, OBJ_IS_CORE_MODULE) /* class, version */ CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"spacelfonreceive", 0, eCmdHdlrBinary, NULL, &bSpaceLFOnRcv, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"escape8bitcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscape8BitChars, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactertab", 0, eCmdHdlrBinary, NULL, &bEscapeTab, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); diff --git a/runtime/queue.c b/runtime/queue.c index 0b717289..5df748c2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -83,6 +83,11 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr); +static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); +static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); +static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); +static rsRetVal qDestructDisk(qqueue_t *pThis); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -592,6 +597,47 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis) /* -------------------- disk -------------------- */ +/* The following function is used to "save" ourself from being killed by + * a fatally failed disk queue. A fatal failure is, for example, if no + * data can be read or written. In that case, the disk support is disabled, + * with all on-disk structures kept as-is as much as possible. Instead, the + * queue is switched to direct mode, so that at least + * some processing can happen. Of course, this may still have lots of + * undesired side-effects, but is probably better than aborting the + * syslogd. Note that this function *must* succeed in one way or another, as + * we can not recover from failure here. But it may emit different return + * states, which can trigger different processing in the higher layers. + * rgerhards, 2011-05-03 + */ +static inline rsRetVal +queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) +{ + pThis->iQueueSize = 0; + pThis->nLogDeq = 0; + qDestructDisk(pThis); /* free disk structures */ + + pThis->qType = QUEUETYPE_DIRECT; + pThis->qConstruct = qConstructDirect; + pThis->qDestruct = qDestructDirect; + pThis->qAdd = qAddDirect; + pThis->qDel = qDelDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; + if(pThis->pqParent != NULL) { + DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); + pThis->pqParent->bIsDA = 0; + pThis->pqParent->pqDA = NULL; + /* This may have undesired side effects, not sure if I really evaluated + * all. So you know where to look at if you come to this point during + * troubleshooting ;) -- rgerhards, 2011-05-03 + */ + } + + errmsg.LogError(0, initiatingError, "fatal error on disk queue '%s', emergency switch to direct mode", + obj.GetName((obj_t*) pThis)); + return RS_RET_ERR_QUEUE_EMERGENCY; +} + + static rsRetVal qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis) { @@ -794,10 +840,7 @@ finalize_it: static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr) { DEFiRet; - - CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL)); - -finalize_it: + iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); RETiRet; } @@ -1694,7 +1737,18 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueForConsumer(pThis, pWti)); + iRet = DequeueForConsumer(pThis, pWti); + if(iRet == RS_RET_FILE_NOT_FOUND) { + /* This is a fatal condition and means the queue is almost unusable */ + d_pthread_mutex_unlock(pThis->mut); + DBGOPRINT((obj_t*) pThis, "got 'file not found' error %d, queue defunct\n", iRet); + iRet = queueSwitchToEmergencyMode(pThis, iRet); + // TODO: think about what to return as iRet -- keep RS_RET_FILE_NOT_FOUND? + d_pthread_mutex_lock(pThis->mut); + } + if (iRet != RS_RET_OK) { + FINALIZE; + } /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 55df4d11..adcd4656 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -343,8 +343,10 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */ RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */ RS_RET_WRN_WRKDIR = -2182, /**< correctable problems with the rsyslog working directory */ + RS_RET_ERR_QUEUE_EMERGENCY = -2183, /**< some fatal error caused queue to switch to emergency mode */ RS_RET_OUTDATED_STMT = -2184, /**< some outdated statement/functionality is being used in conf file */ RS_RET_MISSING_WHITESPACE = -2185, /**< whitespace is missing in some config construct */ + RS_RET_OK_WARN = -2186, /**< config part: everything was OK, but a warning message was emitted */ RS_RET_CONF_RQRD_PARAM_MISSING = -2208,/**< required parameter in config object is missing */ /* RainerScript error messages (range 1000.. 1999) */ diff --git a/runtime/strmsrv.c b/runtime/strmsrv.c index 0de18e7f..8310e832 100644 --- a/runtime/strmsrv.c +++ b/runtime/strmsrv.c @@ -765,7 +765,7 @@ static rsRetVal SetKeepAlive(strmsrv_t *pThis, int iVal) { DEFiRet; - dbgprintf("keep-alive set to %d\n", iVal); + dbgprintf("strmsrv: keep-alive set to %d\n", iVal); pThis->bUseKeepAlive = iVal; RETiRet; } diff --git a/runtime/wti.c b/runtime/wti.c index 69da2e9f..e44086af 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -312,7 +312,10 @@ wtiWorker(wti_t *pThis) */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); - if(localRet == RS_RET_IDLE) { + if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) { + d_pthread_mutex_unlock(pWtp->pmutUsr); + break; /* end of loop */ + } else if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); dbgoprint((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n", |