summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-03 17:37:28 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-03 17:37:28 +0000
commitb95b5ab28407b75467c6cff63359cba9a0a3bd70 (patch)
treee5ec5564b97690f1a278f1d693f7390d5818623d
parent64de2f0d2e0dd61dc9703a4ffd62f41f5cf42caa (diff)
downloadrsyslog-b95b5ab28407b75467c6cff63359cba9a0a3bd70.tar.gz
rsyslog-b95b5ab28407b75467c6cff63359cba9a0a3bd70.tar.xz
rsyslog-b95b5ab28407b75467c6cff63359cba9a0a3bd70.zip
begun working on disk queueing (not completed, do not use this mode!)
-rw-r--r--queue.c109
-rw-r--r--queue.h15
-rwxr-xr-xsrUtils.c51
-rwxr-xr-xsrUtils.h2
-rw-r--r--syslogd.c3
5 files changed, 170 insertions, 10 deletions
diff --git a/queue.c b/queue.c
index f4dc992c..bf0d1261 100644
--- a/queue.c
+++ b/queue.c
@@ -30,10 +30,14 @@
#include <assert.h>
#include <signal.h>
#include <pthread.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
#include "rsyslog.h"
#include "syslogd.h"
#include "queue.h"
+#include "srUtils.h"
/* static data */
@@ -179,6 +183,72 @@ rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
return iRet;
}
+
+/* -------------------- disk -------------------- */
+
+rsRetVal qConstructDisk(queue_t *pThis)
+{
+ DEFiRet;
+ uchar *pszFile;
+
+ assert(pThis != NULL);
+
+ if((pThis->tVars.disk.pszSpoolDir = (uchar*) strdup((char*)pszSpoolDirectory)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+
+ pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir);
+
+ /* now open the file */
+ CHKiRet(genFileName(&pszFile, pThis->tVars.disk.pszSpoolDir, pThis->tVars.disk.lenSpoolDir,
+ (uchar*) "mainq", 5, 1, (uchar*) "qf", 2));
+
+ dbgprintf("Queue 0x%lx: opening file '%s'\n", pThis, pszFile);
+
+ pThis->tVars.disk.fd = open((char*)pszFile, O_RDWR|O_CREAT, 0600);
+ dbgprintf("opened file %d\n", pThis->tVars.disk.fd);
+
+finalize_it:
+ if(pThis->tVars.disk.pszSpoolDir != NULL)
+ free(pThis->tVars.disk.pszSpoolDir);
+
+ return iRet;
+}
+
+
+rsRetVal qDestructDisk(queue_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ close(pThis->tVars.disk.fd);
+
+ return iRet;
+}
+
+rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
+{
+ DEFiRet;
+ int i;
+
+ assert(pThis != NULL);
+ dbgprintf("writing to file %d\n", pThis->tVars.disk.fd);
+ i = write(pThis->tVars.disk.fd, "entry\n", 6);
+ dbgprintf("write wrote %d bytes, errno: %d, err %s\n", i, errno, strerror(errno));
+
+finalize_it:
+ return iRet;
+}
+
+rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
+{
+ DEFiRet;
+
+ iRet = RS_RET_ERR;
+
+ return iRet;
+}
+
/* --------------- end type-specific handlers -------------------- */
@@ -207,12 +277,18 @@ queueDel(queue_t *pThis, void *pUsr)
DEFiRet;
assert(pThis != NULL);
- CHKiRet(pThis->qDel(pThis, pUsr));
+
+ /* we do NOT abort if we encounter an error, because otherwise the queue
+ * will not be decremented, what will most probably result in an endless loop.
+ * If we decrement, however, we may lose a message. But that is better than
+ * losing the whole process because it loops... -- rgerhards, 2008-01-03
+ */
+ iRet = pThis->qDel(pThis, pUsr);
--pThis->iQueueSize;
- dbgprintf("Queue 0x%lx: entry deleted, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize);
+ dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n",
+ (unsigned long) pThis, iRet, pThis->iQueueSize);
-finalize_it:
return iRet;
}
@@ -230,6 +306,7 @@ finalize_it:
static void *
queueWorker(void *arg)
{
+ DEFiRet;
queue_t *pThis = (queue_t*) arg;
void *pUsr;
sigset_t sigSet;
@@ -247,12 +324,22 @@ queueWorker(void *arg)
}
if(pThis->iQueueSize > 0) {
/* dequeue element (still protected from mutex) */
- queueDel(pThis, &pUsr);
+ iRet = queueDel(pThis, &pUsr);
pthread_mutex_unlock(pThis->mut);
pthread_cond_signal (pThis->notFull);
- /* do actual processing (the lengthy part, runs in parallel) */
- dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis);
- pThis->pConsumer(pUsr);
+ /* do actual processing (the lengthy part, runs in parallel)
+ * If we had a problem while dequeing, we do not call the consumer,
+ * but we otherwise ignore it. This is in the hopes that it will be
+ * self-healing. Howerver, this is really not a good thing.
+ * rgerhards, 2008-01-03
+ */
+ if(iRet == RS_RET_OK) {
+ dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis);
+ pThis->pConsumer(pUsr);
+ } else {
+ dbgprintf("Queue 0x%lx: error %d dequeueing element - ignoring, but strange things "
+ "may happen\n", (unsigned long) pThis, iRet);
+ }
} else { /* the mutex must be unlocked in any case (important for termination) */
pthread_mutex_unlock(pThis->mut);
}
@@ -276,7 +363,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize,
assert(ppThis != NULL);
assert(pConsumer != NULL);
- if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) {
+ if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
@@ -306,6 +393,12 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize,
pThis->qAdd = qAddLinkedList;
pThis->qDel = qDelLinkedList;
break;
+ case QUEUETYPE_DISK:
+ pThis->qConstruct = qConstructDisk;
+ pThis->qDestruct = qDestructDisk;
+ pThis->qAdd = qAddDisk;
+ pThis->qDel = qDelDisk;
+ break;
}
/* call type-specific constructor */
diff --git a/queue.h b/queue.h
index d119eab7..989fe4a7 100644
--- a/queue.h
+++ b/queue.h
@@ -28,7 +28,8 @@
/* queue types */
typedef enum {
QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */
- QUEUETYPE_LINKEDLIST = 1 /* linked list used as buffer, lower fixed memory overhead but slower */
+ QUEUETYPE_LINKEDLIST = 1, /* linked list used as buffer, lower fixed memory overhead but slower */
+ QUEUETYPE_DISK = 2 /* disk files used as buffer */
} queueType_t;
/* list member definition for linked list types of queues: */
@@ -64,6 +65,18 @@ typedef struct queue_s {
qLinkedList_t *pRoot;
qLinkedList_t *pLast;
} linklist;
+ struct {
+ rsRetVal (*serializer)(uchar **ppOutBuf, size_t *lenBuf, void *pUsr);
+ rsRetVal (*deSerializer)(void *ppUsr, uchar *ppBuf, size_t lenBuf);
+ uchar *pszSpoolDir;
+ size_t lenSpoolDir;
+ uchar *pszFilePrefix;
+ size_t lenFilePrefix;
+ int iCurrFileNum; /* number of file currently processed */
+ int fd; /* current file descriptor */
+ long iWritePos; /* next write position offset */
+ long iReadPos; /* next read position offset */
+ } disk;
} tVars;
} queue_t;
diff --git a/srUtils.c b/srUtils.c
index c10f58bb..a0dae0b7 100755
--- a/srUtils.c
+++ b/srUtils.c
@@ -7,7 +7,7 @@
* \date 2003-09-09
* Coding begun.
*
- * Copyright 2003-2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2003-2008 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -238,6 +238,55 @@ void skipWhiteSpace(uchar **pp)
}
+/* generate a file name from four parts:
+ * <directory name>/<prefix>-<number>.<type>
+ * If number is negative, it is not used. If any of the strings is
+ * NULL, an empty string is used instead. Length must be provided.
+ * rgerhards, 2008-01-03
+ */
+rsRetVal genFileName(uchar **ppName, uchar *pDirName, size_t lenDirName,
+ uchar *pPrefix, size_t lenPrefix, long lNum, uchar *pType, size_t lenType)
+{
+ DEFiRet;
+ uchar *pName;
+ uchar *pNameWork;
+ size_t lenName;
+ uchar szBuf[128]; /* buffer for number */
+ size_t lenBuf;
+
+ if(lNum < 0) {
+ szBuf[0] = '\0';
+ lenBuf = 0;
+ } else {
+ lenBuf = snprintf((char*)szBuf, sizeof(szBuf), "-%ld", lNum);
+ }
+
+ lenName = lenDirName + 1 + lenPrefix + lenBuf + 1 + lenType + 1; /* last +1 for \0 char! */
+ if((pName = malloc(sizeof(uchar) * lenName)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+
+ /* got memory, now construct string */
+ memcpy(pName, pDirName, lenDirName);
+ pNameWork = pName + lenDirName;
+ *pNameWork++ = '/';
+ memcpy(pNameWork, pPrefix, lenPrefix);
+ pNameWork += lenPrefix;
+ if(lenBuf > 0) {
+ memcpy(pNameWork, szBuf, lenBuf);
+ pNameWork += lenBuf;
+ }
+ *pNameWork++ = '.';
+ memcpy(pNameWork, pType, lenType);
+ pNameWork += lenType;
+ *pNameWork = '\0';
+
+ *ppName = pName;
+
+finalize_it:
+ return iRet;
+}
+
+
/*
* vi:set ai:
*/
diff --git a/srUtils.h b/srUtils.h
index 15cae89d..ae731dac 100755
--- a/srUtils.h
+++ b/srUtils.h
@@ -64,4 +64,6 @@ int makeFileParentDirs(uchar *szFile, size_t lenFile, mode_t mode, uid_t uid, gi
int execProg(uchar *program, int wait, uchar *arg);
void skipWhiteSpace(uchar **pp);
+rsRetVal genFileName(uchar **ppName, uchar *pDirName, size_t lenDirName,
+ uchar *pPrefix, size_t lenPrefix, long lNum, uchar *pType, size_t lenType);
#endif
diff --git a/syslogd.c b/syslogd.c
index 69763de3..a018768d 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -4205,6 +4205,9 @@ static rsRetVal setMainMsgQueType(void __attribute__((unused)) *pVal, uchar *psz
} else if (!strcasecmp((char *) pszType, "linkedlist")) {
MainMsgQueType = QUEUETYPE_LINKEDLIST;
dbgprintf("main message queue type set to LINKEDLIST\n");
+ } else if (!strcasecmp((char *) pszType, "disk")) {
+ MainMsgQueType = QUEUETYPE_DISK;
+ dbgprintf("main message queue type set to DISK\n");
} else {
logerrorSz("unknown mainmessagequeuetype parameter: %s", (char *) pszType);
iRet = RS_RET_INVALID_PARAMS;