From b95b5ab28407b75467c6cff63359cba9a0a3bd70 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 3 Jan 2008 17:37:28 +0000 Subject: begun working on disk queueing (not completed, do not use this mode!) --- queue.c | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 8 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index f4dc992c..bf0d1261 100644 --- a/queue.c +++ b/queue.c @@ -30,10 +30,14 @@ #include #include #include +#include +#include +#include #include "rsyslog.h" #include "syslogd.h" #include "queue.h" +#include "srUtils.h" /* static data */ @@ -179,6 +183,72 @@ rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) return iRet; } + +/* -------------------- disk -------------------- */ + +rsRetVal qConstructDisk(queue_t *pThis) +{ + DEFiRet; + uchar *pszFile; + + assert(pThis != NULL); + + if((pThis->tVars.disk.pszSpoolDir = (uchar*) strdup((char*)pszSpoolDirectory)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir); + + /* now open the file */ + CHKiRet(genFileName(&pszFile, pThis->tVars.disk.pszSpoolDir, pThis->tVars.disk.lenSpoolDir, + (uchar*) "mainq", 5, 1, (uchar*) "qf", 2)); + + dbgprintf("Queue 0x%lx: opening file '%s'\n", pThis, pszFile); + + pThis->tVars.disk.fd = open((char*)pszFile, O_RDWR|O_CREAT, 0600); + dbgprintf("opened file %d\n", pThis->tVars.disk.fd); + +finalize_it: + if(pThis->tVars.disk.pszSpoolDir != NULL) + free(pThis->tVars.disk.pszSpoolDir); + + return iRet; +} + + +rsRetVal qDestructDisk(queue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + close(pThis->tVars.disk.fd); + + return iRet; +} + +rsRetVal qAddDisk(queue_t *pThis, void* pUsr) +{ + DEFiRet; + int i; + + assert(pThis != NULL); + dbgprintf("writing to file %d\n", pThis->tVars.disk.fd); + i = write(pThis->tVars.disk.fd, "entry\n", 6); + dbgprintf("write wrote %d bytes, errno: %d, err %s\n", i, errno, strerror(errno)); + +finalize_it: + return iRet; +} + +rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) +{ + DEFiRet; + + iRet = RS_RET_ERR; + + return iRet; +} + /* --------------- end type-specific handlers -------------------- */ @@ -207,12 +277,18 @@ queueDel(queue_t *pThis, void *pUsr) DEFiRet; assert(pThis != NULL); - CHKiRet(pThis->qDel(pThis, pUsr)); + + /* we do NOT abort if we encounter an error, because otherwise the queue + * will not be decremented, what will most probably result in an endless loop. + * If we decrement, however, we may lose a message. But that is better than + * losing the whole process because it loops... -- rgerhards, 2008-01-03 + */ + iRet = pThis->qDel(pThis, pUsr); --pThis->iQueueSize; - dbgprintf("Queue 0x%lx: entry deleted, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize); + dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n", + (unsigned long) pThis, iRet, pThis->iQueueSize); -finalize_it: return iRet; } @@ -230,6 +306,7 @@ finalize_it: static void * queueWorker(void *arg) { + DEFiRet; queue_t *pThis = (queue_t*) arg; void *pUsr; sigset_t sigSet; @@ -247,12 +324,22 @@ queueWorker(void *arg) } if(pThis->iQueueSize > 0) { /* dequeue element (still protected from mutex) */ - queueDel(pThis, &pUsr); + iRet = queueDel(pThis, &pUsr); pthread_mutex_unlock(pThis->mut); pthread_cond_signal (pThis->notFull); - /* do actual processing (the lengthy part, runs in parallel) */ - dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis); - pThis->pConsumer(pUsr); + /* do actual processing (the lengthy part, runs in parallel) + * If we had a problem while dequeing, we do not call the consumer, + * but we otherwise ignore it. This is in the hopes that it will be + * self-healing. Howerver, this is really not a good thing. + * rgerhards, 2008-01-03 + */ + if(iRet == RS_RET_OK) { + dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis); + pThis->pConsumer(pUsr); + } else { + dbgprintf("Queue 0x%lx: error %d dequeueing element - ignoring, but strange things " + "may happen\n", (unsigned long) pThis, iRet); + } } else { /* the mutex must be unlocked in any case (important for termination) */ pthread_mutex_unlock(pThis->mut); } @@ -276,7 +363,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, assert(ppThis != NULL); assert(pConsumer != NULL); - if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) { + if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } @@ -306,6 +393,12 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, pThis->qAdd = qAddLinkedList; pThis->qDel = qDelLinkedList; break; + case QUEUETYPE_DISK: + pThis->qConstruct = qConstructDisk; + pThis->qDestruct = qDestructDisk; + pThis->qAdd = qAddDisk; + pThis->qDel = qDelDisk; + break; } /* call type-specific constructor */ -- cgit