summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-08 15:23:25 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-08 15:23:25 +0200
commitcf906392c3039ffa2526316b44ff28bd1d899061 (patch)
tree09c83a8866c917207b7d7130de1c88590c5f2b64
parent8e4ad77e54c9d9272cef41f71d94ae277965711e (diff)
parent5221a1e42e16c8c39b48a4a1a18ee6322c38cd17 (diff)
downloadrsyslog-cf906392c3039ffa2526316b44ff28bd1d899061.tar.gz
rsyslog-cf906392c3039ffa2526316b44ff28bd1d899061.tar.xz
rsyslog-cf906392c3039ffa2526316b44ff28bd1d899061.zip
Merge branch 'v4-devel'
Conflicts: runtime/debug.h runtime/stream.c
-rw-r--r--runtime/debug.c4
-rw-r--r--runtime/debug.h1
-rw-r--r--runtime/queue.c133
-rw-r--r--runtime/stream.c444
-rw-r--r--runtime/stream.h19
-rw-r--r--runtime/wtp.c7
-rwxr-xr-xtests/diag.sh2
-rwxr-xr-xtests/killrsyslog.sh2
-rw-r--r--tests/nettester.c5
-rw-r--r--tests/tcpflood.c19
-rw-r--r--tools/omfile.c7
-rw-r--r--tools/syslogd.c6
12 files changed, 397 insertions, 252 deletions
diff --git a/runtime/debug.c b/runtime/debug.c
index 248c5ea3..fb751efb 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -732,6 +732,8 @@ static void dbgGetThrdName(char *pszBuf, size_t lenBuf, pthread_t thrd, int bInc
*/
void dbgSetThrdName(uchar *pszName)
{
+return;
+
dbgThrdInfo_t *pThrd = dbgGetThrdInfo();
if(pThrd->pszThrdName != NULL)
free(pThrd->pszThrdName);
@@ -776,7 +778,7 @@ static void dbgCallStackPrint(dbgThrdInfo_t *pThrd)
/* print all threads call stacks
*/
-static void dbgCallStackPrintAll(void)
+void dbgCallStackPrintAll(void)
{
dbgThrdInfo_t *pThrd;
/* stack info */
diff --git a/runtime/debug.h b/runtime/debug.h
index 8b66d784..dcbfb930 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -135,7 +135,6 @@ void dbgPrintAllDebugInfo(void);
/* debug aides */
#ifdef RTINST
-//#if 0 // temporarily removed for helgrind
#define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_trylock(x) dbgMutexTryLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
diff --git a/runtime/queue.c b/runtime/queue.c
index 2568717b..a4d16132 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -2460,105 +2460,6 @@ finalize_it:
}
-/* enqueue a new user data element
- * Enqueues the new element and awakes worker thread.
- */
-rsRetVal
-qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
-{
- DEFiRet;
- int iCancelStateSave;
- struct timespec t;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* Please note that this function is not cancel-safe and consequently
- * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
- * during its execution. If that is not done, race conditions occur if the
- * thread is canceled (most important use case is input module termination).
- * rgerhards, 2008-01-08
- */
- if(pThis->qType != QUEUETYPE_DIRECT) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
- }
-
- /* first check if we need to discard this message (which will cause CHKiRet() to exit)
- */
- CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr));
-
- /* then check if we need to add an assistance disk queue */
- if(pThis->bIsDA)
- CHKiRet(ChkStrtDA(pThis));
-
- /* handle flow control
- * There are two different flow control mechanisms: basic and advanced flow control.
- * Basic flow control has always been implemented and protects the queue structures
- * in that it makes sure no more data is enqueued than the queue is configured to
- * support. Enhanced flow control is being added today. There are some sources which
- * can easily be stopped, e.g. a file reader. This is the case because it is unlikely
- * that blocking those sources will have negative effects (after all, the file is
- * continued to be written). Other sources can somewhat be blocked (e.g. the kernel
- * log reader or the local log stream reader): in general, nothing is lost if messages
- * from these sources are not picked up immediately. HOWEVER, they can not block for
- * an extended period of time, as this either causes message loss or - even worse - some
- * other bad effects (e.g. unresponsive system in respect to the main system log socket).
- * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is
- * a prime example. If a UDP message is not received, it is simply lost. So we can't
- * do anything against UDP sockets that come in too fast. The core idea of advanced
- * flow control is that we take into account the different natures of the sources and
- * select flow control mechanisms that fit these needs. This also means, in the end
- * result, that non-blockable sources like UDP syslog receive priority in the system.
- * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14
- */
- if(flowCtlType == eFLOWCTL_FULL_DELAY) {
- while(getPhysicalQueueSize(pThis) >= pThis->iFullDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
- pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
- }
- } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
- if(getPhysicalQueueSize(pThis) >= pThis->iLightDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
- timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
- pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
- }
- }
-
- /* from our regular flow control settings, we are now ready to enqueue the object.
- * However, we now need to do a check if the queue permits to add more data. If that
- * is not the case, basic flow control enters the field, which means we wait for
- * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14
- */
- while( (pThis->iMaxQueueSize > 0 && getPhysicalQueueSize(pThis) >= pThis->iMaxQueueSize)
- || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
- && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
- timeoutComp(&t, pThis->toEnq);
- if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
- objDestruct(pUsr);
- ABORT_FINALIZE(RS_RET_QUEUE_FULL);
- }
- }
-
- /* and finally enqueue the message */
- CHKiRet(qqueueAdd(pThis, pUsr));
- qqueueChkPersist(pThis, 1);
-
-finalize_it:
- if(pThis->qType != QUEUETYPE_DIRECT) {
- /* make sure at least one worker is running. */
- qqueueAdviseMaxWorkers(pThis);
- /* and release the mutex */
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
- }
-
- RETiRet;
-}
-
-
/* enqueue a single data object. This currently is a helper to qqueueMultiEnqObj.
* Note that the queue mutex MUST already be locked when this function is called.
* rgerhards, 2009-06-16
@@ -2678,6 +2579,40 @@ finalize_it:
}
+/* enqueue a new user data element
+ * Enqueues the new element and awakes worker thread.
+ */
+rsRetVal
+qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
+ }
+
+ CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr));
+
+ qqueueChkPersist(pThis, 1);
+
+finalize_it:
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* make sure at least one worker is running. */
+ qqueueAdviseMaxWorkers(pThis);
+ /* and release the mutex */
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
+ }
+
+ RETiRet;
+}
+
+
/* set queue mode to enqueue only or not
* There is one subtle issue: this method may be called during queue
* construction or while it is running. In the former case, the queue
diff --git a/runtime/stream.c b/runtime/stream.c
index 67941062..4f611a62 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -48,39 +48,25 @@
#include "stream.h"
#include "unicode-helper.h"
#include "module-template.h"
-#include "apc.h"
+#include <sys/prctl.h>
+
+#define inline
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
-DEFobjCurrIf(apc)
/* forward definitions */
static rsRetVal strmFlush(strm_t *pThis);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
+static void *asyncWriterThread(void *pPtr);
+static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
/* methods */
-/* async flush apc handler
- */
-static void
-flushApc(void *param1, void __attribute__((unused)) *param2)
-{
- DEFVARS_mutexProtection_uncond;
- strm_t *pThis = (strm_t*) param1;
- ISOBJ_TYPE_assert(pThis, strm);
-
- BEGINfunc
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- strmFlush(pThis);
- pThis->apcRequested = 0;
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- ENDfunc
-}
-
-
/* Try to resolve a size limit situation. This is used to support custom-file size handlers
* for omfile. It first runs the command, and then checks if we are still above the size
* treshold. Note that this works only with single file names, NOT with circular names.
@@ -141,10 +127,11 @@ resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName)
finalize_it:
if(iRet != RS_RET_OK) {
- if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE)
- dbgprintf("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName);
- else
- dbgprintf("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet);
+ if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) {
+ DBGPRINTF("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName);
+ } else {
+ DBGPRINTF("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet);
+ }
pThis->bDisabled = 1;
}
@@ -282,6 +269,23 @@ finalize_it:
}
+/* wait for the output writer thread to be done. This must be called before actions
+ * that require data to be persisted. May be called in non-async mode and is a null
+ * operation than. Must be called with the mutex locked.
+ */
+static inline void
+strmWaitAsyncWriterDone(strm_t *pThis)
+{
+ BEGINfunc
+ if(pThis->bAsyncWrite) {
+ /* awake writer thread and make it write out everything */
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut);
+ }
+ ENDfunc
+}
+
+
/* close a strm file
* Note that the bDeleteOnClose flag is honored. If it is set, the file will be
* deleted after close. This is in support for the qRead thread.
@@ -294,8 +298,15 @@ static rsRetVal strmCloseFile(strm_t *pThis)
ASSERT(pThis->fd != -1);
dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd);
- if(pThis->tOperationsMode != STREAMMODE_READ)
- strmFlush(pThis);
+ if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) {
+ pThis->bInClose = 1;
+ if(pThis->bAsyncWrite) {
+ strmFlush(pThis);
+ } else {
+ strmWaitAsyncWriterDone(pThis);
+ }
+ pThis->bInClose = 0;
+ }
close(pThis->fd);
pThis->fd = -1;
@@ -571,11 +582,11 @@ ENDobjConstruct(strm)
static rsRetVal strmConstructFinalize(strm_t *pThis)
{
rsRetVal localRet;
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
- CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
pThis->iBufPtrMax = 0; /* results in immediate read request */
if(pThis->iZipLevel) { /* do we need a zip buf? */
localRet = objUse(zlibw, LM_ZLIBW_FILENAME);
@@ -603,9 +614,28 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
}
- /* if we should call flush apc's, we need a mutex */
+ /* if we have a flush interval, we need to do async writes in any case */
if(pThis->iFlushInterval != 0) {
+ pThis->bAsyncWrite = 1;
+ }
+
+ /* if we work asynchronously, we need a couple of synchronization objects */
+ if(pThis->bAsyncWrite) {
pthread_mutex_init(&pThis->mut, 0);
+ pthread_cond_init(&pThis->notFull, 0);
+ pthread_cond_init(&pThis->notEmpty, 0);
+ pthread_cond_init(&pThis->isEmpty, 0);
+ pThis->iCnt = pThis->iEnq = pThis->iDeq = 0;
+ for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
+ CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ }
+ pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
+ pThis->bStopWriter = 0;
+ if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
+ DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
+ } else {
+ /* we work synchronously, so we need to alloc a fixed pIOBuf */
+ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
}
finalize_it:
@@ -613,12 +643,33 @@ finalize_it:
}
+/* stop the writer thread (we MUST be runnnig asynchronously when this method
+ * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06
+ */
+static inline void
+stopWriter(strm_t *pThis)
+{
+ BEGINfunc
+ pThis->bStopWriter = 1;
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_mutex_unlock(&pThis->mut);
+ pthread_join(pThis->writerThreadID, NULL);
+ ENDfunc
+}
+
+
/* destructor for the strm object */
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
+ int i;
CODESTARTobjDestruct(strm)
+ if(pThis->bAsyncWrite)
+ /* Note: mutex will be unlocked in stopWriter! */
+ d_pthread_mutex_lock(&pThis->mut);
+
if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
+dbgprintf("XXX: destruct stream %p\n", pThis);
/* ... then free resources */
if(pThis->fd != -1)
strmCloseFile(pThis);
@@ -627,16 +678,23 @@ CODESTARTobjDestruct(strm)
objRelease(zlibw, LM_ZLIBW_FILENAME);
}
- if(pThis->iFlushInterval != 0) {
- // TODO: check if there is an apc and remove it!
- pthread_mutex_destroy(&pThis->mut);
- }
-
free(pThis->pszDir);
- free(pThis->pIOBuf);
free(pThis->pZipBuf);
free(pThis->pszCurrFName);
free(pThis->pszFName);
+
+ if(pThis->bAsyncWrite) {
+ stopWriter(pThis);
+ pthread_mutex_destroy(&pThis->mut);
+ pthread_cond_destroy(&pThis->notFull);
+ pthread_cond_destroy(&pThis->notEmpty);
+ pthread_cond_destroy(&pThis->isEmpty);
+ for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
+ free(pThis->asyncBuf[i].pBuf);
+ }
+ } else {
+ free(pThis->pIOBuf);
+ }
ENDobjDestruct(strm)
@@ -652,6 +710,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis)
if(pThis->fd == -1)
FINALIZE;
+ /* wait for output to be empty, so that our counts are correct */
+ strmWaitAsyncWriterDone(pThis);
+
if(pThis->iCurrOffs >= pThis->iMaxFileSize) {
dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n",
(long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs);
@@ -732,12 +793,164 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf)
pWriteBuf += iWritten;
} while(lenBuf > 0); /* Warning: do..while()! */
+ dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
+
finalize_it:
*pLenBuf = iTotalWritten;
RETiRet;
}
+
+/* write memory buffer to a stream object.
+ */
+static inline rsRetVal
+doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->iZipLevel) {
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ } else {
+ /* write without zipping */
+ CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called to "do" an async write call, what primarily means that
+ * the data is handed over to the writer thread (which will then do the actual write
+ * in parallel). Note that the stream mutex has already been locked by the
+ * strmWrite...() calls. Also note that we always have only a single producer,
+ * so we can simply serially assign the next free buffer to it and be sure that
+ * the very some producer comes back in sequence to submit the then-filled buffers.
+ * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06
+ */
+static inline rsRetVal
+doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf);
+ while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
+ d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
+
+ pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf;
+ pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf;
+
+ pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */
+ if(++pThis->iCnt == 1)
+ pthread_cond_signal(&pThis->notEmpty);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* schedule writing to the stream. Depending on our concurrency settings,
+ * this either directly writes to the stream or schedules writing via
+ * the background thread. -- rgerhards, 2009-07-07
+ */
+static rsRetVal
+strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->bAsyncWrite) {
+ CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
+ } else {
+ CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
+ }
+
+ pThis->iBufPtr = 0; /* we are at the begin of a new buffer */
+
+finalize_it:
+ RETiRet;
+}
+
+
+
+/* This is the writer thread for asynchronous mode.
+ * -- rgerhards, 2009-07-06
+ */
+static void*
+asyncWriterThread(void *pPtr)
+{
+ int iDeq;
+ struct timespec t;
+ bool bTimedOut = 0;
+ strm_t *pThis = (strm_t*) pPtr;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ BEGINfunc
+ if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) {
+ DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
+ }
+
+ while(1) { /* loop broken inside */
+ d_pthread_mutex_lock(&pThis->mut);
+ while(pThis->iCnt == 0) {
+ if(pThis->bStopWriter) {
+ pthread_cond_broadcast(&pThis->isEmpty);
+ d_pthread_mutex_unlock(&pThis->mut);
+ goto finalize_it; /* break main loop */
+ }
+ if(bTimedOut && pThis->iBufPtr > 0) {
+RUNLOG_STR("XXX: we had a timeout in stream writer");
+ /* if we timed out, we need to flush pending data */
+ strmFlush(pThis);
+ bTimedOut = 0;
+ continue; /* now we should have data */
+ }
+ bTimedOut = 0;
+ timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */
+ if(pThis->bDoTimedWait) {
+ if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) {
+ int err = errno;
+ if(err == ETIMEDOUT) {
+ bTimedOut = 1;
+ } else {
+ bTimedOut = 1;
+ char errStr[1024];
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n",
+ err, errStr);
+ }
+ }
+ } else {
+ d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
+ }
+ }
+
+ bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
+
+ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
+ // TODO: error check????? 2009-07-06
+
+ --pThis->iCnt;
+ if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) {
+ pthread_cond_signal(&pThis->notFull);
+ if(pThis->iCnt == 0)
+ pthread_cond_broadcast(&pThis->isEmpty);
+ }
+ d_pthread_mutex_unlock(&pThis->mut);
+ }
+
+finalize_it:
+ ENDfunc
+ return NULL; /* to keep pthreads happy */
+}
+
+
/* sync the file to disk, so that any unwritten data is persisted. This
* also syncs the directory and thus makes sure that the file survives
* fatal failure. Note that we do NOT return an error status if the
@@ -791,9 +1004,7 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iWritten = lenBuf;
CHKiRet(doWriteCall(pThis, pBuf, &iWritten));
- dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
- pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
/* update user counter, if provided */
if(pThis->pUsrWCntr != NULL)
@@ -841,7 +1052,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
/* see note in file header for the params we use with deflateInit2() */
zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
if(zRet != Z_OK) {
- dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet);
+ DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
ABORT_FINALIZE(RS_RET_ZLIB_ERR);
}
@@ -851,11 +1062,11 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
/* run deflate() on input until output buffer not full, finish
compression if all of source has been read in */
do {
- dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
zstrm.avail_out = pThis->sIOBufSize;
zstrm.next_out = pThis->pZipBuf;
zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
- dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
} while (zstrm.avail_out == 0);
@@ -864,7 +1075,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
zRet = zlibw.DeflateEnd(&zstrm);
if(zRet != Z_OK) {
- dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet);
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
ABORT_FINALIZE(RS_RET_ZLIB_ERR);
}
@@ -873,33 +1084,6 @@ finalize_it:
}
-/* write memory buffer to a stream object.
- * To support direct writes of large objects, this method may be called
- * with a buffer pointing to some region other than the stream buffer itself.
- * However, in that case the stream buffer must be empty (strmFlush() has to
- * be called before), because we would otherwise mess up with the sequence
- * inside the stream. -- rgerhards, 2008-01-10
- */
-static rsRetVal
-strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
-{
- DEFiRet;
-
- ASSERT(pThis != NULL);
- ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
-
- if(pThis->iZipLevel) {
- CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
- } else {
- /* write without zipping */
- CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
@@ -913,7 +1097,7 @@ strmFlush(strm_t *pThis)
dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr);
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
- iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
}
RETiRet;
@@ -966,6 +1150,12 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
ASSERT(pThis != NULL);
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+
+ if(pThis->bDisabled)
+ ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
+
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis));
@@ -975,11 +1165,18 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
pThis->iBufPtr++;
finalize_it:
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_unlock(&pThis->mut);
+
RETiRet;
}
-/* write an integer value (actually a long) to a stream object */
+/* write an integer value (actually a long) to a stream object
+ * Note that we do not need to lock the mutex here, because we call
+ * strmWrite(), which does the lock (aka: we must not lock it, else we
+ * would run into a recursive lock, resulting in a deadlock!)
+ */
static rsRetVal strmWriteLong(strm_t *pThis, long i)
{
DEFiRet;
@@ -995,89 +1192,65 @@ finalize_it:
}
-/* schedule an Apc flush request.
- * rgerhards, 2009-06-15
- */
-static inline rsRetVal
-scheduleFlushRequest(strm_t *pThis)
-{
- apc_t *pApc;
- DEFiRet;
-
- if(!pThis->apcRequested) {
- /* we do an request only if none is yet pending */
- pThis->apcRequested = 1;
- // TODO: find similar thing later CHKiRet(apc.CancelApc(pThis->apcID));
-dbgprintf("XXX: requesting to add apc!\n");
- CHKiRet(apc.Construct(&pApc));
- CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc));
- CHKiRet(apc.SetParam1(pApc, pThis));
- CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID));
- }
-
-finalize_it:
- RETiRet;
-}
-
-
-/* write memory buffer to a stream object
+/* write memory buffer to a stream object.
+ * process the data in chunks and copy it over to our buffer. The caller-provided data
+ * may theoritically be larger than our buffer. In that case, we do multiple copies. One
+ * may argue if it were more efficient to write out the caller-provided buffer in that case
+ * and earlier versions of rsyslog did this. However, this introduces a lot of complexity
+ * inside the buffered writer and potential performance bottlenecks when trying to solve
+ * it. Now keep in mind that we actually do (almost?) never have a case where the
+ * caller-provided buffer is larger than our one. So instead of optimizing a case
+ * which normally does not exist, we expect some degradation in its case but make us
+ * perform better in the regular cases. -- rgerhards, 2009-07-07
*/
static rsRetVal
strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
- DEFVARS_mutexProtection_uncond;
DEFiRet;
- size_t iPartial;
+ size_t iWrite;
+ size_t iOffset;
ASSERT(pThis != NULL);
ASSERT(pBuf != NULL);
-dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
+//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+
if(pThis->bDisabled)
ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
-RUNLOG_VAR("%d", pThis->iFlushInterval);
- if(pThis->iFlushInterval != 0) {
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- }
-
- /* check if the to-be-written data is larger than our buffer size */
- if(lenBuf >= pThis->sIOBufSize) {
- /* it is - so we do a direct write, that is most efficient.
- * TODO: is it really? think about disk block sizes!
- */
- CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */
- CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf));
- } else {
- /* data fits into a buffer - we just need to see if it
- * fits into the current buffer...
- */
- if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) {
- /* nope, so we must split it */
- iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
- if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial);
- pThis->iBufPtr += iPartial;
- }
+ iOffset = 0;
+ do {
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
- memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial);
- pThis->iBufPtr = lenBuf - iPartial;
- } else {
- /* we have space, so we simply copy over the string */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
- pThis->iBufPtr += lenBuf;
}
- }
-
- /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at
- * termination. For Zip mode, it could be fatal if we write after each record.
+ iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
+ if(iWrite > lenBuf)
+ iWrite = lenBuf;
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite);
+ pThis->iBufPtr += iWrite;
+ iOffset += iWrite;
+ lenBuf -= iWrite;
+ } while(lenBuf > 0);
+
+ /* now check if the buffer right at the end of the write is full and, if so,
+ * write it. This seems more natural than waiting (hours?) for the next message...
*/
- if(pThis->iFlushInterval != 0)
- scheduleFlushRequest(pThis);
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
+ CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ }
finalize_it:
- if(pThis->iFlushInterval != 0) {
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+ if(pThis->bAsyncWrite) {
+ if(pThis->bDoTimedWait == 0) {
+ /* we potentially have a partial buffer, so re-activate the
+ * writer thread that it can set and pick up timeouts.
+ */
+ pThis->bDoTimedWait = 1;
+ pthread_cond_signal(&pThis->notEmpty);
+ }
+ d_pthread_mutex_unlock(&pThis->mut);
}
RETiRet;
@@ -1433,7 +1606,6 @@ ENDobjQueryInterface(strm)
*/
BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
- CHKiRet(objUse(apc, CORE_COMPONENT));
OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty);
diff --git a/runtime/stream.h b/runtime/stream.h
index e8f4acf9..c251e5c4 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -87,6 +87,7 @@ typedef enum { /* when extending, do NOT change existing modes! */
STREAMMODE_WRITE_APPEND = 4
} strmMode_t;
+#define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */
/* The strm_t data structure */
typedef struct strm_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
@@ -112,17 +113,32 @@ typedef struct strm_s {
int fd; /* the file descriptor, -1 if closed */
int fdDir; /* the directory's descriptor, in case bSync is requested (-1 if closed) */
uchar *pszCurrFName; /* name of current file (if open) */
- uchar *pIOBuf; /* io Buffer */
+ uchar *pIOBuf; /* the iobuffer currently in use to gather data */
size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
size_t iBufPtr; /* pointer into current buffer */
int iUngetC; /* char set via UngetChar() call or -1 if none set */
bool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */
+ bool bInClose; /* used to break "deadly close loops", tells us we are already inside a close */
int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */
Bytef *pZipBuf;
/* support for async flush procesing */
+ bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
+ bool bStopWriter; /* shall writer thread terminate? */
+ bool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
int iFlushInterval; /* flush in which interval - 0, no flushing */
apc_id_t apcID; /* id of current Apc request (used for cancelling) */
pthread_mutex_t mut;/* mutex for flush in async mode */
+ pthread_cond_t notFull;
+ pthread_cond_t notEmpty;
+ pthread_cond_t isEmpty;
+ short iEnq;
+ short iDeq;
+ short iCnt; /* current nbr of elements in buffer */
+ struct {
+ uchar *pBuf;
+ size_t lenBuf;
+ } asyncBuf[STREAM_ASYNC_NUMBUFS];
+ pthread_t writerThreadID;
int apcRequested; /* is an apc Requested? */
/* support for omfile size-limiting commands, special counters, NOT persisted! */
off_t iSizeLimit; /* file size limit, 0 = no limit */
@@ -130,6 +146,7 @@ typedef struct strm_s {
bool bIsTTY; /* is this a tty file? */
} strm_t;
+
/* interfaces */
BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Construct)(strm_t **ppThis);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index dc9cbb75..81e4446d 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -53,6 +53,7 @@
#include "wtp.h"
#include "wti.h"
#include "obj.h"
+#include "unicode-helper.h"
#include "glbl.h"
/* static data */
@@ -426,6 +427,8 @@ wtpWrkrExecCancelCleanup(void *arg)
static void *
wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
{
+ uchar *pszDbgHdr;
+ uchar thrdName[32] = "rs:";
DEFiRet;
DEFVARS_mutexProtection;
wti_t *pWti = (wti_t*) arg;
@@ -440,7 +443,9 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
/* set thread name - we ignore if the call fails, has no harsh consequences... */
- if(prctl(PR_SET_NAME, wtpGetDbgHdr(pThis), 0, 0, 0) != 0) {
+ pszDbgHdr = wtpGetDbgHdr(pThis);
+ ustrncpy(thrdName+3, pszDbgHdr, 20);
+ if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) {
DBGPRINTF("prctl failed, not setting thread name for '%s'\n", wtpGetDbgHdr(pThis));
}
diff --git a/tests/diag.sh b/tests/diag.sh
index 2a9d0ee3..299c5d71 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -9,7 +9,7 @@
#valgrind="valgrind --tool=drd --log-fd=1"
#valgrind="valgrind --tool=helgrind --log-fd=1"
#set -o xtrace
-#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
+#export RSYSLOG_DEBUG="debug nostdout printmutexaction"
#export RSYSLOG_DEBUGLOG="log"
case $1 in
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
diff --git a/tests/killrsyslog.sh b/tests/killrsyslog.sh
index b1be757b..c9b6e0ac 100755
--- a/tests/killrsyslog.sh
+++ b/tests/killrsyslog.sh
@@ -2,6 +2,6 @@
if [ -e "rsyslog.pid" ]
then
echo rsyslog.pid exists, trying to shut down rsyslogd process `cat rsyslog.pid`.
- kill `cat rsyslog.pid`
+ kill -9 `cat rsyslog.pid`
sleep 1
fi
diff --git a/tests/nettester.c b/tests/nettester.c
index dbfb4db3..73abc46e 100644
--- a/tests/nettester.c
+++ b/tests/nettester.c
@@ -128,12 +128,13 @@ tcpSend(char *buf, int lenBuf)
if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
break;
} else {
- if(retries++ == 30) {
+ if(retries++ == 50) {
++iFailed;
fprintf(stderr, "connect() failed\n");
return(1);
} else {
- usleep(100);
+ fprintf(stderr, "connect() failed, retry %d\n", retries);
+ usleep(100000); /* ms = 1000 us! */
}
}
}
diff --git a/tests/tcpflood.c b/tests/tcpflood.c
index 2ca796ca..0439e33e 100644
--- a/tests/tcpflood.c
+++ b/tests/tcpflood.c
@@ -61,6 +61,7 @@ int openConn(int *fd)
{
int sock;
struct sockaddr_in addr;
+ int retries = 0;
if((sock=socket(AF_INET, SOCK_STREAM, 0))==-1) {
perror("socket()");
@@ -74,11 +75,19 @@ int openConn(int *fd)
fprintf(stderr, "inet_aton() failed\n");
return(1);
}
- if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
- perror("connect()");
- fprintf(stderr, "connect() failed\n");
- return(1);
- }
+ while(1) { /* loop broken inside */
+ if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
+ break;
+ } else {
+ if(retries++ == 50) {
+ perror("connect()");
+ fprintf(stderr, "connect() failed\n");
+ return(1);
+ } else {
+ usleep(100000); /* ms = 1000 us! */
+ }
+ }
+ }
*fd = sock;
return 0;
diff --git a/tools/omfile.c b/tools/omfile.c
index 82944a96..bb12b4b6 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -401,12 +401,17 @@ prepareFile(instanceData *pData, uchar *newFileName)
CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName)));
CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel));
CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize));
- CHKiRet(strm.SetiFlushInterval(pData->pStrm, pData->iFlushInterval));
CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND));
CHKiRet(strm.SettOpenMode(pData->pStrm, fCreateMode));
CHKiRet(strm.SetbSync(pData->pStrm, pData->bSyncFile));
CHKiRet(strm.SetsType(pData->pStrm, STREAMTYPE_FILE_SINGLE));
CHKiRet(strm.SetiSizeLimit(pData->pStrm, pData->iSizeLimit));
+ /* set the flush interval only if we actually use it - otherwise it will activate
+ * async processing, which is a real performance waste if we do not do buffered
+ * writes! -- rgerhards, 2009-07-06
+ */
+ if(!pData->bFlushOnTXEnd)
+ CHKiRet(strm.SetiFlushInterval(pData->pStrm, pData->iFlushInterval));
if(pData->pszSizeLimitCmd != NULL)
CHKiRet(strm.SetpszSizeLimitCmd(pData->pStrm, ustrdup(pData->pszSizeLimitCmd)));
CHKiRet(strm.ConstructFinalize(pData->pStrm));
diff --git a/tools/syslogd.c b/tools/syslogd.c
index e3824d9e..e4daff54 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -2793,8 +2793,8 @@ mainloop(void)
* powertop, for example). In that case, we primarily wait for a signal,
* but a once-a-day wakeup should be quite acceptable. -- rgerhards, 2008-06-09
*/
- //tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/;
- tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */
+ tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/;
+ //tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */
tvSelectTimeout.tv_usec = 0;
select(1, NULL, NULL, NULL, &tvSelectTimeout);
if(bFinished)
@@ -2829,7 +2829,7 @@ mainloop(void)
bHadHUP = 0;
continue;
}
- execScheduled(); /* handle Apc calls (if any) */
+ // TODO: remove execScheduled(); /* handle Apc calls (if any) */
}
ENDfunc
}