summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--obj.h10
-rw-r--r--queue.c247
-rw-r--r--queue.h5
-rw-r--r--rsyslog.h2
-rw-r--r--stream.c51
-rw-r--r--stream.h9
6 files changed, 85 insertions, 239 deletions
diff --git a/obj.h b/obj.h
index 0b97718e..0c24c279 100644
--- a/obj.h
+++ b/obj.h
@@ -125,6 +125,16 @@ typedef struct serialStore_s {
if(pThis != NULL) \
free(pThis); \
}
+
+#define DEFpropSetMeth(obj, prop, dataType)\
+ rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\
+ { \
+ pThis->prop = pVal; \
+ return RS_RET_OK; \
+ }
+#define PROTOTYPEpropSetMeth(obj, prop, dataType)\
+ rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)
+
#define objSerializeSCALAR(propName, propType) \
CHKiRet(objSerializeProp(pCStr, (uchar*) #propName, PROPTYPE_##propType, (void*) &pThis->propName));
#define objSerializePTR(propName, propType) \
diff --git a/queue.c b/queue.c
index b2a910fc..76dbace6 100644
--- a/queue.c
+++ b/queue.c
@@ -189,208 +189,20 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
/* -------------------- disk -------------------- */
-/* first, disk-queue internal utility functions */
-
-
-/* open a queue file */
-static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode)
-{
- DEFiRet;
-
- assert(pThis != NULL);
- assert(pFile != NULL);
-
- if(pThis->pszFilePrefix == NULL)
- ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
-
- /* now open the write file */
- CHKiRet(genFileName(&pFile->pszFileName, pThis->pszSpoolDir, pThis->lenSpoolDir,
- pThis->pszFilePrefix, pThis->lenFilePrefix, pFile->iCurrFileNum, (uchar*) "qf", 2));
-
- pFile->fd = open((char*)pFile->pszFileName, flags, mode); // TODO: open modes!
- pFile->iCurrOffs = 0;
-
- dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd);
-
-finalize_it:
- return iRet;
-}
-
-
-/* close a queue file
- * Note that the bDeleteOnClose flag is honored. If it is set, the file will be
- * deleted after close. This is in support for the qRead thread.
- */
-static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
-{
- DEFiRet;
-
- assert(pThis != NULL);
- assert(pFile != NULL);
- dbgprintf("Queue 0x%lx: closing file %d\n", (unsigned long) pThis, pFile->fd);
-
- close(pFile->fd); // TODO: error check
- pFile->fd = -1;
-
- if(pFile->bDeleteOnClose) {
- unlink((char*) pThis->tVars.disk.fRead.pszFileName); // TODO: check returncode
- }
-
- if(pFile->pszFileName != NULL) {
- free(pFile->pszFileName); /* no longer needed in any case (just for open) */
- pFile->pszFileName = NULL;
- }
-
- return iRet;
-}
-
-
-/* switch to next queue file */
-static rsRetVal qDiskNextFile(queue_t *pThis, queueFileDescription_t *pFile)
-{
- DEFiRet;
-
- assert(pThis != NULL);
- assert(pFile != NULL);
- CHKiRet(qDiskCloseFile(pThis, pFile));
-
- /* we do modulo 1,000,000 so that the file number is always at most 6 digits. If we have a million
- * or more queue files, something is awfully wrong and it is OK if we run into problems in that
- * situation ;) -- rgerhards, 2008-01-09
- */
- pFile->iCurrFileNum = (pFile->iCurrFileNum + 1) % 1000000;
-
-finalize_it:
- return iRet;
-}
-
-
-/*** buffered read functions for queue files ***/
-
-/* logically "read" a character from a file. What actually happens is that
- * data is taken from the buffer. Only if the buffer is full, data is read
- * directly from file. In that case, a read is performed blockwise.
- * rgerhards, 2008-01-07
- * NOTE: needs to be enhanced to support sticking with a queue entry (if not
- * deleted).
- */
-static rsRetVal qDiskReadChar(queueFileDescription_t *pFile, uchar *pC)
-{
- DEFiRet;
-
- assert(pFile != NULL);
- assert(pC != NULL);
-
-//dbgprintf("qDiskRead index %d, max %d\n", pFile->iBufPtr, pFile->iBufPtrMax);
- if(pFile->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */
- if((pFile->pIOBuf = (uchar*) malloc(sizeof(uchar) * qFILE_IOBUF_SIZE )) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- pFile->iBufPtrMax = 0; /* results in immediate read request */
- }
-
- if(pFile->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */
- *pC = pFile->iUngetC;
- pFile->iUngetC = -1;
- ABORT_FINALIZE(RS_RET_OK);
- }
-
- /* do we need to obtain a new buffer */
- if(pFile->iBufPtr >= pFile->iBufPtrMax) {
- /* read */
- pFile->iBufPtrMax = read(pFile->fd, pFile->pIOBuf, qFILE_IOBUF_SIZE);
- dbgprintf("qDiskReadChar read %d bytes from file %d\n", pFile->iBufPtrMax, pFile->fd);
- if(pFile->iBufPtrMax == 0)
- ABORT_FINALIZE(RS_RET_EOF);
- else if(pFile->iBufPtrMax < 0)
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- /* if we reach this point, we had a good read */
- pFile->iBufPtr = 0;
- }
-
- *pC = pFile->pIOBuf[pFile->iBufPtr++];
-
-finalize_it:
- return iRet;
-}
-
-
-/* unget a single character just like ungetc(). As with that call, there is only a single
- * character buffering capability.
- * rgerhards, 2008-01-07
- */
-static rsRetVal qDiskUnreadChar(queueFileDescription_t *pFile, uchar c)
-{
- assert(pFile != NULL);
- assert(pFile->iUngetC == -1);
- pFile->iUngetC = c;
-
- return RS_RET_OK;
-}
-
-#if 0
-/* we have commented out the code below because we would like to preserve it. It
- * is currently not needed, but may be useful if we implemented a bufferred file
- * class.
- * rgerhards, 2008-01-07
- */
-/* read a line from a queue file. A line is terminated by LF. The LF is read, but it
- * is not returned in the buffer (it is discared). The caller is responsible for
- * destruction of the returned CStr object!
- * rgerhards, 2008-01-07
- */
-static rsRetVal qDiskReadLine(queueFileDescription_t *pFile, rsCStrObj **ppCStr)
-{
- DEFiRet;
- uchar c;
- rsCStrObj *pCStr = NULL;
-
- assert(pFile != NULL);
- assert(ppCStr != NULL);
-
- if((pCStr = rsCStrConstruct()) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
- /* now read the line */
- CHKiRet(qDiskReadChar(pFile, &c));
- while(c != '\n') {
- CHKiRet(rsCStrAppendChar(pCStr, c));
- CHKiRet(qDiskReadChar(pFile, &c));
- }
- CHKiRet(rsCStrFinish(pCStr));
- *ppCStr = pCStr;
-
-finalize_it:
- if(iRet != RS_RET_OK && pCStr != NULL)
- rsCStrDestruct(pCStr);
-
- return iRet;
-}
-
-#endif /* #if 0 - saved code */
-
-/*** end buffered read functions for queue files ***/
-
-
-/* now come the disk mode queue driver functions */
-
static rsRetVal qConstructDisk(queue_t *pThis)
{
DEFiRet;
assert(pThis != NULL);
- pThis->tVars.disk.fWrite.iCurrFileNum = 1;
- pThis->tVars.disk.fWrite.iCurrOffs = 0;
- pThis->tVars.disk.fWrite.fd = -1;
- pThis->tVars.disk.fWrite.iUngetC = -1;
- pThis->tVars.disk.fRead.bDeleteOnClose = 0; /* do *NOT* set this to 1! */
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
- pThis->tVars.disk.fRead.iCurrFileNum = 1;
- pThis->tVars.disk.fRead.fd = -1;
- pThis->tVars.disk.fRead.iCurrOffs = 0;
- pThis->tVars.disk.fRead.iUngetC = -1;
- pThis->tVars.disk.fRead.bDeleteOnClose = 1;
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
+finalize_it:
return iRet;
}
@@ -401,10 +213,8 @@ static rsRetVal qDestructDisk(queue_t *pThis)
assert(pThis != NULL);
- if(pThis->tVars.disk.fWrite.fd != -1)
- qDiskCloseFile(pThis, &pThis->tVars.disk.fWrite);
- if(pThis->tVars.disk.fRead.fd != -1)
- qDiskCloseFile(pThis, &pThis->tVars.disk.fRead);
+ strmDestruct(pThis->tVars.disk.pWrite);
+ strmDestruct(pThis->tVars.disk.pRead);
if(pThis->pszSpoolDir != NULL)
free(pThis->pszSpoolDir);
@@ -415,23 +225,14 @@ static rsRetVal qDestructDisk(queue_t *pThis)
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
- int iWritten;
rsCStrObj *pCStr;
assert(pThis != NULL);
- if(pThis->tVars.disk.fWrite.fd == -1)
- CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
+ CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
CHKiRet((objSerialize(pUsr))(pUsr, &pCStr));
- iWritten = write(pThis->tVars.disk.fWrite.fd, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr));
- dbgprintf("Queue 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis,
- iWritten, errno, strerror(errno));
- /* TODO: handle error case -- rgerhards, 2008-01-07 */
-
- pThis->tVars.disk.fWrite.iCurrOffs += iWritten;
- if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->iMaxFileSize)
- CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fWrite));
+ CHKiRet(strmWrite(pThis->tVars.disk.pWrite, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr)));
finalize_it:
return iRet;
@@ -450,21 +251,20 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
* We need to try at least twice because we may run into EOF and need
* to switch files.
*/
- serialStore.pUsr = &pThis->tVars.disk.fRead;
- serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) qDiskReadChar;
- serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) qDiskUnreadChar;
+ serialStore.pUsr = pThis->tVars.disk.pRead;
+ serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) strmReadChar;
+ serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) strmUnreadChar;
bRun = 1;
while(bRun) {
/* first check if we need to (re)open the file (we may have switched to a new one!) */
- if(pThis->tVars.disk.fRead.fd == -1)
- CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes!
+ CHKiRet(strmOpenFile(pThis->tVars.disk.pRead, O_RDONLY, 0600)); // TODO: open modes!
iRet = objDeserialize((void*) &pMsg, OBJMsg, &serialStore);
if(iRet == RS_RET_OK)
bRun = 0; /* we are done */
else if(iRet == RS_RET_EOF) {
- dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.fRead.fd);
- CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fRead));
+ dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.pRead->fd);
+ CHKiRet(strmNextFile(pThis->tVars.disk.pRead));
} else
FINALIZE;
}
@@ -781,19 +581,8 @@ rsRetVal
queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
-
- assert(pThis != NULL);
- assert(pszPrefix != NULL);
-
- if(iLenPrefix < 1)
- ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
-
- if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
- memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); /* always think about the \0! */
- pThis->lenFilePrefix = iLenPrefix;
-
+ CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix));
+ CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix));
finalize_it:
return iRet;
}
diff --git a/queue.h b/queue.h
index 1cbd52b4..df4146f6 100644
--- a/queue.h
+++ b/queue.h
@@ -25,6 +25,7 @@
#include <pthread.h>
#include "obj.h"
+#include "stream.h"
/* some information about disk files used by the queue. In the long term, we may
* export this settings to a separate file module - or not (if they are too
@@ -100,8 +101,8 @@ typedef struct queue_s {
qLinkedList_t *pLast;
} linklist;
struct {
- queueFileDescription_t fWrite; /* current file to be written */
- queueFileDescription_t fRead; /* current file to be read */
+ strm_t *pWrite; /* current file to be written */
+ strm_t *pRead; /* current file to be read */
} disk;
} tVars;
} queue_t;
diff --git a/rsyslog.h b/rsyslog.h
index ff6e4eaa..0bb8371b 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -104,7 +104,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_NO_PROPLINE = -2033, /**< line is not a property line */
RS_RET_INVALID_TRAILER = -2034, /**< invalid trailer */
RS_RET_VALUE_TOO_LOW = -2035, /**< a provided value is too low */
- RS_RET_FILE_PREFIX_MISSING = -2035, /**< a required file prefix (parameter?) is missing */
+ RS_RET_FILE_PREFIX_MISSING = -2036, /**< a required file prefix (parameter?) is missing */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
diff --git a/stream.c b/stream.c
index 7f99ab9a..caf78bec 100644
--- a/stream.c
+++ b/stream.c
@@ -56,13 +56,19 @@ DEFobjStaticHelpers
* strm instance object.
*/
-/* open a strm file */
-static rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode)
+/* open a strm file
+ * It is OK to call this function when the stream is already open. In that
+ * case, it returns immediately with RS_RET_OK
+ */
+rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode)
{
DEFiRet;
assert(pThis != NULL);
+ if(pThis->fd != -1)
+ ABORT_FINALIZE(RS_RET_OK);
+
if(pThis->pszFilePrefix == NULL)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
@@ -107,7 +113,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
/* switch to next strm file */
-static rsRetVal strmNextFile(strm_t *pThis)
+rsRetVal strmNextFile(strm_t *pThis)
{
DEFiRet;
@@ -134,14 +140,14 @@ finalize_it:
* NOTE: needs to be enhanced to support sticking with a strm entry (if not
* deleted).
*/
-static rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
+rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
{
DEFiRet;
assert(pThis != NULL);
assert(pC != NULL);
-//dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax);
+ /* DEV debug only: dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */
if(pThis->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */
if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * STRM_IOBUF_SIZE )) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -178,7 +184,7 @@ finalize_it:
* character buffering capability.
* rgerhards, 2008-01-07
*/
-static rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
+rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
{
assert(pThis != NULL);
assert(pThis->iUngetC == -1);
@@ -240,6 +246,7 @@ rsRetVal strmConstruct(strm_t **ppThis)
{
DEFiRet;
strm_t *pThis;
+dbgprintf("strmConstruct\n");
assert(ppThis != NULL);
@@ -291,11 +298,12 @@ rsRetVal strmDestruct(strm_t *pThis)
/* write memory buffer to a stream object
*/
-static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
DEFiRet;
int iWritten;
+dbgprintf("strmWrite()\n");
assert(pThis != NULL);
assert(pBuf != NULL);
@@ -316,6 +324,35 @@ finalize_it:
}
+/* property set methods */
+/* simple ones first */
+DEFpropSetMeth(strm, bDeleteOnClose, int)
+
+/* set the stream's file prefix
+ * The passed-in string is duplicated. So if the caller does not need
+ * it any longer, it must free it.
+ * rgerhards, 2008-01-09
+ */
+rsRetVal
+strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ assert(pszPrefix != NULL);
+
+ if(iLenPrefix < 1)
+ ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
+
+ if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+
+ memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); /* always think about the \0! */
+ pThis->lenFilePrefix = iLenPrefix;
+
+finalize_it:
+ return iRet;
+}
/* Initialize the stream class. Must be called as the very first method
diff --git a/stream.h b/stream.h
index 5c9451cd..a8ace8a6 100644
--- a/stream.h
+++ b/stream.h
@@ -75,9 +75,18 @@ typedef struct {
#define STRM_IOBUF_SIZE 4096 /* size of the IO buffer */
/* prototypes */
+rsRetVal strmConstruct(strm_t **ppThis);
+rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis);
rsRetVal strmDestruct(strm_t *pThis);
rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize);
rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
+rsRetVal strmReadChar(strm_t *pThis, uchar *pC);
+rsRetVal strmUnreadChar(strm_t *pThis, uchar c);
+rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+rsRetVal strmNextFile(strm_t *pThis);
+rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode);
+rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
PROTOTYPEObjClassInit(strm);
+PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int);
#endif /* #ifndef STREAM_H_INCLUDED */