summaryrefslogtreecommitdiffstats
path: root/runtime/stream.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-03-29 11:07:15 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-03-29 11:07:15 +0200
commit8b9cef552de558ecfd130451c4dea22676184d38 (patch)
treed3b40f3bfc34f32bffbc08668dddd6c4e1a724d3 /runtime/stream.c
parentde7726cbf0957384cc9261ac47d6bf65906739b5 (diff)
parentb6ce75cb6ce65a468f9551d98a641b407a4f2630 (diff)
downloadrsyslog-8b9cef552de558ecfd130451c4dea22676184d38.tar.gz
rsyslog-8b9cef552de558ecfd130451c4dea22676184d38.tar.xz
rsyslog-8b9cef552de558ecfd130451c4dea22676184d38.zip
Merge branch 'v4-stable' into v5-stable
Conflicts: ChangeLog configure.ac doc/manual.html runtime/debug.c runtime/stream.c tests/Makefile.am tests/diskqueue.sh tests/nettester.c tools/omfile.c
Diffstat (limited to 'runtime/stream.c')
-rw-r--r--runtime/stream.c147
1 files changed, 100 insertions, 47 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 81f8e89b..0415c25c 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -67,7 +67,7 @@ DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
/* forward definitions */
-static rsRetVal strmFlush(strm_t *pThis);
+static rsRetVal strmFlushInternal(strm_t *pThis);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
static void *asyncWriterThread(void *pPtr);
@@ -163,7 +163,7 @@ doSizeLimitProcessing(strm_t *pThis)
ASSERT(pThis->fd != -1);
if(pThis->iCurrOffs >= pThis->iSizeLimit) {
- /* strmClosefile() destroys the current file name, so we
+ /* strmCloseFile() destroys the current file name, so we
* need to preserve it.
*/
CHKmalloc(pszCurrFName = ustrdup(pThis->pszCurrFName));
@@ -220,7 +220,7 @@ doPhysOpen(strm_t *pThis)
char errStr[1024];
int err = errno;
rs_strerror_r(err, errStr, sizeof(errStr));
- dbgoprint((obj_t*) pThis, "open error %d, file '%s': %s\n", errno, pThis->pszCurrFName, errStr);
+ DBGOPRINT((obj_t*) pThis, "open error %d, file '%s': %s\n", errno, pThis->pszCurrFName, errStr);
if(err == ENOENT)
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
else
@@ -278,7 +278,7 @@ static rsRetVal strmOpenFile(strm_t *pThis)
pThis->iCurrOffs = offset;
}
- dbgoprint((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName,
+ DBGOPRINT((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName,
(pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", pThis->fd);
finalize_it:
@@ -296,8 +296,10 @@ strmWaitAsyncWriterDone(strm_t *pThis)
BEGINfunc
if(pThis->bAsyncWrite) {
/* awake writer thread and make it write out everything */
- pthread_cond_signal(&pThis->notEmpty);
- d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut);
+ while(pThis->iCnt > 0) {
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut);
+ }
}
ENDfunc
}
@@ -306,27 +308,33 @@ strmWaitAsyncWriterDone(strm_t *pThis)
/* close a strm file
* Note that the bDeleteOnClose flag is honored. If it is set, the file will be
* deleted after close. This is in support for the qRead thread.
+ * Note: it is valid to call this function when the physical file is closed. If so,
+ * strmCloseFile() will still check if there is any unwritten data inside buffers
+ * (this may be the case) and, if so, will open the file, write the data, and then
+ * close it again (this is done via strmFlushInternal and friends).
*/
static rsRetVal strmCloseFile(strm_t *pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
- ASSERT(pThis->fd != -1);
- dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd);
+ DBGOPRINT((obj_t*) pThis, "file %d(%s) closing\n", pThis->fd,
+ (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName);
- if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) {
- pThis->bInClose = 1;
+ if(pThis->tOperationsMode != STREAMMODE_READ) {
+ strmFlushInternal(pThis);
if(pThis->bAsyncWrite) {
- strmFlush(pThis);
- } else {
strmWaitAsyncWriterDone(pThis);
}
- pThis->bInClose = 0;
}
- close(pThis->fd);
- pThis->fd = -1;
+ /* the file may already be closed (or never have opened), so guard
+ * against this. -- rgerhards, 2010-03-19
+ */
+ if(pThis->fd != -1) {
+ close(pThis->fd);
+ pThis->fd = -1;
+ }
if(pThis->fdDir != -1) {
/* close associated directory handle, if it is open */
@@ -441,7 +449,7 @@ strmHandleEOF(strm_t *pThis)
case STREAMTYPE_FILE_CIRCULAR:
/* we have multiple files and need to switch to the next one */
/* TODO: think about emulating EOF in this case (not yet needed) */
- dbgoprint((obj_t*) pThis, "file %d EOF\n", pThis->fd);
+ DBGOPRINT((obj_t*) pThis, "file %d EOF\n", pThis->fd);
CHKiRet(strmNextFile(pThis));
break;
case STREAMTYPE_FILE_MONITOR:
@@ -473,7 +481,7 @@ strmReadBuf(strm_t *pThis)
*/
CHKiRet(strmOpenFile(pThis));
iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize);
- dbgoprint((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead);
+ DBGOPRINT((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead);
if(iLenRead == 0) {
CHKiRet(strmHandleEOF(pThis));
} else if(iLenRead < 0)
@@ -505,7 +513,7 @@ static rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
ASSERT(pThis != NULL);
ASSERT(pC != NULL);
- /* DEV debug only: dbgoprint((obj_t*) pThis, "strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */
+ /* DEV debug only: DBGOPRINT((obj_t*) pThis, "strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */
if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */
*pC = pThis->iUngetC;
++pThis->iCurrOffs; /* one more octet read */
@@ -617,11 +625,11 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
* to make sure we can write out everything with a SINGLE api call!
* We add another 128 bytes to take care of the gzip header and "all eventualities".
*/
- CHKmalloc(pThis->pZipBuf = (Bytef*) MALLOC(sizeof(uchar) * pThis->sIOBufSize + 128));
+ CHKmalloc(pThis->pZipBuf = (Bytef*) MALLOC(sizeof(uchar) * (pThis->sIOBufSize + 128)));
}
}
- /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */
+ /* if we are set to sync, we must obtain a file handle to the directory for fsync() purposes */
if(pThis->bSync && !pThis->bIsTTY) {
pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY | O_CLOEXEC | O_NOCTTY);
if(pThis->fdDir == -1) {
@@ -633,6 +641,9 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
}
+ DBGPRINTF("file stream %s params: flush interval %d, async write %d\n",
+ (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName,
+ pThis->iFlushInterval, pThis->bAsyncWrite);
/* if we have a flush interval, we need to do async writes in any case */
if(pThis->iFlushInterval != 0) {
pThis->bAsyncWrite = 1;
@@ -685,8 +696,10 @@ CODESTARTobjDestruct(strm)
/* Note: mutex will be unlocked in stopWriter! */
d_pthread_mutex_lock(&pThis->mut);
- if(pThis->tOperationsMode != STREAMMODE_READ)
- strmFlush(pThis);
+ /* strmClose() will handle read-only files as well as need to open
+ * files that have unwritten buffers. -- rgerhards, 2010-03-09
+ */
+ strmCloseFile(pThis);
if(pThis->bAsyncWrite) {
stopWriter(pThis);
@@ -705,14 +718,11 @@ CODESTARTobjDestruct(strm)
* IMPORTANT: we MUST free this only AFTER the ansyncWriter has been stopped, else
* we get random errors...
*/
- if(pThis->fd != -1)
- strmCloseFile(pThis);
-
free(pThis->pszDir);
free(pThis->pZipBuf);
free(pThis->pszCurrFName);
free(pThis->pszFName);
-
+ pThis->bStopWriter = 2; /* RG: use as flag for destruction */
ENDobjDestruct(strm)
@@ -732,7 +742,7 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis)
strmWaitAsyncWriterDone(pThis);
if(pThis->iCurrOffs >= pThis->iMaxFileSize) {
- dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n",
+ DBGOPRINT((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n",
(long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs);
CHKiRet(strmNextFile(pThis));
}
@@ -790,6 +800,7 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf)
if(iWritten < 0) {
char errStr[1024];
int err = errno;
+ iWritten = 0; /* we have written NO bytes! */
rs_strerror_r(err, errStr, sizeof(errStr));
DBGPRINTF("log file (%d) write error %d: %s\n", pThis->fd, err, errStr);
if(err == EINTR) {
@@ -811,7 +822,7 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf)
pWriteBuf += iWritten;
} while(lenBuf > 0); /* Warning: do..while()! */
- dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
+ DBGOPRINT((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
finalize_it:
*pLenBuf = iTotalWritten;
@@ -855,7 +866,8 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
DEFiRet;
ISOBJ_TYPE_assert(pThis, strm);
- while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
+ /* the -1 below is important, because we need one buffer for the main thread! */
+ while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS - 1)
d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf;
@@ -880,13 +892,22 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
ASSERT(pThis != NULL);
+ /* we need to reset the buffer pointer BEFORE calling the actual write
+ * function. Otherwise, in circular mode, the write function will
+ * potentially close the file, then close will flush and as the
+ * buffer pointer is nonzero, will re-call into this code here. In
+ * the end result, we than have a problem (and things are screwed
+ * up). So we reset the buffer pointer first, and all this can
+ * not happen. It is safe to do so, because that pointer is NOT
+ * used inside the write functions. -- rgerhads, 2010-03-10
+ */
+ pThis->iBufPtr = 0; /* we are at the begin of a new buffer */
if(pThis->bAsyncWrite) {
CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
} else {
CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
}
- pThis->iBufPtr = 0; /* we are at the begin of a new buffer */
finalize_it:
RETiRet;
@@ -911,10 +932,11 @@ asyncWriterThread(void *pPtr)
if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) {
DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
}
-#endif
+# endif
while(1) { /* loop broken inside */
d_pthread_mutex_lock(&pThis->mut);
+dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName);
while(pThis->iCnt == 0) {
if(pThis->bStopWriter) {
pthread_cond_broadcast(&pThis->isEmpty);
@@ -923,13 +945,14 @@ asyncWriterThread(void *pPtr)
}
if(bTimedOut && pThis->iBufPtr > 0) {
/* if we timed out, we need to flush pending data */
- strmFlush(pThis);
+ strmFlushInternal(pThis);
bTimedOut = 0;
continue; /* now we should have data */
}
bTimedOut = 0;
- timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */
+ timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ // TODO: check the 2000?!?
if(pThis->bDoTimedWait) {
+dbgprintf("asyncWriter thread going to timeout sleep\n");
if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) {
int err = errno;
if(err == ETIMEDOUT) {
@@ -943,13 +966,16 @@ asyncWriterThread(void *pPtr)
}
}
} else {
+dbgprintf("asyncWriter thread going to eternal sleep\n");
d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
}
+dbgprintf("asyncWriter woke up\n");
}
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
+dbgprintf("asyncWriter writes data\n");
doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
// TODO: error check????? 2009-07-06
@@ -1058,10 +1084,6 @@ finalize_it:
* add a config switch so that the user can decide the risk he is ready
* to take, but so far this is not yet implemented (not even requested ;)).
* rgerhards, 2009-06-04
- * For the time being, we take a very conservative approach and do not run this
- * method multithreaded. This is done in an effort to solve a segfault condition
- * that seems to be related to the zip code. -- rgerhards, 2009-09-22
- * TODO: make multithreaded again!
*/
static rsRetVal
doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
@@ -1120,12 +1142,14 @@ finalize_it:
* rgerhards, 2008-01-10
*/
static rsRetVal
-strmFlush(strm_t *pThis)
+strmFlushInternal(strm_t *pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
- dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr);
+ DBGOPRINT((obj_t*) pThis, "file %d(%s) flush, buflen %ld%s\n", pThis->fd,
+ (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName,
+ (long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : "");
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
@@ -1135,6 +1159,31 @@ strmFlush(strm_t *pThis)
}
+/* flush stream output buffer to persistent storage. This can be called at any time
+ * and is automatically called when the output buffer is full. This function is for
+ * use by EXTERNAL callers. Do NOT use it internally. It locks the async writer
+ * mutex if ther is need to do so.
+ * rgerhards, 2010-03-18
+ */
+static rsRetVal
+strmFlush(strm_t *pThis)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+ CHKiRet(strmFlushInternal(pThis));
+
+finalize_it:
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_unlock(&pThis->mut);
+
+ RETiRet;
+}
+
+
/* seek a stream to a specific location. Pending writes are flushed, read data
* is invalidated.
* rgerhards, 2008-01-12
@@ -1148,9 +1197,9 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs)
if(pThis->fd == -1)
strmOpenFile(pThis);
else
- strmFlush(pThis);
+ strmFlushInternal(pThis);
int i;
- dbgoprint((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs);
+ DBGOPRINT((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs);
i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error!
pThis->iCurrOffs = offs; /* we are now at *this* offset */
pThis->iBufPtr = 0; /* buffer invalidated */
@@ -1189,7 +1238,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlush(pThis));
+ CHKiRet(strmFlushInternal(pThis));
}
/* we now always have space for one character, so we simply copy it */
*(pThis->pIOBuf + pThis->iBufPtr) = c;
@@ -1233,6 +1282,11 @@ finalize_it:
* caller-provided buffer is larger than our one. So instead of optimizing a case
* which normally does not exist, we expect some degradation in its case but make us
* perform better in the regular cases. -- rgerhards, 2009-07-07
+ * Note: the pThis->iBufPtr == pThis->sIOBufSize logic below looks a bit like an
+ * on-off error. In fact, it is not, because iBufPtr always points to the next
+ * *free* byte in the buffer. So if it is sIOBufSize - 1, there actually is one
+ * free byte left. This came up during a code walkthrough and was considered
+ * worth nothing. -- rgerhards, 2010-03-10
*/
static rsRetVal
strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
@@ -1254,7 +1308,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iOffset = 0;
do {
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
}
iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
if(iWrite > lenBuf)
@@ -1269,7 +1323,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
* write it. This seems more natural than waiting (hours?) for the next message...
*/
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
}
finalize_it:
@@ -1357,8 +1411,7 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir)
if(iLenDir < 1)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
- if((pThis->pszDir = MALLOC(sizeof(uchar) * iLenDir + 1)) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ CHKmalloc(pThis->pszDir = MALLOC(sizeof(uchar) * (iLenDir + 1)));
memcpy(pThis->pszDir, pszDir, iLenDir + 1); /* always think about the \0! */
pThis->lenDir = iLenDir;
@@ -1429,7 +1482,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
- strmFlush(pThis);
+ strmFlushInternal(pThis);
CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis));
objSerializeSCALAR(pStrm, iCurrFNum, INT);