summaryrefslogtreecommitdiffstats
path: root/stream.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-13 15:47:41 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-13 15:47:41 +0000
commitabdcea7d8f0eda058a6c6719cf24955878279d7c (patch)
treebab4f9dc61457f63e9a30516cf91eb8112fac1ef /stream.c
parent366060a51de60c717886636d6ef646bf1959972c (diff)
downloadrsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.tar.gz
rsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.tar.xz
rsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.zip
support for reading back persistet queue information completed
Diffstat (limited to 'stream.c')
-rw-r--r--stream.c64
1 files changed, 56 insertions, 8 deletions
diff --git a/stream.c b/stream.c
index 59067041..eb9fb960 100644
--- a/stream.c
+++ b/stream.c
@@ -70,6 +70,7 @@ static rsRetVal strmOpenFile(strm_t *pThis)
if(pThis->fd != -1)
ABORT_FINALIZE(RS_RET_OK);
+dbgprintf("strmOpenFile actual open %p, iFileNumDigits: %d\n", pThis, pThis->iFileNumDigits);
if(pThis->pszFName == NULL)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
@@ -85,12 +86,13 @@ static rsRetVal strmOpenFile(strm_t *pThis)
if(pThis->tOperationsMode == STREAMMODE_READ)
iFlags = O_RDONLY;
else
- iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND;
+ //iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND;
+ iFlags = O_WRONLY | O_CREAT;
pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
if(pThis->fd == -1) {
int ierrnoSave = errno;
- dbgprintf("Stream 0x%lx: open error %d\n", (unsigned long) pThis, errno);
+ dbgprintf("Stream 0x%lx: open error %d, file '%s'\n", (unsigned long) pThis, errno, pThis->pszCurrFName);
if(ierrnoSave == ENOENT)
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
else
@@ -185,6 +187,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
/* DEV debug only: dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */
if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */
*pC = pThis->iUngetC;
+ ++pThis->iCurrOffs; /* one more octet read */
pThis->iUngetC = -1;
ABORT_FINALIZE(RS_RET_OK);
}
@@ -220,6 +223,8 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
}
*pC = pThis->pIOBuf[pThis->iBufPtr++];
+ ++pThis->iCurrOffs; /* one more octet read */
+//dbgprintf("ReadChar: read %c, offset %d\n", *pC, pThis->iCurrOffs);
finalize_it:
return iRet;
@@ -235,6 +240,10 @@ rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
assert(pThis != NULL);
assert(pThis->iUngetC == -1);
pThis->iUngetC = c;
+ --pThis->iCurrOffs; /* one less octet read - NOTE: this can cause problems if we got a file change
+ and immediately do an unread and the file is on a buffer boundary and the stream is then persisted.
+ With the queue, this can not happen as an Unread is only done on record begin, which is never split
+ accross files. For other cases we accept the very remote risk. -- rgerhards, 2008-01-12 */
return RS_RET_OK;
}
@@ -402,6 +411,7 @@ finalize_it:
return iRet;
}
+
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
@@ -413,7 +423,7 @@ rsRetVal strmFlush(strm_t *pThis)
assert(pThis != NULL);
dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr);
- if(pThis->iBufPtr > 0) {
+ if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) {
iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
}
@@ -421,6 +431,45 @@ rsRetVal strmFlush(strm_t *pThis)
}
+/* seek a stream to a specific location. Pending writes are flushed, read data
+ * is invalidated.
+ * rgerhards, 2008-01-12
+ */
+static rsRetVal strmSeek(strm_t *pThis, off_t offs)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ if(pThis->fd == -1)
+ strmOpenFile(pThis);
+ else
+ strmFlush(pThis);
+ int i;
+ dbgprintf("Stream 0x%lx: seek file %d, pos %ld\n", (unsigned long) pThis, pThis->fd, offs);
+ i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error!
+dbgprintf("seek(%d, %ld): %d\n", pThis->fd, offs, i);
+ pThis->iCurrOffs = offs; /* we are now at *this* offset */
+ pThis->iBufPtr = 0; /* buffer invalidated */
+
+ return iRet;
+}
+
+
+/* seek to current offset. This is primarily a helper to readjust the OS file
+ * pointer after a strm object has been deserialized.
+ */
+rsRetVal strmSeekCurrOffs(strm_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ iRet = strmSeek(pThis, pThis->iCurrOffs);
+ return iRet;
+}
+
+
/* write a *single* character to a stream object -- rgerhards, 2008-01-10
*/
rsRetVal strmWriteChar(strm_t *pThis, uchar c)
@@ -514,6 +563,7 @@ rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
{
pThis->iMaxFiles = iNewVal;
pThis->iFileNumDigits = getNumberDigits(iNewVal);
+dbgprintf("strmSetiMaxFiles %p val %d, digits %d\n", pThis, iNewVal, pThis->iFileNumDigits);
return RS_RET_OK;
}
@@ -633,14 +683,12 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
-dbgprintf("strmSerialize 1\n");
+ strmFlush(pThis);
CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis));
-dbgprintf("strmSerialize 2\n");
objSerializeSCALAR(pStrm, iCurrFNum, INT);
objSerializePTR(pStrm, pszFName, PSZ);
objSerializeSCALAR(pStrm, iMaxFiles, INT);
- objSerializeSCALAR(pStrm, iFileNumDigits, INT);
objSerializeSCALAR(pStrm, bDeleteOnClose, INT);
i = pThis->sType;
@@ -662,11 +710,11 @@ dbgprintf("strmSerialize 2\n");
CHKiRet(objEndSerialize(pStrm));
finalize_it:
-dbgprintf("strmSerialize out %d\n", iRet);
return iRet;
}
+#include "stringbuf.h"
/* This function can be used as a generic way to set properties.
* rgerhards, 2008-01-11
@@ -698,7 +746,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp)
} else if(isProp("iFileNumDigits")) {
CHKiRet(strmSetiFileNumDigits(pThis, pProp->val.vInt));
} else if(isProp("bDeleteOnClose")) {
- CHKiRet(strmSetiFileNumDigits(pThis, pProp->val.vInt));
+ CHKiRet(strmSetbDeleteOnClose(pThis, pProp->val.vInt));
}
finalize_it: