From aa7e00d8e1a1d67fa2860623ffab75bd387faffc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 9 Jan 2008 17:25:07 +0000 Subject: changed queue class to use stream class --- obj.h | 10 +++ queue.c | 247 +++++--------------------------------------------------------- queue.h | 5 +- rsyslog.h | 2 +- stream.c | 51 +++++++++++-- stream.h | 9 +++ 6 files changed, 85 insertions(+), 239 deletions(-) diff --git a/obj.h b/obj.h index 0b97718e..0c24c279 100644 --- a/obj.h +++ b/obj.h @@ -125,6 +125,16 @@ typedef struct serialStore_s { if(pThis != NULL) \ free(pThis); \ } + +#define DEFpropSetMeth(obj, prop, dataType)\ + rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\ + { \ + pThis->prop = pVal; \ + return RS_RET_OK; \ + } +#define PROTOTYPEpropSetMeth(obj, prop, dataType)\ + rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal) + #define objSerializeSCALAR(propName, propType) \ CHKiRet(objSerializeProp(pCStr, (uchar*) #propName, PROPTYPE_##propType, (void*) &pThis->propName)); #define objSerializePTR(propName, propType) \ diff --git a/queue.c b/queue.c index b2a910fc..76dbace6 100644 --- a/queue.c +++ b/queue.c @@ -189,208 +189,20 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) /* -------------------- disk -------------------- */ -/* first, disk-queue internal utility functions */ - - -/* open a queue file */ -static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode) -{ - DEFiRet; - - assert(pThis != NULL); - assert(pFile != NULL); - - if(pThis->pszFilePrefix == NULL) - ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - - /* now open the write file */ - CHKiRet(genFileName(&pFile->pszFileName, pThis->pszSpoolDir, pThis->lenSpoolDir, - pThis->pszFilePrefix, pThis->lenFilePrefix, pFile->iCurrFileNum, (uchar*) "qf", 2)); - - pFile->fd = open((char*)pFile->pszFileName, flags, mode); // TODO: open modes! - pFile->iCurrOffs = 0; - - dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd); - -finalize_it: - return iRet; -} - - -/* close a queue file - * Note that the bDeleteOnClose flag is honored. If it is set, the file will be - * deleted after close. This is in support for the qRead thread. - */ -static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) -{ - DEFiRet; - - assert(pThis != NULL); - assert(pFile != NULL); - dbgprintf("Queue 0x%lx: closing file %d\n", (unsigned long) pThis, pFile->fd); - - close(pFile->fd); // TODO: error check - pFile->fd = -1; - - if(pFile->bDeleteOnClose) { - unlink((char*) pThis->tVars.disk.fRead.pszFileName); // TODO: check returncode - } - - if(pFile->pszFileName != NULL) { - free(pFile->pszFileName); /* no longer needed in any case (just for open) */ - pFile->pszFileName = NULL; - } - - return iRet; -} - - -/* switch to next queue file */ -static rsRetVal qDiskNextFile(queue_t *pThis, queueFileDescription_t *pFile) -{ - DEFiRet; - - assert(pThis != NULL); - assert(pFile != NULL); - CHKiRet(qDiskCloseFile(pThis, pFile)); - - /* we do modulo 1,000,000 so that the file number is always at most 6 digits. If we have a million - * or more queue files, something is awfully wrong and it is OK if we run into problems in that - * situation ;) -- rgerhards, 2008-01-09 - */ - pFile->iCurrFileNum = (pFile->iCurrFileNum + 1) % 1000000; - -finalize_it: - return iRet; -} - - -/*** buffered read functions for queue files ***/ - -/* logically "read" a character from a file. What actually happens is that - * data is taken from the buffer. Only if the buffer is full, data is read - * directly from file. In that case, a read is performed blockwise. - * rgerhards, 2008-01-07 - * NOTE: needs to be enhanced to support sticking with a queue entry (if not - * deleted). - */ -static rsRetVal qDiskReadChar(queueFileDescription_t *pFile, uchar *pC) -{ - DEFiRet; - - assert(pFile != NULL); - assert(pC != NULL); - -//dbgprintf("qDiskRead index %d, max %d\n", pFile->iBufPtr, pFile->iBufPtrMax); - if(pFile->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */ - if((pFile->pIOBuf = (uchar*) malloc(sizeof(uchar) * qFILE_IOBUF_SIZE )) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - pFile->iBufPtrMax = 0; /* results in immediate read request */ - } - - if(pFile->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ - *pC = pFile->iUngetC; - pFile->iUngetC = -1; - ABORT_FINALIZE(RS_RET_OK); - } - - /* do we need to obtain a new buffer */ - if(pFile->iBufPtr >= pFile->iBufPtrMax) { - /* read */ - pFile->iBufPtrMax = read(pFile->fd, pFile->pIOBuf, qFILE_IOBUF_SIZE); - dbgprintf("qDiskReadChar read %d bytes from file %d\n", pFile->iBufPtrMax, pFile->fd); - if(pFile->iBufPtrMax == 0) - ABORT_FINALIZE(RS_RET_EOF); - else if(pFile->iBufPtrMax < 0) - ABORT_FINALIZE(RS_RET_IO_ERROR); - /* if we reach this point, we had a good read */ - pFile->iBufPtr = 0; - } - - *pC = pFile->pIOBuf[pFile->iBufPtr++]; - -finalize_it: - return iRet; -} - - -/* unget a single character just like ungetc(). As with that call, there is only a single - * character buffering capability. - * rgerhards, 2008-01-07 - */ -static rsRetVal qDiskUnreadChar(queueFileDescription_t *pFile, uchar c) -{ - assert(pFile != NULL); - assert(pFile->iUngetC == -1); - pFile->iUngetC = c; - - return RS_RET_OK; -} - -#if 0 -/* we have commented out the code below because we would like to preserve it. It - * is currently not needed, but may be useful if we implemented a bufferred file - * class. - * rgerhards, 2008-01-07 - */ -/* read a line from a queue file. A line is terminated by LF. The LF is read, but it - * is not returned in the buffer (it is discared). The caller is responsible for - * destruction of the returned CStr object! - * rgerhards, 2008-01-07 - */ -static rsRetVal qDiskReadLine(queueFileDescription_t *pFile, rsCStrObj **ppCStr) -{ - DEFiRet; - uchar c; - rsCStrObj *pCStr = NULL; - - assert(pFile != NULL); - assert(ppCStr != NULL); - - if((pCStr = rsCStrConstruct()) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - - /* now read the line */ - CHKiRet(qDiskReadChar(pFile, &c)); - while(c != '\n') { - CHKiRet(rsCStrAppendChar(pCStr, c)); - CHKiRet(qDiskReadChar(pFile, &c)); - } - CHKiRet(rsCStrFinish(pCStr)); - *ppCStr = pCStr; - -finalize_it: - if(iRet != RS_RET_OK && pCStr != NULL) - rsCStrDestruct(pCStr); - - return iRet; -} - -#endif /* #if 0 - saved code */ - -/*** end buffered read functions for queue files ***/ - - -/* now come the disk mode queue driver functions */ - static rsRetVal qConstructDisk(queue_t *pThis) { DEFiRet; assert(pThis != NULL); - pThis->tVars.disk.fWrite.iCurrFileNum = 1; - pThis->tVars.disk.fWrite.iCurrOffs = 0; - pThis->tVars.disk.fWrite.fd = -1; - pThis->tVars.disk.fWrite.iUngetC = -1; - pThis->tVars.disk.fRead.bDeleteOnClose = 0; /* do *NOT* set this to 1! */ + CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); - pThis->tVars.disk.fRead.iCurrFileNum = 1; - pThis->tVars.disk.fRead.fd = -1; - pThis->tVars.disk.fRead.iCurrOffs = 0; - pThis->tVars.disk.fRead.iUngetC = -1; - pThis->tVars.disk.fRead.bDeleteOnClose = 1; + CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); +finalize_it: return iRet; } @@ -401,10 +213,8 @@ static rsRetVal qDestructDisk(queue_t *pThis) assert(pThis != NULL); - if(pThis->tVars.disk.fWrite.fd != -1) - qDiskCloseFile(pThis, &pThis->tVars.disk.fWrite); - if(pThis->tVars.disk.fRead.fd != -1) - qDiskCloseFile(pThis, &pThis->tVars.disk.fRead); + strmDestruct(pThis->tVars.disk.pWrite); + strmDestruct(pThis->tVars.disk.pRead); if(pThis->pszSpoolDir != NULL) free(pThis->pszSpoolDir); @@ -415,23 +225,14 @@ static rsRetVal qDestructDisk(queue_t *pThis) static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) { DEFiRet; - int iWritten; rsCStrObj *pCStr; assert(pThis != NULL); - if(pThis->tVars.disk.fWrite.fd == -1) - CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! + CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! CHKiRet((objSerialize(pUsr))(pUsr, &pCStr)); - iWritten = write(pThis->tVars.disk.fWrite.fd, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr)); - dbgprintf("Queue 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis, - iWritten, errno, strerror(errno)); - /* TODO: handle error case -- rgerhards, 2008-01-07 */ - - pThis->tVars.disk.fWrite.iCurrOffs += iWritten; - if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->iMaxFileSize) - CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fWrite)); + CHKiRet(strmWrite(pThis->tVars.disk.pWrite, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr))); finalize_it: return iRet; @@ -450,21 +251,20 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) * We need to try at least twice because we may run into EOF and need * to switch files. */ - serialStore.pUsr = &pThis->tVars.disk.fRead; - serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) qDiskReadChar; - serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) qDiskUnreadChar; + serialStore.pUsr = pThis->tVars.disk.pRead; + serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) strmReadChar; + serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) strmUnreadChar; bRun = 1; while(bRun) { /* first check if we need to (re)open the file (we may have switched to a new one!) */ - if(pThis->tVars.disk.fRead.fd == -1) - CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes! + CHKiRet(strmOpenFile(pThis->tVars.disk.pRead, O_RDONLY, 0600)); // TODO: open modes! iRet = objDeserialize((void*) &pMsg, OBJMsg, &serialStore); if(iRet == RS_RET_OK) bRun = 0; /* we are done */ else if(iRet == RS_RET_EOF) { - dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.fRead.fd); - CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fRead)); + dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.pRead->fd); + CHKiRet(strmNextFile(pThis->tVars.disk.pRead)); } else FINALIZE; } @@ -781,19 +581,8 @@ rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; - - assert(pThis != NULL); - assert(pszPrefix != NULL); - - if(iLenPrefix < 1) - ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - - if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - - memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); /* always think about the \0! */ - pThis->lenFilePrefix = iLenPrefix; - + CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix)); + CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix)); finalize_it: return iRet; } diff --git a/queue.h b/queue.h index 1cbd52b4..df4146f6 100644 --- a/queue.h +++ b/queue.h @@ -25,6 +25,7 @@ #include #include "obj.h" +#include "stream.h" /* some information about disk files used by the queue. In the long term, we may * export this settings to a separate file module - or not (if they are too @@ -100,8 +101,8 @@ typedef struct queue_s { qLinkedList_t *pLast; } linklist; struct { - queueFileDescription_t fWrite; /* current file to be written */ - queueFileDescription_t fRead; /* current file to be read */ + strm_t *pWrite; /* current file to be written */ + strm_t *pRead; /* current file to be read */ } disk; } tVars; } queue_t; diff --git a/rsyslog.h b/rsyslog.h index ff6e4eaa..0bb8371b 100644 --- a/rsyslog.h +++ b/rsyslog.h @@ -104,7 +104,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_NO_PROPLINE = -2033, /**< line is not a property line */ RS_RET_INVALID_TRAILER = -2034, /**< invalid trailer */ RS_RET_VALUE_TOO_LOW = -2035, /**< a provided value is too low */ - RS_RET_FILE_PREFIX_MISSING = -2035, /**< a required file prefix (parameter?) is missing */ + RS_RET_FILE_PREFIX_MISSING = -2036, /**< a required file prefix (parameter?) is missing */ RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */ RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */ RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */ diff --git a/stream.c b/stream.c index 7f99ab9a..caf78bec 100644 --- a/stream.c +++ b/stream.c @@ -56,13 +56,19 @@ DEFobjStaticHelpers * strm instance object. */ -/* open a strm file */ -static rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode) +/* open a strm file + * It is OK to call this function when the stream is already open. In that + * case, it returns immediately with RS_RET_OK + */ +rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode) { DEFiRet; assert(pThis != NULL); + if(pThis->fd != -1) + ABORT_FINALIZE(RS_RET_OK); + if(pThis->pszFilePrefix == NULL) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); @@ -107,7 +113,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) /* switch to next strm file */ -static rsRetVal strmNextFile(strm_t *pThis) +rsRetVal strmNextFile(strm_t *pThis) { DEFiRet; @@ -134,14 +140,14 @@ finalize_it: * NOTE: needs to be enhanced to support sticking with a strm entry (if not * deleted). */ -static rsRetVal strmReadChar(strm_t *pThis, uchar *pC) +rsRetVal strmReadChar(strm_t *pThis, uchar *pC) { DEFiRet; assert(pThis != NULL); assert(pC != NULL); -//dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); + /* DEV debug only: dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ if(pThis->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */ if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * STRM_IOBUF_SIZE )) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -178,7 +184,7 @@ finalize_it: * character buffering capability. * rgerhards, 2008-01-07 */ -static rsRetVal strmUnreadChar(strm_t *pThis, uchar c) +rsRetVal strmUnreadChar(strm_t *pThis, uchar c) { assert(pThis != NULL); assert(pThis->iUngetC == -1); @@ -240,6 +246,7 @@ rsRetVal strmConstruct(strm_t **ppThis) { DEFiRet; strm_t *pThis; +dbgprintf("strmConstruct\n"); assert(ppThis != NULL); @@ -291,11 +298,12 @@ rsRetVal strmDestruct(strm_t *pThis) /* write memory buffer to a stream object */ -static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; int iWritten; +dbgprintf("strmWrite()\n"); assert(pThis != NULL); assert(pBuf != NULL); @@ -316,6 +324,35 @@ finalize_it: } +/* property set methods */ +/* simple ones first */ +DEFpropSetMeth(strm, bDeleteOnClose, int) + +/* set the stream's file prefix + * The passed-in string is duplicated. So if the caller does not need + * it any longer, it must free it. + * rgerhards, 2008-01-09 + */ +rsRetVal +strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix) +{ + DEFiRet; + + assert(pThis != NULL); + assert(pszPrefix != NULL); + + if(iLenPrefix < 1) + ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); + + if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); /* always think about the \0! */ + pThis->lenFilePrefix = iLenPrefix; + +finalize_it: + return iRet; +} /* Initialize the stream class. Must be called as the very first method diff --git a/stream.h b/stream.h index 5c9451cd..a8ace8a6 100644 --- a/stream.h +++ b/stream.h @@ -75,9 +75,18 @@ typedef struct { #define STRM_IOBUF_SIZE 4096 /* size of the IO buffer */ /* prototypes */ +rsRetVal strmConstruct(strm_t **ppThis); +rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis); rsRetVal strmDestruct(strm_t *pThis); rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize); rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); +rsRetVal strmReadChar(strm_t *pThis, uchar *pC); +rsRetVal strmUnreadChar(strm_t *pThis, uchar c); +rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +rsRetVal strmNextFile(strm_t *pThis); +rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode); +rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); PROTOTYPEObjClassInit(strm); +PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int); #endif /* #ifndef STREAM_H_INCLUDED */ -- cgit