diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-04-12 09:10:19 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-04-12 09:10:19 +0200 |
commit | 25bc3b2e30deaee00fcf183e885378a0d64ae94c (patch) | |
tree | 9003917ec023600f4e2916a5907d5a35856b928e /runtime | |
parent | 62e00d7a1c1d0301d50e7a28cb84563d61410ecd (diff) | |
parent | 5ef852f4a3f030f61254a963b0d2dca290933e3c (diff) | |
download | rsyslog-25bc3b2e30deaee00fcf183e885378a0d64ae94c.tar.gz rsyslog-25bc3b2e30deaee00fcf183e885378a0d64ae94c.tar.xz rsyslog-25bc3b2e30deaee00fcf183e885378a0d64ae94c.zip |
Merge branch 'v4-stable-solaris' into v4-devel
Conflicts:
ChangeLog
configure.ac
doc/manual.html
tools/omfile.c
tools/syslogd.c
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/Makefile.am | 1 | ||||
-rw-r--r-- | runtime/atomic-posix-sem.c | 70 | ||||
-rw-r--r-- | runtime/atomic.h | 120 | ||||
-rw-r--r-- | runtime/cfsysline.c | 8 | ||||
-rw-r--r-- | runtime/conf.c | 8 | ||||
-rw-r--r-- | runtime/ctok.c | 37 | ||||
-rw-r--r-- | runtime/datetime.c | 25 | ||||
-rw-r--r-- | runtime/datetime.h | 6 | ||||
-rw-r--r-- | runtime/debug.c | 15 | ||||
-rw-r--r-- | runtime/msg.c | 130 | ||||
-rw-r--r-- | runtime/net.c | 4 | ||||
-rw-r--r-- | runtime/parser.c | 9 | ||||
-rw-r--r-- | runtime/rsyslog.c | 14 | ||||
-rw-r--r-- | runtime/rsyslog.h | 11 | ||||
-rw-r--r-- | runtime/rule.c | 1 | ||||
-rw-r--r-- | runtime/srutils.c | 32 | ||||
-rw-r--r-- | runtime/stream.c | 147 | ||||
-rw-r--r-- | runtime/stream.h | 1 | ||||
-rw-r--r-- | runtime/vm.c | 3 |
19 files changed, 498 insertions, 144 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am index c1a15198..ac006bca 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -9,6 +9,7 @@ librsyslog_la_SOURCES = \ rsyslog.h \ unicode-helper.h \ atomic.h \ + atomic-posix-sem.c \ syslogd-types.h \ module-template.h \ obj-types.h \ diff --git a/runtime/atomic-posix-sem.c b/runtime/atomic-posix-sem.c new file mode 100644 index 00000000..979fae02 --- /dev/null +++ b/runtime/atomic-posix-sem.c @@ -0,0 +1,70 @@ +/* atomic_posix_sem.c: This file supplies an emulation for atomic operations using + * POSIX semaphores. + * + * Copyright 2010 DResearch Digital Media Systems GmbH + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ + +#include "config.h" +#ifndef HAVE_ATOMIC_BUILTINS +#ifdef HAVE_SEMAPHORE_H +#include <semaphore.h> +#include <errno.h> + +#include "atomic.h" +#include "rsyslog.h" +#include "srUtils.h" + +sem_t atomicSem; + +rsRetVal +atomicSemInit(void) +{ + DEFiRet; + + dbgprintf("init posix semaphore for atomics emulation\n"); + if(sem_init(&atomicSem, 0, 1) == -1) + { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + dbgprintf("init posix semaphore for atomics emulation failed: %s\n", errStr); + iRet = RS_RET_SYS_ERR; /* the right error code ??? */ + } + + RETiRet; +} + +void +atomicSemExit(void) +{ + dbgprintf("destroy posix semaphore for atomics emulation\n"); + if(sem_destroy(&atomicSem) == -1) + { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + dbgprintf("destroy posix semaphore for atomics emulation failed: %s\n", errStr); + } +} + +#endif /* HAVE_SEMAPHORE_H */ +#endif /* !defined(HAVE_ATOMIC_BUILTINS) */ + +/* vim:set ai: + */ diff --git a/runtime/atomic.h b/runtime/atomic.h index d5aaf56b..ea3be37a 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -53,6 +53,122 @@ # define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); # define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal)); #else +#ifdef HAVE_SEMAPHORE_H + /* we use POSIX semaphores instead */ + +#include "rsyslog.h" +#include <semaphore.h> + +extern sem_t atomicSem; +rsRetVal atomicSemInit(void); +void atomicSemExit(void); + +#if HAVE_TYPEOF +#define my_typeof(x) typeof(x) +#else /* sorry, can't determine types, using 'int' */ +#define my_typeof(x) int +#endif + +# define ATOMIC_SUB(data, val) \ +({ \ + my_typeof(data) tmp; \ + sem_wait(&atomicSem); \ + tmp = data; \ + data -= val; \ + sem_post(&atomicSem); \ + tmp; \ +}) + +# define ATOMIC_ADD(data, val) \ +({ \ + my_typeof(data) tmp; \ + sem_wait(&atomicSem); \ + tmp = data; \ + data += val; \ + sem_post(&atomicSem); \ + tmp; \ +}) + +# define ATOMIC_INC_AND_FETCH(data) \ +({ \ + my_typeof(data) tmp; \ + sem_wait(&atomicSem); \ + tmp = data; \ + data += 1; \ + sem_post(&atomicSem); \ + tmp; \ +}) + +# define ATOMIC_INC(data) ((void) ATOMIC_INC_AND_FETCH(data)) + +# define ATOMIC_DEC_AND_FETCH(data) \ +({ \ + sem_wait(&atomicSem); \ + data -= 1; \ + sem_post(&atomicSem); \ + data; \ +}) + +# define ATOMIC_DEC(data) ((void) ATOMIC_DEC_AND_FETCH(data)) + +# define ATOMIC_FETCH_32BIT(data) ((unsigned) ATOMIC_ADD((data), 0xffffffff)) + +# define ATOMIC_STORE_1_TO_32BIT(data) \ +({ \ + my_typeof(data) tmp; \ + sem_wait(&atomicSem); \ + tmp = data; \ + data = 1; \ + sem_post(&atomicSem); \ + tmp; \ +}) + +# define ATOMIC_STORE_0_TO_INT(data) \ +({ \ + my_typeof(data) tmp; \ + sem_wait(&atomicSem); \ + tmp = data; \ + data = 0; \ + sem_post(&atomicSem); \ + tmp; \ +}) + +# define ATOMIC_STORE_1_TO_INT(data) \ +({ \ + my_typeof(data) tmp; \ + sem_wait(&atomicSem); \ + tmp = data; \ + data = 1; \ + sem_post(&atomicSem); \ + tmp; \ +}) + +# define ATOMIC_CAS(data, oldVal, newVal) \ +({ \ + int ret; \ + sem_wait(&atomicSem); \ + if(data != oldVal) ret = 0; \ + else \ + { \ + data = newVal; \ + ret = 1; \ + } \ + sem_post(&atomicSem); \ + ret; \ +}) + +# define ATOMIC_CAS_VAL(data, oldVal, newVal) \ +({ \ + sem_wait(&atomicSem); \ + if(data == oldVal) \ + { \ + data = newVal; \ + } \ + sem_post(&atomicSem); \ + data; \ +}) + +#else /* not HAVE_SEMAPHORE_H */ /* note that we gained parctical proof that theoretical problems DO occur * if we do not properly address them. See this blog post for details: * http://blog.gerhards.net/2009/01/rsyslog-data-race-analysis.html @@ -66,6 +182,10 @@ # define ATOMIC_DEC_AND_FETCH(data) (--(data)) # define ATOMIC_FETCH_32BIT(data) (data) # define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 +# define ATOMIC_STORE_1_TO_INT(data) (data) = 1 +# define ATOMIC_STORE_0_TO_INT(data) (data) = 0 +# define ATOMIC_CAS_VAL(data, oldVal, newVal) (data) = (newVal) +#endif #endif #endif /* #ifndef INCLUDED_ATOMIC_H */ diff --git a/runtime/cfsysline.c b/runtime/cfsysline.c index 184c0d87..5df8e64c 100644 --- a/runtime/cfsysline.c +++ b/runtime/cfsysline.c @@ -217,9 +217,11 @@ static rsRetVal doGetSize(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void * case 'K': i *= 1000; ++(*pp); break; case 'M': i *= 1000000; ++(*pp); break; case 'G': i *= 1000000000; ++(*pp); break; - case 'T': i *= 1000000000000; ++(*pp); break; /* tera */ - case 'P': i *= 1000000000000000; ++(*pp); break; /* peta */ - case 'E': i *= 1000000000000000000; ++(*pp); break; /* exa */ + /* we need to use the multiplication below because otherwise + * the compiler gets an error during constant parsing */ + case 'T': i *= (int64) 1000 * 1000000000; ++(*pp); break; /* tera */ + case 'P': i *= (int64) 1000000 * 1000000000; ++(*pp); break; /* peta */ + case 'E': i *= (int64) 1000000000 * 1000000000; ++(*pp); break; /* exa */ } /* done */ diff --git a/runtime/conf.c b/runtime/conf.c index b92664a1..ef795237 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -513,7 +513,7 @@ finalize_it: rsRetVal cflineParseTemplateName(uchar** pp, omodStringRequest_t *pOMSR, int iEntry, int iTplOpts, uchar *dfltTplName) { uchar *p; - uchar *tplName; + uchar *tplName = NULL; cstr_t *pStrB; DEFiRet; @@ -550,10 +550,12 @@ rsRetVal cflineParseTemplateName(uchar** pp, omodStringRequest_t *pOMSR, int iEn CHKiRet(cstrConvSzStrAndDestruct(pStrB, &tplName, 0)); } - iRet = OMSRsetEntry(pOMSR, iEntry, tplName, iTplOpts); - if(iRet != RS_RET_OK) goto finalize_it; + CHKiRet(OMSRsetEntry(pOMSR, iEntry, tplName, iTplOpts)); finalize_it: + if(iRet != RS_RET_OK) + free(tplName); + *pp = p; RETiRet; diff --git a/runtime/ctok.c b/runtime/ctok.c index 6f5f0273..18ddaed2 100644 --- a/runtime/ctok.c +++ b/runtime/ctok.c @@ -87,11 +87,12 @@ ctokUngetCharFromStream(ctok_t *pThis, uchar __attribute__((unused)) c) } -/* get the next character from the input "stream" (currently just a in-memory - * string...) -- rgerhards, 2008-02-19 +/* get the next character from the input "stream". Note that this version + * does NOT look for comment characters as end-of-stream, so it is suitable + * when building constant strings! -- rgerhards, 2010-03-01 */ -static rsRetVal -ctokGetCharFromStream(ctok_t *pThis, uchar *pc) +static inline rsRetVal +ctokGetCharFromStreamNoComment(ctok_t *pThis, uchar *pc) { DEFiRet; @@ -99,7 +100,7 @@ ctokGetCharFromStream(ctok_t *pThis, uchar *pc) ASSERT(pc != NULL); /* end of string or begin of comment terminates the "stream" */ - if(*pThis->pp == '\0' || *pThis->pp == '#') { + if(*pThis->pp == '\0') { ABORT_FINALIZE(RS_RET_EOS); } else { *pc = *pThis->pp; @@ -111,6 +112,28 @@ finalize_it: } +/* get the next character from the input "stream" (currently just a in-memory + * string...) -- rgerhards, 2008-02-19 + */ +static rsRetVal +ctokGetCharFromStream(ctok_t *pThis, uchar *pc) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, ctok); + ASSERT(pc != NULL); + + CHKiRet(ctokGetCharFromStreamNoComment(pThis, pc)); + /* begin of comment terminates the "stream"! */ + if(*pc == '#') { + ABORT_FINALIZE(RS_RET_EOS); + } + +finalize_it: + RETiRet; +} + + /* skip whitespace in the input "stream". * rgerhards, 2008-02-19 */ @@ -302,7 +325,7 @@ ctokGetSimpStr(ctok_t *pThis, ctok_token_t *pToken) pToken->tok = ctok_SIMPSTR; CHKiRet(cstrConstruct(&pstrVal)); - CHKiRet(ctokGetCharFromStream(pThis, &c)); + CHKiRet(ctokGetCharFromStreamNoComment(pThis, &c)); /* while we are in escape mode (had a backslash), no sequence * terminates the loop. If outside, it is terminated by a single quote. */ @@ -317,7 +340,7 @@ ctokGetSimpStr(ctok_t *pThis, ctok_token_t *pToken) CHKiRet(cstrAppendChar(pstrVal, c)); } } - CHKiRet(ctokGetCharFromStream(pThis, &c)); + CHKiRet(ctokGetCharFromStreamNoComment(pThis, &c)); } CHKiRet(cstrFinalize(pstrVal)); diff --git a/runtime/datetime.c b/runtime/datetime.c index 6160bd7c..eff72f91 100644 --- a/runtime/datetime.c +++ b/runtime/datetime.c @@ -291,11 +291,11 @@ ParseTIMESTAMP3339(struct syslogTime *pTime, uchar** ppszTS, int *pLenStr) } /* OK, we actually have a 3339 timestamp, so let's indicated this */ - if(lenStr > 0 && *pszTS == ' ') { + if(lenStr > 0) { + if(*pszTS != ' ') /* if it is not a space, it can not be a "good" time - 2010-02-22 rgerhards */ + ABORT_FINALIZE(RS_RET_INVLD_TIME); + ++pszTS; /* just skip past it */ --lenStr; - ++pszTS; - } else { - ABORT_FINALIZE(RS_RET_INVLD_TIME); } /* we had success, so update parse pointer and caller-provided timestamp */ @@ -510,6 +510,7 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS, int *pLenStr) if(lenStr == 0 || *pszTS++ != ' ') ABORT_FINALIZE(RS_RET_INVLD_TIME); + --lenStr; /* we accept a slightly malformed timestamp when receiving. This is * we accept one-digit days @@ -565,7 +566,13 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS, int *pLenStr) * invalid format, it occurs frequently enough (e.g. with Cisco devices) * to permit it as a valid case. -- rgerhards, 2008-09-12 */ - if(lenStr == 0 || *pszTS++ == ':') { + if(lenStr > 0 && *pszTS == ':') { + ++pszTS; /* just skip past it */ + --lenStr; + } + if(lenStr > 0) { + if(*pszTS != ' ') /* if it is not a space, it can not be a "good" time - 2010-02-22 rgerhards */ + ABORT_FINALIZE(RS_RET_INVLD_TIME); ++pszTS; /* just skip past it */ --lenStr; } @@ -786,8 +793,12 @@ int formatTimestamp3339(struct syslogTime *ts, char* pBuf) * buffer that will receive the resulting string. The function * returns the size of the timestamp written in bytes (without * the string termnator). If 0 is returend, an error occured. + * rgerhards, 2010-03-05: Added support to for buggy 3164 dates, + * where a zero-digit is written instead of a space for the first + * day character if day < 10. syslog-ng seems to do that, and some + * parsing scripts (in migration cases) rely on that. */ -int formatTimestamp3164(struct syslogTime *ts, char* pBuf) +int formatTimestamp3164(struct syslogTime *ts, char* pBuf, int bBuggyDay) { static char* monthNames[12] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; @@ -800,7 +811,7 @@ int formatTimestamp3164(struct syslogTime *ts, char* pBuf) pBuf[2] = monthNames[(ts->month - 1) % 12][2]; pBuf[3] = ' '; iDay = (ts->day / 10) % 10; /* we need to write a space if the first digit is 0 */ - pBuf[4] = iDay ? iDay + '0' : ' '; + pBuf[4] = (bBuggyDay || iDay > 0) ? iDay + '0' : ' '; pBuf[5] = ts->day % 10 + '0'; pBuf[6] = ' '; pBuf[7] = (ts->hour / 10) % 10 + '0'; diff --git a/runtime/datetime.h b/runtime/datetime.h index 8140eb71..9dcce3c5 100644 --- a/runtime/datetime.h +++ b/runtime/datetime.h @@ -39,15 +39,17 @@ BEGINinterface(datetime) /* name must also be changed in ENDinterface macro! */ int (*formatTimestampToMySQL)(struct syslogTime *ts, char* pDst); int (*formatTimestampToPgSQL)(struct syslogTime *ts, char *pDst); int (*formatTimestamp3339)(struct syslogTime *ts, char* pBuf); - int (*formatTimestamp3164)(struct syslogTime *ts, char* pBuf); + int (*formatTimestamp3164)(struct syslogTime *ts, char* pBuf, int); int (*formatTimestampSecFrac)(struct syslogTime *ts, char* pBuf); ENDinterface(datetime) -#define datetimeCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */ +#define datetimeCURR_IF_VERSION 4 /* increment whenever you change the interface structure! */ /* interface changes: * 1 - initial version * 2 - not compatible to 1 - bugfix required ParseTIMESTAMP3164 to accept char ** as * last parameter. Did not try to remain compatible as this is not something any * third-party module should call. -- rgerhards, 2008.-09-12 + * 3 - taken by v5 branch! + * 4 - formatTimestamp3164 takes a third int parameter */ /* prototypes */ diff --git a/runtime/debug.c b/runtime/debug.c index c23dec3b..c82a411d 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -1,4 +1,3 @@ -#include <sys/syscall.h> /* debug.c * * This file proides debug and run time error analysis support. Some of the @@ -550,6 +549,7 @@ if(pLog == NULL) { return; /* if we don't know it yet, we can not clean up... */ } #endif +#include <sys/syscall.h> /* we found the last lock entry. We now need to see from which FuncDB we need to * remove it. This is recorded inside the mutex log entry. @@ -922,7 +922,7 @@ void dbgoprint(obj_t *pObj, char *fmt, ...) { va_list ap; - char pszWriteBuf[1024]; + char pszWriteBuf[32*1024]; size_t lenWriteBuf; if(!(Debug && debugging_on)) @@ -952,7 +952,7 @@ void dbgprintf(char *fmt, ...) { va_list ap; - char pszWriteBuf[1024]; + char pszWriteBuf[20480]; size_t lenWriteBuf; if(!(Debug && debugging_on)) @@ -961,6 +961,15 @@ dbgprintf(char *fmt, ...) va_start(ap, fmt); lenWriteBuf = vsnprintf(pszWriteBuf, sizeof(pszWriteBuf), fmt, ap); va_end(ap); + if(lenWriteBuf >= sizeof(pszWriteBuf)) { + /* prevent buffer overrruns and garbagge display */ + pszWriteBuf[sizeof(pszWriteBuf) - 5] = '.'; + pszWriteBuf[sizeof(pszWriteBuf) - 4] = '.'; + pszWriteBuf[sizeof(pszWriteBuf) - 3] = '.'; + pszWriteBuf[sizeof(pszWriteBuf) - 2] = '\n'; + pszWriteBuf[sizeof(pszWriteBuf) - 1] = '\0'; + lenWriteBuf = sizeof(pszWriteBuf); + } dbgprint(NULL, pszWriteBuf, lenWriteBuf); } diff --git a/runtime/msg.c b/runtime/msg.c index b45775b6..91057f97 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -1171,7 +1171,7 @@ uchar *getMSG(msg_t *pM) if(pM == NULL) ret = UCHAR_CONSTANT(""); else { - if(pM->offMSG == -1) + if(pM->iLenMSG == 0) ret = UCHAR_CONSTANT(""); else ret = pM->pszRawMsg + pM->offMSG; @@ -1213,10 +1213,12 @@ static inline char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt) switch(eFmt) { case tplFmtDefault: case tplFmtRFC3164Date: + case tplFmtRFC3164BuggyDate: MsgLock(pM); if(pM->pszTIMESTAMP3164 == NULL) { pM->pszTIMESTAMP3164 = pM->pszTimestamp3164; - datetime.formatTimestamp3164(&pM->tTIMESTAMP, pM->pszTIMESTAMP3164); + datetime.formatTimestamp3164(&pM->tTIMESTAMP, pM->pszTIMESTAMP3164, + (eFmt == tplFmtRFC3164BuggyDate)); } MsgUnlock(pM); return(pM->pszTIMESTAMP3164); @@ -1279,7 +1281,7 @@ static inline char *getTimeGenerated(msg_t *pM, enum tplFormatTypes eFmt) MsgUnlock(pM); return ""; } - datetime.formatTimestamp3164(&pM->tRcvdAt, pM->pszRcvdAt3164); + datetime.formatTimestamp3164(&pM->tRcvdAt, pM->pszRcvdAt3164, 0); } MsgUnlock(pM); return(pM->pszRcvdAt3164); @@ -1306,13 +1308,15 @@ static inline char *getTimeGenerated(msg_t *pM, enum tplFormatTypes eFmt) MsgUnlock(pM); return(pM->pszRcvdAt_PgSQL); case tplFmtRFC3164Date: + case tplFmtRFC3164BuggyDate: MsgLock(pM); if(pM->pszRcvdAt3164 == NULL) { if((pM->pszRcvdAt3164 = malloc(16)) == NULL) { MsgUnlock(pM); return ""; } - datetime.formatTimestamp3164(&pM->tRcvdAt, pM->pszRcvdAt3164); + datetime.formatTimestamp3164(&pM->tRcvdAt, pM->pszRcvdAt3164, + (eFmt == tplFmtRFC3164BuggyDate)); } MsgUnlock(pM); return(pM->pszRcvdAt3164); @@ -1947,12 +1951,20 @@ void MsgSetHOSTNAME(msg_t *pThis, uchar* pszHOSTNAME, int lenHOSTNAME) /* set the offset of the MSG part into the raw msg buffer + * Note that the offset may be higher than the length of the raw message + * (exactly by one). This can happen if we have a message that does not + * contain any MSG part. */ void MsgSetMSGoffs(msg_t *pMsg, short offs) { ISOBJ_TYPE_assert(pMsg, msg); - pMsg->iLenMSG = pMsg->iLenRawMsg - offs; pMsg->offMSG = offs; + if(offs > pMsg->iLenRawMsg) { + assert(offs - 1 == pMsg->iLenRawMsg); + pMsg->iLenMSG = 0; + } else { + pMsg->iLenMSG = pMsg->iLenRawMsg - offs; + } } @@ -1986,7 +1998,8 @@ rsRetVal MsgReplaceMSG(msg_t *pThis, uchar* pszMSG, int lenMSG) pThis->pszRawMsg = bufNew; } - memcpy(pThis->pszRawMsg + pThis->offMSG, pszMSG, lenMSG); + if(lenMSG > 0) + memcpy(pThis->pszRawMsg + pThis->offMSG, pszMSG, lenMSG); pThis->pszRawMsg[lenNew] = '\0'; /* this also works with truncation! */ pThis->iLenRawMsg = lenNew; pThis->iLenMSG = lenMSG; @@ -1997,6 +2010,8 @@ finalize_it: /* set raw message in message object. Size of message is provided. + * The function makes sure that the stored rawmsg is properly + * terminated by '\0'. * rgerhards, 2009-06-16 */ void MsgSetRawMsg(msg_t *pThis, char* pszRawMsg, size_t lenMsg) @@ -2139,6 +2154,10 @@ static uchar *getNOW(eNOWType eNow) * be used in selector line processing. * rgerhards 2005-09-15 */ +/* a quick helper to save some writing: */ +#define RET_OUT_OF_MEMORY { *pbMustBeFreed = 0;\ + *pPropLen = sizeof("**OUT OF MEMORY**") - 1; \ + return(UCHAR_CONSTANT("**OUT OF MEMORY**"));} uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, propid_t propID, size_t *pPropLen, unsigned short *pbMustBeFreed) @@ -2200,8 +2219,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, case PROP_PRI_TEXT: pBuf = malloc(20 * sizeof(uchar)); if(pBuf == NULL) { - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } else { *pbMustBeFreed = 1; pRes = (uchar*)textpri((char*)pBuf, 20, getPRIi(pMsg)); @@ -2245,49 +2263,49 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, break; case PROP_SYS_NOW: if((pRes = getNOW(NOW_NOW)) == NULL) { - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_YEAR: if((pRes = getNOW(NOW_YEAR)) == NULL) { - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_MONTH: if((pRes = getNOW(NOW_MONTH)) == NULL) { - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_DAY: if((pRes = getNOW(NOW_DAY)) == NULL) { - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_HOUR: if((pRes = getNOW(NOW_HOUR)) == NULL) { - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_HHOUR: if((pRes = getNOW(NOW_HHOUR)) == NULL) { - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_QHOUR: if((pRes = getNOW(NOW_QHOUR)) == NULL) { - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_MINUTE: if((pRes = getNOW(NOW_MINUTE)) == NULL) { - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; @@ -2299,10 +2317,11 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, * error message unreadable. rgerhards, 2007-07-10 */ dbgprintf("invalid property id: '%d'\n", propID); + *pbMustBeFreed = 0; + *pPropLen = sizeof("**INVALID PROPERTY NAME**") - 1; return UCHAR_CONSTANT("**INVALID PROPERTY NAME**"); } - /* If we did not receive a template pointer, we are already done... */ if(pTpe == NULL) { return pRes; @@ -2357,8 +2376,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pBuf == NULL) { if(*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } /* now copy */ memcpy(pBuf, pFld, iLen); @@ -2375,6 +2393,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; + *pPropLen = sizeof("**FIELD NOT FOUND**") - 1; return UCHAR_CONSTANT("**FIELD NOT FOUND**"); } } else if(pTpe->data.field.iFromPos != 0 || pTpe->data.field.iToPos != 0) { @@ -2403,8 +2422,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pBuf == NULL) { if(*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } pSb = pRes; if(iFrom) { @@ -2434,9 +2452,15 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, } else { /* Check for regular expressions */ if (pTpe->data.field.has_regex != 0) { - if (pTpe->data.field.has_regex == 2) + if (pTpe->data.field.has_regex == 2) { /* Could not compile regex before! */ + if (*pbMustBeFreed == 1) { + free(pRes); + *pbMustBeFreed = 0; + } + *pPropLen = sizeof("**NO MATCH** **BAD REGULAR EXPRESSION**") - 1; return UCHAR_CONSTANT("**NO MATCH** **BAD REGULAR EXPRESSION**"); + } dbgprintf("string to match for regex is: %s\n", pRes); @@ -2476,12 +2500,16 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, free(pRes); *pbMustBeFreed = 0; } - if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR) - return UCHAR_CONSTANT("**NO MATCH**"); - else if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_ZERO) - return UCHAR_CONSTANT("0"); - else - return UCHAR_CONSTANT(""); + if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR) { + bufLen = sizeof("**NO MATCH**") - 1; + pRes = UCHAR_CONSTANT("**NO MATCH**"); + } else if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_ZERO) { + bufLen = 1; + pRes = UCHAR_CONSTANT("0"); + } else { + bufLen = 0; + pRes = UCHAR_CONSTANT(""); + } } } else { /* Match- but did it match the one we wanted? */ @@ -2492,10 +2520,16 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, free(pRes); *pbMustBeFreed = 0; } - if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR) - return UCHAR_CONSTANT("**NO MATCH**"); - else - return UCHAR_CONSTANT(""); + if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR) { + bufLen = sizeof("**NO MATCH**") - 1; + pRes = UCHAR_CONSTANT("**NO MATCH**"); + } else if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_ZERO) { + bufLen = 1; + pRes = UCHAR_CONSTANT("0"); + } else { + bufLen = 0; + pRes = UCHAR_CONSTANT(""); + } } } /* OK, we have a usable match - we now need to malloc pB */ @@ -2509,13 +2543,12 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if (pB == NULL) { if (*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } /* Lets copy the matched substring to the buffer */ memcpy(pB, pRes + iOffs + pmatch[pTpe->data.field.iSubMatchToUse].rm_so, iLenBuf); - bufLen = iLenBuf - 1; + bufLen = iLenBuf; pB[iLenBuf] = '\0';/* terminate string, did not happen before */ if (*pbMustBeFreed == 1) @@ -2533,6 +2566,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, free(pRes); *pbMustBeFreed = 0; } + *pPropLen = sizeof("***REGEXP NOT AVAILABLE***") - 1; return UCHAR_CONSTANT("***REGEXP NOT AVAILABLE***"); } } @@ -2565,8 +2599,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pB == NULL) { if(*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } pSrc = pRes; while(*pSrc) { @@ -2612,8 +2645,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pDst == NULL) { if(*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } for(pSrc = pRes; *pSrc; pSrc++) { if(!iscntrl((int) *pSrc)) @@ -2648,8 +2680,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pDst == NULL) { if(*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } for(pSrc = pRes; *pSrc; pSrc++) { if(iscntrl((int) *pSrc)) @@ -2688,8 +2719,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pB == NULL) { if(*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } while(*pRes) { if(iscntrl((int) *pRes)) { @@ -2734,8 +2764,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pDst == NULL) { if(*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } for(pSrc = pRes; *pSrc; pSrc++) { if(*pSrc != '/') @@ -2770,8 +2799,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pDst == NULL) { if(*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } for(pSrc = pRes; *pSrc; pSrc++) { if(*pSrc == '/') @@ -2825,8 +2853,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, /* ok, original copy, need a private one */ pB = malloc((iLn + 1) * sizeof(uchar)); if(pB == NULL) { - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } memcpy(pB, pRes, iLn - 1); pRes = pB; @@ -2845,6 +2872,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pTpe->data.field.options.bCSV) { /* we need to obtain a private copy, as we need to at least add the double quotes */ int iBufLen; + int i; uchar *pBStart; uchar *pDst; uchar *pSrc; @@ -2856,10 +2884,10 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pDst == NULL) { if(*pbMustBeFreed == 1) free(pRes); - *pbMustBeFreed = 0; - return UCHAR_CONSTANT("**OUT OF MEMORY**"); + RET_OUT_OF_MEMORY; } pSrc = pRes; + i = 0; *pDst++ = '"'; /* starting quote */ while(*pSrc) { if(*pSrc == '"') diff --git a/runtime/net.c b/runtime/net.c index e91c8a7f..fe6eef5b 100644 --- a/runtime/net.c +++ b/runtime/net.c @@ -1010,8 +1010,8 @@ static int should_use_so_bsdcompat(void) { #ifndef OS_BSD - static int init_done; - static int so_bsdcompat_is_obsolete; + static int init_done = 0; + static int so_bsdcompat_is_obsolete = 0; if (!init_done) { struct utsname myutsname; diff --git a/runtime/parser.c b/runtime/parser.c index 466066e7..36e88ebd 100644 --- a/runtime/parser.c +++ b/runtime/parser.c @@ -176,7 +176,10 @@ sanitizeMessage(msg_t *pMsg) pszMsg = pMsg->pszRawMsg; lenMsg = pMsg->iLenRawMsg; - /* remove NUL character at end of message (see comment in function header) */ + /* remove NUL character at end of message (see comment in function header) + * Note that we do not need to add a NUL character in this case, because it + * is already present ;) + */ if(pszMsg[lenMsg-1] == '\0') { DBGPRINTF("dropped NUL at very end of message\n"); bUpdatedLen = TRUE; @@ -190,8 +193,9 @@ sanitizeMessage(msg_t *pMsg) */ if(bDropTrailingLF && pszMsg[lenMsg-1] == '\n') { DBGPRINTF("dropped LF at very end of message (DropTrailingLF is set)\n"); - bUpdatedLen = TRUE; lenMsg--; + pszMsg[lenMsg] = '\0'; + bUpdatedLen = TRUE; } /* it is much quicker to sweep over the message and see if it actually @@ -245,6 +249,7 @@ sanitizeMessage(msg_t *pMsg) } ++iSrc; } + pDst[iDst] = '\0'; MsgSetRawMsg(pMsg, (char*)pDst, iDst); /* save sanitized string */ diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index 443d0f41..5750ca76 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -80,6 +80,7 @@ #include "prop.h" #include "rule.h" #include "ruleset.h" +#include "atomic.h" /* forward definitions */ static rsRetVal dfltErrLogger(int, uchar *errMsg); @@ -139,6 +140,12 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF) CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */ CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */ +#ifndef HAVE_ATOMIC_BUILTINS +#ifdef HAVE_SEMAPHORE_H + CHKiRet(atomicSemInit()); +#endif /* HAVE_SEMAPHORE_H */ +#endif /* !defined(HAVE_ATOMIC_BUILTINS) */ + /* initialize core classes. We must be very careful with the order of events. Some * classes use others and if we do not initialize them in the right order, we may end * up with an invalid call. The most important thing that can happen is that an error @@ -215,6 +222,13 @@ rsrtExit(void) glblClassExit(); rulesetClassExit(); ruleClassExit(); + +#ifndef HAVE_ATOMIC_BUILTINS +#ifdef HAVE_SEMAPHORE_H + atomicSemExit(); +#endif /* HAVE_SEMAPHORE_H */ +#endif /* !defined(HAVE_ATOMIC_BUILTINS) */ + objClassExit(); /* *THIS* *MUST/SHOULD?* always be the first class initilizer being called (except debug)! */ } diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 27bea6bc..8979893a 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -35,7 +35,7 @@ * value to the fixed size of the message object. */ #define CONF_TAG_MAXSIZE 512 /* a value that is deemed far too large for any valid TAG */ -#define CONF_TAG_HOSTNAME 512 /* a value that is deemed far too large for any valid HOSTNAME */ +#define CONF_HOSTNAME_MAXSIZE 512 /* a value that is deemed far too large for any valid HOSTNAME */ #define CONF_RAWMSG_BUFSIZE 101 #define CONF_TAG_BUFSIZE 32 #define CONF_HOSTNAME_BUFSIZE 32 @@ -46,15 +46,6 @@ * # End Config Settings # * * ############################################################# */ -#ifndef NOLARGEFILE -# undef _LARGEFILE_SOURCE -# undef _LARGEFILE64_SOURCE -# undef _FILE_OFFSET_BITS -# define _LARGEFILE_SOURCE -# define _LARGEFILE64_SOURCE -# define _FILE_OFFSET_BITS 64 -#endif - /* portability: not all platforms have these defines, so we * define them here if they are missing. -- rgerhards, 2008-03-04 */ diff --git a/runtime/rule.c b/runtime/rule.c index 182d616a..4c2c9edb 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -164,6 +164,7 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) if(pRule->f_filter_type == FILTER_PRI) { /* skip messages that are incorrect priority */ +dbgprintf("testing filter, f_pmask %d\n", pRule->f_filterData.f_pmask[pMsg->iFacility]); if ( (pRule->f_filterData.f_pmask[pMsg->iFacility] == TABLE_NOPRI) || \ ((pRule->f_filterData.f_pmask[pMsg->iFacility] & (1<<pMsg->iSeverity)) == 0) ) bRet = 0; diff --git a/runtime/srutils.c b/runtime/srutils.c index c403b312..1452c9b7 100644 --- a/runtime/srutils.c +++ b/runtime/srutils.c @@ -166,10 +166,22 @@ uchar *srUtilStrDup(uchar *pOld, size_t len) /* creates a path recursively - * Return 0 on success, -1 otherwise. On failure, errno - * hold the last OS error. - * Param "mode" holds the mode that all non-existing directories - * are to be created with. + * Return 0 on success, -1 otherwise. On failure, errno * hold the last OS error. + * Param "mode" holds the mode that all non-existing directories are to be + * created with. + * Note that we have a potential race inside that code, a race that even exists + * outside of the rsyslog process (if multiple instances run, or other programs + * generate directories): If the directory does not exist, a context switch happens, + * at that moment another process creates it, then our creation on the context + * switch back fails. This actually happened in practice, and depending on the + * configuration it is even likely to happen. We can not solve this situation + * with a mutex, as that works only within out process space. So the solution + * is that we take the optimistic approach, try the creation, and if it fails + * with "already exists" we go back and do one retry of the check/create + * sequence. That should then succeed. If the directory is still not found but + * the creation fails in the similar way, we return an error on that second + * try because otherwise we would potentially run into an endless loop. + * loop. -- rgerhards, 2010-03-25 */ int makeFileParentDirs(uchar *szFile, size_t lenFile, mode_t mode, uid_t uid, gid_t gid, int bFailOnChownFail) @@ -177,6 +189,8 @@ int makeFileParentDirs(uchar *szFile, size_t lenFile, mode_t mode, uchar *p; uchar *pszWork; size_t len; + int err; + int iTry = 0; int bErr = 0; assert(szFile != NULL); @@ -190,8 +204,9 @@ int makeFileParentDirs(uchar *szFile, size_t lenFile, mode_t mode, if(*p == '/') { /* temporarily terminate string, create dir and go on */ *p = '\0'; +again: if(access((char*)pszWork, F_OK)) { - if(mkdir((char*)pszWork, mode) == 0) { + if((err = mkdir((char*)pszWork, mode)) == 0) { if(uid != (uid_t) -1 || gid != (gid_t) -1) { /* we need to set owner/group */ if(chown((char*)pszWork, uid, gid) != 0) @@ -201,8 +216,13 @@ int makeFileParentDirs(uchar *szFile, size_t lenFile, mode_t mode, * to do so. */ } - } else + } else { + if(err == EEXIST && iTry == 0) { + iTry = 1; + goto again; + } bErr = 1; + } if(bErr) { int eSave = errno; free(pszWork); diff --git a/runtime/stream.c b/runtime/stream.c index 36f44003..e8805a40 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -67,7 +67,7 @@ DEFobjStaticHelpers DEFobjCurrIf(zlibw) /* forward definitions */ -static rsRetVal strmFlush(strm_t *pThis); +static rsRetVal strmFlushInternal(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); @@ -163,7 +163,7 @@ doSizeLimitProcessing(strm_t *pThis) ASSERT(pThis->fd != -1); if(pThis->iCurrOffs >= pThis->iSizeLimit) { - /* strmClosefile() destroys the current file name, so we + /* strmCloseFile() destroys the current file name, so we * need to preserve it. */ CHKmalloc(pszCurrFName = ustrdup(pThis->pszCurrFName)); @@ -220,7 +220,7 @@ doPhysOpen(strm_t *pThis) char errStr[1024]; int err = errno; rs_strerror_r(err, errStr, sizeof(errStr)); - dbgoprint((obj_t*) pThis, "open error %d, file '%s': %s\n", errno, pThis->pszCurrFName, errStr); + DBGOPRINT((obj_t*) pThis, "open error %d, file '%s': %s\n", errno, pThis->pszCurrFName, errStr); if(err == ENOENT) ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); else @@ -278,7 +278,7 @@ static rsRetVal strmOpenFile(strm_t *pThis) pThis->iCurrOffs = offset; } - dbgoprint((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName, + DBGOPRINT((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", pThis->fd); finalize_it: @@ -296,8 +296,10 @@ strmWaitAsyncWriterDone(strm_t *pThis) BEGINfunc if(pThis->bAsyncWrite) { /* awake writer thread and make it write out everything */ - pthread_cond_signal(&pThis->notEmpty); - d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + while(pThis->iCnt > 0) { + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + } } ENDfunc } @@ -306,27 +308,33 @@ strmWaitAsyncWriterDone(strm_t *pThis) /* close a strm file * Note that the bDeleteOnClose flag is honored. If it is set, the file will be * deleted after close. This is in support for the qRead thread. + * Note: it is valid to call this function when the physical file is closed. If so, + * strmCloseFile() will still check if there is any unwritten data inside buffers + * (this may be the case) and, if so, will open the file, write the data, and then + * close it again (this is done via strmFlushInternal and friends). */ static rsRetVal strmCloseFile(strm_t *pThis) { DEFiRet; ASSERT(pThis != NULL); - ASSERT(pThis->fd != -1); - dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); + DBGOPRINT((obj_t*) pThis, "file %d(%s) closing\n", pThis->fd, + (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName); - if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { - pThis->bInClose = 1; + if(pThis->tOperationsMode != STREAMMODE_READ) { + strmFlushInternal(pThis); if(pThis->bAsyncWrite) { - strmFlush(pThis); - } else { strmWaitAsyncWriterDone(pThis); } - pThis->bInClose = 0; } - close(pThis->fd); - pThis->fd = -1; + /* the file may already be closed (or never have opened), so guard + * against this. -- rgerhards, 2010-03-19 + */ + if(pThis->fd != -1) { + close(pThis->fd); + pThis->fd = -1; + } if(pThis->fdDir != -1) { /* close associated directory handle, if it is open */ @@ -441,7 +449,7 @@ strmHandleEOF(strm_t *pThis) case STREAMTYPE_FILE_CIRCULAR: /* we have multiple files and need to switch to the next one */ /* TODO: think about emulating EOF in this case (not yet needed) */ - dbgoprint((obj_t*) pThis, "file %d EOF\n", pThis->fd); + DBGOPRINT((obj_t*) pThis, "file %d EOF\n", pThis->fd); CHKiRet(strmNextFile(pThis)); break; case STREAMTYPE_FILE_MONITOR: @@ -473,7 +481,7 @@ strmReadBuf(strm_t *pThis) */ CHKiRet(strmOpenFile(pThis)); iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); - dbgoprint((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead); + DBGOPRINT((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead); if(iLenRead == 0) { CHKiRet(strmHandleEOF(pThis)); } else if(iLenRead < 0) @@ -505,7 +513,7 @@ static rsRetVal strmReadChar(strm_t *pThis, uchar *pC) ASSERT(pThis != NULL); ASSERT(pC != NULL); - /* DEV debug only: dbgoprint((obj_t*) pThis, "strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ + /* DEV debug only: DBGOPRINT((obj_t*) pThis, "strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ *pC = pThis->iUngetC; ++pThis->iCurrOffs; /* one more octet read */ @@ -617,11 +625,11 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) * to make sure we can write out everything with a SINGLE api call! * We add another 128 bytes to take care of the gzip header and "all eventualities". */ - CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * pThis->sIOBufSize + 128)); + CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * (pThis->sIOBufSize + 128))); } } - /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */ + /* if we are set to sync, we must obtain a file handle to the directory for fsync() purposes */ if(pThis->bSync && !pThis->bIsTTY) { pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY | O_CLOEXEC | O_NOCTTY); if(pThis->fdDir == -1) { @@ -633,6 +641,9 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } + DBGPRINTF("file stream %s params: flush interval %d, async write %d\n", + (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName, + pThis->iFlushInterval, pThis->bAsyncWrite); /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { pThis->bAsyncWrite = 1; @@ -685,8 +696,10 @@ CODESTARTobjDestruct(strm) /* Note: mutex will be unlocked in stopWriter! */ d_pthread_mutex_lock(&pThis->mut); - if(pThis->tOperationsMode != STREAMMODE_READ) - strmFlush(pThis); + /* strmClose() will handle read-only files as well as need to open + * files that have unwritten buffers. -- rgerhards, 2010-03-09 + */ + strmCloseFile(pThis); if(pThis->bAsyncWrite) { stopWriter(pThis); @@ -705,14 +718,11 @@ CODESTARTobjDestruct(strm) * IMPORTANT: we MUST free this only AFTER the ansyncWriter has been stopped, else * we get random errors... */ - if(pThis->fd != -1) - strmCloseFile(pThis); - free(pThis->pszDir); free(pThis->pZipBuf); free(pThis->pszCurrFName); free(pThis->pszFName); - + pThis->bStopWriter = 2; /* RG: use as flag for destruction */ ENDobjDestruct(strm) @@ -732,7 +742,7 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) strmWaitAsyncWriterDone(pThis); if(pThis->iCurrOffs >= pThis->iMaxFileSize) { - dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", + DBGOPRINT((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); CHKiRet(strmNextFile(pThis)); } @@ -790,6 +800,7 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) if(iWritten < 0) { char errStr[1024]; int err = errno; + iWritten = 0; /* we have written NO bytes! */ rs_strerror_r(err, errStr, sizeof(errStr)); DBGPRINTF("log file (%d) write error %d: %s\n", pThis->fd, err, errStr); if(err == EINTR) { @@ -811,7 +822,7 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) pWriteBuf += iWritten; } while(lenBuf > 0); /* Warning: do..while()! */ - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + DBGOPRINT((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); finalize_it: *pLenBuf = iTotalWritten; @@ -855,7 +866,8 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); - while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + /* the -1 below is important, because we need one buffer for the main thread! */ + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS - 1) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; @@ -880,13 +892,22 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); + /* we need to reset the buffer pointer BEFORE calling the actual write + * function. Otherwise, in circular mode, the write function will + * potentially close the file, then close will flush and as the + * buffer pointer is nonzero, will re-call into this code here. In + * the end result, we than have a problem (and things are screwed + * up). So we reset the buffer pointer first, and all this can + * not happen. It is safe to do so, because that pointer is NOT + * used inside the write functions. -- rgerhads, 2010-03-10 + */ + pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ if(pThis->bAsyncWrite) { CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); } else { CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } - pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ finalize_it: RETiRet; @@ -911,10 +932,11 @@ asyncWriterThread(void *pPtr) if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -#endif +# endif while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); +dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName); while(pThis->iCnt == 0) { if(pThis->bStopWriter) { pthread_cond_broadcast(&pThis->isEmpty); @@ -923,13 +945,14 @@ asyncWriterThread(void *pPtr) } if(bTimedOut && pThis->iBufPtr > 0) { /* if we timed out, we need to flush pending data */ - strmFlush(pThis); + strmFlushInternal(pThis); bTimedOut = 0; continue; /* now we should have data */ } bTimedOut = 0; - timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ + timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ // TODO: check the 2000?!? if(pThis->bDoTimedWait) { +dbgprintf("asyncWriter thread going to timeout sleep\n"); if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { int err = errno; if(err == ETIMEDOUT) { @@ -943,13 +966,16 @@ asyncWriterThread(void *pPtr) } } } else { +dbgprintf("asyncWriter thread going to eternal sleep\n"); d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); } +dbgprintf("asyncWriter woke up\n"); } bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; +dbgprintf("asyncWriter writes data\n"); doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); // TODO: error check????? 2009-07-06 @@ -1058,10 +1084,6 @@ finalize_it: * add a config switch so that the user can decide the risk he is ready * to take, but so far this is not yet implemented (not even requested ;)). * rgerhards, 2009-06-04 - * For the time being, we take a very conservative approach and do not run this - * method multithreaded. This is done in an effort to solve a segfault condition - * that seems to be related to the zip code. -- rgerhards, 2009-09-22 - * TODO: make multithreaded again! */ static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -1120,12 +1142,14 @@ finalize_it: * rgerhards, 2008-01-10 */ static rsRetVal -strmFlush(strm_t *pThis) +strmFlushInternal(strm_t *pThis) { DEFiRet; ASSERT(pThis != NULL); - dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); + DBGOPRINT((obj_t*) pThis, "file %d(%s) flush, buflen %ld%s\n", pThis->fd, + (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName, + (long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : ""); if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); @@ -1135,6 +1159,31 @@ strmFlush(strm_t *pThis) } +/* flush stream output buffer to persistent storage. This can be called at any time + * and is automatically called when the output buffer is full. This function is for + * use by EXTERNAL callers. Do NOT use it internally. It locks the async writer + * mutex if ther is need to do so. + * rgerhards, 2010-03-18 + */ +static rsRetVal +strmFlush(strm_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + CHKiRet(strmFlushInternal(pThis)); + +finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + + RETiRet; +} + + /* seek a stream to a specific location. Pending writes are flushed, read data * is invalidated. * rgerhards, 2008-01-12 @@ -1148,9 +1197,9 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs) if(pThis->fd == -1) strmOpenFile(pThis); else - strmFlush(pThis); + strmFlushInternal(pThis); int i; - dbgoprint((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs); + DBGOPRINT((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs); i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error! pThis->iCurrOffs = offs; /* we are now at *this* offset */ pThis->iBufPtr = 0; /* buffer invalidated */ @@ -1189,7 +1238,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlush(pThis)); + CHKiRet(strmFlushInternal(pThis)); } /* we now always have space for one character, so we simply copy it */ *(pThis->pIOBuf + pThis->iBufPtr) = c; @@ -1233,6 +1282,11 @@ finalize_it: * caller-provided buffer is larger than our one. So instead of optimizing a case * which normally does not exist, we expect some degradation in its case but make us * perform better in the regular cases. -- rgerhards, 2009-07-07 + * Note: the pThis->iBufPtr == pThis->sIOBufSize logic below looks a bit like an + * on-off error. In fact, it is not, because iBufPtr always points to the next + * *free* byte in the buffer. So if it is sIOBufSize - 1, there actually is one + * free byte left. This came up during a code walkthrough and was considered + * worth nothing. -- rgerhards, 2010-03-10 */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -1254,7 +1308,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iOffset = 0; do { if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ } iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) @@ -1269,7 +1323,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) * write it. This seems more natural than waiting (hours?) for the next message... */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ } finalize_it: @@ -1357,8 +1411,7 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) if(iLenDir < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - if((pThis->pszDir = malloc(sizeof(uchar) * iLenDir + 1)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + CHKmalloc(pThis->pszDir = malloc(sizeof(uchar) * iLenDir + 1)); memcpy(pThis->pszDir, pszDir, iLenDir + 1); /* always think about the \0! */ pThis->lenDir = iLenDir; @@ -1429,7 +1482,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) ISOBJ_TYPE_assert(pThis, strm); ISOBJ_TYPE_assert(pStrm, strm); - strmFlush(pThis); + strmFlushInternal(pThis); CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis)); objSerializeSCALAR(pStrm, iCurrFNum, INT); diff --git a/runtime/stream.h b/runtime/stream.h index 89175b0f..369d5a0f 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -119,7 +119,6 @@ typedef struct strm_s { size_t iBufPtr; /* pointer into current buffer */ int iUngetC; /* char set via UngetChar() call or -1 if none set */ bool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */ - bool bInClose; /* used to break "deadly close loops", tells us we are already inside a close */ int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */ Bytef *pZipBuf; /* support for async flush procesing */ diff --git a/runtime/vm.c b/runtime/vm.c index a1d992c3..0ed174d1 100644 --- a/runtime/vm.c +++ b/runtime/vm.c @@ -91,6 +91,9 @@ rsfrAddFunction(uchar *szName, prsf_t rsf) funcRegRoot = pEntry; finalize_it: + if(iRet != RS_RET_OK && iRet != RS_RET_DUP_FUNC_NAME) + free(pEntry); + RETiRet; } |