diff options
-rw-r--r-- | cfsysline.c | 92 | ||||
-rw-r--r-- | cfsysline.h | 1 | ||||
-rw-r--r-- | queue.c | 32 | ||||
-rw-r--r-- | queue.h | 5 | ||||
-rw-r--r-- | rsyslog.h | 1 | ||||
-rw-r--r-- | syslogd.c | 21 |
6 files changed, 135 insertions, 17 deletions
diff --git a/cfsysline.c b/cfsysline.c index ff44af68..337d362a 100644 --- a/cfsysline.c +++ b/cfsysline.c @@ -96,17 +96,20 @@ finalize_it: } -/* Parse a number from the configuration line. - * rgerhards, 2007-07-31 +/* Parse a number from the configuration line. This functions just parses + * the number and does NOT call any handlers or set any values. It is just + * for INTERNAL USE by other parse functions! + * rgerhards, 2008-01-08 */ -static rsRetVal doGetInt(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *pVal) +static rsRetVal parseIntVal(uchar **pp, size_t *pVal) { uchar *p; DEFiRet; - int i; + size_t i; assert(pp != NULL); assert(*pp != NULL); + assert(pVal != NULL); skipWhiteSpace(pp); /* skip over any whitespace */ p = *pp; @@ -121,12 +124,35 @@ static rsRetVal doGetInt(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p for(i = 0 ; *p && isdigit((int) *p) ; ++p) i = i * 10 + *p - '0'; + *pVal = i; + *pp = p; + +finalize_it: + return iRet; +} + + +/* Parse a number from the configuration line. + * rgerhards, 2007-07-31 + */ +static rsRetVal doGetInt(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *pVal) +{ + uchar *p; + DEFiRet; + size_t i; + + assert(pp != NULL); + assert(*pp != NULL); + + CHKiRet(parseIntVal(pp, &i)); + p = *pp; + if(pSetHdlr == NULL) { /* we should set value directly to var */ - *((int*)pVal) = i; + *((int*)pVal) = (int) i; } else { /* we set value via a set function */ - CHKiRet(pSetHdlr(pVal, i)); + CHKiRet(pSetHdlr(pVal, (int) i)); } *pp = p; @@ -136,6 +162,57 @@ finalize_it: } +/* Parse a size from the configuration line. This is basically an integer + * syntax, but modifiers may be added after the integer (e.g. 1k to mean + * 1024). The size must immediately follow the number. Note that the + * param value must be size_t! + * rgerhards, 2008-01-09 + */ +static rsRetVal doGetSize(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *pVal) +{ + DEFiRet; + size_t i; + + assert(pp != NULL); + assert(*pp != NULL); + + CHKiRet(parseIntVal(pp, &i)); + + /* we now check if the next character is one of our known modifiers. + * If so, we accept it as such. If not, we leave it alone. tera and + * above does not make any sense as that is above a 32-bit int value. + */ + switch(**pp) { + /* traditional binary-based definitions */ + case 'k': i *= 1024; ++(*pp); break; + case 'm': i *= 1024 * 1024; ++(*pp); break; + case 'g': i *= 1024 * 1024 * 1024; ++(*pp); break; + case 't': i *= (size_t) 1024 * 1024 * 1024 * 1024; ++(*pp); break; /* tera */ + case 'p': i *= (size_t) 1024 * 1024 * 1024 * 1024 * 1024; ++(*pp); break; /* peta */ + case 'e': i *= (size_t) 1024 * 1024 * 1024 * 1024 * 1024 * 1024; ++(*pp); break; /* exa */ + /* and now the "new" 1000-based definitions */ + case 'K': i *= 1000; ++(*pp); break; + case 'M': i *= 10000; ++(*pp); break; + case 'G': i *= 100000; ++(*pp); break; + case 'T': i *= 1000000; ++(*pp); break; /* tera */ + case 'P': i *= 10000000; ++(*pp); break; /* peta */ + case 'E': i *= 100000000; ++(*pp); break; /* exa */ + } + + /* done */ + if(pSetHdlr == NULL) { + /* we should set value directly to var */ + *((size_t*)pVal) = i; + } else { + /* we set value via a set function */ + CHKiRet(pSetHdlr(pVal, i)); + } + +finalize_it: + return iRet; +} + + /* Parse and interpet a $FileCreateMode and $umask line. This function * pulls the creation mode and, if successful, stores it * into the global variable so that the rest of rsyslogd @@ -510,6 +587,9 @@ static rsRetVal cslchCallHdlr(cslCmdHdlr_t *pThis, uchar **ppConfLine) case eCmdHdlrInt: pHdlr = doGetInt; break; + case eCmdHdlrSize: + pHdlr = doGetSize; + break; case eCmdHdlrGetChar: pHdlr = doGetChar; break; diff --git a/cfsysline.h b/cfsysline.h index 5c7c643a..2aa54595 100644 --- a/cfsysline.h +++ b/cfsysline.h @@ -35,6 +35,7 @@ typedef enum cslCmdHdlrType { eCmdHdlrBinary, eCmdHdlrFileCreateMode, eCmdHdlrInt, + eCmdHdlrSize, eCmdHdlrGetChar, eCmdHdlrGetWord } ecslCmdHdrlType; @@ -380,7 +380,7 @@ static rsRetVal qConstructDisk(queue_t *pThis) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir); - pThis->tVars.disk.iMaxFileSize = 10240000; //1024 * 3; // TODO: configurable! + pThis->tVars.disk.iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->tVars.disk.fWrite.iCurrFileNum = 1; pThis->tVars.disk.fWrite.iCurrOffs = 0; @@ -712,6 +712,11 @@ rsRetVal queueStart(queue_t *pThis) int iState; int i; + assert(pThis != NULL); + + dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType, + pThis->tVars.disk.iMaxFileSize); + if(pThis->qType != QUEUETYPE_DIRECT) { if((pThis->pWorkerThreads = calloc(pThis->iNumWorkerThreads, sizeof(pthread_t))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -720,8 +725,8 @@ rsRetVal queueStart(queue_t *pThis) pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { iState = pthread_create(&(pThis->pWorkerThreads[i]), NULL, queueWorker, (void*) pThis); - dbgprintf("Worker thread %d for queue 0x%lx, type %d started with state %d.\n", - i, (unsigned long) pThis, (int) pThis->qType, iState); + dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", + (unsigned long) pThis, (unsigned) pThis->pWorkerThreads[i], i, iState); } } @@ -770,6 +775,27 @@ rsRetVal queueDestruct(queue_t *pThis) } +/* set the queue's maximum file size + * rgerhards, 2008-01-09 + */ +rsRetVal +queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) +{ + DEFiRet; + + assert(pThis != 0); + + if(iMaxFileSize < 1024) { + ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW); + } + + pThis->tVars.disk.iMaxFileSize = iMaxFileSize; + +finalize_it: + return iRet; +} + + /* enqueue a new user data element * Enqueues the new element and awakes worker thread. * TODO: this code still uses the "discard if queue full" approach from @@ -35,7 +35,7 @@ typedef struct { int fd; /* the file descriptor, -1 if closed */ uchar *pszFileName; /* name of current file (if open) */ int iCurrFileNum;/* current file number (NOT descriptor, but the number in the file name!) */ - int iCurrOffs; /* current offset */ + size_t iCurrOffs;/* current offset */ uchar *pIOBuf; /* io Buffer */ int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ int iBufPtr; /* pointer into current buffer */ @@ -93,7 +93,7 @@ typedef struct queue_s { uchar *pszFilePrefix; size_t lenFilePrefix; int iNumberFiles; /* how many files make up the queue? */ - int iMaxFileSize; /* max size for a single queue file */ + size_t iMaxFileSize; /* max size for a single queue file */ queueFileDescription_t fWrite; /* current file to be written */ queueFileDescription_t fRead; /* current file to be read */ } disk; @@ -104,6 +104,7 @@ typedef struct queue_s { rsRetVal queueDestruct(queue_t *pThis); rsRetVal queueEnqObj(queue_t *pThis, void *pUsr); rsRetVal queueStart(queue_t *pThis); +rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize); rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*)); @@ -103,6 +103,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_INVALID_PROPFRAME = -2032, /**< invalid framing in serialized property */ RS_RET_NO_PROPLINE = -2033, /**< line is not a property line */ RS_RET_INVALID_TRAILER = -2034, /**< invalid trailer */ + RS_RET_VALUE_TOO_LOW = -2035, /**< a provided value is too low */ RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */ RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */ RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */ @@ -381,6 +381,7 @@ static int logEveryMsg = 0;/* no repeat message processing - read-only after st */ uchar *pszSpoolDirectory = NULL;/* name of rsyslog's spool directory (without trailing slash) */ uchar *pszMainMsgQFilePrefix = NULL;/* prefix for the main message queue file */ +size_t iMainMsgQueMaxFileSize = 1024*1024; /* end global config file state variables */ static unsigned int Forwarding = 0; @@ -519,6 +520,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a pszMainMsgQFilePrefix = NULL; } iMainMsgQueueSize = 10000; + iMainMsgQueMaxFileSize = 1024 * 1024; iMainMsgQueueNumWorkers = 1; MainMsgQueType = QUEUETYPE_FIXED_ARRAY; @@ -3357,6 +3359,12 @@ init(void) fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); exit(1); } + /* ... set some properties ... */ + CHKiRet_Hdlr(queueSetMaxFileSize(pMsgQueue, iMainMsgQueMaxFileSize)) { + logerrorInt("Invalid $MainMsgQueueMaxFileSize, error %d. Ignored, running with default setting", iRet); + } + + /* ... and finally start the queue! */ CHKiRet_Hdlr(queueStart(pMsgQueue)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet); @@ -4278,17 +4286,17 @@ dbgprintf(char *fmt, ...) * rgerhards, 2007-06-15 */ if(bWasNL) { - //fprintf(stdout, "%8.8x: ", (unsigned int) pthread_self()); - fprintf(stderr, "%8.8x: ", (unsigned int) pthread_self()); + fprintf(stdout, "%8.8x: ", (unsigned int) pthread_self()); + //fprintf(stderr, "%8.8x: ", (unsigned int) pthread_self()); } bWasNL = (*(fmt + strlen(fmt) - 1) == '\n') ? TRUE : FALSE; va_start(ap, fmt); - //vfprintf(stdout, fmt, ap); - vfprintf(stderr, fmt, ap); + vfprintf(stdout, fmt, ap); + //vfprintf(stderr, fmt, ap); va_end(ap); - fflush(stderr); - //fflush(stdout); + //fflush(stderr); + fflush(stdout); return; } @@ -4496,6 +4504,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL)); |