diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-03 17:37:28 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-03 17:37:28 +0000 |
commit | b95b5ab28407b75467c6cff63359cba9a0a3bd70 (patch) | |
tree | e5ec5564b97690f1a278f1d693f7390d5818623d | |
parent | 64de2f0d2e0dd61dc9703a4ffd62f41f5cf42caa (diff) | |
download | rsyslog-b95b5ab28407b75467c6cff63359cba9a0a3bd70.tar.gz rsyslog-b95b5ab28407b75467c6cff63359cba9a0a3bd70.tar.xz rsyslog-b95b5ab28407b75467c6cff63359cba9a0a3bd70.zip |
begun working on disk queueing (not completed, do not use this mode!)
-rw-r--r-- | queue.c | 109 | ||||
-rw-r--r-- | queue.h | 15 | ||||
-rwxr-xr-x | srUtils.c | 51 | ||||
-rwxr-xr-x | srUtils.h | 2 | ||||
-rw-r--r-- | syslogd.c | 3 |
5 files changed, 170 insertions, 10 deletions
@@ -30,10 +30,14 @@ #include <assert.h> #include <signal.h> #include <pthread.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> #include "rsyslog.h" #include "syslogd.h" #include "queue.h" +#include "srUtils.h" /* static data */ @@ -179,6 +183,72 @@ rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) return iRet; } + +/* -------------------- disk -------------------- */ + +rsRetVal qConstructDisk(queue_t *pThis) +{ + DEFiRet; + uchar *pszFile; + + assert(pThis != NULL); + + if((pThis->tVars.disk.pszSpoolDir = (uchar*) strdup((char*)pszSpoolDirectory)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir); + + /* now open the file */ + CHKiRet(genFileName(&pszFile, pThis->tVars.disk.pszSpoolDir, pThis->tVars.disk.lenSpoolDir, + (uchar*) "mainq", 5, 1, (uchar*) "qf", 2)); + + dbgprintf("Queue 0x%lx: opening file '%s'\n", pThis, pszFile); + + pThis->tVars.disk.fd = open((char*)pszFile, O_RDWR|O_CREAT, 0600); + dbgprintf("opened file %d\n", pThis->tVars.disk.fd); + +finalize_it: + if(pThis->tVars.disk.pszSpoolDir != NULL) + free(pThis->tVars.disk.pszSpoolDir); + + return iRet; +} + + +rsRetVal qDestructDisk(queue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + close(pThis->tVars.disk.fd); + + return iRet; +} + +rsRetVal qAddDisk(queue_t *pThis, void* pUsr) +{ + DEFiRet; + int i; + + assert(pThis != NULL); + dbgprintf("writing to file %d\n", pThis->tVars.disk.fd); + i = write(pThis->tVars.disk.fd, "entry\n", 6); + dbgprintf("write wrote %d bytes, errno: %d, err %s\n", i, errno, strerror(errno)); + +finalize_it: + return iRet; +} + +rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) +{ + DEFiRet; + + iRet = RS_RET_ERR; + + return iRet; +} + /* --------------- end type-specific handlers -------------------- */ @@ -207,12 +277,18 @@ queueDel(queue_t *pThis, void *pUsr) DEFiRet; assert(pThis != NULL); - CHKiRet(pThis->qDel(pThis, pUsr)); + + /* we do NOT abort if we encounter an error, because otherwise the queue + * will not be decremented, what will most probably result in an endless loop. + * If we decrement, however, we may lose a message. But that is better than + * losing the whole process because it loops... -- rgerhards, 2008-01-03 + */ + iRet = pThis->qDel(pThis, pUsr); --pThis->iQueueSize; - dbgprintf("Queue 0x%lx: entry deleted, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize); + dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n", + (unsigned long) pThis, iRet, pThis->iQueueSize); -finalize_it: return iRet; } @@ -230,6 +306,7 @@ finalize_it: static void * queueWorker(void *arg) { + DEFiRet; queue_t *pThis = (queue_t*) arg; void *pUsr; sigset_t sigSet; @@ -247,12 +324,22 @@ queueWorker(void *arg) } if(pThis->iQueueSize > 0) { /* dequeue element (still protected from mutex) */ - queueDel(pThis, &pUsr); + iRet = queueDel(pThis, &pUsr); pthread_mutex_unlock(pThis->mut); pthread_cond_signal (pThis->notFull); - /* do actual processing (the lengthy part, runs in parallel) */ - dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis); - pThis->pConsumer(pUsr); + /* do actual processing (the lengthy part, runs in parallel) + * If we had a problem while dequeing, we do not call the consumer, + * but we otherwise ignore it. This is in the hopes that it will be + * self-healing. Howerver, this is really not a good thing. + * rgerhards, 2008-01-03 + */ + if(iRet == RS_RET_OK) { + dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis); + pThis->pConsumer(pUsr); + } else { + dbgprintf("Queue 0x%lx: error %d dequeueing element - ignoring, but strange things " + "may happen\n", (unsigned long) pThis, iRet); + } } else { /* the mutex must be unlocked in any case (important for termination) */ pthread_mutex_unlock(pThis->mut); } @@ -276,7 +363,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, assert(ppThis != NULL); assert(pConsumer != NULL); - if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) { + if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } @@ -306,6 +393,12 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, pThis->qAdd = qAddLinkedList; pThis->qDel = qDelLinkedList; break; + case QUEUETYPE_DISK: + pThis->qConstruct = qConstructDisk; + pThis->qDestruct = qDestructDisk; + pThis->qAdd = qAddDisk; + pThis->qDel = qDelDisk; + break; } /* call type-specific constructor */ @@ -28,7 +28,8 @@ /* queue types */ typedef enum { QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */ - QUEUETYPE_LINKEDLIST = 1 /* linked list used as buffer, lower fixed memory overhead but slower */ + QUEUETYPE_LINKEDLIST = 1, /* linked list used as buffer, lower fixed memory overhead but slower */ + QUEUETYPE_DISK = 2 /* disk files used as buffer */ } queueType_t; /* list member definition for linked list types of queues: */ @@ -64,6 +65,18 @@ typedef struct queue_s { qLinkedList_t *pRoot; qLinkedList_t *pLast; } linklist; + struct { + rsRetVal (*serializer)(uchar **ppOutBuf, size_t *lenBuf, void *pUsr); + rsRetVal (*deSerializer)(void *ppUsr, uchar *ppBuf, size_t lenBuf); + uchar *pszSpoolDir; + size_t lenSpoolDir; + uchar *pszFilePrefix; + size_t lenFilePrefix; + int iCurrFileNum; /* number of file currently processed */ + int fd; /* current file descriptor */ + long iWritePos; /* next write position offset */ + long iReadPos; /* next read position offset */ + } disk; } tVars; } queue_t; @@ -7,7 +7,7 @@ * \date 2003-09-09 * Coding begun. * - * Copyright 2003-2007 Rainer Gerhards and Adiscon GmbH. + * Copyright 2003-2008 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -238,6 +238,55 @@ void skipWhiteSpace(uchar **pp) } +/* generate a file name from four parts: + * <directory name>/<prefix>-<number>.<type> + * If number is negative, it is not used. If any of the strings is + * NULL, an empty string is used instead. Length must be provided. + * rgerhards, 2008-01-03 + */ +rsRetVal genFileName(uchar **ppName, uchar *pDirName, size_t lenDirName, + uchar *pPrefix, size_t lenPrefix, long lNum, uchar *pType, size_t lenType) +{ + DEFiRet; + uchar *pName; + uchar *pNameWork; + size_t lenName; + uchar szBuf[128]; /* buffer for number */ + size_t lenBuf; + + if(lNum < 0) { + szBuf[0] = '\0'; + lenBuf = 0; + } else { + lenBuf = snprintf((char*)szBuf, sizeof(szBuf), "-%ld", lNum); + } + + lenName = lenDirName + 1 + lenPrefix + lenBuf + 1 + lenType + 1; /* last +1 for \0 char! */ + if((pName = malloc(sizeof(uchar) * lenName)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + /* got memory, now construct string */ + memcpy(pName, pDirName, lenDirName); + pNameWork = pName + lenDirName; + *pNameWork++ = '/'; + memcpy(pNameWork, pPrefix, lenPrefix); + pNameWork += lenPrefix; + if(lenBuf > 0) { + memcpy(pNameWork, szBuf, lenBuf); + pNameWork += lenBuf; + } + *pNameWork++ = '.'; + memcpy(pNameWork, pType, lenType); + pNameWork += lenType; + *pNameWork = '\0'; + + *ppName = pName; + +finalize_it: + return iRet; +} + + /* * vi:set ai: */ @@ -64,4 +64,6 @@ int makeFileParentDirs(uchar *szFile, size_t lenFile, mode_t mode, uid_t uid, gi int execProg(uchar *program, int wait, uchar *arg); void skipWhiteSpace(uchar **pp); +rsRetVal genFileName(uchar **ppName, uchar *pDirName, size_t lenDirName, + uchar *pPrefix, size_t lenPrefix, long lNum, uchar *pType, size_t lenType); #endif @@ -4205,6 +4205,9 @@ static rsRetVal setMainMsgQueType(void __attribute__((unused)) *pVal, uchar *psz } else if (!strcasecmp((char *) pszType, "linkedlist")) { MainMsgQueType = QUEUETYPE_LINKEDLIST; dbgprintf("main message queue type set to LINKEDLIST\n"); + } else if (!strcasecmp((char *) pszType, "disk")) { + MainMsgQueType = QUEUETYPE_DISK; + dbgprintf("main message queue type set to DISK\n"); } else { logerrorSz("unknown mainmessagequeuetype parameter: %s", (char *) pszType); iRet = RS_RET_INVALID_PARAMS; |