summaryrefslogtreecommitdiffstats
path: root/runtime/stream.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-06-15 13:44:51 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-06-15 13:44:51 +0200
commit16ecb90c3ac88bb2261c31c990d88f97f1a1b32f (patch)
tree393807bddcfb330384ab7c2bd5aa32200571d495 /runtime/stream.c
parent1f874c220860d3a19fb6cfb60f0902a08639f6ab (diff)
downloadrsyslog-16ecb90c3ac88bb2261c31c990d88f97f1a1b32f.tar.gz
rsyslog-16ecb90c3ac88bb2261c31c990d88f97f1a1b32f.tar.xz
rsyslog-16ecb90c3ac88bb2261c31c990d88f97f1a1b32f.zip
omfile buffers are now synchronized after inactivity
This is the first shot at this functionality. Currently, we run off a fixed counter in the rsyslogd mainloop, which needs to be restructured. But this code works, so it is a good time for a commit.
Diffstat (limited to 'runtime/stream.c')
-rw-r--r--runtime/stream.c68
1 files changed, 68 insertions, 0 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index c8672aa2..773da319 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -39,6 +39,7 @@
#include <unistd.h>
#include <sys/stat.h> /* required for HP UX */
#include <errno.h>
+#include <pthread.h>
#include "rsyslog.h"
#include "stringbuf.h"
@@ -47,10 +48,12 @@
#include "stream.h"
#include "unicode-helper.h"
#include "module-template.h"
+#include "apc.h"
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
+DEFobjCurrIf(apc)
/* forward definitions */
static rsRetVal strmFlush(strm_t *pThis);
@@ -60,6 +63,20 @@ static rsRetVal strmCloseFile(strm_t *pThis);
/* methods */
+/* async flush apc handler
+ */
+static void
+flushApc(void *param1, void __attribute__((unused)) *param2)
+{
+ DEFVARS_mutexProtection_uncond;
+ strm_t *pThis = (strm_t*) param1;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+ strmFlush(pThis);
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+}
+
/* Try to resolve a size limit situation. This is used to support custom-file size handlers
* for omfile. It first runs the command, and then checks if we are still above the size
@@ -583,6 +600,11 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
}
+ /* if we should call flush apc's, we need a mutex */
+ if(pThis->iFlushInterval != 0) {
+ pthread_mutex_init(&pThis->mut, 0);
+ }
+
finalize_it:
RETiRet;
}
@@ -602,6 +624,11 @@ CODESTARTobjDestruct(strm)
objRelease(zlibw, LM_ZLIBW_FILENAME);
}
+ if(pThis->iFlushInterval != 0) {
+ // TODO: check if there is an apc and remove it!
+ pthread_mutex_destroy(&pThis->mut);
+ }
+
free(pThis->pszDir);
free(pThis->pIOBuf);
free(pThis->pZipBuf);
@@ -965,11 +992,33 @@ finalize_it:
}
+/* schedule an Apc flush request.
+ * rgerhards, 2009-06-15
+ */
+static inline rsRetVal
+scheduleFlushRequest(strm_t *pThis)
+{
+ apc_t *pApc;
+ DEFiRet;
+
+ CHKiRet(apc.CancelApc(pThis->apcID));
+dbgprintf("XXX: requesting to add apc!\n");
+ CHKiRet(apc.Construct(&pApc));
+ CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc));
+ CHKiRet(apc.SetParam1(pApc, pThis));
+ CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID));
+
+finalize_it:
+ RETiRet;
+}
+
+
/* write memory buffer to a stream object
*/
static rsRetVal
strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
+ DEFVARS_mutexProtection_uncond;
DEFiRet;
size_t iPartial;
@@ -980,6 +1029,11 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n
if(pThis->bDisabled)
ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
+RUNLOG_VAR("%d", pThis->iFlushInterval);
+ if(pThis->iFlushInterval != 0) {
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+ }
+
/* check if the to-be-written data is larger than our buffer size */
if(lenBuf >= pThis->sIOBufSize) {
/* it is - so we do a direct write, that is most efficient.
@@ -1008,7 +1062,17 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n
}
}
+ /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at
+ * termination. For Zip mode, it could be fatal if we write after each record.
+ */
+ if(pThis->iFlushInterval != 0)
+ scheduleFlushRequest(pThis);
+
finalize_it:
+ if(pThis->iFlushInterval != 0) {
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+ }
+
RETiRet;
}
@@ -1025,6 +1089,7 @@ DEFpropSetMeth(strm, iZipLevel, int)
DEFpropSetMeth(strm, bSync, int)
DEFpropSetMeth(strm, sIOBufSize, size_t)
DEFpropSetMeth(strm, iSizeLimit, off_t)
+DEFpropSetMeth(strm, iFlushInterval, int)
DEFpropSetMeth(strm, pszSizeLimitCmd, uchar*)
static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
@@ -1308,6 +1373,7 @@ CODESTARTobjQueryInterface(strm)
pIf->SetbSync = strmSetbSync;
pIf->SetsIOBufSize = strmSetsIOBufSize;
pIf->SetiSizeLimit = strmSetiSizeLimit;
+ pIf->SetiFlushInterval = strmSetiFlushInterval;
pIf->SetpszSizeLimitCmd = strmSetpszSizeLimitCmd;
finalize_it:
ENDobjQueryInterface(strm)
@@ -1319,6 +1385,8 @@ ENDobjQueryInterface(strm)
*/
BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
+ CHKiRet(objUse(apc, CORE_COMPONENT));
+
OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty);
OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize);