diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-15 13:44:51 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-15 13:44:51 +0200 |
commit | 16ecb90c3ac88bb2261c31c990d88f97f1a1b32f (patch) | |
tree | 393807bddcfb330384ab7c2bd5aa32200571d495 /runtime/stream.c | |
parent | 1f874c220860d3a19fb6cfb60f0902a08639f6ab (diff) | |
download | rsyslog-16ecb90c3ac88bb2261c31c990d88f97f1a1b32f.tar.gz rsyslog-16ecb90c3ac88bb2261c31c990d88f97f1a1b32f.tar.xz rsyslog-16ecb90c3ac88bb2261c31c990d88f97f1a1b32f.zip |
omfile buffers are now synchronized after inactivity
This is the first shot at this functionality. Currently, we run off a fixed
counter in the rsyslogd mainloop, which needs to be restructured. But this
code works, so it is a good time for a commit.
Diffstat (limited to 'runtime/stream.c')
-rw-r--r-- | runtime/stream.c | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index c8672aa2..773da319 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -39,6 +39,7 @@ #include <unistd.h> #include <sys/stat.h> /* required for HP UX */ #include <errno.h> +#include <pthread.h> #include "rsyslog.h" #include "stringbuf.h" @@ -47,10 +48,12 @@ #include "stream.h" #include "unicode-helper.h" #include "module-template.h" +#include "apc.h" /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) +DEFobjCurrIf(apc) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); @@ -60,6 +63,20 @@ static rsRetVal strmCloseFile(strm_t *pThis); /* methods */ +/* async flush apc handler + */ +static void +flushApc(void *param1, void __attribute__((unused)) *param2) +{ + DEFVARS_mutexProtection_uncond; + strm_t *pThis = (strm_t*) param1; + ISOBJ_TYPE_assert(pThis, strm); + + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + strmFlush(pThis); + END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); +} + /* Try to resolve a size limit situation. This is used to support custom-file size handlers * for omfile. It first runs the command, and then checks if we are still above the size @@ -583,6 +600,11 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } + /* if we should call flush apc's, we need a mutex */ + if(pThis->iFlushInterval != 0) { + pthread_mutex_init(&pThis->mut, 0); + } + finalize_it: RETiRet; } @@ -602,6 +624,11 @@ CODESTARTobjDestruct(strm) objRelease(zlibw, LM_ZLIBW_FILENAME); } + if(pThis->iFlushInterval != 0) { + // TODO: check if there is an apc and remove it! + pthread_mutex_destroy(&pThis->mut); + } + free(pThis->pszDir); free(pThis->pIOBuf); free(pThis->pZipBuf); @@ -965,11 +992,33 @@ finalize_it: } +/* schedule an Apc flush request. + * rgerhards, 2009-06-15 + */ +static inline rsRetVal +scheduleFlushRequest(strm_t *pThis) +{ + apc_t *pApc; + DEFiRet; + + CHKiRet(apc.CancelApc(pThis->apcID)); +dbgprintf("XXX: requesting to add apc!\n"); + CHKiRet(apc.Construct(&pApc)); + CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc)); + CHKiRet(apc.SetParam1(pApc, pThis)); + CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID)); + +finalize_it: + RETiRet; +} + + /* write memory buffer to a stream object */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { + DEFVARS_mutexProtection_uncond; DEFiRet; size_t iPartial; @@ -980,6 +1029,11 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); +RUNLOG_VAR("%d", pThis->iFlushInterval); + if(pThis->iFlushInterval != 0) { + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + } + /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. @@ -1008,7 +1062,17 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n } } + /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at + * termination. For Zip mode, it could be fatal if we write after each record. + */ + if(pThis->iFlushInterval != 0) + scheduleFlushRequest(pThis); + finalize_it: + if(pThis->iFlushInterval != 0) { + END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + } + RETiRet; } @@ -1025,6 +1089,7 @@ DEFpropSetMeth(strm, iZipLevel, int) DEFpropSetMeth(strm, bSync, int) DEFpropSetMeth(strm, sIOBufSize, size_t) DEFpropSetMeth(strm, iSizeLimit, off_t) +DEFpropSetMeth(strm, iFlushInterval, int) DEFpropSetMeth(strm, pszSizeLimitCmd, uchar*) static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) @@ -1308,6 +1373,7 @@ CODESTARTobjQueryInterface(strm) pIf->SetbSync = strmSetbSync; pIf->SetsIOBufSize = strmSetsIOBufSize; pIf->SetiSizeLimit = strmSetiSizeLimit; + pIf->SetiFlushInterval = strmSetiFlushInterval; pIf->SetpszSizeLimitCmd = strmSetpszSizeLimitCmd; finalize_it: ENDobjQueryInterface(strm) @@ -1319,6 +1385,8 @@ ENDobjQueryInterface(strm) */ BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ + CHKiRet(objUse(apc, CORE_COMPONENT)); + OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); |