/* The serial stream class. * * A serial stream provides serial data access. In theory, serial streams * can be implemented via a number of methods (e.g. files or in-memory * streams). In practice, there currently only exist the file type (aka * "driver"). * * File begun on 2008-01-09 by RGerhards * * Copyright 2008 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * * Rsyslog is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Rsyslog is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Rsyslog. If not, see . * * A copy of the GPL can be found in the file "COPYING" in this distribution. */ #include "config.h" #include #include #include #include #include #include #include #include #include "rsyslog.h" #include "syslogd.h" #include "stringbuf.h" #include "srUtils.h" #include "obj.h" #include "stream.h" /* static data */ DEFobjStaticHelpers /* methods */ /* first, we define type-specific handlers. The provide a generic functionality, * but for this specific type of strm. The mapping to these handlers happens during * strm construction. Later on, handlers are called by pointers present in the * strm instance object. */ /* open a strm file * It is OK to call this function when the stream is already open. In that * case, it returns immediately with RS_RET_OK */ static rsRetVal strmOpenFile(strm_t *pThis) { DEFiRet; int iFlags; assert(pThis != NULL); assert(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE); if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); if(pThis->pszFilePrefix == NULL) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, pThis->pszFilePrefix, pThis->lenFilePrefix, pThis->iCurrFNum, pThis->iFileNumDigits)); /* compute which flags we need to provide to open */ if(pThis->tOperationsMode == STREAMMODE_READ) iFlags = O_RDONLY; else iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND; pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); pThis->iCurrOffs = 0; dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis, pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", iFlags, pThis->fd); finalize_it: return iRet; } /* close a strm file * Note that the bDeleteOnClose flag is honored. If it is set, the file will be * deleted after close. This is in support for the qRead thread. */ static rsRetVal strmCloseFile(strm_t *pThis) { DEFiRet; assert(pThis != NULL); assert(pThis->fd != -1); dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd); if(pThis->tOperationsMode == STREAMMODE_WRITE) strmFlush(pThis); close(pThis->fd); // TODO: error check pThis->fd = -1; if(pThis->bDeleteOnClose) { unlink((char*) pThis->pszCurrFName); // TODO: check returncode } if(pThis->pszCurrFName != NULL) { free(pThis->pszCurrFName); /* no longer needed in any case (just for open) */ pThis->pszCurrFName = NULL; } dbgprintf("exit strmCloseFile, fd: %d\n", pThis->fd); return iRet; } /* switch to next strm file * This method must only be called if we are in a multi-file mode! */ static rsRetVal strmNextFile(strm_t *pThis) { DEFiRet; dbgprintf("strmNextFile, old num %d\n", pThis->iCurrFNum); assert(pThis != NULL); assert(pThis->iMaxFiles != 0); assert(pThis->fd != -1); CHKiRet(strmCloseFile(pThis)); /* we do modulo operation to ensure we obej the iMaxFile property. This will always * result in a file number lower than iMaxFile, so it if wraps, the name is back to * 0, which results in the first file being overwritten. Not desired for queues, so * make sure their iMaxFiles is large enough. But it is well-desired for other * use cases, e.g. a circular output log file. -- rgerhards, 2008-01-10 */ pThis->iCurrFNum = (pThis->iCurrFNum + 1) % pThis->iMaxFiles; finalize_it: return iRet; } /* logically "read" a character from a file. What actually happens is that * data is taken from the buffer. Only if the buffer is full, data is read * directly from file. In that case, a read is performed blockwise. * rgerhards, 2008-01-07 * NOTE: needs to be enhanced to support sticking with a strm entry (if not * deleted). */ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) { DEFiRet; int bRun; long iLenRead; assert(pThis != NULL); assert(pC != NULL); /* DEV debug only: dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ *pC = pThis->iUngetC; pThis->iUngetC = -1; ABORT_FINALIZE(RS_RET_OK); } /* do we need to obtain a new buffer */ if(pThis->iBufPtr >= pThis->iBufPtrMax) { /* We need to try read at least twice because we may run into EOF and need to switch files. */ bRun = 1; while(bRun) { /* first check if we need to (re)open the file (we may have switched to a new one!) */ CHKiRet(strmOpenFile(pThis)); iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); dbgprintf("Stream 0x%lx: read %ld bytes from file %d\n", (unsigned long) pThis, iLenRead, pThis->fd); if(iLenRead == 0) { if(pThis->iMaxFiles == 0) ABORT_FINALIZE(RS_RET_EOF); else { /* we have multiple files and need to switch to the next one */ /* TODO: think about emulating EOF in this case (not yet needed) */ dbgprintf("Stream 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->fd); CHKiRet(strmNextFile(pThis)); } } else if(iLenRead < 0) ABORT_FINALIZE(RS_RET_IO_ERROR); else { /* good read */ pThis->iBufPtrMax = iLenRead; bRun = 0; /* exit loop */ } } /* if we reach this point, we had a good read */ pThis->iBufPtr = 0; } *pC = pThis->pIOBuf[pThis->iBufPtr++]; finalize_it: return iRet; } /* unget a single character just like ungetc(). As with that call, there is only a single * character buffering capability. * rgerhards, 2008-01-07 */ rsRetVal strmUnreadChar(strm_t *pThis, uchar c) { assert(pThis != NULL); assert(pThis->iUngetC == -1); pThis->iUngetC = c; return RS_RET_OK; } #if 0 /* we have commented out the code below because we would like to preserve it. It * is currently not needed, but may be useful if we implemented a bufferred file * class. NOTE: YOU MUST REVIEW THIS CODE BEFORE ACTIVATION. It may be pretty * outdated! -- rgerhards, 2008-01-10 */ /* read a line from a strm file. A line is terminated by LF. The LF is read, but it * is not returned in the buffer (it is discared). The caller is responsible for * destruction of the returned CStr object! * rgerhards, 2008-01-07 */ static rsRetVal strmReadLine(strm_t *pThis, rsCStrObj **ppCStr) { DEFiRet; uchar c; rsCStrObj *pCStr = NULL; assert(pThis != NULL); assert(ppCStr != NULL); if((pCStr = rsCStrConstruct()) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); /* now read the line */ CHKiRet(strmReadChar(pThis, &c)); while(c != '\n') { CHKiRet(rsCStrAppendChar(pCStr, c)); CHKiRet(strmReadChar(pThis, &c)); } CHKiRet(rsCStrFinish(pCStr)); *ppCStr = pCStr; finalize_it: if(iRet != RS_RET_OK && pCStr != NULL) rsCStrDestruct(pCStr); return iRet; } #endif /* #if 0 - saved code */ /* Standard-Constructor for the strm object */ BEGINobjConstruct(strm) pThis->iCurrFNum = 1; pThis->fd = -1; pThis->iUngetC = -1; pThis->sType = STREAMTYPE_FILE; pThis->sIOBufSize = glblGetIOBufSize(); pThis->tOpenMode = 0600; ENDobjConstruct(strm) /* ConstructionFinalizer * rgerhards, 2008-01-09 */ rsRetVal strmConstructFinalize(strm_t *pThis) { DEFiRet; assert(pThis != NULL); if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */ if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); pThis->iBufPtrMax = 0; /* results in immediate read request */ } finalize_it: return iRet; } /* destructor for the strm object */ rsRetVal strmDestruct(strm_t *pThis) { DEFiRet; assert(pThis != NULL); /* ... then free resources */ if(pThis->fd != -1) strmCloseFile(pThis); if(pThis->pszDir != NULL) free(pThis->pszDir); /* and finally delete the strm objet itself */ free(pThis); return iRet; } /* check if we need to open a new file (in output mode only). * The decision is based on file size AND record delimition state. * This method may also be called on a closed file, in which case * it immediately returns. */ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) { DEFiRet; if(pThis->fd == -1) FINALIZE; if(pThis->iCurrOffs >= pThis->iMaxFileSize) { dbgprintf("Stream 0x%lx: max file size %ld reached for %d, now %ld - starting new file\n", (unsigned long) pThis, (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); CHKiRet(strmNextFile(pThis)); } finalize_it: return iRet; } /* write memory buffer to a stream object. * To support direct writes of large objects, this method may be called * with a buffer pointing to some region other than the stream buffer itself. * However, in that case the stream buffer must be empty (strmFlush() has to * be called before), because we would otherwise mess up with the sequence * inside the stream. -- rgerhards, 2008-01-10 */ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; int iWritten; assert(pThis != NULL); assert(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); iWritten = write(pThis->fd, pBuf, lenBuf); dbgprintf("Stream 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis, iWritten, errno, strerror(errno)); /* TODO: handle error case -- rgerhards, 2008-01-07 */ /* Now indicate buffer empty again. We do this in any case, because there * is no way we could react more intelligently to an error during write. * This MUST be done BEFORE strCheckNextOutputFile(), otherwise we have an * endless loop. We reset the buffer pointer also in finalize_it - this is * necessary if we run into problems. Not resetting it would again cause an * endless loop. So it is better to loose some data (which also justifies * duplicating that code, too...) -- rgerhards, 2008-01-10 */ pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; CHKiRet(strmCheckNextOutputFile(pThis)); finalize_it: pThis->iBufPtr = 0; /* see comment above */ return iRet; } /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 */ rsRetVal strmFlush(strm_t *pThis) { DEFiRet; assert(pThis != NULL); dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); if(pThis->iBufPtr > 0) { iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); } return iRet; } /* write a *single* character to a stream object -- rgerhards, 2008-01-10 */ rsRetVal strmWriteChar(strm_t *pThis, uchar c) { DEFiRet; assert(pThis != NULL); /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); } /* we now always have space for one character, so we simply copy it */ *(pThis->pIOBuf + pThis->iBufPtr) = c; pThis->iBufPtr++; finalize_it: return iRet; } /* write an integer value (actually a long) to a stream object */ rsRetVal strmWriteLong(strm_t *pThis, long i) { DEFiRet; uchar szBuf[32]; assert(pThis != NULL); CHKiRet(srUtilItoA((char*)szBuf, sizeof(szBuf), i)); CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf))); finalize_it: return iRet; } /* write memory buffer to a stream object */ rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; size_t iPartial; assert(pThis != NULL); assert(pBuf != NULL); /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. * TODO: is it really? think about disk block sizes! */ CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); } else { /* data fits into a buffer - we just need to see if it * fits into the current buffer... */ if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) { /* nope, so we must split it */ iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial); pThis->iBufPtr += iPartial; } CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial); pThis->iBufPtr = lenBuf - iPartial; } else { /* we have space, so we simply copy over the string */ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); pThis->iBufPtr += lenBuf; } } finalize_it: return iRet; } /* property set methods */ /* simple ones first */ DEFpropSetMeth(strm, bDeleteOnClose, int) DEFpropSetMeth(strm, iMaxFileSize, int) DEFpropSetMeth(strm, iFileNumDigits, int) DEFpropSetMeth(strm, tOperationsMode, int); DEFpropSetMeth(strm, tOpenMode, mode_t); rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { pThis->iMaxFiles = iNewVal; pThis->iFileNumDigits = getNumberDigits(iNewVal); return RS_RET_OK; } /* set the stream's file prefix * The passed-in string is duplicated. So if the caller does not need * it any longer, it must free it. * rgerhards, 2008-01-09 */ rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; assert(pThis != NULL); assert(pszPrefix != NULL); if(iLenPrefix < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); /* always think about the \0! */ pThis->lenFilePrefix = iLenPrefix; finalize_it: return iRet; } /* set the stream's directory * The passed-in string is duplicated. So if the caller does not need * it any longer, it must free it. * rgerhards, 2008-01-09 */ rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) { DEFiRet; assert(pThis != NULL); assert(pszDir != NULL); if(iLenDir < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); if((pThis->pszDir = malloc(sizeof(uchar) * iLenDir + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszDir, pszDir, iLenDir + 1); /* always think about the \0! */ pThis->lenDir = iLenDir; finalize_it: return iRet; } /* support for data records * The stream class is able to write to multiple files. However, there are * situation (actually quite common), where a single data record should not * be split across files. This may be problematic if multiple stream write * calls are used to create the record. To support that, we provide the * bInRecord status variable. If it is set, no file spliting occurs. Once * it is set to 0, a check is done if a split is necessary and it then * happens. For a record-oriented caller, the proper sequence is: * * strmRecordBegin() * strmWrite...() * strmRecordEnd() * * Please note that records do not affect the writing of output buffers. They * are always written when full. The only thing affected is circular files * creation. So it is safe to write large records. * * IMPORTANT: RecordBegin() can not be nested! It is a programming error * if RecordBegin() is called while already in a record! * * rgerhards, 2008-01-10 */ rsRetVal strmRecordBegin(strm_t *pThis) { assert(pThis != NULL); assert(pThis->bInRecord == 0); pThis->bInRecord = 1; dbgprintf("strmRecordBegin set \n"); return RS_RET_OK; } rsRetVal strmRecordEnd(strm_t *pThis) { DEFiRet; assert(pThis != NULL); assert(pThis->bInRecord == 1); dbgprintf("strmRecordEnd in %d\n", iRet); pThis->bInRecord = 0; iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */ dbgprintf("strmRecordEnd out %d\n", iRet); return iRet; } /* end stream record support functions */ /* Initialize the stream class. Must be called as the very first method * before anything else is called inside this class. * rgerhards, 2008-01-09 */ BEGINObjClassInit(strm, 1) //OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); //OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); ENDObjClassInit(strm) /* * vi:set ai: */