1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
/* Definition of serial stream class (strm).
*
* A serial stream provides serial data access. In theory, serial streams
* can be implemented via a number of methods (e.g. files or in-memory
* streams). In practice, there currently only exist the file type (aka
* "driver").
*
* In practice, many stream features are bound to files. I have not yet made
* any serious effort, except for the naming of this class, to try to make
* the interfaces very generic. However, I assume that we could work much
* like in the strm class, where some properties are simply ignored when
* the wrong strm mode is selected (which would translate here to the wrong
* stream mode).
*
* Most importantly, this class provides generic input and output functions
* which can directly be used to work with the strms and file output. It
* provides such useful things like a circular file buffer and, hopefully
* at a later stage, a lazy writer. The object is also seriazable and thus
* can easily be persistet. The bottom line is that it makes much sense to
* use this class whereever possible as its features may grow in the future.
*
* An important note on writing gzip format via zlib (kept anonymous
* by request):
*
* --------------------------------------------------------------------------
* We'd like to make sure the output file is in full gzip format
* (compatible with gzip -d/zcat etc). There is a flag in how the output
* is initialized within zlib to properly add the gzip wrappers to the
* output. (gzip is effectively a small metadata wrapper around raw
* zstream output.)
*
* I had written an old bit of code to do this - the documentation on
* deflatInit2() was pretty tricky to nail down on this specific feature:
*
* int deflateInit2 (z_streamp strm, int level, int method, int windowBits,
* int memLevel, int strategy);
*
* I believe "31" would be the value for the "windowBits" field that you'd
* want to try:
*
* deflateInit2(zstrmptr, 6, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
* --------------------------------------------------------------------------
*
* Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
* The rsyslog runtime library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The rsyslog runtime library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
*/
#ifndef STREAM_H_INCLUDED
#define STREAM_H_INCLUDED
#include <pthread.h>
#include "obj-types.h"
#include "glbl.h"
#include "stream.h"
#include "zlibw.h"
#include "apc.h"
/* stream types */
typedef enum {
STREAMTYPE_FILE_SINGLE = 0, /**< read a single file */
STREAMTYPE_FILE_CIRCULAR = 1, /**< circular files */
STREAMTYPE_FILE_MONITOR = 2, /**< monitor a (third-party) file */
STREAMTYPE_NAMED_PIPE = 3 /**< file is a named pipe (so far, tested for output only) */
} strmType_t;
typedef enum { /* when extending, do NOT change existing modes! */
STREAMMMODE_INVALID = 0,
STREAMMODE_READ = 1,
STREAMMODE_WRITE = 2,
STREAMMODE_WRITE_TRUNC = 3,
STREAMMODE_WRITE_APPEND = 4
} strmMode_t;
#define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */
/* The strm_t data structure */
typedef struct strm_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
strmType_t sType;
/* descriptive properties */
int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */
uchar *pszFName; /* prefix for generated filenames */
int lenFName;
strmMode_t tOperationsMode;
mode_t tOpenMode;
int64 iMaxFileSize;/* maximum size a file may grow to */
int iMaxFiles; /* maximum number of files if a circular mode is in use */
int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
sbool bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
int64 iCurrOffs;/* current offset */
int64 *pUsrWCntr; /* NULL or a user-provided counter that receives the nbr of bytes written since the last CntrSet() */
/* dynamic properties, valid only during file open, not to be persistet */
sbool bDisabled; /* should file no longer be written to? (currently set only if omfile file size limit fails) */
sbool bSync; /* sync this file after every write? */
size_t sIOBufSize;/* size of IO buffer */
uchar *pszDir; /* Directory */
int lenDir;
int fd; /* the file descriptor, -1 if closed */
int fdDir; /* the directory's descriptor, in case bSync is requested (-1 if closed) */
uchar *pszCurrFName; /* name of current file (if open) */
uchar *pIOBuf; /* the iobuffer currently in use to gather data */
size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
size_t iBufPtr; /* pointer into current buffer */
int iUngetC; /* char set via UngetChar() call or -1 if none set */
sbool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */
int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */
Bytef *pZipBuf;
/* support for async flush procesing */
sbool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
sbool bStopWriter; /* shall writer thread terminate? */
sbool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
int iFlushInterval; /* flush in which interval - 0, no flushing */
apc_id_t apcID; /* id of current Apc request (used for cancelling) */
pthread_mutex_t mut;/* mutex for flush in async mode */
pthread_cond_t notFull;
pthread_cond_t notEmpty;
pthread_cond_t isEmpty;
unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
short iCnt; /* current nbr of elements in buffer */
struct {
uchar *pBuf;
size_t lenBuf;
} asyncBuf[STREAM_ASYNC_NUMBUFS];
pthread_t writerThreadID;
int apcRequested; /* is an apc Requested? */
/* support for omfile size-limiting commands, special counters, NOT persisted! */
off_t iSizeLimit; /* file size limit, 0 = no limit */
uchar *pszSizeLimitCmd; /* command to carry out when size limit is reached */
sbool bIsTTY; /* is this a tty file? */
} strm_t;
/* interfaces */
BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Construct)(strm_t **ppThis);
rsRetVal (*ConstructFinalize)(strm_t *pThis);
rsRetVal (*Destruct)(strm_t **ppThis);
rsRetVal (*SetMaxFileSize)(strm_t *pThis, int64 iMaxFileSize);
rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName);
rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC);
rsRetVal (*UnreadChar)(strm_t *pThis, uchar c);
rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr);
rsRetVal (*SeekCurrOffs)(strm_t *pThis);
rsRetVal (*Write)(strm_t *pThis, uchar *pBuf, size_t lenBuf);
rsRetVal (*WriteChar)(strm_t *pThis, uchar c);
rsRetVal (*WriteLong)(strm_t *pThis, long i);
rsRetVal (*SetFName)(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal (*SetDir)(strm_t *pThis, uchar *pszDir, size_t iLenDir);
rsRetVal (*Flush)(strm_t *pThis);
rsRetVal (*RecordBegin)(strm_t *pThis);
rsRetVal (*RecordEnd)(strm_t *pThis);
rsRetVal (*Serialize)(strm_t *pThis, strm_t *pStrm);
rsRetVal (*GetCurrOffset)(strm_t *pThis, int64 *pOffs);
rsRetVal (*SetWCntr)(strm_t *pThis, number_t *pWCnt);
rsRetVal (*Dup)(strm_t *pThis, strm_t **ppNew);
INTERFACEpropSetMeth(strm, bDeleteOnClose, int);
INTERFACEpropSetMeth(strm, iMaxFileSize, int);
INTERFACEpropSetMeth(strm, iMaxFiles, int);
INTERFACEpropSetMeth(strm, iFileNumDigits, int);
INTERFACEpropSetMeth(strm, tOperationsMode, int);
INTERFACEpropSetMeth(strm, tOpenMode, mode_t);
INTERFACEpropSetMeth(strm, sType, strmType_t);
INTERFACEpropSetMeth(strm, iZipLevel, int);
INTERFACEpropSetMeth(strm, bSync, int);
INTERFACEpropSetMeth(strm, sIOBufSize, size_t);
INTERFACEpropSetMeth(strm, iSizeLimit, off_t);
INTERFACEpropSetMeth(strm, iFlushInterval, int);
INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*);
ENDinterface(strm)
#define strmCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */
/* prototypes */
PROTOTYPEObjClassInit(strm);
#endif /* #ifndef STREAM_H_INCLUDED */
|