summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-09 13:27:07 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-09 13:27:07 +0000
commitfd8c6452c8a4d51d39eb511046fca09391138a22 (patch)
treeff95b43b91df5bf2e68c690e38002f30ede5df1c
parent2146e340706a9de2be02761b7ad7c28034fb91f3 (diff)
downloadrsyslog-fd8c6452c8a4d51d39eb511046fca09391138a22.tar.gz
rsyslog-fd8c6452c8a4d51d39eb511046fca09391138a22.tar.xz
rsyslog-fd8c6452c8a4d51d39eb511046fca09391138a22.zip
created a generic stream class (for file access)
-rw-r--r--Makefile.am2
-rw-r--r--obj.h39
-rw-r--r--queue.c11
-rw-r--r--stream.c332
-rw-r--r--stream.h83
-rw-r--r--syslogd.c2
6 files changed, 455 insertions, 14 deletions
diff --git a/Makefile.am b/Makefile.am
index d5170c1f..2cad2365 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -21,6 +21,8 @@ rsyslogd_SOURCES = \
liblogging-stub.h \
threads.c \
threads.h \
+ stream.c \
+ stream.h \
queue.c \
queue.h \
sync.c \
diff --git a/obj.h b/obj.h
index 51f2217d..0b97718e 100644
--- a/obj.h
+++ b/obj.h
@@ -3,6 +3,24 @@
* This module relies heavily on preprocessor macros in order to
* provide fast execution time AND ease of use.
*
+ * Each object that uses this base class MUST provide a constructor with
+ * the following interface:
+ *
+ * Destruct(pThis);
+ *
+ * A constructor is not necessary (except for some features, e.g. de-serialization).
+ * If it is provided, it is a three-part constructor (to handle all cases with a
+ * generic interface):
+ *
+ * Construct(&pThis);
+ * SetProperty(pThis, property_t *);
+ * ConstructFinalize(pThis);
+ *
+ * SetProperty() and ConstructFinalize() may also be called on an object
+ * instance which has been Construct()'ed outside of this module.
+ *
+ * pThis always references to a pointer of the object.
+ *
* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
@@ -53,10 +71,11 @@ typedef struct {
/* object Types/IDs */
typedef enum { /* IDs of known object "types/classes" */
- objNull = 0, /* no valid object (we do not start at zero so we can detect calloc()) */
- objMsg = 1
+ OBJNull = 0, /* no valid object (we do not start at zero so we can detect calloc()) */
+ OBJMsg = 1,
+ OBJstrm = 2
} objID_t;
-#define OBJ_NUM_IDS 2
+#define OBJ_NUM_IDS 3
typedef enum { /* IDs of base methods supported by all objects - used for jump table, so
* they must start at zero and be incremented. -- rgerahrds, 2008-01-04
@@ -96,6 +115,16 @@ typedef struct serialStore_s {
/* macros */
+/* the following one is a helper that prevents us from writing the
+ * ever-same code at the end of Construct()
+ */
+#define OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP \
+ if(iRet == RS_RET_OK) { \
+ *ppThis = pThis; \
+ } else { \
+ if(pThis != NULL) \
+ free(pThis); \
+ }
#define objSerializeSCALAR(propName, propType) \
CHKiRet(objSerializeProp(pCStr, (uchar*) #propName, PROPTYPE_##propType, (void*) &pThis->propName));
#define objSerializePTR(propName, propType) \
@@ -115,11 +144,11 @@ typedef struct serialStore_s {
rsRetVal objName##ClassInit(void) \
{ \
DEFiRet; \
- CHKiRet(objInfoConstruct(&pObjInfoOBJ, obj##objName, (uchar*) #objName, objVers, \
+ CHKiRet(objInfoConstruct(&pObjInfoOBJ, OBJ##objName, (uchar*) #objName, objVers, \
(rsRetVal (*)(void*))objName##Construct, (rsRetVal (*)(void*))objName##Destruct));
#define ENDObjClassInit(objName) \
- objRegisterObj(obj##objName, pObjInfoOBJ); \
+ objRegisterObj(OBJ##objName, pObjInfoOBJ); \
finalize_it: \
return iRet; \
}
diff --git a/queue.c b/queue.c
index ceb4e3ab..b2a910fc 100644
--- a/queue.c
+++ b/queue.c
@@ -1,4 +1,3 @@
-#include <stdio.h>
// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
// call consumer state. Facilitates retaining messages in queue until action could
// be called!
@@ -460,7 +459,7 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
if(pThis->tVars.disk.fRead.fd == -1)
CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes!
- iRet = objDeserialize((void*) &pMsg, objMsg, &serialStore);
+ iRet = objDeserialize((void*) &pMsg, OBJMsg, &serialStore);
if(iRet == RS_RET_OK)
bRun = 0; /* we are done */
else if(iRet == RS_RET_EOF) {
@@ -696,13 +695,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
CHKiRet(pThis->qConstruct(pThis));
finalize_it:
- if(iRet == RS_RET_OK) {
- *ppThis = pThis;
- } else {
- if(pThis != NULL)
- free(pThis);
- }
-
+ OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
return iRet;
}
diff --git a/stream.c b/stream.c
new file mode 100644
index 00000000..7f99ab9a
--- /dev/null
+++ b/stream.c
@@ -0,0 +1,332 @@
+/* The serial stream class.
+ *
+ * A serial stream provides serial data access. In theory, serial streams
+ * can be implemented via a number of methods (e.g. files or in-memory
+ * streams). In practice, there currently only exist the file type (aka
+ * "driver").
+ *
+ * File begun on 2008-01-09 by RGerhards
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+#include "config.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <pthread.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include "rsyslog.h"
+#include "syslogd.h"
+#include "stringbuf.h"
+#include "srUtils.h"
+#include "obj.h"
+#include "stream.h"
+
+/* static data */
+DEFobjStaticHelpers
+
+
+/* methods */
+
+/* first, we define type-specific handlers. The provide a generic functionality,
+ * but for this specific type of strm. The mapping to these handlers happens during
+ * strm construction. Later on, handlers are called by pointers present in the
+ * strm instance object.
+ */
+
+/* open a strm file */
+static rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ if(pThis->pszFilePrefix == NULL)
+ ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
+
+ CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
+ pThis->pszFilePrefix, pThis->lenFilePrefix, pThis->iCurrFNum, (uchar*) "qf", 2));
+
+ pThis->fd = open((char*)pThis->pszCurrFName, flags, mode); // TODO: open modes!
+ pThis->iCurrOffs = 0;
+
+ dbgprintf("Stream 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pThis->pszCurrFName, flags, pThis->fd);
+
+finalize_it:
+ return iRet;
+}
+
+
+/* close a strm file
+ * Note that the bDeleteOnClose flag is honored. If it is set, the file will be
+ * deleted after close. This is in support for the qRead thread.
+ */
+static rsRetVal strmCloseFile(strm_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd);
+
+ close(pThis->fd); // TODO: error check
+ pThis->fd = -1;
+
+ if(pThis->bDeleteOnClose) {
+ unlink((char*) pThis->pszCurrFName); // TODO: check returncode
+ }
+
+ if(pThis->pszCurrFName != NULL) {
+ free(pThis->pszCurrFName); /* no longer needed in any case (just for open) */
+ pThis->pszCurrFName = NULL;
+ }
+
+ return iRet;
+}
+
+
+/* switch to next strm file */
+static rsRetVal strmNextFile(strm_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ CHKiRet(strmCloseFile(pThis));
+
+ /* we do modulo 1,000,000 so that the file number is always at most 6 digits. If we have a million
+ * or more strm files, something is awfully wrong and it is OK if we run into problems in that
+ * situation ;) -- rgerhards, 2008-01-09
+ */
+ pThis->iCurrFNum = (pThis->iCurrFNum + 1) % 1000000;
+
+finalize_it:
+ return iRet;
+}
+
+
+/*** buffered read functions for strm files ***/
+
+/* logically "read" a character from a file. What actually happens is that
+ * data is taken from the buffer. Only if the buffer is full, data is read
+ * directly from file. In that case, a read is performed blockwise.
+ * rgerhards, 2008-01-07
+ * NOTE: needs to be enhanced to support sticking with a strm entry (if not
+ * deleted).
+ */
+static rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ assert(pC != NULL);
+
+//dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax);
+ if(pThis->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */
+ if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * STRM_IOBUF_SIZE )) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ pThis->iBufPtrMax = 0; /* results in immediate read request */
+ }
+
+ if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */
+ *pC = pThis->iUngetC;
+ pThis->iUngetC = -1;
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* do we need to obtain a new buffer */
+ if(pThis->iBufPtr >= pThis->iBufPtrMax) {
+ /* read */
+ pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, STRM_IOBUF_SIZE);
+ dbgprintf("strmReadChar read %d bytes from file %d\n", pThis->iBufPtrMax, pThis->fd);
+ if(pThis->iBufPtrMax == 0)
+ ABORT_FINALIZE(RS_RET_EOF);
+ else if(pThis->iBufPtrMax < 0)
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ /* if we reach this point, we had a good read */
+ pThis->iBufPtr = 0;
+ }
+
+ *pC = pThis->pIOBuf[pThis->iBufPtr++];
+
+finalize_it:
+ return iRet;
+}
+
+
+/* unget a single character just like ungetc(). As with that call, there is only a single
+ * character buffering capability.
+ * rgerhards, 2008-01-07
+ */
+static rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
+{
+ assert(pThis != NULL);
+ assert(pThis->iUngetC == -1);
+ pThis->iUngetC = c;
+
+ return RS_RET_OK;
+}
+
+#if 0
+/* we have commented out the code below because we would like to preserve it. It
+ * is currently not needed, but may be useful if we implemented a bufferred file
+ * class.
+ * rgerhards, 2008-01-07
+ */
+/* read a line from a strm file. A line is terminated by LF. The LF is read, but it
+ * is not returned in the buffer (it is discared). The caller is responsible for
+ * destruction of the returned CStr object!
+ * rgerhards, 2008-01-07
+ */
+static rsRetVal strmReadLine(strm_t *pThis, rsCStrObj **ppCStr)
+{
+ DEFiRet;
+ uchar c;
+ rsCStrObj *pCStr = NULL;
+
+ assert(pThis != NULL);
+ assert(ppCStr != NULL);
+
+ if((pCStr = rsCStrConstruct()) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+
+ /* now read the line */
+ CHKiRet(strmReadChar(pThis, &c));
+ while(c != '\n') {
+ CHKiRet(rsCStrAppendChar(pCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ }
+ CHKiRet(rsCStrFinish(pCStr));
+ *ppCStr = pCStr;
+
+finalize_it:
+ if(iRet != RS_RET_OK && pCStr != NULL)
+ rsCStrDestruct(pCStr);
+
+ return iRet;
+}
+
+#endif /* #if 0 - saved code */
+
+/*** end buffered read functions for strm files ***/
+
+
+/* --------------- end type-specific handlers -------------------- */
+
+
+/* Constructor for the strm object
+ */
+rsRetVal strmConstruct(strm_t **ppThis)
+{
+ DEFiRet;
+ strm_t *pThis;
+
+ assert(ppThis != NULL);
+
+ if((pThis = (strm_t *)calloc(1, sizeof(strm_t))) == NULL) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
+ /* we have an object, so let's fill all properties that must not be 0 (we did calloc()!) */
+ pThis->iCurrFNum = 1;
+ pThis->fd = -1;
+ pThis->iUngetC = -1;
+ pThis->sType = STREAMTYPE_FILE;
+
+finalize_it:
+ OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
+ return iRet;
+}
+
+
+/* ConstructionFinalizer - currently provided just to comply to the interface
+ * definiton. -- rgerhards, 2008-01-09
+ */
+rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis)
+{
+ return RS_RET_OK;
+}
+
+
+/* destructor for the strm object */
+rsRetVal strmDestruct(strm_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ /* ... then free resources */
+ if(pThis->fd != -1)
+ strmCloseFile(pThis);
+
+ if(pThis->pszDir != NULL)
+ free(pThis->pszDir);
+
+ /* and finally delete the strm objet itself */
+ free(pThis);
+
+ return iRet;
+}
+
+
+/* write memory buffer to a stream object
+ */
+static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+ int iWritten;
+
+ assert(pThis != NULL);
+ assert(pBuf != NULL);
+
+ if(pThis->fd == -1)
+ CHKiRet(strmOpenFile(pThis, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
+
+ iWritten = write(pThis->fd, pBuf, lenBuf);
+ dbgprintf("Stream 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis,
+ iWritten, errno, strerror(errno));
+ /* TODO: handle error case -- rgerhards, 2008-01-07 */
+
+ pThis->iCurrOffs += iWritten;
+ if(pThis->iCurrOffs >= pThis->iMaxFileSize)
+ CHKiRet(strmNextFile(pThis));
+
+finalize_it:
+ return iRet;
+}
+
+
+
+
+/* Initialize the stream class. Must be called as the very first method
+ * before anything else is called inside this class.
+ * rgerhards, 2008-01-09
+ */
+BEGINObjClassInit(strm, 1)
+ //OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
+ //OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty);
+ OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize);
+ENDObjClassInit(strm)
+/*
+ * vi:set ai:
+ */
diff --git a/stream.h b/stream.h
new file mode 100644
index 00000000..5c9451cd
--- /dev/null
+++ b/stream.h
@@ -0,0 +1,83 @@
+/* Definition of serial stream class (strm).
+ *
+ * A serial stream provides serial data access. In theory, serial streams
+ * can be implemented via a number of methods (e.g. files or in-memory
+ * streams). In practice, there currently only exist the file type (aka
+ * "driver").
+ *
+ * In practice, many stream features are bound to files. I have not yet made
+ * any serious effort, except for the naming of this class, to try to make
+ * the interfaces very generic. However, I assume that we could work much
+ * like in the strm class, where some properties are simply ignored when
+ * the wrong strm mode is selected (which would translate here to the wrong
+ * stream mode).
+ *
+ * Most importantly, this class provides generic input and output functions
+ * which can directly be used to work with the strms and file output. It
+ * provides such useful things like a circular file buffer and, hopefully
+ * at a later stage, a lazy writer. The object is also seriazable and thus
+ * can easily be persistet. The bottom line is that it makes much sense to
+ * use this class whereever possible as its features may grow in the future.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+
+#ifndef STREAM_H_INCLUDED
+#define STREAM_H_INCLUDED
+
+#include <pthread.h>
+#include "obj.h"
+#include "stream.h"
+
+/* stream types */
+typedef enum {
+ STREAMTYPE_FILE = 0
+} strmType_t;
+
+/* The strm_t data structure */
+typedef struct {
+ BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
+ strmType_t sType;
+ int fd; /* the file descriptor, -1 if closed */
+ uchar *pszCurrFName; /* name of current file (if open) */
+ int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */
+ uchar *pszDir; /* Directory */
+ int lenDir;
+ uchar *pszFilePrefix; /* prefix for generated filenames */
+ int lenFilePrefix;
+ size_t iCurrOffs;/* current offset */
+ uchar *pIOBuf; /* io Buffer */
+ int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
+ int iBufPtr; /* pointer into current buffer */
+ int iUngetC; /* char set via UngetChar() call or -1 if none set */
+ int iFlagsOpenOS;
+ int iModeOpenOS;
+ size_t iMaxFileSize;/* maximum size a file may grow to */
+ int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
+} strm_t;
+#define STRM_IOBUF_SIZE 4096 /* size of the IO buffer */
+
+/* prototypes */
+rsRetVal strmDestruct(strm_t *pThis);
+rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize);
+rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
+PROTOTYPEObjClassInit(strm);
+
+#endif /* #ifndef STREAM_H_INCLUDED */
diff --git a/syslogd.c b/syslogd.c
index 64297f42..d624e678 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -176,6 +176,7 @@
#include "omdiscard.h"
#include "threads.h"
#include "queue.h"
+#include "stream.h"
/* We define our own set of syslog defintions so that we
* do not need to rely on (possibly different) implementations.
@@ -4651,6 +4652,7 @@ static rsRetVal InitGlobalClasses(void)
CHKiRet(objClassInit()); /* *THIS* *MUST* always be the first class initilizere called! */
CHKiRet(MsgClassInit());
+ CHKiRet(strmClassInit());
finalize_it:
return iRet;