From e3040285dbf0854443bc2443e0de5ac59f6f839e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 16:38:09 +0200 Subject: first shot at asynchronous stream writer with timeout capability ... seems to work on quick testing, but needs a far more testing and improvement. Good milestone commit. --- runtime/apc.c | 2 + runtime/debug.h | 4 +- runtime/stream.c | 216 ++++++++++++++++++++++++++++++++++----------------- runtime/stream.h | 17 +++- runtime/wtp.c | 6 +- tests/diag.sh | 2 +- tests/killrsyslog.sh | 2 +- tests/nettester.c | 5 +- tests/tcpflood.c | 19 +++-- tests/threadingmq.sh | 4 +- tools/omfile.c | 7 +- tools/syslogd.c | 6 +- 12 files changed, 201 insertions(+), 89 deletions(-) diff --git a/runtime/apc.c b/runtime/apc.c index 5919191d..bc330e39 100644 --- a/runtime/apc.c +++ b/runtime/apc.c @@ -335,9 +335,11 @@ CancelApc(apc_id_t id) { DEFVARS_mutexProtection_uncond; + BEGINfunc BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); deleteApc(id); END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + ENDfunc return RS_RET_OK; } diff --git a/runtime/debug.h b/runtime/debug.h index 1375493d..8b66d784 100644 --- a/runtime/debug.h +++ b/runtime/debug.h @@ -134,8 +134,8 @@ void dbgPrintAllDebugInfo(void); /* debug aides */ -//#ifdef RTINST -#if 0 // temporarily removed for helgrind +#ifdef RTINST +//#if 0 // temporarily removed for helgrind #define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_mutex_trylock(x) dbgMutexTryLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) diff --git a/runtime/stream.c b/runtime/stream.c index 00c726d9..a9f4803f 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -48,37 +48,21 @@ #include "stream.h" #include "unicode-helper.h" #include "module-template.h" -#include "apc.h" +#include /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) -DEFobjCurrIf(apc) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); +static void *asyncWriterThread(void *pPtr); /* methods */ -/* async flush apc handler - */ -static void -flushApc(void *param1, void __attribute__((unused)) *param2) -{ - DEFVARS_mutexProtection_uncond; - strm_t *pThis = (strm_t*) param1; - ISOBJ_TYPE_assert(pThis, strm); - - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - strmFlush(pThis); - pThis->apcRequested = 0; - END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); -} - - /* Try to resolve a size limit situation. This is used to support custom-file size handlers * for omfile. It first runs the command, and then checks if we are still above the size * treshold. Note that this works only with single file names, NOT with circular names. @@ -569,11 +553,11 @@ ENDobjConstruct(strm) static rsRetVal strmConstructFinalize(strm_t *pThis) { rsRetVal localRet; + int i; DEFiRet; ASSERT(pThis != NULL); - CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); pThis->iBufPtrMax = 0; /* results in immediate read request */ if(pThis->iZipLevel) { /* do we need a zip buf? */ localRet = objUse(zlibw, LM_ZLIBW_FILENAME); @@ -601,9 +585,33 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } - /* if we should call flush apc's, we need a mutex */ +dbgprintf("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); + /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { + pThis->bAsyncWrite = 1; + } +dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); + + /* if we work asynchronously, we need a couple of synchronization objects */ + if(pThis->bAsyncWrite) { pthread_mutex_init(&pThis->mut, 0); + pthread_cond_init(&pThis->notFull, 0); + pthread_cond_init(&pThis->notEmpty, 0); + pthread_cond_init(&pThis->isEmpty, 0); + pThis->iCnt = pThis->iEnq = pThis->iDeq = 0; + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } + //pThis->pIOBuf = pThis->ioBuf[0]; + pThis->bStopWriter = 0; + // TODO: detached thread? + if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) + dbgprintf("ERROR: stream %p cold not create writer thread\n", pThis); + // TODO: remove that below later! + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } else { + /* we work synchronously, so we need to alloc a fixed pIOBuf */ + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } finalize_it: @@ -611,8 +619,26 @@ finalize_it: } +/* stop the writer thread (we MUST be runnnig asynchronously when this method + * is called!) -- rgerhards, 2009-07-06 + */ +static inline void +stopWriter(strm_t *pThis) +{ + BEGINfunc + d_pthread_mutex_lock(&pThis->mut); + pThis->bStopWriter = 1; + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + d_pthread_mutex_unlock(&pThis->mut); + pthread_join(pThis->writerThreadID, NULL); + ENDfunc +} + + /* destructor for the strm object */ BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ + int i; CODESTARTobjDestruct(strm) if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); @@ -625,16 +651,23 @@ CODESTARTobjDestruct(strm) objRelease(zlibw, LM_ZLIBW_FILENAME); } - if(pThis->iFlushInterval != 0) { - // TODO: check if there is an apc and remove it! - pthread_mutex_destroy(&pThis->mut); - } - free(pThis->pszDir); - free(pThis->pIOBuf); free(pThis->pZipBuf); free(pThis->pszCurrFName); free(pThis->pszFName); + + if(pThis->bAsyncWrite) { + stopWriter(pThis); + pthread_mutex_destroy(&pThis->mut); + pthread_cond_destroy(&pThis->notFull); + pthread_cond_destroy(&pThis->notEmpty); + pthread_cond_destroy(&pThis->isEmpty); + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + free(pThis->asyncBuf[i].pBuf); + } + } else { + free(pThis->pIOBuf); + } ENDobjDestruct(strm) @@ -730,11 +763,93 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) pWriteBuf += iWritten; } while(lenBuf > 0); /* Warning: do..while()! */ + dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + finalize_it: *pLenBuf = iTotalWritten; RETiRet; } +#include + +/* This is the writer thread for asynchronous mode. + * -- rgerhards, 2009-07-06 + */ +static void* +asyncWriterThread(void *pPtr) +{ + int iDeq; + size_t iWritten; + strm_t *pThis = (strm_t*) pPtr; + ISOBJ_TYPE_assert(pThis, strm); + + BEGINfunc + if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); + } + +fprintf(stderr, "async stream writer thread started\n");fflush(stderr); +dbgprintf("TTT: writer thread startup\n"); + while(1) { /* loop broken inside */ + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt == 0) { +dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); + if(pThis->bStopWriter) { + pthread_cond_signal(&pThis->isEmpty); + d_pthread_mutex_unlock(&pThis->mut); + goto finalize_it; /* break main loop */ + } + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } + + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; + iWritten = pThis->asyncBuf[iDeq].lenBuf; + doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); + // TODO: error check!!!!! 2009-07-06 + + --pThis->iCnt; + if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { + pthread_cond_signal(&pThis->notFull); + } + d_pthread_mutex_unlock(&pThis->mut); + } + +finalize_it: +dbgprintf("TTT: writer thread shutdown\n"); + ENDfunc + return NULL; /* to keep pthreads happy */ +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel. -- rgerhards, 2009-07-06 + */ +static inline rsRetVal +doAsyncWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) +{ + int iEnq; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + // TODO: optimize, memcopy only for getting it initially going! + //pThis->asyncBuf[iEnq].pBuf = pBuf; + memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, *pLenBuf); + pThis->asyncBuf[iEnq].lenBuf = *pLenBuf; + + if(++pThis->iCnt == 1) + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + +finalize_it: + RETiRet; +} + /* sync the file to disk, so that any unwritten data is persisted. This * also syncs the directory and thus makes sure that the file survives @@ -788,8 +903,11 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = lenBuf; - CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteCall(pThis, pBuf, &iWritten)); + } else { + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); + } pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; @@ -993,37 +1111,11 @@ finalize_it: } -/* schedule an Apc flush request. - * rgerhards, 2009-06-15 - */ -static inline rsRetVal -scheduleFlushRequest(strm_t *pThis) -{ - apc_t *pApc; - DEFiRet; - - if(!pThis->apcRequested) { - /* we do an request only if none is yet pending */ - pThis->apcRequested = 1; - // TODO: find similar thing later CHKiRet(apc.CancelApc(pThis->apcID)); -dbgprintf("XXX: requesting to add apc!\n"); - CHKiRet(apc.Construct(&pApc)); - CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc)); - CHKiRet(apc.SetParam1(pApc, pThis)); - CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID)); - } - -finalize_it: - RETiRet; -} - - /* write memory buffer to a stream object */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { - DEFVARS_mutexProtection_uncond; DEFiRet; size_t iPartial; @@ -1034,11 +1126,6 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); -RUNLOG_VAR("%d", pThis->iFlushInterval); - if(pThis->iFlushInterval != 0) { - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - } - /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. @@ -1067,17 +1154,7 @@ RUNLOG_VAR("%d", pThis->iFlushInterval); } } - /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at - * termination. For Zip mode, it could be fatal if we write after each record. - */ - if(pThis->iFlushInterval != 0) - scheduleFlushRequest(pThis); - finalize_it: - if(pThis->iFlushInterval != 0) { - END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - } - RETiRet; } @@ -1390,7 +1467,6 @@ ENDobjQueryInterface(strm) */ BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ - CHKiRet(objUse(apc, CORE_COMPONENT)); OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); diff --git a/runtime/stream.h b/runtime/stream.h index ac003c7b..2c1ac255 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -87,6 +87,7 @@ typedef enum { /* when extending, do NOT change existing modes! */ STREAMMODE_WRITE_APPEND = 4 } strmMode_t; +#define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */ /* The strm_t data structure */ typedef struct strm_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ @@ -112,7 +113,7 @@ typedef struct strm_s { int fd; /* the file descriptor, -1 if closed */ int fdDir; /* the directory's descriptor, in case bSync is requested (-1 if closed) */ uchar *pszCurrFName; /* name of current file (if open) */ - uchar *pIOBuf; /* io Buffer */ + uchar *pIOBuf; /* the iobuffer currently in use to gather data */ size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ size_t iBufPtr; /* pointer into current buffer */ int iUngetC; /* char set via UngetChar() call or -1 if none set */ @@ -120,9 +121,22 @@ typedef struct strm_s { int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */ Bytef *pZipBuf; /* support for async flush procesing */ + bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */ + bool bStopWriter; /* shall writer thread terminate? */ int iFlushInterval; /* flush in which interval - 0, no flushing */ apc_id_t apcID; /* id of current Apc request (used for cancelling) */ pthread_mutex_t mut;/* mutex for flush in async mode */ + pthread_cond_t notFull; + pthread_cond_t notEmpty; + pthread_cond_t isEmpty; + short iEnq; + short iDeq; + short iCnt; /* current nbr of elements in buffer */ + struct { + uchar *pBuf; + size_t lenBuf; + } asyncBuf[STREAM_ASYNC_NUMBUFS]; + pthread_t writerThreadID; int apcRequested; /* is an apc Requested? */ /* support for omfile size-limiting commands, special counters, NOT persisted! */ off_t iSizeLimit; /* file size limit, 0 = no limit */ @@ -130,6 +144,7 @@ typedef struct strm_s { bool bIsTTY; /* is this a tty file? */ } strm_t; + /* interfaces */ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(strm_t **ppThis); diff --git a/runtime/wtp.c b/runtime/wtp.c index 02662cde..e37ebddf 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -421,6 +421,8 @@ wtpWrkrExecCancelCleanup(void *arg) static void * wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */ { + uchar *pszDbgHdr; + uchar thrdName[32] = "rs:"; DEFiRet; DEFVARS_mutexProtection; wti_t *pWti = (wti_t*) arg; @@ -435,7 +437,9 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pthread_sigmask(SIG_BLOCK, &sigSet, NULL); /* set thread name - we ignore if the call fails, has no harsh consequences... */ - if(prctl(PR_SET_NAME, wtpGetDbgHdr(pThis), 0, 0, 0) != 0) { + pszDbgHdr = wtpGetDbgHdr(pThis); + strncpy(thrdName+3, pszDbgHdr, 20); + if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) { DBGPRINTF("prctl failed, not setting thread name for '%s'\n", wtpGetDbgHdr(pThis)); } diff --git a/tests/diag.sh b/tests/diag.sh index 2a9d0ee3..299c5d71 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -9,7 +9,7 @@ #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" #set -o xtrace -#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction" +#export RSYSLOG_DEBUG="debug nostdout printmutexaction" #export RSYSLOG_DEBUGLOG="log" case $1 in 'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason diff --git a/tests/killrsyslog.sh b/tests/killrsyslog.sh index b1be757b..c9b6e0ac 100755 --- a/tests/killrsyslog.sh +++ b/tests/killrsyslog.sh @@ -2,6 +2,6 @@ if [ -e "rsyslog.pid" ] then echo rsyslog.pid exists, trying to shut down rsyslogd process `cat rsyslog.pid`. - kill `cat rsyslog.pid` + kill -9 `cat rsyslog.pid` sleep 1 fi diff --git a/tests/nettester.c b/tests/nettester.c index dbfb4db3..73abc46e 100644 --- a/tests/nettester.c +++ b/tests/nettester.c @@ -128,12 +128,13 @@ tcpSend(char *buf, int lenBuf) if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == 0) { break; } else { - if(retries++ == 30) { + if(retries++ == 50) { ++iFailed; fprintf(stderr, "connect() failed\n"); return(1); } else { - usleep(100); + fprintf(stderr, "connect() failed, retry %d\n", retries); + usleep(100000); /* ms = 1000 us! */ } } } diff --git a/tests/tcpflood.c b/tests/tcpflood.c index 2ca796ca..0439e33e 100644 --- a/tests/tcpflood.c +++ b/tests/tcpflood.c @@ -61,6 +61,7 @@ int openConn(int *fd) { int sock; struct sockaddr_in addr; + int retries = 0; if((sock=socket(AF_INET, SOCK_STREAM, 0))==-1) { perror("socket()"); @@ -74,11 +75,19 @@ int openConn(int *fd) fprintf(stderr, "inet_aton() failed\n"); return(1); } - if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) { - perror("connect()"); - fprintf(stderr, "connect() failed\n"); - return(1); - } + while(1) { /* loop broken inside */ + if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == 0) { + break; + } else { + if(retries++ == 50) { + perror("connect()"); + fprintf(stderr, "connect() failed\n"); + return(1); + } else { + usleep(100000); /* ms = 1000 us! */ + } + } + } *fd = sock; return 0; diff --git a/tests/threadingmq.sh b/tests/threadingmq.sh index 5c29ec60..3680df5f 100755 --- a/tests/threadingmq.sh +++ b/tests/threadingmq.sh @@ -9,7 +9,7 @@ echo TEST: threadingmq.sh - main queue concurrency source $srcdir/diag.sh init source $srcdir/diag.sh startup threadingmq.conf -source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 100000 +source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 10000000 source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages -source $srcdir/diag.sh seq-check 0 99999 +source $srcdir/diag.sh seq-check 0 9999999 source $srcdir/diag.sh exit diff --git a/tools/omfile.c b/tools/omfile.c index 82944a96..bb12b4b6 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -401,12 +401,17 @@ prepareFile(instanceData *pData, uchar *newFileName) CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName))); CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel)); CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize)); - CHKiRet(strm.SetiFlushInterval(pData->pStrm, pData->iFlushInterval)); CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND)); CHKiRet(strm.SettOpenMode(pData->pStrm, fCreateMode)); CHKiRet(strm.SetbSync(pData->pStrm, pData->bSyncFile)); CHKiRet(strm.SetsType(pData->pStrm, STREAMTYPE_FILE_SINGLE)); CHKiRet(strm.SetiSizeLimit(pData->pStrm, pData->iSizeLimit)); + /* set the flush interval only if we actually use it - otherwise it will activate + * async processing, which is a real performance waste if we do not do buffered + * writes! -- rgerhards, 2009-07-06 + */ + if(!pData->bFlushOnTXEnd) + CHKiRet(strm.SetiFlushInterval(pData->pStrm, pData->iFlushInterval)); if(pData->pszSizeLimitCmd != NULL) CHKiRet(strm.SetpszSizeLimitCmd(pData->pStrm, ustrdup(pData->pszSizeLimitCmd))); CHKiRet(strm.ConstructFinalize(pData->pStrm)); diff --git a/tools/syslogd.c b/tools/syslogd.c index f7253a8e..f79410d7 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -2548,8 +2548,8 @@ mainloop(void) * powertop, for example). In that case, we primarily wait for a signal, * but a once-a-day wakeup should be quite acceptable. -- rgerhards, 2008-06-09 */ - //tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/; - tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */ + tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/; + //tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */ tvSelectTimeout.tv_usec = 0; select(1, NULL, NULL, NULL, &tvSelectTimeout); if(bFinished) @@ -2584,7 +2584,7 @@ mainloop(void) bHadHUP = 0; continue; } - execScheduled(); /* handle Apc calls (if any) */ + // TODO: remove execScheduled(); /* handle Apc calls (if any) */ } ENDfunc } -- cgit From 8e76a0521bee36e02e8bce2e97fa3d2aa67130da Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 19:28:22 +0200 Subject: some minor cleanup --- runtime/stream.c | 32 ++++++++++++++++---------------- runtime/wtp.c | 3 ++- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/runtime/stream.c b/runtime/stream.c index a9f4803f..2bb19caf 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -123,10 +123,11 @@ resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName) finalize_it: if(iRet != RS_RET_OK) { - if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) - dbgprintf("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); - else - dbgprintf("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) { + DBGPRINTF("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); + } else { + DBGPRINTF("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + } pThis->bDisabled = 1; } @@ -585,12 +586,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } -dbgprintf("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); +DBGPRINTF("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { pThis->bAsyncWrite = 1; } -dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); +DBGPRINTF("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we work asynchronously, we need a couple of synchronization objects */ if(pThis->bAsyncWrite) { @@ -604,9 +605,8 @@ dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlus } //pThis->pIOBuf = pThis->ioBuf[0]; pThis->bStopWriter = 0; - // TODO: detached thread? if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) - dbgprintf("ERROR: stream %p cold not create writer thread\n", pThis); + DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); // TODO: remove that below later! CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { @@ -789,11 +789,11 @@ asyncWriterThread(void *pPtr) } fprintf(stderr, "async stream writer thread started\n");fflush(stderr); -dbgprintf("TTT: writer thread startup\n"); +DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { -dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); +DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); if(pThis->bStopWriter) { pthread_cond_signal(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); @@ -815,7 +815,7 @@ dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) } finalize_it: -dbgprintf("TTT: writer thread shutdown\n"); +DBGPRINTF("TTT: writer thread shutdown\n"); ENDfunc return NULL; /* to keep pthreads happy */ } @@ -957,7 +957,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) /* see note in file header for the params we use with deflateInit2() */ zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); if(zRet != Z_OK) { - dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet); + DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } @@ -967,11 +967,11 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) /* run deflate() on input until output buffer not full, finish compression if all of source has been read in */ do { - dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); zstrm.avail_out = pThis->sIOBufSize; zstrm.next_out = pThis->pZipBuf; zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */ - dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); assert(zRet != Z_STREAM_ERROR); /* state not clobbered */ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out)); } while (zstrm.avail_out == 0); @@ -980,7 +980,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) zRet = zlibw.DeflateEnd(&zstrm); if(zRet != Z_OK) { - dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet); + DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } @@ -1122,7 +1122,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); +DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); diff --git a/runtime/wtp.c b/runtime/wtp.c index e37ebddf..ba1f94b3 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -53,6 +53,7 @@ #include "wtp.h" #include "wti.h" #include "obj.h" +#include "unicode-helper.h" #include "glbl.h" /* static data */ @@ -438,7 +439,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in /* set thread name - we ignore if the call fails, has no harsh consequences... */ pszDbgHdr = wtpGetDbgHdr(pThis); - strncpy(thrdName+3, pszDbgHdr, 20); + ustrncpy(thrdName+3, pszDbgHdr, 20); if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) { DBGPRINTF("prctl failed, not setting thread name for '%s'\n", wtpGetDbgHdr(pThis)); } -- cgit From 53b055b6aabd87fa096edf70a6e58eea6c87f38b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 19:44:53 +0200 Subject: moved zip part to writer thread ... this is necessary in preparation for the final solution (we need to have a "unified" writer). If it causes worse performance to have the zip writher togehter with the synchronous write, we may do an async write... --- runtime/stream.c | 158 +++++++++++++++++++++++++++++---------------------- tests/threadingmq.sh | 4 +- 2 files changed, 91 insertions(+), 71 deletions(-) diff --git a/runtime/stream.c b/runtime/stream.c index 2bb19caf..dfb43891 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -59,6 +59,8 @@ static rsRetVal strmFlush(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); +static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); /* methods */ @@ -586,12 +588,10 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } -DBGPRINTF("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { pThis->bAsyncWrite = 1; } -DBGPRINTF("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we work asynchronously, we need a couple of synchronization objects */ if(pThis->bAsyncWrite) { @@ -770,7 +770,88 @@ finalize_it: RETiRet; } -#include + + +/* write memory buffer to a stream object. + * To support direct writes of large objects, this method may be called + * with a buffer pointing to some region other than the stream buffer itself. + * However, in that case the stream buffer must be empty (strmFlush() has to + * be called before), because we would otherwise mess up with the sequence + * inside the stream. -- rgerhards, 2008-01-10 + */ +static inline rsRetVal +doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->iZipLevel) { + CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + } else { + /* write without zipping */ + CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel. -- rgerhards, 2009-07-06 + */ +static inline rsRetVal +doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + int iEnq; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + // TODO: optimize, memcopy only for getting it initially going! + //pThis->asyncBuf[iEnq].pBuf = pBuf; + memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); + pThis->asyncBuf[iEnq].lenBuf = lenBuf; + + if(++pThis->iCnt == 1) + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + +finalize_it: + RETiRet; +} + + +/* schedule writing to the stream. Depending on our concurrency settings, + * this either directly writes to the stream or schedules writing via + * the background thread. -- rgerhards, 2009-07-07 + */ +static rsRetVal +strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf)); + } else { + CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + /* This is the writer thread for asynchronous mode. * -- rgerhards, 2009-07-06 @@ -788,7 +869,6 @@ asyncWriterThread(void *pPtr) DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -fprintf(stderr, "async stream writer thread started\n");fflush(stderr); DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); @@ -804,7 +884,8 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; iWritten = pThis->asyncBuf[iDeq].lenBuf; - doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); + // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); // TODO: error check!!!!! 2009-07-06 --pThis->iCnt; @@ -821,36 +902,6 @@ DBGPRINTF("TTT: writer thread shutdown\n"); } -/* This function is called to "do" an async write call, what primarily means that - * the data is handed over to the writer thread (which will then do the actual write - * in parallel. -- rgerhards, 2009-07-06 - */ -static inline rsRetVal -doAsyncWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) -{ - int iEnq; - DEFiRet; - ISOBJ_TYPE_assert(pThis, strm); - - d_pthread_mutex_lock(&pThis->mut); - while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) - d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - - iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; - // TODO: optimize, memcopy only for getting it initially going! - //pThis->asyncBuf[iEnq].pBuf = pBuf; - memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, *pLenBuf); - pThis->asyncBuf[iEnq].lenBuf = *pLenBuf; - - if(++pThis->iCnt == 1) - pthread_cond_signal(&pThis->notEmpty); - d_pthread_mutex_unlock(&pThis->mut); - -finalize_it: - RETiRet; -} - - /* sync the file to disk, so that any unwritten data is persisted. This * also syncs the directory and thus makes sure that the file survives * fatal failure. Note that we do NOT return an error status if the @@ -903,11 +954,7 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = lenBuf; - if(pThis->bAsyncWrite) { - CHKiRet(doAsyncWriteCall(pThis, pBuf, &iWritten)); - } else { - CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - } + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; @@ -989,33 +1036,6 @@ finalize_it: } -/* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 - */ -static rsRetVal -strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) -{ - DEFiRet; - - ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); - - if(pThis->iZipLevel) { - CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); - } else { - /* write without zipping */ - CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); - } - -finalize_it: - RETiRet; -} - - /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 @@ -1029,7 +1049,7 @@ strmFlush(strm_t *pThis) dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { - iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); + iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); } RETiRet; @@ -1132,7 +1152,7 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n * TODO: is it really? think about disk block sizes! */ CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); } else { /* data fits into a buffer - we just need to see if it * fits into the current buffer... diff --git a/tests/threadingmq.sh b/tests/threadingmq.sh index 3680df5f..5c29ec60 100755 --- a/tests/threadingmq.sh +++ b/tests/threadingmq.sh @@ -9,7 +9,7 @@ echo TEST: threadingmq.sh - main queue concurrency source $srcdir/diag.sh init source $srcdir/diag.sh startup threadingmq.conf -source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 10000000 +source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 100000 source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages -source $srcdir/diag.sh seq-check 0 9999999 +source $srcdir/diag.sh seq-check 0 99999 source $srcdir/diag.sh exit -- cgit From f27b5cc2535f0b1763e1963304546b611cd3c26a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 08:08:22 +0200 Subject: moved locking primitives --- runtime/stream.c | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/runtime/stream.c b/runtime/stream.c index dfb43891..d25f8b02 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -801,7 +801,8 @@ finalize_it: /* This function is called to "do" an async write call, what primarily means that * the data is handed over to the writer thread (which will then do the actual write - * in parallel. -- rgerhards, 2009-07-06 + * in parallel). Note that the stream mutex has already been locked by the + * strmWrite...() calls. -- rgerhards, 2009-07-06 */ static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -810,7 +811,6 @@ doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); - d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); @@ -1102,6 +1102,9 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) ASSERT(pThis != NULL); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); @@ -1111,11 +1114,18 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) pThis->iBufPtr++; finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + RETiRet; } -/* write an integer value (actually a long) to a stream object */ +/* write an integer value (actually a long) to a stream object + * Note that we do not need to lock the mutex here, because we call + * strmWrite(), which does the lock (aka: we must not lock it, else we + * would run into a recursive lock, resulting in a deadlock!) + */ static rsRetVal strmWriteLong(strm_t *pThis, long i) { DEFiRet; @@ -1143,6 +1153,9 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pBuf != NULL); DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); @@ -1175,6 +1188,9 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n } finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + RETiRet; } -- cgit From 4e9deb5b88129a397d23b55e3954c6f1c259e466 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 08:33:22 +0200 Subject: stream now uses a singular buffer strucuture for writing --- runtime/stream.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/runtime/stream.c b/runtime/stream.c index d25f8b02..6924d527 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -603,12 +603,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } - //pThis->pIOBuf = pThis->ioBuf[0]; + pThis->pIOBuf = pThis->asyncBuf[0].pBuf; pThis->bStopWriter = 0; if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); // TODO: remove that below later! - CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); @@ -802,7 +802,10 @@ finalize_it: /* This function is called to "do" an async write call, what primarily means that * the data is handed over to the writer thread (which will then do the actual write * in parallel). Note that the stream mutex has already been locked by the - * strmWrite...() calls. -- rgerhards, 2009-07-06 + * strmWrite...() calls. Also note that we always have only a single producer, + * so we can simply serially assign the next free buffer to it and be sure that + * the very some producer comes back in sequence to submit the then-filled buffers. + * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -814,11 +817,12 @@ doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; // TODO: optimize, memcopy only for getting it initially going! //pThis->asyncBuf[iEnq].pBuf = pBuf; - memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); - pThis->asyncBuf[iEnq].lenBuf = lenBuf; + //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); + pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; + pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); @@ -1164,6 +1168,8 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n /* it is - so we do a direct write, that is most efficient. * TODO: is it really? think about disk block sizes! */ +printf("error: we should not have reached this code!\n"); + abort(); CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); } else { -- cgit From 53aa68fc7d78279ab1af88de253a6134fc7ef00d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 09:16:14 +0200 Subject: clean solution for "writing" arbrietary-size user buffers to a stream --- runtime/stream.c | 70 +++++++++++++++++++++++--------------------------------- 1 file changed, 28 insertions(+), 42 deletions(-) diff --git a/runtime/stream.c b/runtime/stream.c index 6924d527..92122215 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -607,8 +607,6 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) pThis->bStopWriter = 0; if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); - // TODO: remove that below later! - //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); @@ -808,19 +806,14 @@ finalize_it: * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ static inline rsRetVal -doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) { - int iEnq; DEFiRet; ISOBJ_TYPE_assert(pThis, strm); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; - // TODO: optimize, memcopy only for getting it initially going! - //pThis->asyncBuf[iEnq].pBuf = pBuf; - //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; @@ -846,7 +839,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->bAsyncWrite) { - CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); } else { CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } @@ -873,11 +866,9 @@ asyncWriterThread(void *pPtr) DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { -DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); if(pThis->bStopWriter) { pthread_cond_signal(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); @@ -889,7 +880,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; iWritten = pThis->asyncBuf[iDeq].lenBuf; doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); - // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); // TODO: error check!!!!! 2009-07-06 --pThis->iCnt; @@ -900,7 +890,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) } finalize_it: -DBGPRINTF("TTT: writer thread shutdown\n"); ENDfunc return NULL; /* to keep pthreads happy */ } @@ -1109,6 +1098,9 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); @@ -1145,13 +1137,23 @@ finalize_it: } -/* write memory buffer to a stream object +/* write memory buffer to a stream object. + * process the data in chunks and copy it over to our buffer. The caller-provided data + * may theoritically be larger than our buffer. In that case, we do multiple copies. One + * may argue if it were more efficient to write out the caller-provided buffer in that case + * and earlier versions of rsyslog did this. However, this introduces a lot of complexity + * inside the buffered writer and potential performance bottlenecks when trying to solve + * it. Now keep in mind that we actually do (almost?) never have a case where the + * caller-provided buffer is larger than our one. So instead of optimizing a case + * which normally does not exist, we expect some degradation in its case but make us + * perform better in the regular cases. -- rgerhards, 2009-07-07 */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; - size_t iPartial; + size_t iWrite; + size_t iOffset; ASSERT(pThis != NULL); ASSERT(pBuf != NULL); @@ -1163,35 +1165,19 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); - /* check if the to-be-written data is larger than our buffer size */ - if(lenBuf >= pThis->sIOBufSize) { - /* it is - so we do a direct write, that is most efficient. - * TODO: is it really? think about disk block sizes! - */ -printf("error: we should not have reached this code!\n"); - abort(); - CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); - } else { - /* data fits into a buffer - we just need to see if it - * fits into the current buffer... - */ - if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) { - /* nope, so we must split it */ - iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ - if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial); - pThis->iBufPtr += iPartial; - } + iOffset = 0; + do { + if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ - memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial); - pThis->iBufPtr = lenBuf - iPartial; - } else { - /* we have space, so we simply copy over the string */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); - pThis->iBufPtr += lenBuf; } - } + iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ + if(iWrite > lenBuf) + iWrite = lenBuf; + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite); + pThis->iBufPtr += iWrite; + iOffset += iWrite; + lenBuf -= iWrite; + } while(lenBuf > 0); finalize_it: if(pThis->bAsyncWrite) -- cgit From f53aa966e1fad03c478de342f5a878e57405de13 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 12:09:41 +0200 Subject: solved a race condition --- runtime/debug.c | 2 ++ runtime/debug.h | 1 - runtime/stream.c | 63 +++++++++++++++++++++++++++++++++++++++++++------------- tests/diag.sh | 2 +- 4 files changed, 52 insertions(+), 16 deletions(-) diff --git a/runtime/debug.c b/runtime/debug.c index 4ee90226..ded1c218 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -732,6 +732,8 @@ static void dbgGetThrdName(char *pszBuf, size_t lenBuf, pthread_t thrd, int bInc */ void dbgSetThrdName(uchar *pszName) { +return; + dbgThrdInfo_t *pThrd = dbgGetThrdInfo(); if(pThrd->pszThrdName != NULL) free(pThrd->pszThrdName); diff --git a/runtime/debug.h b/runtime/debug.h index 8b66d784..dcbfb930 100644 --- a/runtime/debug.h +++ b/runtime/debug.h @@ -135,7 +135,6 @@ void dbgPrintAllDebugInfo(void); /* debug aides */ #ifdef RTINST -//#if 0 // temporarily removed for helgrind #define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_mutex_trylock(x) dbgMutexTryLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) diff --git a/runtime/stream.c b/runtime/stream.c index 92122215..eae36796 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -50,6 +50,8 @@ #include "module-template.h" #include +#define inline + /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) @@ -267,6 +269,21 @@ finalize_it: } +/* wait for the output writer thread to be done. This must be called before actions + * that require data to be persisted. May be called in non-async mode and is a null + * operation than. Must be called with the mutex locked. + */ +static inline void +strmWaitAsyncWriterDone(strm_t *pThis) +{ + if(pThis->bAsyncWrite) { + /* awake writer thread and make it write out everything */ + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + } +} + + /* close a strm file * Note that the bDeleteOnClose flag is honored. If it is set, the file will be * deleted after close. This is in support for the qRead thread. @@ -282,6 +299,8 @@ static rsRetVal strmCloseFile(strm_t *pThis) if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); + strmWaitAsyncWriterDone(pThis); + close(pThis->fd); pThis->fd = -1; @@ -618,16 +637,14 @@ finalize_it: /* stop the writer thread (we MUST be runnnig asynchronously when this method - * is called!) -- rgerhards, 2009-07-06 + * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06 */ static inline void stopWriter(strm_t *pThis) { BEGINfunc - d_pthread_mutex_lock(&pThis->mut); pThis->bStopWriter = 1; pthread_cond_signal(&pThis->notEmpty); - d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); d_pthread_mutex_unlock(&pThis->mut); pthread_join(pThis->writerThreadID, NULL); ENDfunc @@ -638,9 +655,14 @@ stopWriter(strm_t *pThis) BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ int i; CODESTARTobjDestruct(strm) + if(pThis->bAsyncWrite) + /* Note: mutex will be unlocked in stopWriter! */ + d_pthread_mutex_lock(&pThis->mut); + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); +dbgprintf("XXX: destruct stream %p\n", pThis); /* ... then free resources */ if(pThis->fd != -1) strmCloseFile(pThis); @@ -681,6 +703,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) if(pThis->fd == -1) FINALIZE; + /* wait for output to be empty, so that our counts are correct */ + strmWaitAsyncWriterDone(pThis); + if(pThis->iCurrOffs >= pThis->iMaxFileSize) { dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); @@ -783,7 +808,6 @@ doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->iZipLevel) { CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); @@ -811,15 +835,15 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); +dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; - pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; + pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); - d_pthread_mutex_unlock(&pThis->mut); finalize_it: RETiRet; @@ -836,7 +860,6 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->bAsyncWrite) { CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); @@ -844,6 +867,8 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } + pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ + finalize_it: RETiRet; } @@ -857,7 +882,6 @@ static void* asyncWriterThread(void *pPtr) { int iDeq; - size_t iWritten; strm_t *pThis = (strm_t*) pPtr; ISOBJ_TYPE_assert(pThis, strm); @@ -870,7 +894,7 @@ asyncWriterThread(void *pPtr) d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { if(pThis->bStopWriter) { - pthread_cond_signal(&pThis->isEmpty); + pthread_cond_broadcast(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); goto finalize_it; /* break main loop */ } @@ -878,13 +902,14 @@ asyncWriterThread(void *pPtr) } iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; - iWritten = pThis->asyncBuf[iDeq].lenBuf; - doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); - // TODO: error check!!!!! 2009-07-06 + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); + // TODO: error check????? 2009-07-06 --pThis->iCnt; if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { pthread_cond_signal(&pThis->notFull); + if(pThis->iCnt == 0) + pthread_cond_broadcast(&pThis->isEmpty); } d_pthread_mutex_unlock(&pThis->mut); } @@ -949,7 +974,6 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iWritten = lenBuf; CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; /* update user counter, if provided */ if(pThis->pUsrWCntr != NULL) @@ -1158,7 +1182,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); +//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); @@ -1167,9 +1191,11 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n iOffset = 0; do { +//dbgprintf("XXX: enter write loop\n"); if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } +//dbgprintf("XXX: post strmFlush\n"); iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) iWrite = lenBuf; @@ -1177,8 +1203,17 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n pThis->iBufPtr += iWrite; iOffset += iWrite; lenBuf -= iWrite; +//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset); } while(lenBuf > 0); + /* now check if the buffer right at the end of the write is full and, if so, + * write it. This seems more natural than waiting (hours?) for the next message... + */ + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + } +//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf); + finalize_it: if(pThis->bAsyncWrite) d_pthread_mutex_unlock(&pThis->mut); diff --git a/tests/diag.sh b/tests/diag.sh index 299c5d71..1514474c 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -5,7 +5,7 @@ # not always able to convey back states to the upper-level test driver # begun 2009-05-27 by rgerhards # This file is part of the rsyslog project, released under GPLv3 -#valgrind="valgrind --log-fd=1" +valgrind="valgrind --log-fd=1" #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" #set -o xtrace -- cgit From 26227091faac8c3cc9bc282eb4e4fc408635f8d2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 17:18:51 +0200 Subject: fixed a bug introduced today that lead to an abort in queue disk mode --- runtime/debug.c | 2 +- runtime/stream.c | 21 ++++++++++++--------- runtime/stream.h | 1 + tests/diag.sh | 2 +- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/runtime/debug.c b/runtime/debug.c index ded1c218..807fd3f7 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -778,7 +778,7 @@ static void dbgCallStackPrint(dbgThrdInfo_t *pThrd) /* print all threads call stacks */ -static void dbgCallStackPrintAll(void) +void dbgCallStackPrintAll(void) { dbgThrdInfo_t *pThrd; /* stack info */ diff --git a/runtime/stream.c b/runtime/stream.c index eae36796..9f363257 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -276,11 +276,13 @@ finalize_it: static inline void strmWaitAsyncWriterDone(strm_t *pThis) { + BEGINfunc if(pThis->bAsyncWrite) { /* awake writer thread and make it write out everything */ pthread_cond_signal(&pThis->notEmpty); d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); } + ENDfunc } @@ -296,10 +298,16 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - if(pThis->tOperationsMode != STREAMMODE_READ) - strmFlush(pThis); - - strmWaitAsyncWriterDone(pThis); + dbgCallStackPrintAll(); + if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { + pThis->bInClose = 1; + if(pThis->bAsyncWrite) { + strmFlush(pThis); + } else { + strmWaitAsyncWriterDone(pThis); + } + pThis->bInClose = 0; + } close(pThis->fd); pThis->fd = -1; @@ -796,11 +804,6 @@ finalize_it: /* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 */ static inline rsRetVal doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) diff --git a/runtime/stream.h b/runtime/stream.h index 2c1ac255..1efd29b5 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -118,6 +118,7 @@ typedef struct strm_s { size_t iBufPtr; /* pointer into current buffer */ int iUngetC; /* char set via UngetChar() call or -1 if none set */ bool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */ + bool bInClose; /* used to break "deadly close loops", tells us we are already inside a close */ int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */ Bytef *pZipBuf; /* support for async flush procesing */ diff --git a/tests/diag.sh b/tests/diag.sh index 1514474c..299c5d71 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -5,7 +5,7 @@ # not always able to convey back states to the upper-level test driver # begun 2009-05-27 by rgerhards # This file is part of the rsyslog project, released under GPLv3 -valgrind="valgrind --log-fd=1" +#valgrind="valgrind --log-fd=1" #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" #set -o xtrace -- cgit From 5221a1e42e16c8c39b48a4a1a18ee6322c38cd17 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 18:33:00 +0200 Subject: added capability to write incomplete buffers after an inactivity timeout for the stream class and thus finally activating omfile's timeout capability in a useful way without polling and too-high performance overhead. --- runtime/stream.c | 46 +++++++++++++++++++++++++++++++++++++++------- runtime/stream.h | 1 + 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/runtime/stream.c b/runtime/stream.c index 9f363257..a0571a61 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -298,7 +298,6 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - dbgCallStackPrintAll(); if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { pThis->bInClose = 1; if(pThis->bAsyncWrite) { @@ -845,6 +844,7 @@ dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf) pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; + pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */ if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); @@ -885,6 +885,8 @@ static void* asyncWriterThread(void *pPtr) { int iDeq; + struct timespec t; + bool bTimedOut = 0; strm_t *pThis = (strm_t*) pPtr; ISOBJ_TYPE_assert(pThis, strm); @@ -901,9 +903,35 @@ asyncWriterThread(void *pPtr) d_pthread_mutex_unlock(&pThis->mut); goto finalize_it; /* break main loop */ } - d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + if(bTimedOut && pThis->iBufPtr > 0) { +RUNLOG_STR("XXX: we had a timeout in stream writer"); + /* if we timed out, we need to flush pending data */ + strmFlush(pThis); + bTimedOut = 0; + continue; /* now we should have data */ + } + bTimedOut = 0; + timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ + if(pThis->bDoTimedWait) { + if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { + int err = errno; + if(err == ETIMEDOUT) { + bTimedOut = 1; + } else { + bTimedOut = 1; + char errStr[1024]; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n", + err, errStr); + } + } + } else { + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } } + bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); // TODO: error check????? 2009-07-06 @@ -1194,11 +1222,9 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iOffset = 0; do { -//dbgprintf("XXX: enter write loop\n"); if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: post strmFlush\n"); iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) iWrite = lenBuf; @@ -1206,7 +1232,6 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) pThis->iBufPtr += iWrite; iOffset += iWrite; lenBuf -= iWrite; -//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset); } while(lenBuf > 0); /* now check if the buffer right at the end of the write is full and, if so, @@ -1215,11 +1240,18 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf); finalize_it: - if(pThis->bAsyncWrite) + if(pThis->bAsyncWrite) { + if(pThis->bDoTimedWait == 0) { + /* we potentially have a partial buffer, so re-activate the + * writer thread that it can set and pick up timeouts. + */ + pThis->bDoTimedWait = 1; + pthread_cond_signal(&pThis->notEmpty); + } d_pthread_mutex_unlock(&pThis->mut); + } RETiRet; } diff --git a/runtime/stream.h b/runtime/stream.h index 1efd29b5..cb368835 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -124,6 +124,7 @@ typedef struct strm_s { /* support for async flush procesing */ bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */ bool bStopWriter; /* shall writer thread terminate? */ + bool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */ int iFlushInterval; /* flush in which interval - 0, no flushing */ apc_id_t apcID; /* id of current Apc request (used for cancelling) */ pthread_mutex_t mut;/* mutex for flush in async mode */ -- cgit From 6fde78cb744b22eb5790d43297acab249ca0e7fa Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 9 Jul 2009 17:19:21 +0200 Subject: small performance improvement and cleanup optimized substring processing, should bring a small enhancement when forwarding with the default forwarding templates. Also did some uchar cleanup in msg.c (thus so many changes, in reality they are few...). --- action.c | 3 +- runtime/msg.c | 319 ++++++++++++++++++++++++++++++--------------------------- runtime/msg.h | 4 +- runtime/rule.c | 7 +- 4 files changed, 174 insertions(+), 159 deletions(-) diff --git a/action.c b/action.c index f21feea6..bcb23659 100644 --- a/action.c +++ b/action.c @@ -43,6 +43,7 @@ #include "srUtils.h" #include "errmsg.h" #include "datetime.h" +#include "unicode-helper.h" /* forward definitions */ rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg); @@ -780,7 +781,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) /* suppress duplicate messages */ if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL && (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) && - !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && + !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) && !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) && !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) { diff --git a/runtime/msg.c b/runtime/msg.c index d29da560..de298871 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -1159,16 +1159,16 @@ int getMSGLen(msg_t *pM) return((pM == NULL) ? 0 : pM->iLenMSG); } -char *getMSG(msg_t *pM) +uchar *getMSG(msg_t *pM) { - char *ret; + uchar *ret; if(pM == NULL) - ret = ""; + ret = UCHAR_CONSTANT(""); else { if(pM->offMSG == -1) - ret = ""; + ret = UCHAR_CONSTANT(""); else - ret = (char*)(pM->pszRawMsg + pM->offMSG); + ret = pM->pszRawMsg + pM->offMSG; } return ret; } @@ -1613,21 +1613,23 @@ static inline void tryEmulateTAG(msg_t *pM, bool bLockMutex) } -static inline char *getTAG(msg_t *pM) +static inline void +getTAG(msg_t *pM, uchar **ppBuf, int *piLen) { - char *ret; - - if(pM == NULL) - ret = ""; - else { + if(pM == NULL) { + *ppBuf = UCHAR_CONSTANT(""); + *piLen = 0; + } else { if(pM->iLenTAG == 0) tryEmulateTAG(pM, LOCK_MUTEX); - if(pM->iLenTAG == 0) - ret = ""; - else - ret = (char*) ((pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG); + if(pM->iLenTAG == 0) { + *ppBuf = UCHAR_CONSTANT(""); + *piLen = 0; + } else { + *ppBuf = (pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG; + *piLen = pM->iLenTAG; + } } - return(ret); } @@ -2130,14 +2132,14 @@ static uchar *getNOW(eNOWType eNow) * be used in selector line processing. * rgerhards 2005-09-15 */ -char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, +uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, propid_t propID, size_t *pPropLen, unsigned short *pbMustBeFreed) { - char *pRes; /* result pointer */ + uchar *pRes; /* result pointer */ int bufLen = -1; /* length of string or -1, if not known */ - char *pBufStart; - char *pBuf; + uchar *pBufStart; + uchar *pBuf; int iLen; short iOffs; @@ -2159,16 +2161,16 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, bufLen = getMSGLen(pMsg); break; case PROP_TIMESTAMP: - pRes = getTimeReported(pMsg, pTpe->data.field.eDateFormat); + pRes = (uchar*)getTimeReported(pMsg, pTpe->data.field.eDateFormat); break; case PROP_HOSTNAME: - pRes = getHOSTNAME(pMsg); + pRes = (uchar*)getHOSTNAME(pMsg); break; case PROP_SYSLOGTAG: - pRes = getTAG(pMsg); + getTAG(pMsg, &pRes, &bufLen); break; case PROP_RAWMSG: - pRes = getRawMsg(pMsg); + pRes = (uchar*)getRawMsg(pMsg); break; /* enable this, if someone actually uses UxTradMsg, delete after some time has * passed and nobody complained -- rgerhards, 2009-06-16 @@ -2177,120 +2179,120 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, break; */ case PROP_INPUTNAME: - getInputName(pMsg, ((uchar**) &pRes), &bufLen); + getInputName(pMsg, &pRes, &bufLen); break; case PROP_FROMHOST: - pRes = (char*) getRcvFrom(pMsg); + pRes = getRcvFrom(pMsg); break; case PROP_FROMHOST_IP: - pRes = (char*) getRcvFromIP(pMsg); + pRes = getRcvFromIP(pMsg); break; case PROP_PRI: - pRes = getPRI(pMsg); + pRes = (uchar*)getPRI(pMsg); break; case PROP_PRI_TEXT: - pBuf = malloc(20 * sizeof(char)); + pBuf = malloc(20 * sizeof(uchar)); if(pBuf == NULL) { *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } else { *pbMustBeFreed = 1; - pRes = textpri(pBuf, 20, getPRIi(pMsg)); + pRes = (uchar*)textpri((char*)pBuf, 20, getPRIi(pMsg)); } break; case PROP_IUT: - pRes = "1"; /* always 1 for syslog messages (a MonitorWare thing;)) */ + pRes = UCHAR_CONSTANT("1"); /* always 1 for syslog messages (a MonitorWare thing;)) */ break; case PROP_SYSLOGFACILITY: - pRes = getFacility(pMsg); + pRes = (uchar*)getFacility(pMsg); break; case PROP_SYSLOGFACILITY_TEXT: - pRes = getFacilityStr(pMsg); + pRes = (uchar*)getFacilityStr(pMsg); break; case PROP_SYSLOGSEVERITY: - pRes = getSeverity(pMsg); + pRes = (uchar*)getSeverity(pMsg); break; case PROP_SYSLOGSEVERITY_TEXT: - pRes = getSeverityStr(pMsg); + pRes = (uchar*)getSeverityStr(pMsg); break; case PROP_TIMEGENERATED: - pRes = getTimeGenerated(pMsg, pTpe->data.field.eDateFormat); + pRes = (uchar*)getTimeGenerated(pMsg, pTpe->data.field.eDateFormat); break; case PROP_PROGRAMNAME: - pRes = getProgramName(pMsg, LOCK_MUTEX); + pRes = (uchar*)getProgramName(pMsg, LOCK_MUTEX); break; case PROP_PROTOCOL_VERSION: - pRes = getProtocolVersionString(pMsg); + pRes = (uchar*)getProtocolVersionString(pMsg); break; case PROP_STRUCTURED_DATA: - pRes = getStructuredData(pMsg); + pRes = (uchar*)getStructuredData(pMsg); break; case PROP_APP_NAME: - pRes = getAPPNAME(pMsg, LOCK_MUTEX); + pRes = (uchar*)getAPPNAME(pMsg, LOCK_MUTEX); break; case PROP_PROCID: - pRes = getPROCID(pMsg, LOCK_MUTEX); + pRes = (uchar*)getPROCID(pMsg, LOCK_MUTEX); break; case PROP_MSGID: - pRes = getMSGID(pMsg); + pRes = (uchar*)getMSGID(pMsg); break; case PROP_SYS_NOW: - if((pRes = (char*) getNOW(NOW_NOW)) == NULL) { - return "***OUT OF MEMORY***"; + if((pRes = getNOW(NOW_NOW)) == NULL) { + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_YEAR: - if((pRes = (char*) getNOW(NOW_YEAR)) == NULL) { - return "***OUT OF MEMORY***"; + if((pRes = getNOW(NOW_YEAR)) == NULL) { + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_MONTH: - if((pRes = (char*) getNOW(NOW_MONTH)) == NULL) { - return "***OUT OF MEMORY***"; + if((pRes = getNOW(NOW_MONTH)) == NULL) { + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_DAY: - if((pRes = (char*) getNOW(NOW_DAY)) == NULL) { - return "***OUT OF MEMORY***"; + if((pRes = getNOW(NOW_DAY)) == NULL) { + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_HOUR: - if((pRes = (char*) getNOW(NOW_HOUR)) == NULL) { - return "***OUT OF MEMORY***"; + if((pRes = getNOW(NOW_HOUR)) == NULL) { + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_HHOUR: - if((pRes = (char*) getNOW(NOW_HHOUR)) == NULL) { - return "***OUT OF MEMORY***"; + if((pRes = getNOW(NOW_HHOUR)) == NULL) { + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_QHOUR: - if((pRes = (char*) getNOW(NOW_QHOUR)) == NULL) { - return "***OUT OF MEMORY***"; + if((pRes = getNOW(NOW_QHOUR)) == NULL) { + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_MINUTE: - if((pRes = (char*) getNOW(NOW_MINUTE)) == NULL) { - return "***OUT OF MEMORY***"; + if((pRes = getNOW(NOW_MINUTE)) == NULL) { + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } else *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */ break; case PROP_SYS_MYHOSTNAME: - pRes = (char*) glbl.GetLocalHostName(); + pRes = glbl.GetLocalHostName(); break; default: /* there is no point in continuing, we may even otherwise render the * error message unreadable. rgerhards, 2007-07-10 */ dbgprintf("invalid property id: '%d'\n", propID); - return "**INVALID PROPERTY NAME**"; + return UCHAR_CONSTANT("**INVALID PROPERTY NAME**"); } @@ -2310,8 +2312,8 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, */ if(pTpe->data.field.has_fields == 1) { size_t iCurrFld; - char *pFld; - char *pFldEnd; + uchar *pFld; + uchar *pFldEnd; /* first, skip to the field in question. The field separator * is always one character and is stored in the template entry. */ @@ -2349,7 +2351,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } /* now copy */ memcpy(pBuf, pFld, iLen); @@ -2366,12 +2368,12 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**FIELD NOT FOUND**"; + return UCHAR_CONSTANT("**FIELD NOT FOUND**"); } } else if(pTpe->data.field.iFromPos != 0 || pTpe->data.field.iToPos != 0) { /* we need to obtain a private copy */ int iFrom, iTo; - char *pSb; + uchar *pSb; iFrom = pTpe->data.field.iFromPos; iTo = pTpe->data.field.iToPos; /* need to zero-base to and from (they are 1-based!) */ @@ -2379,44 +2381,55 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, --iFrom; if(iTo > 0) --iTo; - iLen = iTo - iFrom + 1; /* the +1 is for an actual char, NOT \0! */ - pBufStart = pBuf = malloc((iLen + 1) * sizeof(char)); - if(pBuf == NULL) { - if(*pbMustBeFreed == 1) - free(pRes); - *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; - } - pSb = pRes; - if(iFrom) { - /* skip to the start of the substring (can't do pointer arithmetic - * because the whole string might be smaller!!) - */ - while(*pSb && iFrom) { - --iFrom; + if(bufLen == -1) + bufLen = ustrlen(pRes); + if(iFrom == 0 && iTo >= bufLen) { + /* in this case, the requested string is a superset of what we already have, + * so there is no need to do any processing. This is a frequent case for size-limited + * fields like TAG in the default forwarding template (so it is a useful optimization + * to check for this condition ;)). -- rgerhards, 2009-07-09 + */ + ; /*DO NOTHING*/ + } else { + iLen = iTo - iFrom + 1; /* the +1 is for an actual char, NOT \0! */ + pBufStart = pBuf = malloc((iLen + 1) * sizeof(char)); + if(pBuf == NULL) { + if(*pbMustBeFreed == 1) + free(pRes); + *pbMustBeFreed = 0; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); + } + pSb = pRes; + if(iFrom) { + /* skip to the start of the substring (can't do pointer arithmetic + * because the whole string might be smaller!!) + */ + while(*pSb && iFrom) { + --iFrom; + ++pSb; + } + } + /* OK, we are at the begin - now let's copy... */ + bufLen = iLen; + while(*pSb && iLen) { + *pBuf++ = *pSb; ++pSb; + --iLen; } + *pBuf = '\0'; + bufLen -= iLen; /* subtract remaining length if the string was smaller! */ + if(*pbMustBeFreed == 1) + free(pRes); + pRes = pBufStart; + *pbMustBeFreed = 1; } - /* OK, we are at the begin - now let's copy... */ - bufLen = iLen; - while(*pSb && iLen) { - *pBuf++ = *pSb; - ++pSb; - --iLen; - } - *pBuf = '\0'; - bufLen -= iLen; /* subtract remaining length if the string was smaller! */ - if(*pbMustBeFreed == 1) - free(pRes); - pRes = pBufStart; - *pbMustBeFreed = 1; #ifdef FEATURE_REGEXP } else { /* Check for regular expressions */ if (pTpe->data.field.has_regex != 0) { if (pTpe->data.field.has_regex == 2) /* Could not compile regex before! */ - return "**NO MATCH** **BAD REGULAR EXPRESSION**"; + return UCHAR_CONSTANT("**NO MATCH** **BAD REGULAR EXPRESSION**"); dbgprintf("string to match for regex is: %s\n", pRes); @@ -2429,7 +2442,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, */ while(!bFound) { int iREstat; - iREstat = regexp.regexec(&pTpe->data.field.re, pRes + iOffs, nmatch, pmatch, 0); + iREstat = regexp.regexec(&pTpe->data.field.re, (char*)(pRes + iOffs), nmatch, pmatch, 0); dbgprintf("regexec return is %d\n", iREstat); if(iREstat == 0) { if(pmatch[0].rm_so == -1) { @@ -2457,11 +2470,11 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, *pbMustBeFreed = 0; } if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR) - return "**NO MATCH**"; + return UCHAR_CONSTANT("**NO MATCH**"); else if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_ZERO) - return "0"; + return UCHAR_CONSTANT("0"); else - return ""; + return UCHAR_CONSTANT(""); } } else { /* Match- but did it match the one we wanted? */ @@ -2473,24 +2486,24 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, *pbMustBeFreed = 0; } if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR) - return "**NO MATCH**"; + return UCHAR_CONSTANT("**NO MATCH**"); else - return ""; + return UCHAR_CONSTANT(""); } } /* OK, we have a usable match - we now need to malloc pB */ int iLenBuf; - char *pB; + uchar *pB; iLenBuf = pmatch[pTpe->data.field.iSubMatchToUse].rm_eo - pmatch[pTpe->data.field.iSubMatchToUse].rm_so; - pB = (char *) malloc((iLenBuf + 1) * sizeof(char)); + pB = malloc((iLenBuf + 1) * sizeof(uchar)); if (pB == NULL) { if (*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**OUT OF MEMORY ALLOCATING pBuf**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } /* Lets copy the matched substring to the buffer */ @@ -2513,7 +2526,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, free(pRes); *pbMustBeFreed = 0; } - return "***REGEXP NOT AVAILABLE***"; + return UCHAR_CONSTANT("***REGEXP NOT AVAILABLE***"); } } #endif /* #ifdef FEATURE_REGEXP */ @@ -2525,7 +2538,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, uchar cFirst = *pRes; /* save first char */ if(*pbMustBeFreed == 1) free(pRes); - pRes = (cFirst == ' ') ? "" : " "; + pRes = (cFirst == ' ') ? UCHAR_CONSTANT("") : UCHAR_CONSTANT(" "); bufLen = (cFirst == ' ') ? 0 : 1; *pbMustBeFreed = 0; } @@ -2537,21 +2550,21 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pTpe->data.field.eCaseConv != tplCaseConvNo) { /* we need to obtain a private copy */ if(bufLen == -1) - bufLen = strlen(pRes); - char *pBStart; - char *pB; - char *pSrc; + bufLen = ustrlen(pRes); + uchar *pBStart; + uchar *pB; + uchar *pSrc; pBStart = pB = malloc((bufLen + 1) * sizeof(char)); if(pB == NULL) { if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } pSrc = pRes; while(*pSrc) { *pB++ = (pTpe->data.field.eCaseConv == tplCaseConvUpper) ? - (char)toupper((int)*pSrc) : (char)tolower((int)*pSrc); + (uchar)toupper((int)*pSrc) : (uchar)tolower((int)*pSrc); /* currently only these two exist */ ++pSrc; } @@ -2575,10 +2588,10 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, */ if(pTpe->data.field.options.bDropCC) { int iLenBuf = 0; - char *pSrc = pRes; - char *pDstStart; - char *pDst; - char bDropped = 0; + uchar *pSrc = pRes; + uchar *pDstStart; + uchar *pDst; + uchar bDropped = 0; while(*pSrc) { if(!iscntrl((int) *pSrc++)) @@ -2593,7 +2606,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } for(pSrc = pRes; *pSrc; pSrc++) { if(!iscntrl((int) *pSrc)) @@ -2607,9 +2620,9 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, *pbMustBeFreed = 1; } } else if(pTpe->data.field.options.bSpaceCC) { - char *pSrc; - char *pDstStart; - char *pDst; + uchar *pSrc; + uchar *pDstStart; + uchar *pDst; if(*pbMustBeFreed == 1) { /* in this case, we already work on dynamic @@ -2623,13 +2636,13 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, } } else { if(bufLen == -1) - bufLen = strlen(pRes); + bufLen = ustrlen(pRes); pDst = pDstStart = malloc(bufLen + 1); if(pDst == NULL) { if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } for(pSrc = pRes; *pSrc; pSrc++) { if(iscntrl((int) *pSrc)) @@ -2649,7 +2662,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, */ int iNumCC = 0; int iLenBuf = 0; - char *pB; + uchar *pB; for(pB = pRes ; *pB ; ++pB) { ++iLenBuf; @@ -2659,21 +2672,21 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(iNumCC > 0) { /* if 0, there is nothing to escape, so we are done */ /* OK, let's do the escaping... */ - char *pBStart; - char szCCEsc[8]; /* buffer for escape sequence */ + uchar *pBStart; + uchar szCCEsc[8]; /* buffer for escape sequence */ int i; iLenBuf += iNumCC * 4; - pBStart = pB = malloc((iLenBuf + 1) * sizeof(char)); + pBStart = pB = malloc((iLenBuf + 1) * sizeof(uchar)); if(pB == NULL) { if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } while(*pRes) { if(iscntrl((int) *pRes)) { - snprintf(szCCEsc, sizeof(szCCEsc), "#%3.3d", *pRes); + snprintf((char*)szCCEsc, sizeof(szCCEsc), "#%3.3d", *pRes); for(i = 0 ; i < 4 ; ++i) *pB++ = szCCEsc[i]; } else { @@ -2697,10 +2710,10 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pTpe->data.field.options.bSecPathDrop || pTpe->data.field.options.bSecPathReplace) { if(pTpe->data.field.options.bSecPathDrop) { int iLenBuf = 0; - char *pSrc = pRes; - char *pDstStart; - char *pDst; - char bDropped = 0; + uchar *pSrc = pRes; + uchar *pDstStart; + uchar *pDst; + uchar bDropped = 0; while(*pSrc) { if(*pSrc++ != '/') @@ -2715,7 +2728,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } for(pSrc = pRes; *pSrc; pSrc++) { if(*pSrc != '/') @@ -2729,9 +2742,9 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, *pbMustBeFreed = 1; } } else { - char *pSrc; - char *pDstStart; - char *pDst; + uchar *pSrc; + uchar *pDstStart; + uchar *pDst; if(*pbMustBeFreed == 1) { /* here, again, we can modify the string as we already obtained @@ -2745,13 +2758,13 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, } } else { if(bufLen == -1) - bufLen = strlen(pRes); + bufLen = ustrlen(pRes); pDst = pDstStart = malloc(bufLen + 1); if(pDst == NULL) { if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } for(pSrc = pRes; *pSrc; pSrc++) { if(*pSrc == '/') @@ -2771,19 +2784,19 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, /* check for "." and ".." (note the parenthesis in the if condition!) */ if((*pRes == '.') && (*(pRes + 1) == '\0' || (*(pRes + 1) == '.' && *(pRes + 2) == '\0'))) { - char *pTmp = pRes; + uchar *pTmp = pRes; if(*(pRes + 1) == '\0') - pRes = "_"; + pRes = UCHAR_CONSTANT("_"); else - pRes = "_.";; + pRes = UCHAR_CONSTANT("_.");; if(*pbMustBeFreed == 1) free(pTmp); *pbMustBeFreed = 0; } else if(*pRes == '\0') { if(*pbMustBeFreed == 1) free(pRes); - pRes = "_"; + pRes = UCHAR_CONSTANT("_"); bufLen = 1; *pbMustBeFreed = 0; } @@ -2794,19 +2807,19 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, */ if(pTpe->data.field.options.bDropLastLF && !pTpe->data.field.options.bEscapeCC) { int iLn; - char *pB; + uchar *pB; if(bufLen == -1) - bufLen = strlen(pRes); + bufLen = ustrlen(pRes); iLn = bufLen; if(iLn > 0 && *(pRes + iLn - 1) == '\n') { /* we have a LF! */ /* check if we need to obtain a private copy */ if(*pbMustBeFreed == 0) { /* ok, original copy, need a private one */ - pB = malloc((iLn + 1) * sizeof(char)); + pB = malloc((iLn + 1) * sizeof(uchar)); if(pB == NULL) { *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } memcpy(pB, pRes, iLn - 1); pRes = pB; @@ -2825,19 +2838,19 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pTpe->data.field.options.bCSV) { /* we need to obtain a private copy, as we need to at least add the double quotes */ int iBufLen; - char *pBStart; - char *pDst; - char *pSrc; + uchar *pBStart; + uchar *pDst; + uchar *pSrc; if(bufLen == -1) - bufLen = strlen(pRes); + bufLen = ustrlen(pRes); iBufLen = bufLen; /* the malloc may be optimized, we currently use the worst case... */ - pBStart = pDst = malloc((2 * iBufLen + 3) * sizeof(char)); + pBStart = pDst = malloc((2 * iBufLen + 3) * sizeof(uchar)); if(pDst == NULL) { if(*pbMustBeFreed == 1) free(pRes); *pbMustBeFreed = 0; - return "**OUT OF MEMORY**"; + return UCHAR_CONSTANT("**OUT OF MEMORY**"); } pSrc = pRes; *pDst++ = '"'; /* starting quote */ @@ -2856,7 +2869,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, } if(bufLen == -1) - bufLen = strlen(pRes); + bufLen = ustrlen(pRes); *pPropLen = bufLen; ENDfunc diff --git a/runtime/msg.h b/runtime/msg.h index c20fb005..0b346f7b 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -159,7 +159,7 @@ void MsgSetMSGoffs(msg_t *pMsg, short offs); void MsgSetRawMsgWOSize(msg_t *pMsg, char* pszRawMsg); void MsgSetRawMsg(msg_t *pMsg, char* pszRawMsg, size_t lenMsg); rsRetVal MsgReplaceMSG(msg_t *pThis, uchar* pszMSG, int lenMSG); -char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, +uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, propid_t propID, size_t *pPropLen, unsigned short *pbMustBeFreed); char *textpri(char *pRes, size_t pResLen, int pri); rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar); @@ -168,7 +168,7 @@ uchar *getRcvFrom(msg_t *pM); /* TODO: remove these five (so far used in action.c) */ -char *getMSG(msg_t *pM); +uchar *getMSG(msg_t *pM); char *getHOSTNAME(msg_t *pM); char *getPROCID(msg_t *pM, bool bLockMutex); char *getAPPNAME(msg_t *pM, bool bLockMutex); diff --git a/runtime/rule.c b/runtime/rule.c index 3a257a90..182d616a 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -39,6 +39,7 @@ #include "vm.h" #include "var.h" #include "srUtils.h" +#include "unicode-helper.h" #include "dirty.h" /* for getFIOPName */ /* static data */ @@ -104,7 +105,7 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) { DEFiRet; unsigned short pbMustBeFreed; - char *pszPropVal; + uchar *pszPropVal; int bRet = 0; size_t propLen; vm_t *pVM = NULL; @@ -189,12 +190,12 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) break; case FIOP_ISEQUAL: if(rsCStrSzStrCmp(pRule->f_filterData.prop.pCSCompValue, - (uchar*) pszPropVal, strlen(pszPropVal)) == 0) + pszPropVal, ustrlen(pszPropVal)) == 0) bRet = 1; /* process message! */ break; case FIOP_STARTSWITH: if(rsCStrSzStrStartsWithCStr(pRule->f_filterData.prop.pCSCompValue, - (uchar*) pszPropVal, strlen(pszPropVal)) == 0) + pszPropVal, ustrlen(pszPropVal)) == 0) bRet = 1; /* process message! */ break; case FIOP_REGEX: -- cgit From 935a9eef5770a4a298d1ccefab59e3863210fc68 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 14 Jul 2009 16:57:49 +0200 Subject: added tcp output rebinding option. added tcp output rebinding option. needs some more testing and doc --- tcpclt.c | 14 ++++++++++++++ tcpclt.h | 6 +++++- tools/omfwd.c | 6 ++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/tcpclt.c b/tcpclt.c index c53f00f7..617aaef6 100644 --- a/tcpclt.c +++ b/tcpclt.c @@ -297,6 +297,12 @@ Send(tcpclt_t *pThis, void *pData, char *msg, size_t len) CHKiRet(TCPSendBldFrame(pThis, &msg, &len, &bMsgMustBeFreed)); + if(pThis->iRebindInterval > 0 && ++pThis->iNumMsgs == pThis->iRebindInterval) { + /* we need to rebind, and use the retry logic for this*/ + CHKiRet(pThis->prepRetryFunc(pData)); /* try to recover */ + pThis->iNumMsgs = 0; + } + while(!bDone) { /* loop is broken when send succeeds or error occurs */ CHKiRet(pThis->initFunc(pData)); iRet = pThis->sendFunc(pData, msg, len); @@ -388,6 +394,13 @@ SetFraming(tcpclt_t *pThis, TCPFRAMINGMODE framing) pThis->tcp_framing = framing; RETiRet; } +static rsRetVal +SetRebindInterval(tcpclt_t *pThis, int iRebindInterval) +{ + DEFiRet; + pThis->iRebindInterval = iRebindInterval; + RETiRet; +} /* Standard-Constructor @@ -445,6 +458,7 @@ CODESTARTobjQueryInterface(tcpclt) pIf->SetSendFrame = SetSendFrame; pIf->SetSendPrepRetry = SetSendPrepRetry; pIf->SetFraming = SetFraming; + pIf->SetRebindInterval = SetRebindInterval; finalize_it: ENDobjQueryInterface(tcpclt) diff --git a/tcpclt.h b/tcpclt.h index 1d704044..5a8eba75 100644 --- a/tcpclt.h +++ b/tcpclt.h @@ -36,6 +36,8 @@ typedef struct tcpclt_s { short bResendLastOnRecon; /* should the last message be resent on a successful reconnect? */ size_t lenPrevMsg; /* session specific callbacks */ + int iRebindInterval; /* how often should the send socket be rebound? */ + int iNumMsgs; /* number of messages during current "rebind session" */ rsRetVal (*initFunc)(void*); rsRetVal (*sendFunc)(void*, char*, size_t); rsRetVal (*prepRetryFunc)(void*); @@ -55,8 +57,10 @@ BEGINinterface(tcpclt) /* name must also be changed in ENDinterface macro! */ rsRetVal (*SetSendFrame)(tcpclt_t*, rsRetVal (*)(void*, char*, size_t)); rsRetVal (*SetSendPrepRetry)(tcpclt_t*, rsRetVal (*)(void*)); rsRetVal (*SetFraming)(tcpclt_t*, TCPFRAMINGMODE framing); + /* v3, 2009-07-14*/ + rsRetVal (*SetRebindInterval)(tcpclt_t*, int iRebindInterval); ENDinterface(tcpclt) -#define tcpcltCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */ +#define tcpcltCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */ /* prototypes */ diff --git a/tools/omfwd.c b/tools/omfwd.c index d207cce5..fe65f515 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -90,6 +90,7 @@ typedef struct _instanceData { char *port; int protocol; int iUDPRebindInterval; /* rebind interval */ + int iTCPRebindInterval; /* rebind interval */ int nXmit; /* number of transmissions since last (re-)bind */ # define FORW_UDP 0 # define FORW_TCP 1 @@ -104,6 +105,7 @@ static short iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 m static short bResendLastOnRecon = 0; /* should the last message be re-sent on a successful reconnect? */ static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */ static int iUDPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */ +static int iTCPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */ static permittedPeers_t *pPermPeers = NULL; @@ -643,6 +645,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) /* copy over config data as needed */ pData->iUDPRebindInterval = iUDPRebindInterval; + pData->iTCPRebindInterval = iTCPRebindInterval; /* process template */ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, @@ -657,6 +660,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CHKiRet(tcpclt.SetSendFrame(pData->pTCPClt, TCPSendFrame)); CHKiRet(tcpclt.SetSendPrepRetry(pData->pTCPClt, TCPSendPrepRetry)); CHKiRet(tcpclt.SetFraming(pData->pTCPClt, tcp_framing)); + CHKiRet(tcpclt.SetRebindInterval(pData->pTCPClt, pData->iTCPRebindInterval)); pData->iStrmDrvrMode = iStrmDrvrMode; if(pszStrmDrvr != NULL) CHKmalloc(pData->pszStrmDrvr = (uchar*)strdup((char*)pszStrmDrvr)); @@ -728,6 +732,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iStrmDrvrMode = 0; bResendLastOnRecon = 0; iUDPRebindInterval = 0; + iTCPRebindInterval = 0; return RS_RET_OK; } @@ -742,6 +747,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(net,LM_NET_FILENAME)); CHKiRet(regCfSysLineHdlr((uchar *)"actionforwarddefaulttemplate", 0, eCmdHdlrGetWord, NULL, &pszTplName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionsendtcprebindinterval", 0, eCmdHdlrInt, NULL, &iTCPRebindInterval, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionsendudprebindinterval", 0, eCmdHdlrInt, NULL, &iUDPRebindInterval, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdriver", 0, eCmdHdlrGetWord, NULL, &pszStrmDrvr, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdrivermode", 0, eCmdHdlrInt, NULL, &iStrmDrvrMode, NULL)); -- cgit From 095a20360ab547f37bf4fd8b099fdf0b03a5c86f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 14 Jul 2009 19:53:04 +0200 Subject: added ability for the TCP output action to "rebind" its send socket ...after sending n messages (actually, it re-opens the connection, the name is used because this is a concept very similiar to $ActionUDPRebindInterval). New config directive $ActionSendTCPRebindInterval added for the purpose. By default, rebinding is disabled. This is considered# useful for load balancers. --- ChangeLog | 6 ++++++ doc/rsyslog_conf_global.html | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/ChangeLog b/ChangeLog index 9aca45f7..b0c6a05b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -14,6 +14,12 @@ Version 4.5.1 [DEVEL] (rgerhards), 2009-07-?? - bugfix: message could be truncated after TAG, often when forwarding This was a result of an internal processing error if maximum field sizes had been specified in the property replacer. +- added ability for the TCP output action to "rebind" its send socket after + sending n messages (actually, it re-opens the connection, the name is + used because this is a concept very similiar to $ActionUDPRebindInterval). + New config directive $ActionSendTCPRebindInterval added for the purpose. + By default, rebinding is disabled. This is considered useful for load + balancers. - testbench improvements --------------------------------------------------------------------------- Version 4.5.0 [DEVEL] (rgerhards), 2009-07-02 diff --git a/doc/rsyslog_conf_global.html b/doc/rsyslog_conf_global.html index 03842758..577eb1aa 100644 --- a/doc/rsyslog_conf_global.html +++ b/doc/rsyslog_conf_global.html @@ -96,6 +96,13 @@ default 60000 (1 minute)] (driver-specific)
  • $ActionSendStreamDriverAuthMode <mode>,  authentication mode to use with the stream driver (driver-specific)
  • $ActionSendStreamDriverPermittedPeer <ID>,  accepted fingerprint (SHA1) or name of remote peer (driver-specific) - directive may go away!
  • +
  • $ActionSendTCPRebindInterval nbr- [available since 4.5.1] - instructs the TCP send +action to close and re-open the connection to the remote host every nbr of messages sent. +Zero, the default, means that no such processing is done. This directive is useful for +use with load-balancers. Note that there is some performance overhead associated with it, +so it is advisable to not too often "rebind" the connection (what +"too often" actually means depends on your configuration, a rule of thumb is +that it should be not be much more often than once per second).
  • $ActionSendUDPRebindInterval nbr- [available since 4.3.2] - instructs the UDP send action to rebind the send socket every nbr of messages sent. Zero, the default, means that no rebind is done. This directive is useful for use with load-balancers.
  • -- cgit From 07b076ddcfed4ea9b447d0be574d1dcdb799bc2f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 15 Jul 2009 11:33:49 +0200 Subject: final touches for 4.5.1 --- ChangeLog | 2 +- configure.ac | 2 +- doc/manual.html | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ChangeLog b/ChangeLog index a935f42f..6a6d1d92 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,5 @@ --------------------------------------------------------------------------- -Version 4.5.1 [DEVEL] (rgerhards), 2009-07-?? +Version 4.5.1 [DEVEL] (rgerhards), 2009-07-15 - CONFIG CHANGE: $HUPisRestart default is now "off". We are doing this to support removal of restart-type HUP in v5. - bugfix: fromhost-ip was sometimes truncated diff --git a/configure.ac b/configure.ac index 6d22566b..dc1aab3a 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ(2.61) -AC_INIT([rsyslog],[4.5.0],[rsyslog@lists.adiscon.com]) +AC_INIT([rsyslog],[4.5.1],[rsyslog@lists.adiscon.com]) AM_INIT_AUTOMAKE AC_CONFIG_SRCDIR([ChangeLog]) AC_CONFIG_MACRO_DIR([m4]) diff --git a/doc/manual.html b/doc/manual.html index d473f485..0fb91e1d 100644 --- a/doc/manual.html +++ b/doc/manual.html @@ -19,9 +19,9 @@ rsyslog support available directly from the source!

    Please visit the rsyslog sponsor's page to honor the project sponsors or become one yourself! We are very grateful for any help towards the project goals.

    -

    This documentation is for version 4.5.0 (beta branch) of rsyslog. -Visit the rsyslog status page to obtain current -version information and project status. +

    This documentation is for version 4.5.1 (beta branch) of rsyslog. +Visit the rsyslog status page +to obtain current version information and project status.

    If you like rsyslog, you might want to lend us a helping hand. It doesn't require a lot of time - even a single mouse click helps. Learn how to help the rsyslog project. -- cgit From 8f06095880c73adb5147b348a4bac428d4863169 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 15 Jul 2009 14:08:04 +0200 Subject: documenting imported bugfix v4-beta was so far unreleased, so it was a bug only here --- ChangeLog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ChangeLog b/ChangeLog index 6a6d1d92..f5f3940d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,10 @@ --------------------------------------------------------------------------- +Version 4.5.2 [DEVEL] (rgerhards), 2009-07-?? +- bugfix: memory leak with some input modules. Those inputs that + use parseAndSubmitMsg() leak two small memory blocks with every message. + Typically, those process only relatively few messages, so the issue + does most probably not have any effect in practice. +--------------------------------------------------------------------------- Version 4.5.1 [DEVEL] (rgerhards), 2009-07-15 - CONFIG CHANGE: $HUPisRestart default is now "off". We are doing this to support removal of restart-type HUP in v5. -- cgit From 1d0806b9e3c6e83443c8daa9da8f25bd4df75f9b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 16 Jul 2009 13:51:52 +0200 Subject: calls to prctl() need to be based on configure results (cross-platform issue) This is for another prctl() call, not present in the beta version (looks like it would make sense to stick these into a utility function) --- runtime/stream.c | 7 +++++-- runtime/wti.c | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/runtime/stream.c b/runtime/stream.c index a0571a61..605a9771 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -48,7 +48,9 @@ #include "stream.h" #include "unicode-helper.h" #include "module-template.h" -#include +#if HAVE_SYS_PRCTL_H +# include +#endif #define inline @@ -891,9 +893,11 @@ asyncWriterThread(void *pPtr) ISOBJ_TYPE_assert(pThis, strm); BEGINfunc +# if HAVE_PRCTL && defined PR_SET_NAME if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } +#endif while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); @@ -904,7 +908,6 @@ asyncWriterThread(void *pPtr) goto finalize_it; /* break main loop */ } if(bTimedOut && pThis->iBufPtr > 0) { -RUNLOG_STR("XXX: we had a timeout in stream writer"); /* if we timed out, we need to flush pending data */ strmFlush(pThis); bTimedOut = 0; diff --git a/runtime/wti.c b/runtime/wti.c index 156d8116..abdf4add 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -41,7 +41,6 @@ #ifdef OS_SOLARIS # include -# define pthread_yield() sched_yield() #endif #include "rsyslog.h" -- cgit From aa10f7a16415112c014c6c628f2f25f4eb4beaa2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 17 Aug 2009 14:44:42 +0200 Subject: legacy syslog parser changed so that it now accepts date stamps in wrong case. Some devices seem to create them and I do not see any harm in supporting that. --- ChangeLog | 3 ++ runtime/datetime.c | 56 ++++++++++++++++++++++--------------- tests/testsuites/upcase-date.parse1 | 4 +++ 3 files changed, 41 insertions(+), 22 deletions(-) create mode 100644 tests/testsuites/upcase-date.parse1 diff --git a/ChangeLog b/ChangeLog index f5f3940d..10d3da0e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,8 @@ --------------------------------------------------------------------------- Version 4.5.2 [DEVEL] (rgerhards), 2009-07-?? +- legacy syslog parser changed so that it now accepts date stamps in + wrong case. Some devices seem to create them and I do not see any harm + in supporting that. - bugfix: memory leak with some input modules. Those inputs that use parseAndSubmitMsg() leak two small memory blocks with every message. Typically, those process only relatively few messages, so the issue diff --git a/runtime/datetime.c b/runtime/datetime.c index 2db1d3c5..dfa56b4f 100644 --- a/runtime/datetime.c +++ b/runtime/datetime.c @@ -335,6 +335,10 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS) * We will use this for parsing, as it probably is the * fastest way to parse it. * + * 2009-08-17: we now do case-insensitive comparisons, as some devices obviously do not + * obey to the RFC-specified case. As we need to guess in any case, we can ignore case + * in the first place -- rgerhards + * * 2005-07-18, well sometimes it pays to be a bit more verbose, even in C... * Fixed a bug that lead to invalid detection of the data. The issue was that * we had an if(++pszTS == 'x') inside of some of the consturcts below. However, @@ -346,20 +350,21 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS) */ switch(*pszTS++) { + case 'j': case 'J': - if(*pszTS == 'a') { + if(*pszTS == 'a' || *pszTS == 'A') { ++pszTS; - if(*pszTS == 'n') { + if(*pszTS == 'n' || *pszTS == 'N') { ++pszTS; month = 1; } else ABORT_FINALIZE(RS_RET_INVLD_TIME); - } else if(*pszTS == 'u') { + } else if(*pszTS == 'u' || *pszTS == 'U') { ++pszTS; - if(*pszTS == 'n') { + if(*pszTS == 'n' || *pszTS == 'N') { ++pszTS; month = 6; - } else if(*pszTS == 'l') { + } else if(*pszTS == 'l' || *pszTS == 'L') { ++pszTS; month = 7; } else @@ -367,10 +372,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS) } else ABORT_FINALIZE(RS_RET_INVLD_TIME); break; + case 'f': case 'F': - if(*pszTS == 'e') { + if(*pszTS == 'e' || *pszTS == 'E') { ++pszTS; - if(*pszTS == 'b') { + if(*pszTS == 'b' || *pszTS == 'B') { ++pszTS; month = 2; } else @@ -378,13 +384,14 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS) } else ABORT_FINALIZE(RS_RET_INVLD_TIME); break; + case 'm': case 'M': - if(*pszTS == 'a') { + if(*pszTS == 'a' || *pszTS == 'A') { ++pszTS; - if(*pszTS == 'r') { + if(*pszTS == 'r' || *pszTS == 'R') { ++pszTS; month = 3; - } else if(*pszTS == 'y') { + } else if(*pszTS == 'y' || *pszTS == 'Y') { ++pszTS; month = 5; } else @@ -392,17 +399,18 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS) } else ABORT_FINALIZE(RS_RET_INVLD_TIME); break; + case 'a': case 'A': - if(*pszTS == 'p') { + if(*pszTS == 'p' || *pszTS == 'P') { ++pszTS; - if(*pszTS == 'r') { + if(*pszTS == 'r' || *pszTS == 'R') { ++pszTS; month = 4; } else ABORT_FINALIZE(RS_RET_INVLD_TIME); - } else if(*pszTS == 'u') { + } else if(*pszTS == 'u' || *pszTS == 'U') { ++pszTS; - if(*pszTS == 'g') { + if(*pszTS == 'g' || *pszTS == 'G') { ++pszTS; month = 8; } else @@ -410,10 +418,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS) } else ABORT_FINALIZE(RS_RET_INVLD_TIME); break; + case 's': case 'S': - if(*pszTS == 'e') { + if(*pszTS == 'e' || *pszTS == 'E') { ++pszTS; - if(*pszTS == 'p') { + if(*pszTS == 'p' || *pszTS == 'P') { ++pszTS; month = 9; } else @@ -421,10 +430,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS) } else ABORT_FINALIZE(RS_RET_INVLD_TIME); break; + case 'o': case 'O': - if(*pszTS == 'c') { + if(*pszTS == 'c' || *pszTS == 'C') { ++pszTS; - if(*pszTS == 't') { + if(*pszTS == 't' || *pszTS == 'T') { ++pszTS; month = 10; } else @@ -432,10 +442,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS) } else ABORT_FINALIZE(RS_RET_INVLD_TIME); break; + case 'n': case 'N': - if(*pszTS == 'o') { + if(*pszTS == 'o' || *pszTS == 'O') { ++pszTS; - if(*pszTS == 'v') { + if(*pszTS == 'v' || *pszTS == 'V') { ++pszTS; month = 11; } else @@ -443,10 +454,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS) } else ABORT_FINALIZE(RS_RET_INVLD_TIME); break; + case 'd': case 'D': - if(*pszTS == 'e') { + if(*pszTS == 'e' || *pszTS == 'E') { ++pszTS; - if(*pszTS == 'c') { + if(*pszTS == 'c' || *pszTS == 'C') { ++pszTS; month = 12; } else diff --git a/tests/testsuites/upcase-date.parse1 b/tests/testsuites/upcase-date.parse1 new file mode 100644 index 00000000..2d21222a --- /dev/null +++ b/tests/testsuites/upcase-date.parse1 @@ -0,0 +1,4 @@ +<6>AUG 10 22:18:24 2009 netips-warden2-p [audit] user=[*SMS] src=192.168.11.11 iface=5 access=9 Update State Reset +6,kern,info,Aug 10 22:18:24,2009,,, netips-warden2-p [audit] user=[*SMS] src=192.168.11.11 iface=5 access=9 Update State Reset +#Example from RFC3164, section 5.4 +#Only the first two lines are important, you may place anything behind them! -- cgit From 9cab1036d23fdb59437ee6fcd82f041c555de094 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 17 Aug 2009 16:23:37 +0200 Subject: bugfix: if tcp listen port could not be created, no error message was emitted --- ChangeLog | 2 ++ doc/imtcp.html | 5 +++-- tcpsrv.c | 8 ++++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/ChangeLog b/ChangeLog index 10d3da0e..cf9d30ca 100644 --- a/ChangeLog +++ b/ChangeLog @@ -7,6 +7,8 @@ Version 4.5.2 [DEVEL] (rgerhards), 2009-07-?? use parseAndSubmitMsg() leak two small memory blocks with every message. Typically, those process only relatively few messages, so the issue does most probably not have any effect in practice. +- bugfix: if tcp listen port could not be created, no error message was + emitted --------------------------------------------------------------------------- Version 4.5.1 [DEVEL] (rgerhards), 2009-07-15 - CONFIG CHANGE: $HUPisRestart default is now "off". We are doing this diff --git a/doc/imtcp.html b/doc/imtcp.html index 9ea7efa1..34bfbfcf 100644 --- a/doc/imtcp.html +++ b/doc/imtcp.html @@ -43,8 +43,9 @@ can be found at the Cisco tcp page.

  • $InputTCPServerRun <port>
    Starts a TCP server on selected port
  • -
    • $InputTCPMaxSessions <number>
    -Sets the maximum number of sessions supported
  • $InputTCPServerStreamDriverMode <number>
    +
  • $InputTCPMaxSessions <number>
    +Sets the maximum number of sessions supported
  • +
  • $InputTCPServerStreamDriverMode <number>
    Sets the driver mode for the currently selected
    network stream driver. <number> is driver specifc.
  • $InputTCPServerInputName <name>
    Sets a name for the inputname property. If no name is set "imtcp" is used by default. Setting a diff --git a/tcpsrv.c b/tcpsrv.c index e8ea2b98..6f9ca108 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -327,15 +327,19 @@ finalize_it: static rsRetVal create_tcp_socket(tcpsrv_t *pThis) { - tcpLstnPortList_t *pEntry; DEFiRet; + rsRetVal localRet; + tcpLstnPortList_t *pEntry; ISOBJ_TYPE_assert(pThis, tcpsrv); /* init all configured ports */ pEntry = pThis->pLstnPorts; while(pEntry != NULL) { - CHKiRet(initTCPListener(pThis, pEntry)); + localRet = initTCPListener(pThis, pEntry); + if(localRet != RS_RET_OK) { + errmsg.LogError(0, localRet, "Could not create tcp listener, ignoring port %s.", pEntry->pszPort); + } pEntry = pEntry->pNext; } -- cgit From 56b781e5bb1ea08b76d5dcc1d5e5eab10a40a4c6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 17 Aug 2009 17:18:19 +0200 Subject: added $InputTCPMaxListeners directive permits to specify how many TCP servers shall be possible (default is 20). --- ChangeLog | 2 ++ doc/imtcp.html | 7 +++++-- plugins/imtcp/imtcp.c | 5 +++++ tcpsrv.c | 39 ++++++++++++++++++++++++++------------- tcpsrv.h | 7 +++++-- 5 files changed, 43 insertions(+), 17 deletions(-) diff --git a/ChangeLog b/ChangeLog index cf9d30ca..b8e884e3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -3,6 +3,8 @@ Version 4.5.2 [DEVEL] (rgerhards), 2009-07-?? - legacy syslog parser changed so that it now accepts date stamps in wrong case. Some devices seem to create them and I do not see any harm in supporting that. +- added $InputTCPMaxListeners directive - permits to specify how many + TCP servers shall be possible (default is 20). - bugfix: memory leak with some input modules. Those inputs that use parseAndSubmitMsg() leak two small memory blocks with every message. Typically, those process only relatively few messages, so the issue diff --git a/doc/imtcp.html b/doc/imtcp.html index 34bfbfcf..5217634e 100644 --- a/doc/imtcp.html +++ b/doc/imtcp.html @@ -43,8 +43,10 @@ can be found at the Cisco tcp page.
  • $InputTCPServerRun <port>
    Starts a TCP server on selected port
  • +
  • $InputTCPMaxListeners <number>
    +Sets the maximum number of listeners (server ports) supported. Default is 20. This must be set before the first $InputTCPServerRun directive.
  • $InputTCPMaxSessions <number>
    -Sets the maximum number of sessions supported
  • +Sets the maximum number of sessions supported. Default is 200. This must be set before the first $InputTCPServerRun directive
  • $InputTCPServerStreamDriverMode <number>
    Sets the driver mode for the currently selected
    network stream driver. <number> is driver specifc.
  • $InputTCPServerInputName <name>
    @@ -52,7 +54,8 @@ Sets a name for the inputname property. If no name is set "imtcp" is used by def name is not strictly necessary, but can be useful to apply filtering based on which input the message was received from.
  • $InputTCPServerStreamDriverAuthMode <mode-string>
    -Sets the authentication mode for the currently selected network stream driver. <mode-string> is driver specifc.
  • $InputTCPServerStreamDriverPermittedPeer <id-string>
    +Sets the authentication mode for the currently selected network stream driver. <mode-string> is driver specifc.
  • +
  • $InputTCPServerStreamDriverPermittedPeer <id-string>
    Sets permitted peer IDs. Only these peers are able to connect to the listener. <id-string> semantics depend on the currently selected AuthMode and  network stream driver. PermittedPeers may not be set in anonymous modes.
  • diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index e1f513c8..01c4bc33 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -82,6 +82,7 @@ static permittedPeers_t *pPermPeersRoot = NULL; /* config settings */ static int iTCPSessMax = 200; /* max number of sessions */ +static int iTCPLstnMax = 20; /* max number of sessions */ static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */ static int iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; /* addtl frame delimiter, e.g. for netscreen, default none */ static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */ @@ -188,6 +189,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa if(pOurTcpsrv == NULL) { CHKiRet(tcpsrv.Construct(&pOurTcpsrv)); CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, iTCPSessMax)); + CHKiRet(tcpsrv.SetLstnMax(pOurTcpsrv, iTCPLstnMax)); CHKiRet(tcpsrv.SetCBIsPermittedHost(pOurTcpsrv, isPermittedHost)); CHKiRet(tcpsrv.SetCBRcvData(pOurTcpsrv, doRcvData)); CHKiRet(tcpsrv.SetCBOpenLstnSocks(pOurTcpsrv, doOpenLstnSocks)); @@ -273,6 +275,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { iTCPSessMax = 200; + iTCPLstnMax = 20; iStrmDrvrMode = 0; iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; free(pszInputName); @@ -308,6 +311,8 @@ CODEmodInit_QueryRegCFSLineHdlr addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt, NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt, + NULL, &iTCPLstnMax, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverstreamdrivermode"), 0, eCmdHdlrInt, NULL, &iStrmDrvrMode, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverstreamdriverauthmode"), 0, diff --git a/tcpsrv.c b/tcpsrv.c index 6f9ca108..0be723d1 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -261,7 +261,7 @@ static void deinit_tcp_listener(tcpsrv_t *pThis) } /* finally close our listen streams */ - for(i = 0 ; i < pThis->iLstnMax ; ++i) { + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { netstrm.Destruct(pThis->ppLstn + i); } } @@ -280,12 +280,12 @@ addTcpLstn(void *pUsr, netstrm_t *pLstn) ISOBJ_TYPE_assert(pThis, tcpsrv); ISOBJ_TYPE_assert(pLstn, netstrm); - if(pThis->iLstnMax >= TCPLSTN_MAX_DEFAULT) + if(pThis->iLstnCurr >= pThis->iLstnMax) ABORT_FINALIZE(RS_RET_MAX_LSTN_REACHED); - pThis->ppLstn[pThis->iLstnMax] = pLstn; - pThis->ppLstnPort[pThis->iLstnMax] = pPortList; - ++pThis->iLstnMax; + pThis->ppLstn[pThis->iLstnCurr] = pLstn; + pThis->ppLstnPort[pThis->iLstnCurr] = pPortList; + ++pThis->iLstnCurr; finalize_it: RETiRet; @@ -485,7 +485,6 @@ Run(tcpsrv_t *pThis) * this thread. Thus, we also need to instantiate a cancel cleanup handler * to prevent us from leaking anything. -- rgerharsd, 20080-04-24 */ -RUNLOG_STR("XXXX: tcp server runs\n"); pthread_cleanup_push(RunCancelCleanup, (void*) &pSel); while(1) { CHKiRet(nssel.Construct(&pSel)); @@ -493,7 +492,7 @@ RUNLOG_STR("XXXX: tcp server runs\n"); CHKiRet(nssel.ConstructFinalize(pSel)); /* Add the TCP listen sockets to the list of read descriptors. */ - for(i = 0 ; i < pThis->iLstnMax ; ++i) { + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { CHKiRet(nssel.Add(pSel, pThis->ppLstn[i], NSDSEL_RD)); } @@ -506,11 +505,10 @@ RUNLOG_STR("XXXX: tcp server runs\n"); iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); } -RUNLOG_STR("XXXX: tcp server select\n"); /* wait for io to become ready */ CHKiRet(nssel.Wait(pSel, &nfds)); - for(i = 0 ; i < pThis->iLstnMax ; ++i) { + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { dbgprintf("New connect on NSD %p.\n", pThis->ppLstn[i]); @@ -518,7 +516,6 @@ RUNLOG_STR("XXXX: tcp server select\n"); --nfds; /* indicate we have processed one */ } } -RUNLOG_STR("XXXX: tcp server post select\n"); /* now check the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); @@ -582,7 +579,8 @@ finalize_it: /* this is a very special case - this time only we do not exit the /* Standard-Constructor */ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */ - pThis->iSessMax = TCPSESS_MAX_DEFAULT; /* TODO: useful default ;) */ + pThis->iSessMax = TCPSESS_MAX_DEFAULT; + pThis->iLstnMax = TCPLSTN_MAX_DEFAULT; pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; pThis->OnMsgReceive = NULL; ENDobjConstruct(tcpsrv) @@ -606,8 +604,8 @@ tcpsrvConstructFinalize(tcpsrv_t *pThis) CHKiRet(netstrms.ConstructFinalize(pThis->pNS)); /* set up listeners */ - CHKmalloc(pThis->ppLstn = calloc(TCPLSTN_MAX_DEFAULT, sizeof(netstrm_t*))); - CHKmalloc(pThis->ppLstnPort = calloc(TCPLSTN_MAX_DEFAULT, sizeof(tcpLstnPortList_t*))); + CHKmalloc(pThis->ppLstn = calloc(pThis->iLstnMax, sizeof(netstrm_t*))); + CHKmalloc(pThis->ppLstnPort = calloc(pThis->iLstnMax, sizeof(tcpLstnPortList_t*))); iRet = pThis->OpenLstnSocks(pThis); finalize_it: @@ -824,6 +822,20 @@ SetDrvrPermPeers(tcpsrv_t *pThis, permittedPeers_t *pPermPeers) * -------------------------------------------------------------------------- */ +/* set max number of listeners + * this must be called before ConstructFinalize, or it will have no effect! + * rgerhards, 2009-08-17 + */ +static rsRetVal +SetLstnMax(tcpsrv_t *pThis, int iMax) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, tcpsrv); + pThis->iLstnMax = iMax; + RETiRet; +} + + /* set max number of sessions * this must be called before ConstructFinalize, or it will have no effect! * rgerhards, 2009-04-09 @@ -866,6 +878,7 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->SetInputName = SetInputName; pIf->SetAddtlFrameDelim = SetAddtlFrameDelim; pIf->SetSessMax = SetSessMax; + pIf->SetLstnMax = SetLstnMax; pIf->SetDrvrMode = SetDrvrMode; pIf->SetDrvrAuthMode = SetDrvrAuthMode; pIf->SetDrvrPermPeers = SetDrvrPermPeers; diff --git a/tcpsrv.h b/tcpsrv.h index 70682398..64065aab 100644 --- a/tcpsrv.h +++ b/tcpsrv.h @@ -54,9 +54,10 @@ struct tcpsrv_s { uchar *pszInputName; /**< value to be used as input name */ ruleset_t *pRuleset; /**< ruleset to bind to */ permittedPeers_t *pPermPeers;/**< driver's permitted peers */ - int iLstnMax; /**< max nbr of listeners currently supported */ + int iLstnCurr; /**< max nbr of listeners currently supported */ netstrm_t **ppLstn; /**< our netstream listners */ tcpLstnPortList_t **ppLstnPort; /**< pointer to relevant listen port description */ + int iLstnMax; /**< max number of listners supported */ int iSessMax; /**< max number of sessions supported */ tcpLstnPortList_t *pLstnPorts; /**< head pointer for listen ports */ int addtlFrameDelim; /**< additional frame delimiter for plain TCP syslog framing (e.g. to handle NetScreen) */ @@ -111,8 +112,10 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */ /* added v6 */ rsRetVal (*SetOnMsgReceive)(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)); /* 2009-05-24 */ rsRetVal (*SetRuleset)(tcpsrv_t *pThis, ruleset_t*); /* 2009-06-12 */ + /* added v7 */ + rsRetVal (*SetLstnMax)(tcpsrv_t *pThis, int iMaxLstn); /* 2009-08-17 */ ENDinterface(tcpsrv) -#define tcpsrvCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */ +#define tcpsrvCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */ /* change for v4: * - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10 * - SetInputName() added -- rgerhards, 2008-12-10 -- cgit From bfc3eaf23cae0ef8685fc25b71e701e2c4690509 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 18 Aug 2009 18:48:18 +0200 Subject: bugfix: potential segfault in output file writer (omfile) In async write mode, we use modular arithmetic to index the output buffer array. However, the counter variables accidently were signed, thus resulting in negative indizes after integer overflow. That in turn could lead to segfaults, but was depending on the memory layout of the instance in question (which in turn depended on a number of variables, like compile settings but also configuration). The counters are now unsigned (as they always should have been) and so the dangling mis-indexing does no longer happen. This bug potentially affected all installations, even if only some may actually have seen a segfault. --- ChangeLog | 10 ++++++++++ runtime/stream.c | 1 + runtime/stream.h | 4 ++-- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/ChangeLog b/ChangeLog index b8e884e3..d1ba8617 100644 --- a/ChangeLog +++ b/ChangeLog @@ -11,6 +11,16 @@ Version 4.5.2 [DEVEL] (rgerhards), 2009-07-?? does most probably not have any effect in practice. - bugfix: if tcp listen port could not be created, no error message was emitted +- bugfix: potential segfault in output file writer (omfile) + In async write mode, we use modular arithmetic to index the output + buffer array. However, the counter variables accidently were signed, + thus resulting in negative indizes after integer overflow. That in turn + could lead to segfaults, but was depending on the memory layout of + the instance in question (which in turn depended on a number of + variables, like compile settings but also configuration). The counters + are now unsigned (as they always should have been) and so the dangling + mis-indexing does no longer happen. This bug potentially affected all + installations, even if only some may actually have seen a segfault. --------------------------------------------------------------------------- Version 4.5.1 [DEVEL] (rgerhards), 2009-07-15 - CONFIG CHANGE: $HUPisRestart default is now "off". We are doing this diff --git a/runtime/stream.c b/runtime/stream.c index 605a9771..a6ed70fe 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -833,6 +833,7 @@ finalize_it: * the very some producer comes back in sequence to submit the then-filled buffers. * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ +//#include static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) { diff --git a/runtime/stream.h b/runtime/stream.h index cb368835..64ffb6e1 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -131,8 +131,8 @@ typedef struct strm_s { pthread_cond_t notFull; pthread_cond_t notEmpty; pthread_cond_t isEmpty; - short iEnq; - short iDeq; + unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */ + unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */ short iCnt; /* current nbr of elements in buffer */ struct { uchar *pBuf; -- cgit From 9bb9181572d445dd300546113fc617eb549866ba Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 18 Aug 2009 19:08:44 +0200 Subject: very minor cleanup --- runtime/stream.c | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/stream.c b/runtime/stream.c index a6ed70fe..605a9771 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -833,7 +833,6 @@ finalize_it: * the very some producer comes back in sequence to submit the then-filled buffers. * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ -//#include static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) { -- cgit From daa76ad94428599336ddafdd6854dc0b71356180 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 20 Aug 2009 14:22:33 +0200 Subject: bugfix: hostnames with dashes in them were incorrectly treated as malformed ... thus causing them to be treated as TAG (this was a regression introduced from the "rfc3164 strict" change in 4.5.0). --- ChangeLog | 3 +++ tools/syslogd.c | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/ChangeLog b/ChangeLog index d1ba8617..0ab127bb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -21,6 +21,9 @@ Version 4.5.2 [DEVEL] (rgerhards), 2009-07-?? are now unsigned (as they always should have been) and so the dangling mis-indexing does no longer happen. This bug potentially affected all installations, even if only some may actually have seen a segfault. +- bugfix: hostnames with dashes in them were incorrectly treated as + malformed, thus causing them to be treated as TAG (this was a regression + introduced from the "rfc3164 strict" change in 4.5.0). --------------------------------------------------------------------------- Version 4.5.1 [DEVEL] (rgerhards), 2009-07-15 - CONFIG CHANGE: $HUPisRestart default is now "off". We are doing this diff --git a/tools/syslogd.c b/tools/syslogd.c index 3846e6a9..88588621 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -1218,7 +1218,7 @@ int parseLegacySyslogMsg(msg_t *pMsg, int flags) if(flags & PARSE_HOSTNAME) { i = 0; while((isalnum(p2parse[i]) || p2parse[i] == '.' || p2parse[i] == '.' - || p2parse[i] == '_') && i < CONF_TAG_MAXSIZE) { + || p2parse[i] == '_' || p2parse[i] == '-') && i < CONF_TAG_MAXSIZE) { bufParseHOSTNAME[i] = p2parse[i]; ++i; } -- cgit