diff options
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | obj.h | 39 | ||||
-rw-r--r-- | queue.c | 11 | ||||
-rw-r--r-- | stream.c | 332 | ||||
-rw-r--r-- | stream.h | 83 | ||||
-rw-r--r-- | syslogd.c | 2 |
6 files changed, 455 insertions, 14 deletions
diff --git a/Makefile.am b/Makefile.am index d5170c1f..2cad2365 100644 --- a/Makefile.am +++ b/Makefile.am @@ -21,6 +21,8 @@ rsyslogd_SOURCES = \ liblogging-stub.h \ threads.c \ threads.h \ + stream.c \ + stream.h \ queue.c \ queue.h \ sync.c \ @@ -3,6 +3,24 @@ * This module relies heavily on preprocessor macros in order to * provide fast execution time AND ease of use. * + * Each object that uses this base class MUST provide a constructor with + * the following interface: + * + * Destruct(pThis); + * + * A constructor is not necessary (except for some features, e.g. de-serialization). + * If it is provided, it is a three-part constructor (to handle all cases with a + * generic interface): + * + * Construct(&pThis); + * SetProperty(pThis, property_t *); + * ConstructFinalize(pThis); + * + * SetProperty() and ConstructFinalize() may also be called on an object + * instance which has been Construct()'ed outside of this module. + * + * pThis always references to a pointer of the object. + * * Copyright 2008 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. @@ -53,10 +71,11 @@ typedef struct { /* object Types/IDs */ typedef enum { /* IDs of known object "types/classes" */ - objNull = 0, /* no valid object (we do not start at zero so we can detect calloc()) */ - objMsg = 1 + OBJNull = 0, /* no valid object (we do not start at zero so we can detect calloc()) */ + OBJMsg = 1, + OBJstrm = 2 } objID_t; -#define OBJ_NUM_IDS 2 +#define OBJ_NUM_IDS 3 typedef enum { /* IDs of base methods supported by all objects - used for jump table, so * they must start at zero and be incremented. -- rgerahrds, 2008-01-04 @@ -96,6 +115,16 @@ typedef struct serialStore_s { /* macros */ +/* the following one is a helper that prevents us from writing the + * ever-same code at the end of Construct() + */ +#define OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP \ + if(iRet == RS_RET_OK) { \ + *ppThis = pThis; \ + } else { \ + if(pThis != NULL) \ + free(pThis); \ + } #define objSerializeSCALAR(propName, propType) \ CHKiRet(objSerializeProp(pCStr, (uchar*) #propName, PROPTYPE_##propType, (void*) &pThis->propName)); #define objSerializePTR(propName, propType) \ @@ -115,11 +144,11 @@ typedef struct serialStore_s { rsRetVal objName##ClassInit(void) \ { \ DEFiRet; \ - CHKiRet(objInfoConstruct(&pObjInfoOBJ, obj##objName, (uchar*) #objName, objVers, \ + CHKiRet(objInfoConstruct(&pObjInfoOBJ, OBJ##objName, (uchar*) #objName, objVers, \ (rsRetVal (*)(void*))objName##Construct, (rsRetVal (*)(void*))objName##Destruct)); #define ENDObjClassInit(objName) \ - objRegisterObj(obj##objName, pObjInfoOBJ); \ + objRegisterObj(OBJ##objName, pObjInfoOBJ); \ finalize_it: \ return iRet; \ } @@ -1,4 +1,3 @@ -#include <stdio.h> // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in // call consumer state. Facilitates retaining messages in queue until action could // be called! @@ -460,7 +459,7 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) if(pThis->tVars.disk.fRead.fd == -1) CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes! - iRet = objDeserialize((void*) &pMsg, objMsg, &serialStore); + iRet = objDeserialize((void*) &pMsg, OBJMsg, &serialStore); if(iRet == RS_RET_OK) bRun = 0; /* we are done */ else if(iRet == RS_RET_EOF) { @@ -696,13 +695,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, CHKiRet(pThis->qConstruct(pThis)); finalize_it: - if(iRet == RS_RET_OK) { - *ppThis = pThis; - } else { - if(pThis != NULL) - free(pThis); - } - + OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP return iRet; } diff --git a/stream.c b/stream.c new file mode 100644 index 00000000..7f99ab9a --- /dev/null +++ b/stream.c @@ -0,0 +1,332 @@ +/* The serial stream class. + * + * A serial stream provides serial data access. In theory, serial streams + * can be implemented via a number of methods (e.g. files or in-memory + * streams). In practice, there currently only exist the file type (aka + * "driver"). + * + * File begun on 2008-01-09 by RGerhards + * + * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" + +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <pthread.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> + +#include "rsyslog.h" +#include "syslogd.h" +#include "stringbuf.h" +#include "srUtils.h" +#include "obj.h" +#include "stream.h" + +/* static data */ +DEFobjStaticHelpers + + +/* methods */ + +/* first, we define type-specific handlers. The provide a generic functionality, + * but for this specific type of strm. The mapping to these handlers happens during + * strm construction. Later on, handlers are called by pointers present in the + * strm instance object. + */ + +/* open a strm file */ +static rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode) +{ + DEFiRet; + + assert(pThis != NULL); + + if(pThis->pszFilePrefix == NULL) + ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); + + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFilePrefix, pThis->lenFilePrefix, pThis->iCurrFNum, (uchar*) "qf", 2)); + + pThis->fd = open((char*)pThis->pszCurrFName, flags, mode); // TODO: open modes! + pThis->iCurrOffs = 0; + + dbgprintf("Stream 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pThis->pszCurrFName, flags, pThis->fd); + +finalize_it: + return iRet; +} + + +/* close a strm file + * Note that the bDeleteOnClose flag is honored. If it is set, the file will be + * deleted after close. This is in support for the qRead thread. + */ +static rsRetVal strmCloseFile(strm_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd); + + close(pThis->fd); // TODO: error check + pThis->fd = -1; + + if(pThis->bDeleteOnClose) { + unlink((char*) pThis->pszCurrFName); // TODO: check returncode + } + + if(pThis->pszCurrFName != NULL) { + free(pThis->pszCurrFName); /* no longer needed in any case (just for open) */ + pThis->pszCurrFName = NULL; + } + + return iRet; +} + + +/* switch to next strm file */ +static rsRetVal strmNextFile(strm_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + CHKiRet(strmCloseFile(pThis)); + + /* we do modulo 1,000,000 so that the file number is always at most 6 digits. If we have a million + * or more strm files, something is awfully wrong and it is OK if we run into problems in that + * situation ;) -- rgerhards, 2008-01-09 + */ + pThis->iCurrFNum = (pThis->iCurrFNum + 1) % 1000000; + +finalize_it: + return iRet; +} + + +/*** buffered read functions for strm files ***/ + +/* logically "read" a character from a file. What actually happens is that + * data is taken from the buffer. Only if the buffer is full, data is read + * directly from file. In that case, a read is performed blockwise. + * rgerhards, 2008-01-07 + * NOTE: needs to be enhanced to support sticking with a strm entry (if not + * deleted). + */ +static rsRetVal strmReadChar(strm_t *pThis, uchar *pC) +{ + DEFiRet; + + assert(pThis != NULL); + assert(pC != NULL); + +//dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); + if(pThis->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */ + if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * STRM_IOBUF_SIZE )) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + pThis->iBufPtrMax = 0; /* results in immediate read request */ + } + + if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ + *pC = pThis->iUngetC; + pThis->iUngetC = -1; + ABORT_FINALIZE(RS_RET_OK); + } + + /* do we need to obtain a new buffer */ + if(pThis->iBufPtr >= pThis->iBufPtrMax) { + /* read */ + pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, STRM_IOBUF_SIZE); + dbgprintf("strmReadChar read %d bytes from file %d\n", pThis->iBufPtrMax, pThis->fd); + if(pThis->iBufPtrMax == 0) + ABORT_FINALIZE(RS_RET_EOF); + else if(pThis->iBufPtrMax < 0) + ABORT_FINALIZE(RS_RET_IO_ERROR); + /* if we reach this point, we had a good read */ + pThis->iBufPtr = 0; + } + + *pC = pThis->pIOBuf[pThis->iBufPtr++]; + +finalize_it: + return iRet; +} + + +/* unget a single character just like ungetc(). As with that call, there is only a single + * character buffering capability. + * rgerhards, 2008-01-07 + */ +static rsRetVal strmUnreadChar(strm_t *pThis, uchar c) +{ + assert(pThis != NULL); + assert(pThis->iUngetC == -1); + pThis->iUngetC = c; + + return RS_RET_OK; +} + +#if 0 +/* we have commented out the code below because we would like to preserve it. It + * is currently not needed, but may be useful if we implemented a bufferred file + * class. + * rgerhards, 2008-01-07 + */ +/* read a line from a strm file. A line is terminated by LF. The LF is read, but it + * is not returned in the buffer (it is discared). The caller is responsible for + * destruction of the returned CStr object! + * rgerhards, 2008-01-07 + */ +static rsRetVal strmReadLine(strm_t *pThis, rsCStrObj **ppCStr) +{ + DEFiRet; + uchar c; + rsCStrObj *pCStr = NULL; + + assert(pThis != NULL); + assert(ppCStr != NULL); + + if((pCStr = rsCStrConstruct()) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + /* now read the line */ + CHKiRet(strmReadChar(pThis, &c)); + while(c != '\n') { + CHKiRet(rsCStrAppendChar(pCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } + CHKiRet(rsCStrFinish(pCStr)); + *ppCStr = pCStr; + +finalize_it: + if(iRet != RS_RET_OK && pCStr != NULL) + rsCStrDestruct(pCStr); + + return iRet; +} + +#endif /* #if 0 - saved code */ + +/*** end buffered read functions for strm files ***/ + + +/* --------------- end type-specific handlers -------------------- */ + + +/* Constructor for the strm object + */ +rsRetVal strmConstruct(strm_t **ppThis) +{ + DEFiRet; + strm_t *pThis; + + assert(ppThis != NULL); + + if((pThis = (strm_t *)calloc(1, sizeof(strm_t))) == NULL) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + + /* we have an object, so let's fill all properties that must not be 0 (we did calloc()!) */ + pThis->iCurrFNum = 1; + pThis->fd = -1; + pThis->iUngetC = -1; + pThis->sType = STREAMTYPE_FILE; + +finalize_it: + OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP + return iRet; +} + + +/* ConstructionFinalizer - currently provided just to comply to the interface + * definiton. -- rgerhards, 2008-01-09 + */ +rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis) +{ + return RS_RET_OK; +} + + +/* destructor for the strm object */ +rsRetVal strmDestruct(strm_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + /* ... then free resources */ + if(pThis->fd != -1) + strmCloseFile(pThis); + + if(pThis->pszDir != NULL) + free(pThis->pszDir); + + /* and finally delete the strm objet itself */ + free(pThis); + + return iRet; +} + + +/* write memory buffer to a stream object + */ +static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + int iWritten; + + assert(pThis != NULL); + assert(pBuf != NULL); + + if(pThis->fd == -1) + CHKiRet(strmOpenFile(pThis, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! + + iWritten = write(pThis->fd, pBuf, lenBuf); + dbgprintf("Stream 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis, + iWritten, errno, strerror(errno)); + /* TODO: handle error case -- rgerhards, 2008-01-07 */ + + pThis->iCurrOffs += iWritten; + if(pThis->iCurrOffs >= pThis->iMaxFileSize) + CHKiRet(strmNextFile(pThis)); + +finalize_it: + return iRet; +} + + + + +/* Initialize the stream class. Must be called as the very first method + * before anything else is called inside this class. + * rgerhards, 2008-01-09 + */ +BEGINObjClassInit(strm, 1) + //OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); + //OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); + OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); +ENDObjClassInit(strm) +/* + * vi:set ai: + */ diff --git a/stream.h b/stream.h new file mode 100644 index 00000000..5c9451cd --- /dev/null +++ b/stream.h @@ -0,0 +1,83 @@ +/* Definition of serial stream class (strm). + * + * A serial stream provides serial data access. In theory, serial streams + * can be implemented via a number of methods (e.g. files or in-memory + * streams). In practice, there currently only exist the file type (aka + * "driver"). + * + * In practice, many stream features are bound to files. I have not yet made + * any serious effort, except for the naming of this class, to try to make + * the interfaces very generic. However, I assume that we could work much + * like in the strm class, where some properties are simply ignored when + * the wrong strm mode is selected (which would translate here to the wrong + * stream mode). + * + * Most importantly, this class provides generic input and output functions + * which can directly be used to work with the strms and file output. It + * provides such useful things like a circular file buffer and, hopefully + * at a later stage, a lazy writer. The object is also seriazable and thus + * can easily be persistet. The bottom line is that it makes much sense to + * use this class whereever possible as its features may grow in the future. + * + * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ + +#ifndef STREAM_H_INCLUDED +#define STREAM_H_INCLUDED + +#include <pthread.h> +#include "obj.h" +#include "stream.h" + +/* stream types */ +typedef enum { + STREAMTYPE_FILE = 0 +} strmType_t; + +/* The strm_t data structure */ +typedef struct { + BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ + strmType_t sType; + int fd; /* the file descriptor, -1 if closed */ + uchar *pszCurrFName; /* name of current file (if open) */ + int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */ + uchar *pszDir; /* Directory */ + int lenDir; + uchar *pszFilePrefix; /* prefix for generated filenames */ + int lenFilePrefix; + size_t iCurrOffs;/* current offset */ + uchar *pIOBuf; /* io Buffer */ + int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ + int iBufPtr; /* pointer into current buffer */ + int iUngetC; /* char set via UngetChar() call or -1 if none set */ + int iFlagsOpenOS; + int iModeOpenOS; + size_t iMaxFileSize;/* maximum size a file may grow to */ + int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ +} strm_t; +#define STRM_IOBUF_SIZE 4096 /* size of the IO buffer */ + +/* prototypes */ +rsRetVal strmDestruct(strm_t *pThis); +rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize); +rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); +PROTOTYPEObjClassInit(strm); + +#endif /* #ifndef STREAM_H_INCLUDED */ @@ -176,6 +176,7 @@ #include "omdiscard.h" #include "threads.h" #include "queue.h" +#include "stream.h" /* We define our own set of syslog defintions so that we * do not need to rely on (possibly different) implementations. @@ -4651,6 +4652,7 @@ static rsRetVal InitGlobalClasses(void) CHKiRet(objClassInit()); /* *THIS* *MUST* always be the first class initilizere called! */ CHKiRet(MsgClassInit()); + CHKiRet(strmClassInit()); finalize_it: return iRet; |