summaryrefslogtreecommitdiffstats
path: root/stream.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-02-13 09:39:21 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-02-13 09:39:21 +0000
commit18bea60667f4ced5ac65e91fbd7c7a00ba22b319 (patch)
tree01513cac707247724ed93ffa9594cb3e81c77f6c /stream.c
parent7e91b2298d3e3f0cced47fa32697b2b095638489 (diff)
downloadrsyslog-18bea60667f4ced5ac65e91fbd7c7a00ba22b319.tar.gz
rsyslog-18bea60667f4ced5ac65e91fbd7c7a00ba22b319.tar.xz
rsyslog-18bea60667f4ced5ac65e91fbd7c7a00ba22b319.zip
added ability to monitor file accross rotation
Diffstat (limited to 'stream.c')
-rw-r--r--stream.c235
1 files changed, 161 insertions, 74 deletions
diff --git a/stream.c b/stream.c
index 94080358..97b679d2 100644
--- a/stream.c
+++ b/stream.c
@@ -65,8 +65,8 @@ static rsRetVal strmOpenFile(strm_t *pThis)
DEFiRet;
int iFlags;
- assert(pThis != NULL);
- assert(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE);
+ ASSERT(pThis != NULL);
+ ASSERT(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE);
if(pThis->fd != -1)
ABORT_FINALIZE(RS_RET_OK);
@@ -93,7 +93,7 @@ static rsRetVal strmOpenFile(strm_t *pThis)
pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
if(pThis->fd == -1) {
int ierrnoSave = errno;
- dbgprintf("Stream 0x%lx: open error %d, file '%s'\n", (unsigned long) pThis, errno, pThis->pszCurrFName);
+ dbgoprint((obj_t*) pThis, "open error %d, file '%s'\n", errno, pThis->pszCurrFName);
if(ierrnoSave == ENOENT)
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
else
@@ -102,9 +102,8 @@ static rsRetVal strmOpenFile(strm_t *pThis)
pThis->iCurrOffs = 0;
- dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis,
- pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE",
- iFlags, pThis->fd);
+ dbgoprint((obj_t*) pThis, "opened file '%s' for %s (0x%x) as %d\n", pThis->pszCurrFName,
+ (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", iFlags, pThis->fd);
finalize_it:
RETiRet;
@@ -119,9 +118,9 @@ static rsRetVal strmCloseFile(strm_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
- assert(pThis->fd != -1);
- dbgprintf("Stream 0x%lx: file %d closing\n", (unsigned long) pThis, pThis->fd);
+ ASSERT(pThis != NULL);
+ ASSERT(pThis->fd != -1);
+ dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd);
if(pThis->tOperationsMode == STREAMMODE_WRITE)
strmFlush(pThis);
@@ -150,13 +149,13 @@ strmNextFile(strm_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
- assert(pThis->iMaxFiles != 0);
- assert(pThis->fd != -1);
+ ASSERT(pThis != NULL);
+ ASSERT(pThis->iMaxFiles != 0);
+ ASSERT(pThis->fd != -1);
CHKiRet(strmCloseFile(pThis));
- /* we do modulo operation to ensure we obej the iMaxFile property. This will always
+ /* we do modulo operation to ensure we obey the iMaxFile property. This will always
* result in a file number lower than iMaxFile, so it if wraps, the name is back to
* 0, which results in the first file being overwritten. Not desired for queues, so
* make sure their iMaxFiles is large enough. But it is well-desired for other
@@ -169,6 +168,122 @@ finalize_it:
}
+/* handle the eof case for monitored files.
+ * If we are monitoring a file, someone may have rotated it. In this case, we
+ * also need to close it and reopen it under the same name.
+ * rgerhards, 2008-02-13
+ */
+static rsRetVal
+strmHandleEOFMonitor(strm_t *pThis)
+{
+ DEFiRet;
+ struct stat statOpen;
+ struct stat statName;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+ /* find inodes of both current descriptor as well as file now in file
+ * system. If they are different, the file has been rotated (or
+ * otherwise rewritten). We also check the size, because the inode
+ * does not change if the file is truncated (this, BTW, is also a case
+ * where we actually loose log lines, because we can not do anything
+ * against truncation...). We do NOT rely on the time of last
+ * modificaton because that may not be available under all
+ * circumstances. -- rgerhards, 2008-02-13
+ */
+ if(fstat(pThis->fd, &statOpen) == -1)
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ if(stat((char*) pThis->pszCurrFName, &statName) == -1)
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+dbgoprint((obj_t*)pThis, "curr ino %d, new ino %d, curr offset %lld, new size %ld\n", statOpen.st_ino, statName.st_ino, pThis->iCurrOffs, statName.st_size);
+ if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) {
+RUNLOG_STR("EOF");
+ ABORT_FINALIZE(RS_RET_EOF);
+ } else {
+RUNLOG_STR("file change");
+ /* we had a file change! */
+ CHKiRet(strmCloseFile(pThis));
+ CHKiRet(strmOpenFile(pThis));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* handle the EOF case of a stream
+ * The EOF case is somewhat complicated, as the proper action depends on the
+ * mode the stream is in. If there are multiple files (circular logs, most
+ * important use case is queue files!), we need to close the current file and
+ * try to open the next one.
+ * rgerhards, 2008-02-13
+ */
+static rsRetVal
+strmHandleEOF(strm_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+ switch(pThis->sType) {
+ case STREAMTYPE_FILE_SINGLE:
+ ABORT_FINALIZE(RS_RET_EOF);
+ break;
+ case STREAMTYPE_FILE_CIRCULAR:
+ /* we have multiple files and need to switch to the next one */
+ /* TODO: think about emulating EOF in this case (not yet needed) */
+#if 0
+ if(pThis->iMaxFiles == 0) /* TODO: why do we need this? ;) */
+ ABORT_FINALIZE(RS_RET_EOF);
+#endif
+ dbgoprint((obj_t*) pThis, "file %d EOF\n", pThis->fd);
+ CHKiRet(strmNextFile(pThis));
+ break;
+ case STREAMTYPE_FILE_MONITOR:
+ CHKiRet(strmHandleEOFMonitor(pThis));
+ break;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/* read the next buffer from disk
+ * rgerhards, 2008-02-13
+ */
+static rsRetVal
+strmReadBuf(strm_t *pThis)
+{
+ DEFiRet;
+ int bRun;
+ long iLenRead;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+ /* We need to try read at least twice because we may run into EOF and need to switch files. */
+ bRun = 1;
+ while(bRun) {
+ /* first check if we need to (re)open the file. We may have switched to a new one in
+ * circular mode or it may have been rewritten (rotated) if we monitor a file
+ * rgerhards, 2008-02-13
+ */
+ CHKiRet(strmOpenFile(pThis));
+ iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize);
+ dbgoprint((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead);
+ if(iLenRead == 0) {
+ CHKiRet(strmHandleEOF(pThis));
+ } else if(iLenRead < 0)
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ else { /* good read */
+ pThis->iBufPtrMax = iLenRead;
+ bRun = 0; /* exit loop */
+ }
+ }
+ /* if we reach this point, we had a good read */
+ pThis->iBufPtr = 0;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* logically "read" a character from a file. What actually happens is that
* data is taken from the buffer. Only if the buffer is full, data is read
* directly from file. In that case, a read is performed blockwise.
@@ -179,13 +294,11 @@ finalize_it:
rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
{
DEFiRet;
- int bRun;
- long iLenRead;
- assert(pThis != NULL);
- assert(pC != NULL);
+ ASSERT(pThis != NULL);
+ ASSERT(pC != NULL);
- /* DEV debug only: dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */
+ /* DEV debug only: dbgoprint((obj_t*) pThis, "strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */
if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */
*pC = pThis->iUngetC;
++pThis->iCurrOffs; /* one more octet read */
@@ -193,39 +306,15 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
ABORT_FINALIZE(RS_RET_OK);
}
- /* do we need to obtain a new buffer */
+ /* do we need to obtain a new buffer? */
if(pThis->iBufPtr >= pThis->iBufPtrMax) {
- /* We need to try read at least twice because we may run into EOF and need to switch files. */
- bRun = 1;
- while(bRun) {
- /* first check if we need to (re)open the file (we may have switched to a new one!) */
- CHKiRet(strmOpenFile(pThis));
- iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize);
- dbgprintf("Stream 0x%lx: file %d read %ld bytes\n", (unsigned long) pThis,
- pThis->fd, iLenRead);
- if(iLenRead == 0) {
- if(pThis->iMaxFiles == 0)
- ABORT_FINALIZE(RS_RET_EOF);
- else {
- /* we have multiple files and need to switch to the next one */
- /* TODO: think about emulating EOF in this case (not yet needed) */
- dbgprintf("Stream 0x%lx: file %d EOF\n", (unsigned long) pThis, pThis->fd);
- CHKiRet(strmNextFile(pThis));
- }
- } else if(iLenRead < 0)
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- else { /* good read */
- pThis->iBufPtrMax = iLenRead;
- bRun = 0; /* exit loop */
- }
- }
- /* if we reach this point, we had a good read */
- pThis->iBufPtr = 0;
+ CHKiRet(strmReadBuf(pThis));
}
+ /* if we reach this point, we have data available in the buffer */
+
*pC = pThis->pIOBuf[pThis->iBufPtr++];
++pThis->iCurrOffs; /* one more octet read */
-//dbgprintf("ReadChar: read %c, offset %d\n", *pC, pThis->iCurrOffs);
finalize_it:
RETiRet;
@@ -238,8 +327,8 @@ finalize_it:
*/
rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
{
- assert(pThis != NULL);
- assert(pThis->iUngetC == -1);
+ ASSERT(pThis != NULL);
+ ASSERT(pThis->iUngetC == -1);
pThis->iUngetC = c;
--pThis->iCurrOffs; /* one less octet read - NOTE: this can cause problems if we got a file change
and immediately do an unread and the file is on a buffer boundary and the stream is then persisted.
@@ -262,8 +351,8 @@ strmReadLine(strm_t *pThis, rsCStrObj **ppCStr)
uchar c;
rsCStrObj *pCStr = NULL;
- assert(pThis != NULL);
- assert(ppCStr != NULL);
+ ASSERT(pThis != NULL);
+ ASSERT(ppCStr != NULL);
if((pCStr = rsCStrConstruct()) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -304,7 +393,7 @@ rsRetVal strmConstructFinalize(strm_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */
if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL)
@@ -345,8 +434,8 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis)
FINALIZE;
if(pThis->iCurrOffs >= pThis->iMaxFileSize) {
- dbgprintf("Stream 0x%lx: max file size %ld reached for %d, now %ld - starting new file\n",
- (unsigned long) pThis, (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs);
+ dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n",
+ (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs);
CHKiRet(strmNextFile(pThis));
}
@@ -366,15 +455,14 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
DEFiRet;
int iWritten;
- assert(pThis != NULL);
- assert(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
+ ASSERT(pThis != NULL);
+ ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
if(pThis->fd == -1)
CHKiRet(strmOpenFile(pThis));
iWritten = write(pThis->fd, pBuf, lenBuf);
- dbgprintf("Stream 0x%lx: file %d write wrote %d bytes, errno: %d\n", (unsigned long) pThis,
- pThis->fd, iWritten, errno);
+ dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes, errno: %d\n", pThis->fd, iWritten, errno);
/* TODO: handle error case -- rgerhards, 2008-01-07 */
/* Now indicate buffer empty again. We do this in any case, because there
@@ -406,8 +494,8 @@ rsRetVal strmFlush(strm_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
- dbgprintf("Stream 0x%lx: file %d flush, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr);
+ ASSERT(pThis != NULL);
+ dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, pThis->iBufPtr);
if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) {
iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
@@ -432,9 +520,8 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs)
else
strmFlush(pThis);
int i;
- dbgprintf("Stream 0x%lx: file %d seek, pos %ld\n", (unsigned long) pThis, pThis->fd, offs);
+ dbgoprint((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, offs);
i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error!
-dbgprintf("seek(%d, %ld): %d\n", pThis->fd, offs, i);
pThis->iCurrOffs = offs; /* we are now at *this* offset */
pThis->iBufPtr = 0; /* buffer invalidated */
@@ -462,7 +549,7 @@ rsRetVal strmWriteChar(strm_t *pThis, uchar c)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
@@ -483,7 +570,7 @@ rsRetVal strmWriteLong(strm_t *pThis, long i)
DEFiRet;
uchar szBuf[32];
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
CHKiRet(srUtilItoA((char*)szBuf, sizeof(szBuf), i));
CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf)));
@@ -500,8 +587,8 @@ rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
DEFiRet;
size_t iPartial;
- assert(pThis != NULL);
- assert(pBuf != NULL);
+ ASSERT(pThis != NULL);
+ ASSERT(pBuf != NULL);
/* check if the to-be-written data is larger than our buffer size */
if(lenBuf >= pThis->sIOBufSize) {
@@ -576,8 +663,8 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName)
{
DEFiRet;
- assert(pThis != NULL);
- assert(pszName != NULL);
+ ASSERT(pThis != NULL);
+ ASSERT(pszName != NULL);
if(iLenName < 1)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
@@ -603,8 +690,8 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir)
{
DEFiRet;
- assert(pThis != NULL);
- assert(pszDir != NULL);
+ ASSERT(pThis != NULL);
+ ASSERT(pszDir != NULL);
if(iLenDir < 1)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
@@ -644,8 +731,8 @@ finalize_it:
*/
rsRetVal strmRecordBegin(strm_t *pThis)
{
- assert(pThis != NULL);
- assert(pThis->bInRecord == 0);
+ ASSERT(pThis != NULL);
+ ASSERT(pThis->bInRecord == 0);
pThis->bInRecord = 1;
return RS_RET_OK;
}
@@ -653,8 +740,8 @@ rsRetVal strmRecordBegin(strm_t *pThis)
rsRetVal strmRecordEnd(strm_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
- assert(pThis->bInRecord == 1);
+ ASSERT(pThis != NULL);
+ ASSERT(pThis->bInRecord == 1);
pThis->bInRecord = 0;
iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */
@@ -719,7 +806,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp)
DEFiRet;
ISOBJ_TYPE_assert(pThis, strm);
- assert(pProp != NULL);
+ ASSERT(pProp != NULL);
if(isProp("sType")) {
CHKiRet(strmSetsType(pThis, (strmType_t) pProp->val.vInt));