summaryrefslogtreecommitdiffstats
path: root/runtime/stream.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-06 16:38:09 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-06 16:38:09 +0200
commite3040285dbf0854443bc2443e0de5ac59f6f839e (patch)
tree557eae59b899f2311a9a8ba80ea32e465fff3e9a /runtime/stream.c
parent7fdeac0bdcaad3525f203ae5dc1fa7636078e37f (diff)
downloadrsyslog-e3040285dbf0854443bc2443e0de5ac59f6f839e.tar.gz
rsyslog-e3040285dbf0854443bc2443e0de5ac59f6f839e.tar.xz
rsyslog-e3040285dbf0854443bc2443e0de5ac59f6f839e.zip
first shot at asynchronous stream writer with timeout capability
... seems to work on quick testing, but needs a far more testing and improvement. Good milestone commit.
Diffstat (limited to 'runtime/stream.c')
-rw-r--r--runtime/stream.c216
1 files changed, 146 insertions, 70 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 00c726d9..a9f4803f 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -48,37 +48,21 @@
#include "stream.h"
#include "unicode-helper.h"
#include "module-template.h"
-#include "apc.h"
+#include <sys/prctl.h>
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
-DEFobjCurrIf(apc)
/* forward definitions */
static rsRetVal strmFlush(strm_t *pThis);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
+static void *asyncWriterThread(void *pPtr);
/* methods */
-/* async flush apc handler
- */
-static void
-flushApc(void *param1, void __attribute__((unused)) *param2)
-{
- DEFVARS_mutexProtection_uncond;
- strm_t *pThis = (strm_t*) param1;
- ISOBJ_TYPE_assert(pThis, strm);
-
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- strmFlush(pThis);
- pThis->apcRequested = 0;
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
-}
-
-
/* Try to resolve a size limit situation. This is used to support custom-file size handlers
* for omfile. It first runs the command, and then checks if we are still above the size
* treshold. Note that this works only with single file names, NOT with circular names.
@@ -569,11 +553,11 @@ ENDobjConstruct(strm)
static rsRetVal strmConstructFinalize(strm_t *pThis)
{
rsRetVal localRet;
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
- CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
pThis->iBufPtrMax = 0; /* results in immediate read request */
if(pThis->iZipLevel) { /* do we need a zip buf? */
localRet = objUse(zlibw, LM_ZLIBW_FILENAME);
@@ -601,9 +585,33 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
}
- /* if we should call flush apc's, we need a mutex */
+dbgprintf("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite);
+ /* if we have a flush interval, we need to do async writes in any case */
if(pThis->iFlushInterval != 0) {
+ pThis->bAsyncWrite = 1;
+ }
+dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite);
+
+ /* if we work asynchronously, we need a couple of synchronization objects */
+ if(pThis->bAsyncWrite) {
pthread_mutex_init(&pThis->mut, 0);
+ pthread_cond_init(&pThis->notFull, 0);
+ pthread_cond_init(&pThis->notEmpty, 0);
+ pthread_cond_init(&pThis->isEmpty, 0);
+ pThis->iCnt = pThis->iEnq = pThis->iDeq = 0;
+ for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
+ CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ }
+ //pThis->pIOBuf = pThis->ioBuf[0];
+ pThis->bStopWriter = 0;
+ // TODO: detached thread?
+ if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
+ dbgprintf("ERROR: stream %p cold not create writer thread\n", pThis);
+ // TODO: remove that below later!
+ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ } else {
+ /* we work synchronously, so we need to alloc a fixed pIOBuf */
+ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
}
finalize_it:
@@ -611,8 +619,26 @@ finalize_it:
}
+/* stop the writer thread (we MUST be runnnig asynchronously when this method
+ * is called!) -- rgerhards, 2009-07-06
+ */
+static inline void
+stopWriter(strm_t *pThis)
+{
+ BEGINfunc
+ d_pthread_mutex_lock(&pThis->mut);
+ pThis->bStopWriter = 1;
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut);
+ d_pthread_mutex_unlock(&pThis->mut);
+ pthread_join(pThis->writerThreadID, NULL);
+ ENDfunc
+}
+
+
/* destructor for the strm object */
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
+ int i;
CODESTARTobjDestruct(strm)
if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
@@ -625,16 +651,23 @@ CODESTARTobjDestruct(strm)
objRelease(zlibw, LM_ZLIBW_FILENAME);
}
- if(pThis->iFlushInterval != 0) {
- // TODO: check if there is an apc and remove it!
- pthread_mutex_destroy(&pThis->mut);
- }
-
free(pThis->pszDir);
- free(pThis->pIOBuf);
free(pThis->pZipBuf);
free(pThis->pszCurrFName);
free(pThis->pszFName);
+
+ if(pThis->bAsyncWrite) {
+ stopWriter(pThis);
+ pthread_mutex_destroy(&pThis->mut);
+ pthread_cond_destroy(&pThis->notFull);
+ pthread_cond_destroy(&pThis->notEmpty);
+ pthread_cond_destroy(&pThis->isEmpty);
+ for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
+ free(pThis->asyncBuf[i].pBuf);
+ }
+ } else {
+ free(pThis->pIOBuf);
+ }
ENDobjDestruct(strm)
@@ -730,11 +763,93 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf)
pWriteBuf += iWritten;
} while(lenBuf > 0); /* Warning: do..while()! */
+ dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
+
finalize_it:
*pLenBuf = iTotalWritten;
RETiRet;
}
+#include <stdio.h>
+
+/* This is the writer thread for asynchronous mode.
+ * -- rgerhards, 2009-07-06
+ */
+static void*
+asyncWriterThread(void *pPtr)
+{
+ int iDeq;
+ size_t iWritten;
+ strm_t *pThis = (strm_t*) pPtr;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ BEGINfunc
+ if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) {
+ DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
+ }
+
+fprintf(stderr, "async stream writer thread started\n");fflush(stderr);
+dbgprintf("TTT: writer thread startup\n");
+ while(1) { /* loop broken inside */
+ d_pthread_mutex_lock(&pThis->mut);
+ while(pThis->iCnt == 0) {
+dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter);
+ if(pThis->bStopWriter) {
+ pthread_cond_signal(&pThis->isEmpty);
+ d_pthread_mutex_unlock(&pThis->mut);
+ goto finalize_it; /* break main loop */
+ }
+ d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
+ }
+
+ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
+ iWritten = pThis->asyncBuf[iDeq].lenBuf;
+ doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten);
+ // TODO: error check!!!!! 2009-07-06
+
+ --pThis->iCnt;
+ if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) {
+ pthread_cond_signal(&pThis->notFull);
+ }
+ d_pthread_mutex_unlock(&pThis->mut);
+ }
+
+finalize_it:
+dbgprintf("TTT: writer thread shutdown\n");
+ ENDfunc
+ return NULL; /* to keep pthreads happy */
+}
+
+
+/* This function is called to "do" an async write call, what primarily means that
+ * the data is handed over to the writer thread (which will then do the actual write
+ * in parallel. -- rgerhards, 2009-07-06
+ */
+static inline rsRetVal
+doAsyncWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf)
+{
+ int iEnq;
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ d_pthread_mutex_lock(&pThis->mut);
+ while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
+ d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
+
+ iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS;
+ // TODO: optimize, memcopy only for getting it initially going!
+ //pThis->asyncBuf[iEnq].pBuf = pBuf;
+ memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, *pLenBuf);
+ pThis->asyncBuf[iEnq].lenBuf = *pLenBuf;
+
+ if(++pThis->iCnt == 1)
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_mutex_unlock(&pThis->mut);
+
+finalize_it:
+ RETiRet;
+}
+
/* sync the file to disk, so that any unwritten data is persisted. This
* also syncs the directory and thus makes sure that the file survives
@@ -788,8 +903,11 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
CHKiRet(strmOpenFile(pThis));
iWritten = lenBuf;
- CHKiRet(doWriteCall(pThis, pBuf, &iWritten));
- dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
+ if(pThis->bAsyncWrite) {
+ CHKiRet(doAsyncWriteCall(pThis, pBuf, &iWritten));
+ } else {
+ CHKiRet(doWriteCall(pThis, pBuf, &iWritten));
+ }
pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
@@ -993,37 +1111,11 @@ finalize_it:
}
-/* schedule an Apc flush request.
- * rgerhards, 2009-06-15
- */
-static inline rsRetVal
-scheduleFlushRequest(strm_t *pThis)
-{
- apc_t *pApc;
- DEFiRet;
-
- if(!pThis->apcRequested) {
- /* we do an request only if none is yet pending */
- pThis->apcRequested = 1;
- // TODO: find similar thing later CHKiRet(apc.CancelApc(pThis->apcID));
-dbgprintf("XXX: requesting to add apc!\n");
- CHKiRet(apc.Construct(&pApc));
- CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc));
- CHKiRet(apc.SetParam1(pApc, pThis));
- CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID));
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* write memory buffer to a stream object
*/
static rsRetVal
strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
- DEFVARS_mutexProtection_uncond;
DEFiRet;
size_t iPartial;
@@ -1034,11 +1126,6 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n
if(pThis->bDisabled)
ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
-RUNLOG_VAR("%d", pThis->iFlushInterval);
- if(pThis->iFlushInterval != 0) {
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- }
-
/* check if the to-be-written data is larger than our buffer size */
if(lenBuf >= pThis->sIOBufSize) {
/* it is - so we do a direct write, that is most efficient.
@@ -1067,17 +1154,7 @@ RUNLOG_VAR("%d", pThis->iFlushInterval);
}
}
- /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at
- * termination. For Zip mode, it could be fatal if we write after each record.
- */
- if(pThis->iFlushInterval != 0)
- scheduleFlushRequest(pThis);
-
finalize_it:
- if(pThis->iFlushInterval != 0) {
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- }
-
RETiRet;
}
@@ -1390,7 +1467,6 @@ ENDobjQueryInterface(strm)
*/
BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
- CHKiRet(objUse(apc, CORE_COMPONENT));
OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty);