summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-09 17:25:07 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-09 17:25:07 +0000
commitaa7e00d8e1a1d67fa2860623ffab75bd387faffc (patch)
treebf8d5fbf99050a010c1918d8a3a1168cdb43fe84 /queue.c
parentfd8c6452c8a4d51d39eb511046fca09391138a22 (diff)
downloadrsyslog-aa7e00d8e1a1d67fa2860623ffab75bd387faffc.tar.gz
rsyslog-aa7e00d8e1a1d67fa2860623ffab75bd387faffc.tar.xz
rsyslog-aa7e00d8e1a1d67fa2860623ffab75bd387faffc.zip
changed queue class to use stream class
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c247
1 files changed, 18 insertions, 229 deletions
diff --git a/queue.c b/queue.c
index b2a910fc..76dbace6 100644
--- a/queue.c
+++ b/queue.c
@@ -189,208 +189,20 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
/* -------------------- disk -------------------- */
-/* first, disk-queue internal utility functions */
-
-
-/* open a queue file */
-static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode)
-{
- DEFiRet;
-
- assert(pThis != NULL);
- assert(pFile != NULL);
-
- if(pThis->pszFilePrefix == NULL)
- ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
-
- /* now open the write file */
- CHKiRet(genFileName(&pFile->pszFileName, pThis->pszSpoolDir, pThis->lenSpoolDir,
- pThis->pszFilePrefix, pThis->lenFilePrefix, pFile->iCurrFileNum, (uchar*) "qf", 2));
-
- pFile->fd = open((char*)pFile->pszFileName, flags, mode); // TODO: open modes!
- pFile->iCurrOffs = 0;
-
- dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd);
-
-finalize_it:
- return iRet;
-}
-
-
-/* close a queue file
- * Note that the bDeleteOnClose flag is honored. If it is set, the file will be
- * deleted after close. This is in support for the qRead thread.
- */
-static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
-{
- DEFiRet;
-
- assert(pThis != NULL);
- assert(pFile != NULL);
- dbgprintf("Queue 0x%lx: closing file %d\n", (unsigned long) pThis, pFile->fd);
-
- close(pFile->fd); // TODO: error check
- pFile->fd = -1;
-
- if(pFile->bDeleteOnClose) {
- unlink((char*) pThis->tVars.disk.fRead.pszFileName); // TODO: check returncode
- }
-
- if(pFile->pszFileName != NULL) {
- free(pFile->pszFileName); /* no longer needed in any case (just for open) */
- pFile->pszFileName = NULL;
- }
-
- return iRet;
-}
-
-
-/* switch to next queue file */
-static rsRetVal qDiskNextFile(queue_t *pThis, queueFileDescription_t *pFile)
-{
- DEFiRet;
-
- assert(pThis != NULL);
- assert(pFile != NULL);
- CHKiRet(qDiskCloseFile(pThis, pFile));
-
- /* we do modulo 1,000,000 so that the file number is always at most 6 digits. If we have a million
- * or more queue files, something is awfully wrong and it is OK if we run into problems in that
- * situation ;) -- rgerhards, 2008-01-09
- */
- pFile->iCurrFileNum = (pFile->iCurrFileNum + 1) % 1000000;
-
-finalize_it:
- return iRet;
-}
-
-
-/*** buffered read functions for queue files ***/
-
-/* logically "read" a character from a file. What actually happens is that
- * data is taken from the buffer. Only if the buffer is full, data is read
- * directly from file. In that case, a read is performed blockwise.
- * rgerhards, 2008-01-07
- * NOTE: needs to be enhanced to support sticking with a queue entry (if not
- * deleted).
- */
-static rsRetVal qDiskReadChar(queueFileDescription_t *pFile, uchar *pC)
-{
- DEFiRet;
-
- assert(pFile != NULL);
- assert(pC != NULL);
-
-//dbgprintf("qDiskRead index %d, max %d\n", pFile->iBufPtr, pFile->iBufPtrMax);
- if(pFile->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */
- if((pFile->pIOBuf = (uchar*) malloc(sizeof(uchar) * qFILE_IOBUF_SIZE )) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- pFile->iBufPtrMax = 0; /* results in immediate read request */
- }
-
- if(pFile->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */
- *pC = pFile->iUngetC;
- pFile->iUngetC = -1;
- ABORT_FINALIZE(RS_RET_OK);
- }
-
- /* do we need to obtain a new buffer */
- if(pFile->iBufPtr >= pFile->iBufPtrMax) {
- /* read */
- pFile->iBufPtrMax = read(pFile->fd, pFile->pIOBuf, qFILE_IOBUF_SIZE);
- dbgprintf("qDiskReadChar read %d bytes from file %d\n", pFile->iBufPtrMax, pFile->fd);
- if(pFile->iBufPtrMax == 0)
- ABORT_FINALIZE(RS_RET_EOF);
- else if(pFile->iBufPtrMax < 0)
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- /* if we reach this point, we had a good read */
- pFile->iBufPtr = 0;
- }
-
- *pC = pFile->pIOBuf[pFile->iBufPtr++];
-
-finalize_it:
- return iRet;
-}
-
-
-/* unget a single character just like ungetc(). As with that call, there is only a single
- * character buffering capability.
- * rgerhards, 2008-01-07
- */
-static rsRetVal qDiskUnreadChar(queueFileDescription_t *pFile, uchar c)
-{
- assert(pFile != NULL);
- assert(pFile->iUngetC == -1);
- pFile->iUngetC = c;
-
- return RS_RET_OK;
-}
-
-#if 0
-/* we have commented out the code below because we would like to preserve it. It
- * is currently not needed, but may be useful if we implemented a bufferred file
- * class.
- * rgerhards, 2008-01-07
- */
-/* read a line from a queue file. A line is terminated by LF. The LF is read, but it
- * is not returned in the buffer (it is discared). The caller is responsible for
- * destruction of the returned CStr object!
- * rgerhards, 2008-01-07
- */
-static rsRetVal qDiskReadLine(queueFileDescription_t *pFile, rsCStrObj **ppCStr)
-{
- DEFiRet;
- uchar c;
- rsCStrObj *pCStr = NULL;
-
- assert(pFile != NULL);
- assert(ppCStr != NULL);
-
- if((pCStr = rsCStrConstruct()) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
- /* now read the line */
- CHKiRet(qDiskReadChar(pFile, &c));
- while(c != '\n') {
- CHKiRet(rsCStrAppendChar(pCStr, c));
- CHKiRet(qDiskReadChar(pFile, &c));
- }
- CHKiRet(rsCStrFinish(pCStr));
- *ppCStr = pCStr;
-
-finalize_it:
- if(iRet != RS_RET_OK && pCStr != NULL)
- rsCStrDestruct(pCStr);
-
- return iRet;
-}
-
-#endif /* #if 0 - saved code */
-
-/*** end buffered read functions for queue files ***/
-
-
-/* now come the disk mode queue driver functions */
-
static rsRetVal qConstructDisk(queue_t *pThis)
{
DEFiRet;
assert(pThis != NULL);
- pThis->tVars.disk.fWrite.iCurrFileNum = 1;
- pThis->tVars.disk.fWrite.iCurrOffs = 0;
- pThis->tVars.disk.fWrite.fd = -1;
- pThis->tVars.disk.fWrite.iUngetC = -1;
- pThis->tVars.disk.fRead.bDeleteOnClose = 0; /* do *NOT* set this to 1! */
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
- pThis->tVars.disk.fRead.iCurrFileNum = 1;
- pThis->tVars.disk.fRead.fd = -1;
- pThis->tVars.disk.fRead.iCurrOffs = 0;
- pThis->tVars.disk.fRead.iUngetC = -1;
- pThis->tVars.disk.fRead.bDeleteOnClose = 1;
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
+finalize_it:
return iRet;
}
@@ -401,10 +213,8 @@ static rsRetVal qDestructDisk(queue_t *pThis)
assert(pThis != NULL);
- if(pThis->tVars.disk.fWrite.fd != -1)
- qDiskCloseFile(pThis, &pThis->tVars.disk.fWrite);
- if(pThis->tVars.disk.fRead.fd != -1)
- qDiskCloseFile(pThis, &pThis->tVars.disk.fRead);
+ strmDestruct(pThis->tVars.disk.pWrite);
+ strmDestruct(pThis->tVars.disk.pRead);
if(pThis->pszSpoolDir != NULL)
free(pThis->pszSpoolDir);
@@ -415,23 +225,14 @@ static rsRetVal qDestructDisk(queue_t *pThis)
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
- int iWritten;
rsCStrObj *pCStr;
assert(pThis != NULL);
- if(pThis->tVars.disk.fWrite.fd == -1)
- CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
+ CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
CHKiRet((objSerialize(pUsr))(pUsr, &pCStr));
- iWritten = write(pThis->tVars.disk.fWrite.fd, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr));
- dbgprintf("Queue 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis,
- iWritten, errno, strerror(errno));
- /* TODO: handle error case -- rgerhards, 2008-01-07 */
-
- pThis->tVars.disk.fWrite.iCurrOffs += iWritten;
- if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->iMaxFileSize)
- CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fWrite));
+ CHKiRet(strmWrite(pThis->tVars.disk.pWrite, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr)));
finalize_it:
return iRet;
@@ -450,21 +251,20 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
* We need to try at least twice because we may run into EOF and need
* to switch files.
*/
- serialStore.pUsr = &pThis->tVars.disk.fRead;
- serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) qDiskReadChar;
- serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) qDiskUnreadChar;
+ serialStore.pUsr = pThis->tVars.disk.pRead;
+ serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) strmReadChar;
+ serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) strmUnreadChar;
bRun = 1;
while(bRun) {
/* first check if we need to (re)open the file (we may have switched to a new one!) */
- if(pThis->tVars.disk.fRead.fd == -1)
- CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes!
+ CHKiRet(strmOpenFile(pThis->tVars.disk.pRead, O_RDONLY, 0600)); // TODO: open modes!
iRet = objDeserialize((void*) &pMsg, OBJMsg, &serialStore);
if(iRet == RS_RET_OK)
bRun = 0; /* we are done */
else if(iRet == RS_RET_EOF) {
- dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.fRead.fd);
- CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fRead));
+ dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.pRead->fd);
+ CHKiRet(strmNextFile(pThis->tVars.disk.pRead));
} else
FINALIZE;
}
@@ -781,19 +581,8 @@ rsRetVal
queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
-
- assert(pThis != NULL);
- assert(pszPrefix != NULL);
-
- if(iLenPrefix < 1)
- ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
-
- if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
- memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); /* always think about the \0! */
- pThis->lenFilePrefix = iLenPrefix;
-
+ CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix));
+ CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix));
finalize_it:
return iRet;
}