summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c109
1 files changed, 101 insertions, 8 deletions
diff --git a/queue.c b/queue.c
index f4dc992c..bf0d1261 100644
--- a/queue.c
+++ b/queue.c
@@ -30,10 +30,14 @@
#include <assert.h>
#include <signal.h>
#include <pthread.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
#include "rsyslog.h"
#include "syslogd.h"
#include "queue.h"
+#include "srUtils.h"
/* static data */
@@ -179,6 +183,72 @@ rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
return iRet;
}
+
+/* -------------------- disk -------------------- */
+
+rsRetVal qConstructDisk(queue_t *pThis)
+{
+ DEFiRet;
+ uchar *pszFile;
+
+ assert(pThis != NULL);
+
+ if((pThis->tVars.disk.pszSpoolDir = (uchar*) strdup((char*)pszSpoolDirectory)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+
+ pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir);
+
+ /* now open the file */
+ CHKiRet(genFileName(&pszFile, pThis->tVars.disk.pszSpoolDir, pThis->tVars.disk.lenSpoolDir,
+ (uchar*) "mainq", 5, 1, (uchar*) "qf", 2));
+
+ dbgprintf("Queue 0x%lx: opening file '%s'\n", pThis, pszFile);
+
+ pThis->tVars.disk.fd = open((char*)pszFile, O_RDWR|O_CREAT, 0600);
+ dbgprintf("opened file %d\n", pThis->tVars.disk.fd);
+
+finalize_it:
+ if(pThis->tVars.disk.pszSpoolDir != NULL)
+ free(pThis->tVars.disk.pszSpoolDir);
+
+ return iRet;
+}
+
+
+rsRetVal qDestructDisk(queue_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ close(pThis->tVars.disk.fd);
+
+ return iRet;
+}
+
+rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
+{
+ DEFiRet;
+ int i;
+
+ assert(pThis != NULL);
+ dbgprintf("writing to file %d\n", pThis->tVars.disk.fd);
+ i = write(pThis->tVars.disk.fd, "entry\n", 6);
+ dbgprintf("write wrote %d bytes, errno: %d, err %s\n", i, errno, strerror(errno));
+
+finalize_it:
+ return iRet;
+}
+
+rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
+{
+ DEFiRet;
+
+ iRet = RS_RET_ERR;
+
+ return iRet;
+}
+
/* --------------- end type-specific handlers -------------------- */
@@ -207,12 +277,18 @@ queueDel(queue_t *pThis, void *pUsr)
DEFiRet;
assert(pThis != NULL);
- CHKiRet(pThis->qDel(pThis, pUsr));
+
+ /* we do NOT abort if we encounter an error, because otherwise the queue
+ * will not be decremented, what will most probably result in an endless loop.
+ * If we decrement, however, we may lose a message. But that is better than
+ * losing the whole process because it loops... -- rgerhards, 2008-01-03
+ */
+ iRet = pThis->qDel(pThis, pUsr);
--pThis->iQueueSize;
- dbgprintf("Queue 0x%lx: entry deleted, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize);
+ dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n",
+ (unsigned long) pThis, iRet, pThis->iQueueSize);
-finalize_it:
return iRet;
}
@@ -230,6 +306,7 @@ finalize_it:
static void *
queueWorker(void *arg)
{
+ DEFiRet;
queue_t *pThis = (queue_t*) arg;
void *pUsr;
sigset_t sigSet;
@@ -247,12 +324,22 @@ queueWorker(void *arg)
}
if(pThis->iQueueSize > 0) {
/* dequeue element (still protected from mutex) */
- queueDel(pThis, &pUsr);
+ iRet = queueDel(pThis, &pUsr);
pthread_mutex_unlock(pThis->mut);
pthread_cond_signal (pThis->notFull);
- /* do actual processing (the lengthy part, runs in parallel) */
- dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis);
- pThis->pConsumer(pUsr);
+ /* do actual processing (the lengthy part, runs in parallel)
+ * If we had a problem while dequeing, we do not call the consumer,
+ * but we otherwise ignore it. This is in the hopes that it will be
+ * self-healing. Howerver, this is really not a good thing.
+ * rgerhards, 2008-01-03
+ */
+ if(iRet == RS_RET_OK) {
+ dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis);
+ pThis->pConsumer(pUsr);
+ } else {
+ dbgprintf("Queue 0x%lx: error %d dequeueing element - ignoring, but strange things "
+ "may happen\n", (unsigned long) pThis, iRet);
+ }
} else { /* the mutex must be unlocked in any case (important for termination) */
pthread_mutex_unlock(pThis->mut);
}
@@ -276,7 +363,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize,
assert(ppThis != NULL);
assert(pConsumer != NULL);
- if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) {
+ if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
@@ -306,6 +393,12 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize,
pThis->qAdd = qAddLinkedList;
pThis->qDel = qDelLinkedList;
break;
+ case QUEUETYPE_DISK:
+ pThis->qConstruct = qConstructDisk;
+ pThis->qDestruct = qDestructDisk;
+ pThis->qAdd = qAddDisk;
+ pThis->qDel = qDelDisk;
+ break;
}
/* call type-specific constructor */