diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-02-13 09:39:21 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-02-13 09:39:21 +0000 |
commit | 18bea60667f4ced5ac65e91fbd7c7a00ba22b319 (patch) | |
tree | 01513cac707247724ed93ffa9594cb3e81c77f6c /stream.c | |
parent | 7e91b2298d3e3f0cced47fa32697b2b095638489 (diff) | |
download | rsyslog-18bea60667f4ced5ac65e91fbd7c7a00ba22b319.tar.gz rsyslog-18bea60667f4ced5ac65e91fbd7c7a00ba22b319.tar.xz rsyslog-18bea60667f4ced5ac65e91fbd7c7a00ba22b319.zip |
added ability to monitor file accross rotation
Diffstat (limited to 'stream.c')
-rw-r--r-- | stream.c | 235 |
1 files changed, 161 insertions, 74 deletions
@@ -65,8 +65,8 @@ static rsRetVal strmOpenFile(strm_t *pThis) DEFiRet; int iFlags; - assert(pThis != NULL); - assert(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE); + ASSERT(pThis != NULL); + ASSERT(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE); if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); @@ -93,7 +93,7 @@ static rsRetVal strmOpenFile(strm_t *pThis) pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); if(pThis->fd == -1) { int ierrnoSave = errno; - dbgprintf("Stream 0x%lx: open error %d, file '%s'\n", (unsigned long) pThis, errno, pThis->pszCurrFName); + dbgoprint((obj_t*) pThis, "open error %d, file '%s'\n", errno, pThis->pszCurrFName); if(ierrnoSave == ENOENT) ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); else @@ -102,9 +102,8 @@ static rsRetVal strmOpenFile(strm_t *pThis) pThis->iCurrOffs = 0; - dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis, - pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", - iFlags, pThis->fd); + dbgoprint((obj_t*) pThis, "opened file '%s' for %s (0x%x) as %d\n", pThis->pszCurrFName, + (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", iFlags, pThis->fd); finalize_it: RETiRet; @@ -119,9 +118,9 @@ static rsRetVal strmCloseFile(strm_t *pThis) { DEFiRet; - assert(pThis != NULL); - assert(pThis->fd != -1); - dbgprintf("Stream 0x%lx: file %d closing\n", (unsigned long) pThis, pThis->fd); + ASSERT(pThis != NULL); + ASSERT(pThis->fd != -1); + dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); if(pThis->tOperationsMode == STREAMMODE_WRITE) strmFlush(pThis); @@ -150,13 +149,13 @@ strmNextFile(strm_t *pThis) { DEFiRet; - assert(pThis != NULL); - assert(pThis->iMaxFiles != 0); - assert(pThis->fd != -1); + ASSERT(pThis != NULL); + ASSERT(pThis->iMaxFiles != 0); + ASSERT(pThis->fd != -1); CHKiRet(strmCloseFile(pThis)); - /* we do modulo operation to ensure we obej the iMaxFile property. This will always + /* we do modulo operation to ensure we obey the iMaxFile property. This will always * result in a file number lower than iMaxFile, so it if wraps, the name is back to * 0, which results in the first file being overwritten. Not desired for queues, so * make sure their iMaxFiles is large enough. But it is well-desired for other @@ -169,6 +168,122 @@ finalize_it: } +/* handle the eof case for monitored files. + * If we are monitoring a file, someone may have rotated it. In this case, we + * also need to close it and reopen it under the same name. + * rgerhards, 2008-02-13 + */ +static rsRetVal +strmHandleEOFMonitor(strm_t *pThis) +{ + DEFiRet; + struct stat statOpen; + struct stat statName; + + ISOBJ_TYPE_assert(pThis, strm); + /* find inodes of both current descriptor as well as file now in file + * system. If they are different, the file has been rotated (or + * otherwise rewritten). We also check the size, because the inode + * does not change if the file is truncated (this, BTW, is also a case + * where we actually loose log lines, because we can not do anything + * against truncation...). We do NOT rely on the time of last + * modificaton because that may not be available under all + * circumstances. -- rgerhards, 2008-02-13 + */ + if(fstat(pThis->fd, &statOpen) == -1) + ABORT_FINALIZE(RS_RET_IO_ERROR); + if(stat((char*) pThis->pszCurrFName, &statName) == -1) + ABORT_FINALIZE(RS_RET_IO_ERROR); +dbgoprint((obj_t*)pThis, "curr ino %d, new ino %d, curr offset %lld, new size %ld\n", statOpen.st_ino, statName.st_ino, pThis->iCurrOffs, statName.st_size); + if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) { +RUNLOG_STR("EOF"); + ABORT_FINALIZE(RS_RET_EOF); + } else { +RUNLOG_STR("file change"); + /* we had a file change! */ + CHKiRet(strmCloseFile(pThis)); + CHKiRet(strmOpenFile(pThis)); + } + +finalize_it: + RETiRet; +} + + +/* handle the EOF case of a stream + * The EOF case is somewhat complicated, as the proper action depends on the + * mode the stream is in. If there are multiple files (circular logs, most + * important use case is queue files!), we need to close the current file and + * try to open the next one. + * rgerhards, 2008-02-13 + */ +static rsRetVal +strmHandleEOF(strm_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + switch(pThis->sType) { + case STREAMTYPE_FILE_SINGLE: + ABORT_FINALIZE(RS_RET_EOF); + break; + case STREAMTYPE_FILE_CIRCULAR: + /* we have multiple files and need to switch to the next one */ + /* TODO: think about emulating EOF in this case (not yet needed) */ +#if 0 + if(pThis->iMaxFiles == 0) /* TODO: why do we need this? ;) */ + ABORT_FINALIZE(RS_RET_EOF); +#endif + dbgoprint((obj_t*) pThis, "file %d EOF\n", pThis->fd); + CHKiRet(strmNextFile(pThis)); + break; + case STREAMTYPE_FILE_MONITOR: + CHKiRet(strmHandleEOFMonitor(pThis)); + break; + } + +finalize_it: + RETiRet; +} + +/* read the next buffer from disk + * rgerhards, 2008-02-13 + */ +static rsRetVal +strmReadBuf(strm_t *pThis) +{ + DEFiRet; + int bRun; + long iLenRead; + + ISOBJ_TYPE_assert(pThis, strm); + /* We need to try read at least twice because we may run into EOF and need to switch files. */ + bRun = 1; + while(bRun) { + /* first check if we need to (re)open the file. We may have switched to a new one in + * circular mode or it may have been rewritten (rotated) if we monitor a file + * rgerhards, 2008-02-13 + */ + CHKiRet(strmOpenFile(pThis)); + iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); + dbgoprint((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead); + if(iLenRead == 0) { + CHKiRet(strmHandleEOF(pThis)); + } else if(iLenRead < 0) + ABORT_FINALIZE(RS_RET_IO_ERROR); + else { /* good read */ + pThis->iBufPtrMax = iLenRead; + bRun = 0; /* exit loop */ + } + } + /* if we reach this point, we had a good read */ + pThis->iBufPtr = 0; + +finalize_it: + RETiRet; +} + + /* logically "read" a character from a file. What actually happens is that * data is taken from the buffer. Only if the buffer is full, data is read * directly from file. In that case, a read is performed blockwise. @@ -179,13 +294,11 @@ finalize_it: rsRetVal strmReadChar(strm_t *pThis, uchar *pC) { DEFiRet; - int bRun; - long iLenRead; - assert(pThis != NULL); - assert(pC != NULL); + ASSERT(pThis != NULL); + ASSERT(pC != NULL); - /* DEV debug only: dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ + /* DEV debug only: dbgoprint((obj_t*) pThis, "strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ *pC = pThis->iUngetC; ++pThis->iCurrOffs; /* one more octet read */ @@ -193,39 +306,15 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) ABORT_FINALIZE(RS_RET_OK); } - /* do we need to obtain a new buffer */ + /* do we need to obtain a new buffer? */ if(pThis->iBufPtr >= pThis->iBufPtrMax) { - /* We need to try read at least twice because we may run into EOF and need to switch files. */ - bRun = 1; - while(bRun) { - /* first check if we need to (re)open the file (we may have switched to a new one!) */ - CHKiRet(strmOpenFile(pThis)); - iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); - dbgprintf("Stream 0x%lx: file %d read %ld bytes\n", (unsigned long) pThis, - pThis->fd, iLenRead); - if(iLenRead == 0) { - if(pThis->iMaxFiles == 0) - ABORT_FINALIZE(RS_RET_EOF); - else { - /* we have multiple files and need to switch to the next one */ - /* TODO: think about emulating EOF in this case (not yet needed) */ - dbgprintf("Stream 0x%lx: file %d EOF\n", (unsigned long) pThis, pThis->fd); - CHKiRet(strmNextFile(pThis)); - } - } else if(iLenRead < 0) - ABORT_FINALIZE(RS_RET_IO_ERROR); - else { /* good read */ - pThis->iBufPtrMax = iLenRead; - bRun = 0; /* exit loop */ - } - } - /* if we reach this point, we had a good read */ - pThis->iBufPtr = 0; + CHKiRet(strmReadBuf(pThis)); } + /* if we reach this point, we have data available in the buffer */ + *pC = pThis->pIOBuf[pThis->iBufPtr++]; ++pThis->iCurrOffs; /* one more octet read */ -//dbgprintf("ReadChar: read %c, offset %d\n", *pC, pThis->iCurrOffs); finalize_it: RETiRet; @@ -238,8 +327,8 @@ finalize_it: */ rsRetVal strmUnreadChar(strm_t *pThis, uchar c) { - assert(pThis != NULL); - assert(pThis->iUngetC == -1); + ASSERT(pThis != NULL); + ASSERT(pThis->iUngetC == -1); pThis->iUngetC = c; --pThis->iCurrOffs; /* one less octet read - NOTE: this can cause problems if we got a file change and immediately do an unread and the file is on a buffer boundary and the stream is then persisted. @@ -262,8 +351,8 @@ strmReadLine(strm_t *pThis, rsCStrObj **ppCStr) uchar c; rsCStrObj *pCStr = NULL; - assert(pThis != NULL); - assert(ppCStr != NULL); + ASSERT(pThis != NULL); + ASSERT(ppCStr != NULL); if((pCStr = rsCStrConstruct()) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -304,7 +393,7 @@ rsRetVal strmConstructFinalize(strm_t *pThis) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */ if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL) @@ -345,8 +434,8 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) FINALIZE; if(pThis->iCurrOffs >= pThis->iMaxFileSize) { - dbgprintf("Stream 0x%lx: max file size %ld reached for %d, now %ld - starting new file\n", - (unsigned long) pThis, (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); + dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", + (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); CHKiRet(strmNextFile(pThis)); } @@ -366,15 +455,14 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; int iWritten; - assert(pThis != NULL); - assert(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); iWritten = write(pThis->fd, pBuf, lenBuf); - dbgprintf("Stream 0x%lx: file %d write wrote %d bytes, errno: %d\n", (unsigned long) pThis, - pThis->fd, iWritten, errno); + dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes, errno: %d\n", pThis->fd, iWritten, errno); /* TODO: handle error case -- rgerhards, 2008-01-07 */ /* Now indicate buffer empty again. We do this in any case, because there @@ -406,8 +494,8 @@ rsRetVal strmFlush(strm_t *pThis) { DEFiRet; - assert(pThis != NULL); - dbgprintf("Stream 0x%lx: file %d flush, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); + ASSERT(pThis != NULL); + dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, pThis->iBufPtr); if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) { iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); @@ -432,9 +520,8 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs) else strmFlush(pThis); int i; - dbgprintf("Stream 0x%lx: file %d seek, pos %ld\n", (unsigned long) pThis, pThis->fd, offs); + dbgoprint((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, offs); i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error! -dbgprintf("seek(%d, %ld): %d\n", pThis->fd, offs, i); pThis->iCurrOffs = offs; /* we are now at *this* offset */ pThis->iBufPtr = 0; /* buffer invalidated */ @@ -462,7 +549,7 @@ rsRetVal strmWriteChar(strm_t *pThis, uchar c) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { @@ -483,7 +570,7 @@ rsRetVal strmWriteLong(strm_t *pThis, long i) DEFiRet; uchar szBuf[32]; - assert(pThis != NULL); + ASSERT(pThis != NULL); CHKiRet(srUtilItoA((char*)szBuf, sizeof(szBuf), i)); CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf))); @@ -500,8 +587,8 @@ rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; size_t iPartial; - assert(pThis != NULL); - assert(pBuf != NULL); + ASSERT(pThis != NULL); + ASSERT(pBuf != NULL); /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { @@ -576,8 +663,8 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) { DEFiRet; - assert(pThis != NULL); - assert(pszName != NULL); + ASSERT(pThis != NULL); + ASSERT(pszName != NULL); if(iLenName < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); @@ -603,8 +690,8 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) { DEFiRet; - assert(pThis != NULL); - assert(pszDir != NULL); + ASSERT(pThis != NULL); + ASSERT(pszDir != NULL); if(iLenDir < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); @@ -644,8 +731,8 @@ finalize_it: */ rsRetVal strmRecordBegin(strm_t *pThis) { - assert(pThis != NULL); - assert(pThis->bInRecord == 0); + ASSERT(pThis != NULL); + ASSERT(pThis->bInRecord == 0); pThis->bInRecord = 1; return RS_RET_OK; } @@ -653,8 +740,8 @@ rsRetVal strmRecordBegin(strm_t *pThis) rsRetVal strmRecordEnd(strm_t *pThis) { DEFiRet; - assert(pThis != NULL); - assert(pThis->bInRecord == 1); + ASSERT(pThis != NULL); + ASSERT(pThis->bInRecord == 1); pThis->bInRecord = 0; iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */ @@ -719,7 +806,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); - assert(pProp != NULL); + ASSERT(pProp != NULL); if(isProp("sType")) { CHKiRet(strmSetsType(pThis, (strmType_t) pProp->val.vInt)); |