diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2011-02-01 12:22:36 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2011-02-01 12:22:36 +0100 |
commit | 7974621502ae249a5e393dafb4f69895851b4014 (patch) | |
tree | 762a57f0f6e25d85dc1c8af2e2f553f1060b70e3 /runtime | |
parent | d2b7a55b04b24bff278953240754cc688a32f6b8 (diff) | |
parent | a1f4330a7b1ab426d7abeeacb4ff4e5994c429e6 (diff) | |
download | rsyslog-7974621502ae249a5e393dafb4f69895851b4014.tar.gz rsyslog-7974621502ae249a5e393dafb4f69895851b4014.tar.xz rsyslog-7974621502ae249a5e393dafb4f69895851b4014.zip |
Merge branch 'v5-devel'
Conflicts:
plugins/imfile/imfile.c
plugins/imudp/imudp.c
plugins/ommysql/ommysql.c
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/rsyslog.c | 18 | ||||
-rw-r--r-- | runtime/rsyslog.h | 7 | ||||
-rw-r--r-- | runtime/stream.c | 140 | ||||
-rw-r--r-- | runtime/stream.h | 5 | ||||
-rw-r--r-- | runtime/wtp.c | 6 |
5 files changed, 137 insertions, 39 deletions
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index 8baa2b59..bdb1c9ff 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -85,6 +85,12 @@ #include "statsobj.h" #include "atomic.h" +#ifdef HAVE_PTHREAD_SETSCHEDPARAM +struct sched_param default_sched_param; +pthread_attr_t default_thread_attr; +int default_thr_sched_policy; +#endif + /* forward definitions */ static rsRetVal dfltErrLogger(int, uchar *errMsg); @@ -139,6 +145,18 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF) if(iRefCount == 0) { /* init runtime only if not yet done */ +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + CHKiRet(pthread_getschedparam(pthread_self(), + &default_thr_sched_policy, + &default_sched_param)); + CHKiRet(pthread_attr_init(&default_thread_attr)); + CHKiRet(pthread_attr_setschedpolicy(&default_thread_attr, + default_thr_sched_policy)); + CHKiRet(pthread_attr_setschedparam(&default_thread_attr, + &default_sched_param)); + CHKiRet(pthread_attr_setinheritsched(&default_thread_attr, + PTHREAD_EXPLICIT_SCHED)); +#endif if(ppErrObj != NULL) *ppErrObj = "obj"; CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */ CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 17b20de2..be4707c4 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -25,6 +25,7 @@ */ #ifndef INCLUDED_RSYSLOG_H #define INCLUDED_RSYSLOG_H +#include <pthread.h> #include "typedefs.h" /* ############################################################# * @@ -427,6 +428,12 @@ typedef enum rsObjectID rsObjID; #define RSFREEOBJ(x) {(x)->OID = OIDrsFreed; free(x);} #endif +#ifdef HAVE_PTHREAD_SETSCHEDPARAM +extern struct sched_param default_sched_param; +extern pthread_attr_t default_thread_attr; +extern int default_thr_sched_policy; +#endif + /* for the time being, we do our own portability handling here. It * looks like autotools either does not yet support checks for it, or diff --git a/runtime/stream.c b/runtime/stream.c index 260b59ef..5f4249a8 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -401,6 +401,12 @@ finalize_it: * If we are monitoring a file, someone may have rotated it. In this case, we * also need to close it and reopen it under the same name. * rgerhards, 2008-02-13 + * The previous code also did a check for file truncation, in which case the + * file was considered rewritten. However, this potential border case turned + * out to be a big trouble spot on busy systems. It caused massive message + * duplication (I guess stat() can return a too-low number under some + * circumstances). So starting as of now, we only check the inode number and + * a file change is detected only if the inode changes. -- rgerhards, 2011-01-10 */ static rsRetVal strmHandleEOFMonitor(strm_t *pThis) @@ -410,23 +416,18 @@ strmHandleEOFMonitor(strm_t *pThis) struct stat statName; ISOBJ_TYPE_assert(pThis, strm); - /* find inodes of both current descriptor as well as file now in file - * system. If they are different, the file has been rotated (or - * otherwise rewritten). We also check the size, because the inode - * does not change if the file is truncated (this, BTW, is also a case - * where we actually loose log lines, because we can not do anything - * against truncation...). We do NOT rely on the time of last - * modificaton because that may not be available under all - * circumstances. -- rgerhards, 2008-02-13 - */ if(fstat(pThis->fd, &statOpen) == -1) ABORT_FINALIZE(RS_RET_IO_ERROR); if(stat((char*) pThis->pszCurrFName, &statName) == -1) ABORT_FINALIZE(RS_RET_IO_ERROR); - if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) { + DBGPRINTF("stream checking for file change on '%s', inode %u/%u", + pThis->pszCurrFName, (unsigned) statOpen.st_ino, + (unsigned) statName.st_ino); + if(statOpen.st_ino == statName.st_ino) { ABORT_FINALIZE(RS_RET_EOF); } else { /* we had a file change! */ + DBGPRINTF("we had a file change on '%s'\n", pThis->pszCurrFName); CHKiRet(strmCloseFile(pThis)); CHKiRet(strmOpenFile(pThis)); } @@ -561,39 +562,98 @@ static rsRetVal strmUnreadChar(strm_t *pThis, uchar c) return RS_RET_OK; } - -/* read a line from a strm file. A line is terminated by LF. The LF is read, but it - * is not returned in the buffer (it is discared). The caller is responsible for - * destruction of the returned CStr object! -- rgerhards, 2008-01-07 - * rgerhards, 2008-03-27: I now use the ppCStr directly, without any interim - * string pointer. The reason is that this function my be called by inputs, which - * are pthread_killed() upon termination. So if we use their native pointer, they - * can cleanup (but only then). +/* read a 'paragraph' from a strm file. + * A paragraph may be terminated by a LF, by a LFLF, or by LF<not whitespace> depending on the option set. + * The termination LF characters are read, but are + * not returned in the buffer (it is discared). The caller is responsible for + * destruction of the returned CStr object! -- dlang 2010-12-13 */ static rsRetVal -strmReadLine(strm_t *pThis, cstr_t **ppCStr) +strmReadLine(strm_t *pThis, cstr_t **ppCStr, int mode) { - DEFiRet; - uchar c; - - ASSERT(pThis != NULL); - ASSERT(ppCStr != NULL); - - CHKiRet(cstrConstruct(ppCStr)); - - /* now read the line */ - CHKiRet(strmReadChar(pThis, &c)); - while(c != '\n') { - CHKiRet(cstrAppendChar(*ppCStr, c)); - CHKiRet(strmReadChar(pThis, &c)); + /* mode = 0 single line mode (equivalent to ReadLine) + * mode = 1 LFLF mode (paragraph, blank line between entries) + * mode = 2 LF <not whitespace> mode, a log line starts at the beginning of a line, but following lines that are indented are part of the same log entry + * This modal interface is not nearly as flexible as being able to define a regex for when a new record starts, but it's also not nearly as hard (or as slow) to implement + */ + DEFiRet; + uchar c; + uchar finished; + + ASSERT(pThis != NULL); + ASSERT(ppCStr != NULL); + + CHKiRet(cstrConstruct(ppCStr)); + + /* now read the line */ + CHKiRet(strmReadChar(pThis, &c)); + if (mode == 0){ + while(c != '\n') { + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } + CHKiRet(cstrFinalize(*ppCStr)); + } + if (mode == 1){ + finished=0; + while(finished == 0){ + if(c != '\n') { + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } else { + if ((((*ppCStr)->iStrLen) > 0) ){ + if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] == '\n'){ + rsCStrTruncate(*ppCStr,1); /* remove the prior newline */ + finished=1; + } else { + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } + } else { + finished=1; /* this is a blank line, a \n with nothing since the last complete record */ + } + } + } + CHKiRet(cstrFinalize(*ppCStr)); + } + if (mode == 2){ +/* indented follow-up lines */ + finished=0; + while(finished == 0){ + if ((*ppCStr)->iStrLen == 0){ + if(c != '\n') { +/* nothing in the buffer, and it's not a newline, add it to the buffer */ + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } else { + finished=1; /* this is a blank line, a \n with nothing since the last complete record */ + } + } else { + if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] != '\n'){ +/* not the first character after a newline, add it to the buffer */ + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } else { + if ((c == ' ') || (c == '\t')){ + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } else { +/* clean things up by putting the character we just read back into the input buffer and removing the LF character that is currently at the end of the output string */ + CHKiRet(strmUnreadChar(pThis, c)); + rsCStrTruncate(*ppCStr,1); + finished=1; + } + } + } + } + CHKiRet(cstrFinalize(*ppCStr)); } - CHKiRet(cstrFinalize(*ppCStr)); finalize_it: - if(iRet != RS_RET_OK && *ppCStr != NULL) - cstrDestruct(ppCStr); + if(iRet != RS_RET_OK && *ppCStr != NULL) + cstrDestruct(ppCStr); - RETiRet; + RETiRet; } @@ -669,7 +729,13 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } pThis->pIOBuf = pThis->asyncBuf[0].pBuf; pThis->bStopWriter = 0; - if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) + if(pthread_create(&pThis->writerThreadID, +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + &default_thread_attr, +#else + NULL, +#endif + asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ diff --git a/runtime/stream.h b/runtime/stream.h index 37e9d570..60c68cb2 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -156,7 +156,6 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName); rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC); rsRetVal (*UnreadChar)(strm_t *pThis, uchar c); - rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr); rsRetVal (*SeekCurrOffs)(strm_t *pThis); rsRetVal (*Write)(strm_t *pThis, uchar *pBuf, size_t lenBuf); rsRetVal (*WriteChar)(strm_t *pThis, uchar c); @@ -183,8 +182,10 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ INTERFACEpropSetMeth(strm, iSizeLimit, off_t); INTERFACEpropSetMeth(strm, iFlushInterval, int); INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*); + /* v6 added */ + rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, int mode); ENDinterface(strm) -#define strmCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */ +#define strmCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */ /* prototypes */ diff --git a/runtime/wtp.c b/runtime/wtp.c index ece80911..e615fb19 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -90,6 +90,12 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_mutex_init(&pThis->mutWtp, NULL); pthread_cond_init(&pThis->condThrdTrm, NULL); pthread_attr_init(&pThis->attrThrd); + /* Set thread scheduling policy to default */ +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy); + pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param); + pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED); +#endif pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; |