summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-07 10:41:22 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-07 10:41:22 +0000
commit2c5b4f3c3d79190367595ccf84ae90f50666ddbf (patch)
treeb974a9e48124518234246de01a537308ef8c0d61 /queue.c
parentcfbb74e7a59c93c3816b431ffb25adf2032c5ef0 (diff)
downloadrsyslog-2c5b4f3c3d79190367595ccf84ae90f50666ddbf.tar.gz
rsyslog-2c5b4f3c3d79190367595ccf84ae90f50666ddbf.tar.xz
rsyslog-2c5b4f3c3d79190367595ccf84ae90f50666ddbf.zip
implemented buffered read calls for the queue file
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c124
1 files changed, 104 insertions, 20 deletions
diff --git a/queue.c b/queue.c
index 8b06c882..292938c5 100644
--- a/queue.c
+++ b/queue.c
@@ -54,7 +54,7 @@
*/
/* -------------------- fixed array -------------------- */
-rsRetVal qConstructFixedArray(queue_t *pThis)
+static rsRetVal qConstructFixedArray(queue_t *pThis)
{
DEFiRet;
@@ -72,7 +72,7 @@ finalize_it:
}
-rsRetVal qDestructFixedArray(queue_t *pThis)
+static rsRetVal qDestructFixedArray(queue_t *pThis)
{
DEFiRet;
@@ -84,7 +84,7 @@ rsRetVal qDestructFixedArray(queue_t *pThis)
return iRet;
}
-rsRetVal qAddFixedArray(queue_t *pThis, void* in)
+static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
{
DEFiRet;
@@ -97,7 +97,7 @@ rsRetVal qAddFixedArray(queue_t *pThis, void* in)
return iRet;
}
-rsRetVal qDelFixedArray(queue_t *pThis, void **out)
+static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
{
DEFiRet;
@@ -113,7 +113,7 @@ rsRetVal qDelFixedArray(queue_t *pThis, void **out)
/* -------------------- linked list -------------------- */
-rsRetVal qConstructLinkedList(queue_t *pThis)
+static rsRetVal qConstructLinkedList(queue_t *pThis)
{
DEFiRet;
@@ -126,7 +126,7 @@ rsRetVal qConstructLinkedList(queue_t *pThis)
}
-rsRetVal qDestructLinkedList(queue_t *pThis)
+static rsRetVal qDestructLinkedList(queue_t *pThis)
{
DEFiRet;
@@ -141,7 +141,7 @@ rsRetVal qDestructLinkedList(queue_t *pThis)
return iRet;
}
-rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
+static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
{
DEFiRet;
qLinkedList_t *pEntry;
@@ -165,7 +165,7 @@ finalize_it:
return iRet;
}
-rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
+static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
{
DEFiRet;
qLinkedList_t *pEntry;
@@ -194,7 +194,7 @@ rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
/* open a queue file */
-rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode)
+static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode)
{
DEFiRet;
uchar *pszFile = NULL;
@@ -219,7 +219,7 @@ finalize_it:
/* close a queue file */
-rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
+static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
{
DEFiRet;
@@ -235,7 +235,7 @@ rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
/* switch to next queue file */
-rsRetVal qDiskNextFile(queue_t *pThis, queueFileDescription_t *pFile)
+static rsRetVal qDiskNextFile(queue_t *pThis, queueFileDescription_t *pFile)
{
DEFiRet;
@@ -254,9 +254,88 @@ finalize_it:
}
+/*** buffered read functions for queue files ***/
+
+/* logically "read" a character from a file. What actually happens is that
+ * data is taken from the buffer. Only if the buffer is full, data is read
+ * directly from file. In that case, a read is performed blockwise.
+ * rgerhards, 2008-01-07
+ * NOTE: needs to be enhanced to support sticking with a queue entry (if not
+ * deleted).
+ */
+static rsRetVal qDiskReadChar(queueFileDescription_t *pFile, uchar *pC)
+{
+ DEFiRet;
+ uchar c;
+
+ assert(pFile != NULL);
+ assert(pC != NULL);
+
+ if(pFile->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */
+ if((pFile->pIOBuf = (uchar*) malloc(sizeof(uchar) * qFILE_IOBUF_SIZE )) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ pFile->iBufPtrMax = 0; /* results in immediate read request */
+ }
+
+ /* do we need to obtain a new buffer */
+ if(pFile->iBufPtr >= pFile->iBufPtrMax) {
+ /* read */
+ pFile->iBufPtrMax = read(pFile->fd, pFile->pIOBuf, qFILE_IOBUF_SIZE);
+ dbgprintf("Read %d bytes from file %d\n", pFile->iBufPtrMax, pFile->fd);
+ if(pFile->iBufPtrMax == 0)
+ ABORT_FINALIZE(RS_RET_EOF);
+ else if(pFile->iBufPtrMax < 0)
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ /* if we reach this point, we had a good read */
+ pFile->iBufPtr = 0;
+ }
+
+ *pC = pFile->pIOBuf[pFile->iBufPtr++];
+
+finalize_it:
+ return iRet;
+}
+
+/* read a line from a queue file. A line is terminated by LF. The LF is read, but it
+ * is not returned in the buffer (it is discared). The caller is responsible for
+ * destruction of the returned CStr object!
+ * rgerhards, 2008-01-07
+ */
+static rsRetVal qDiskReadLine(queueFileDescription_t *pFile, rsCStrObj **ppCStr)
+{
+ DEFiRet;
+ uchar c;
+ rsCStrObj *pCStr = NULL;
+
+ assert(pFile != NULL);
+ assert(ppCStr != NULL);
+
+ if((pCStr = rsCStrConstruct()) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+
+ /* now read the line */
+ CHKiRet(qDiskReadChar(pFile, &c));
+ while(c != '\n') {
+ CHKiRet(rsCStrAppendChar(pCStr, c));
+ CHKiRet(qDiskReadChar(pFile, &c));
+ }
+ CHKiRet(rsCStrFinish(pCStr));
+ *ppCStr = pCStr;
+
+finalize_it:
+ if(iRet != RS_RET_OK && pCStr != NULL)
+ rsCStrDestruct(pCStr);
+
+ return iRet;
+}
+
+
+/*** end buffered read functions for queue files ***/
+
+
/* now come the disk mode queue driver functions */
-rsRetVal qConstructDisk(queue_t *pThis)
+static rsRetVal qConstructDisk(queue_t *pThis)
{
DEFiRet;
@@ -266,7 +345,7 @@ rsRetVal qConstructDisk(queue_t *pThis)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir);
- pThis->tVars.disk.iMaxFileSize = 1024; // TODO: configurable!
+ pThis->tVars.disk.iMaxFileSize = 1024 * 3; // TODO: configurable!
pThis->tVars.disk.fWrite.iCurrFileNum = 1;
pThis->tVars.disk.fWrite.iCurrOffs = 0;
@@ -281,7 +360,7 @@ finalize_it:
}
-rsRetVal qDestructDisk(queue_t *pThis)
+static rsRetVal qDestructDisk(queue_t *pThis)
{
DEFiRet;
@@ -298,7 +377,7 @@ rsRetVal qDestructDisk(queue_t *pThis)
return iRet;
}
-rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
+static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
int iWritten;
@@ -323,7 +402,7 @@ finalize_it:
return iRet;
}
-rsRetVal qDelDisk(queue_t __attribute__((unused)) *pThis, void __attribute__((unused)) **ppUsr)
+static rsRetVal qDelDisk(queue_t __attribute__((unused)) *pThis, void __attribute__((unused)) **ppUsr)
{
DEFiRet;
@@ -333,6 +412,10 @@ rsRetVal qDelDisk(queue_t __attribute__((unused)) *pThis, void __attribute__((un
CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes!
/* read here */
+ rsCStrObj *pCStr = NULL;
+ CHKiRet(qDiskReadLine(&pThis->tVars.disk.fRead, &pCStr));
+ dbgprintf("qDelDisk read line '%s'\n", rsCStrGetSzStr(pCStr));
+ rsCStrDestruct(pCStr);
/* de-serialize here */
/* switch to next file when EOF is reached. We may also delete the last file in that case.
@@ -340,6 +423,7 @@ rsRetVal qDelDisk(queue_t __attribute__((unused)) *pThis, void __attribute__((un
if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->tVars.disk.iMaxFileSize)
CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fWrite));
*/
+
iRet = RS_RET_ERR;
finalize_it:
@@ -347,18 +431,18 @@ finalize_it:
}
/* -------------------- direct (no queueing) -------------------- */
-rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
-rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
-rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
+static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
{
DEFiRet;
rsRetVal iRetLocal;
@@ -378,7 +462,7 @@ rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
return iRet;
}
-rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
+static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
{
return RS_RET_OK;
}