summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-08 08:45:24 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-08 08:45:24 +0000
commit6d4bd34517643505dab731fec16d3afeba2169ab (patch)
tree8815be565a7c07e9050d7a0de80282e884edcd18 /queue.c
parentc44de2807a899521c8542321d91e3074f3c40086 (diff)
downloadrsyslog-6d4bd34517643505dab731fec16d3afeba2169ab.tar.gz
rsyslog-6d4bd34517643505dab731fec16d3afeba2169ab.tar.xz
rsyslog-6d4bd34517643505dab731fec16d3afeba2169ab.zip
implemented queue disk reader to switch to multiple files
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c59
1 files changed, 30 insertions, 29 deletions
diff --git a/queue.c b/queue.c
index 29b5e43e..1c371cd9 100644
--- a/queue.c
+++ b/queue.c
@@ -198,23 +198,20 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode)
{
DEFiRet;
- uchar *pszFile = NULL;
assert(pThis != NULL);
assert(pFile != NULL);
/* now open the write file */
- CHKiRet(genFileName(&pszFile, pThis->tVars.disk.pszSpoolDir, pThis->tVars.disk.lenSpoolDir,
+ CHKiRet(genFileName(&pFile->pszFileName, pThis->tVars.disk.pszSpoolDir, pThis->tVars.disk.lenSpoolDir,
(uchar*) "mainq", 5, pFile->iCurrFileNum, (uchar*) "qf", 2));
- pFile->fd = open((char*)pszFile, flags, mode); // TODO: open modes!
+ pFile->fd = open((char*)pFile->pszFileName, flags, mode); // TODO: open modes!
pFile->iCurrOffs = 0;
- dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pszFile, flags, pFile->fd);
+ dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd);
finalize_it:
- if(pszFile != NULL)
- free(pszFile); /* no longer needed in any case (just for open) */
-
+dbgprintf("qDiskOpen iRet %d\n", iRet);
return iRet;
}
@@ -231,6 +228,11 @@ static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
close(pFile->fd); // TODO: error check
pFile->fd = -1;
+ if(pFile->pszFileName != NULL) {
+ free(pFile->pszFileName); /* no longer needed in any case (just for open) */
+ pFile->pszFileName = NULL;
+ }
+
return iRet;
}
@@ -271,7 +273,7 @@ static rsRetVal qDiskReadChar(queueFileDescription_t *pFile, uchar *pC)
assert(pFile != NULL);
assert(pC != NULL);
-dbgprintf("qDiskRead index %d, max %d\n", pFile->iBufPtr, pFile->iBufPtrMax);
+//dbgprintf("qDiskRead index %d, max %d\n", pFile->iBufPtr, pFile->iBufPtrMax);
if(pFile->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */
if((pFile->pIOBuf = (uchar*) malloc(sizeof(uchar) * qFILE_IOBUF_SIZE )) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -437,33 +439,32 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
DEFiRet;
msg_t *pMsg = NULL;
serialStore_t serialStore;
+ int bRun;
assert(pThis != NULL);
- if(pThis->tVars.disk.fRead.fd == -1)
- CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes!
-
- /* de-serialize object from file */
-retry:
+ /* de-serialize object from file
+ * We need to try at least twice because we may run into EOF and need
+ * to switch files.
+ */
serialStore.pUsr = &pThis->tVars.disk.fRead;
serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) qDiskReadChar;
serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) qDiskUnreadChar;
- iRet= objDeserialize((void*) &pMsg, objMsg, &serialStore);
-
- if(iRet == RS_RET_OK)
- ;
- else if(iRet == RS_RET_EOF) {
-dbgprintf("EOF!\n");
- CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fRead));
- goto retry;
- } else
- FINALIZE;
- /* switch to next file when EOF is reached. We may also delete the last file in that case.
- pThis->tVars.disk.fWrite.iCurrOffs += iWritten;
- if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->tVars.disk.iMaxFileSize)
- CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fWrite));
- */
-dbgprintf("got object %lx\n", (unsigned long) pMsg);
+ bRun = 1;
+ while(bRun) {
+ /* first check if we need to (re)open the file (we may have switched to a new one!) */
+ if(pThis->tVars.disk.fRead.fd == -1)
+ CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes!
+
+ iRet = objDeserialize((void*) &pMsg, objMsg, &serialStore);
+ if(iRet == RS_RET_OK)
+ bRun = 0; /* we are done */
+ else if(iRet == RS_RET_EOF) {
+ dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.fRead.fd);
+ CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fRead));
+ } else
+ FINALIZE;
+ }
*ppUsr = (void*) pMsg;