From e3040285dbf0854443bc2443e0de5ac59f6f839e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 16:38:09 +0200 Subject: first shot at asynchronous stream writer with timeout capability ... seems to work on quick testing, but needs a far more testing and improvement. Good milestone commit. --- runtime/stream.h | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'runtime/stream.h') diff --git a/runtime/stream.h b/runtime/stream.h index ac003c7b..2c1ac255 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -87,6 +87,7 @@ typedef enum { /* when extending, do NOT change existing modes! */ STREAMMODE_WRITE_APPEND = 4 } strmMode_t; +#define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */ /* The strm_t data structure */ typedef struct strm_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ @@ -112,7 +113,7 @@ typedef struct strm_s { int fd; /* the file descriptor, -1 if closed */ int fdDir; /* the directory's descriptor, in case bSync is requested (-1 if closed) */ uchar *pszCurrFName; /* name of current file (if open) */ - uchar *pIOBuf; /* io Buffer */ + uchar *pIOBuf; /* the iobuffer currently in use to gather data */ size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ size_t iBufPtr; /* pointer into current buffer */ int iUngetC; /* char set via UngetChar() call or -1 if none set */ @@ -120,9 +121,22 @@ typedef struct strm_s { int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */ Bytef *pZipBuf; /* support for async flush procesing */ + bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */ + bool bStopWriter; /* shall writer thread terminate? */ int iFlushInterval; /* flush in which interval - 0, no flushing */ apc_id_t apcID; /* id of current Apc request (used for cancelling) */ pthread_mutex_t mut;/* mutex for flush in async mode */ + pthread_cond_t notFull; + pthread_cond_t notEmpty; + pthread_cond_t isEmpty; + short iEnq; + short iDeq; + short iCnt; /* current nbr of elements in buffer */ + struct { + uchar *pBuf; + size_t lenBuf; + } asyncBuf[STREAM_ASYNC_NUMBUFS]; + pthread_t writerThreadID; int apcRequested; /* is an apc Requested? */ /* support for omfile size-limiting commands, special counters, NOT persisted! */ off_t iSizeLimit; /* file size limit, 0 = no limit */ @@ -130,6 +144,7 @@ typedef struct strm_s { bool bIsTTY; /* is this a tty file? */ } strm_t; + /* interfaces */ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(strm_t **ppThis); -- cgit From 26227091faac8c3cc9bc282eb4e4fc408635f8d2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 17:18:51 +0200 Subject: fixed a bug introduced today that lead to an abort in queue disk mode --- runtime/stream.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/stream.h') diff --git a/runtime/stream.h b/runtime/stream.h index 2c1ac255..1efd29b5 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -118,6 +118,7 @@ typedef struct strm_s { size_t iBufPtr; /* pointer into current buffer */ int iUngetC; /* char set via UngetChar() call or -1 if none set */ bool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */ + bool bInClose; /* used to break "deadly close loops", tells us we are already inside a close */ int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */ Bytef *pZipBuf; /* support for async flush procesing */ -- cgit From 5221a1e42e16c8c39b48a4a1a18ee6322c38cd17 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 18:33:00 +0200 Subject: added capability to write incomplete buffers after an inactivity timeout for the stream class and thus finally activating omfile's timeout capability in a useful way without polling and too-high performance overhead. --- runtime/stream.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/stream.h') diff --git a/runtime/stream.h b/runtime/stream.h index 1efd29b5..cb368835 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -124,6 +124,7 @@ typedef struct strm_s { /* support for async flush procesing */ bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */ bool bStopWriter; /* shall writer thread terminate? */ + bool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */ int iFlushInterval; /* flush in which interval - 0, no flushing */ apc_id_t apcID; /* id of current Apc request (used for cancelling) */ pthread_mutex_t mut;/* mutex for flush in async mode */ -- cgit From bfc3eaf23cae0ef8685fc25b71e701e2c4690509 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 18 Aug 2009 18:48:18 +0200 Subject: bugfix: potential segfault in output file writer (omfile) In async write mode, we use modular arithmetic to index the output buffer array. However, the counter variables accidently were signed, thus resulting in negative indizes after integer overflow. That in turn could lead to segfaults, but was depending on the memory layout of the instance in question (which in turn depended on a number of variables, like compile settings but also configuration). The counters are now unsigned (as they always should have been) and so the dangling mis-indexing does no longer happen. This bug potentially affected all installations, even if only some may actually have seen a segfault. --- runtime/stream.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'runtime/stream.h') diff --git a/runtime/stream.h b/runtime/stream.h index cb368835..64ffb6e1 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -131,8 +131,8 @@ typedef struct strm_s { pthread_cond_t notFull; pthread_cond_t notEmpty; pthread_cond_t isEmpty; - short iEnq; - short iDeq; + unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */ + unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */ short iCnt; /* current nbr of elements in buffer */ struct { uchar *pBuf; -- cgit