summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-09 08:25:25 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-09 08:25:25 +0000
commitfbd4ecdce40d1164c7cbdd55c672a83755e95482 (patch)
treed2e75ea5c1db839157e0e216685640a2e15451f7
parent0964658cc8a276bbdfce335d08898ee4097e87dc (diff)
downloadrsyslog-fbd4ecdce40d1164c7cbdd55c672a83755e95482.tar.gz
rsyslog-fbd4ecdce40d1164c7cbdd55c672a83755e95482.tar.xz
rsyslog-fbd4ecdce40d1164c7cbdd55c672a83755e95482.zip
- implemented new GetSize() handler for config files
- implemented $MainMsgQueueMaxFileSize configuration directive
-rw-r--r--cfsysline.c92
-rw-r--r--cfsysline.h1
-rw-r--r--queue.c32
-rw-r--r--queue.h5
-rw-r--r--rsyslog.h1
-rw-r--r--syslogd.c21
6 files changed, 135 insertions, 17 deletions
diff --git a/cfsysline.c b/cfsysline.c
index ff44af68..337d362a 100644
--- a/cfsysline.c
+++ b/cfsysline.c
@@ -96,17 +96,20 @@ finalize_it:
}
-/* Parse a number from the configuration line.
- * rgerhards, 2007-07-31
+/* Parse a number from the configuration line. This functions just parses
+ * the number and does NOT call any handlers or set any values. It is just
+ * for INTERNAL USE by other parse functions!
+ * rgerhards, 2008-01-08
*/
-static rsRetVal doGetInt(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *pVal)
+static rsRetVal parseIntVal(uchar **pp, size_t *pVal)
{
uchar *p;
DEFiRet;
- int i;
+ size_t i;
assert(pp != NULL);
assert(*pp != NULL);
+ assert(pVal != NULL);
skipWhiteSpace(pp); /* skip over any whitespace */
p = *pp;
@@ -121,12 +124,35 @@ static rsRetVal doGetInt(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p
for(i = 0 ; *p && isdigit((int) *p) ; ++p)
i = i * 10 + *p - '0';
+ *pVal = i;
+ *pp = p;
+
+finalize_it:
+ return iRet;
+}
+
+
+/* Parse a number from the configuration line.
+ * rgerhards, 2007-07-31
+ */
+static rsRetVal doGetInt(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *pVal)
+{
+ uchar *p;
+ DEFiRet;
+ size_t i;
+
+ assert(pp != NULL);
+ assert(*pp != NULL);
+
+ CHKiRet(parseIntVal(pp, &i));
+ p = *pp;
+
if(pSetHdlr == NULL) {
/* we should set value directly to var */
- *((int*)pVal) = i;
+ *((int*)pVal) = (int) i;
} else {
/* we set value via a set function */
- CHKiRet(pSetHdlr(pVal, i));
+ CHKiRet(pSetHdlr(pVal, (int) i));
}
*pp = p;
@@ -136,6 +162,57 @@ finalize_it:
}
+/* Parse a size from the configuration line. This is basically an integer
+ * syntax, but modifiers may be added after the integer (e.g. 1k to mean
+ * 1024). The size must immediately follow the number. Note that the
+ * param value must be size_t!
+ * rgerhards, 2008-01-09
+ */
+static rsRetVal doGetSize(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *pVal)
+{
+ DEFiRet;
+ size_t i;
+
+ assert(pp != NULL);
+ assert(*pp != NULL);
+
+ CHKiRet(parseIntVal(pp, &i));
+
+ /* we now check if the next character is one of our known modifiers.
+ * If so, we accept it as such. If not, we leave it alone. tera and
+ * above does not make any sense as that is above a 32-bit int value.
+ */
+ switch(**pp) {
+ /* traditional binary-based definitions */
+ case 'k': i *= 1024; ++(*pp); break;
+ case 'm': i *= 1024 * 1024; ++(*pp); break;
+ case 'g': i *= 1024 * 1024 * 1024; ++(*pp); break;
+ case 't': i *= (size_t) 1024 * 1024 * 1024 * 1024; ++(*pp); break; /* tera */
+ case 'p': i *= (size_t) 1024 * 1024 * 1024 * 1024 * 1024; ++(*pp); break; /* peta */
+ case 'e': i *= (size_t) 1024 * 1024 * 1024 * 1024 * 1024 * 1024; ++(*pp); break; /* exa */
+ /* and now the "new" 1000-based definitions */
+ case 'K': i *= 1000; ++(*pp); break;
+ case 'M': i *= 10000; ++(*pp); break;
+ case 'G': i *= 100000; ++(*pp); break;
+ case 'T': i *= 1000000; ++(*pp); break; /* tera */
+ case 'P': i *= 10000000; ++(*pp); break; /* peta */
+ case 'E': i *= 100000000; ++(*pp); break; /* exa */
+ }
+
+ /* done */
+ if(pSetHdlr == NULL) {
+ /* we should set value directly to var */
+ *((size_t*)pVal) = i;
+ } else {
+ /* we set value via a set function */
+ CHKiRet(pSetHdlr(pVal, i));
+ }
+
+finalize_it:
+ return iRet;
+}
+
+
/* Parse and interpet a $FileCreateMode and $umask line. This function
* pulls the creation mode and, if successful, stores it
* into the global variable so that the rest of rsyslogd
@@ -510,6 +587,9 @@ static rsRetVal cslchCallHdlr(cslCmdHdlr_t *pThis, uchar **ppConfLine)
case eCmdHdlrInt:
pHdlr = doGetInt;
break;
+ case eCmdHdlrSize:
+ pHdlr = doGetSize;
+ break;
case eCmdHdlrGetChar:
pHdlr = doGetChar;
break;
diff --git a/cfsysline.h b/cfsysline.h
index 5c7c643a..2aa54595 100644
--- a/cfsysline.h
+++ b/cfsysline.h
@@ -35,6 +35,7 @@ typedef enum cslCmdHdlrType {
eCmdHdlrBinary,
eCmdHdlrFileCreateMode,
eCmdHdlrInt,
+ eCmdHdlrSize,
eCmdHdlrGetChar,
eCmdHdlrGetWord
} ecslCmdHdrlType;
diff --git a/queue.c b/queue.c
index 7bd64d41..9b2bf7e2 100644
--- a/queue.c
+++ b/queue.c
@@ -380,7 +380,7 @@ static rsRetVal qConstructDisk(queue_t *pThis)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir);
- pThis->tVars.disk.iMaxFileSize = 10240000; //1024 * 3; // TODO: configurable!
+ pThis->tVars.disk.iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->tVars.disk.fWrite.iCurrFileNum = 1;
pThis->tVars.disk.fWrite.iCurrOffs = 0;
@@ -712,6 +712,11 @@ rsRetVal queueStart(queue_t *pThis)
int iState;
int i;
+ assert(pThis != NULL);
+
+ dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType,
+ pThis->tVars.disk.iMaxFileSize);
+
if(pThis->qType != QUEUETYPE_DIRECT) {
if((pThis->pWorkerThreads = calloc(pThis->iNumWorkerThreads, sizeof(pthread_t))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -720,8 +725,8 @@ rsRetVal queueStart(queue_t *pThis)
pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
iState = pthread_create(&(pThis->pWorkerThreads[i]), NULL, queueWorker, (void*) pThis);
- dbgprintf("Worker thread %d for queue 0x%lx, type %d started with state %d.\n",
- i, (unsigned long) pThis, (int) pThis->qType, iState);
+ dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
+ (unsigned long) pThis, (unsigned) pThis->pWorkerThreads[i], i, iState);
}
}
@@ -770,6 +775,27 @@ rsRetVal queueDestruct(queue_t *pThis)
}
+/* set the queue's maximum file size
+ * rgerhards, 2008-01-09
+ */
+rsRetVal
+queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize)
+{
+ DEFiRet;
+
+ assert(pThis != 0);
+
+ if(iMaxFileSize < 1024) {
+ ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW);
+ }
+
+ pThis->tVars.disk.iMaxFileSize = iMaxFileSize;
+
+finalize_it:
+ return iRet;
+}
+
+
/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
* TODO: this code still uses the "discard if queue full" approach from
diff --git a/queue.h b/queue.h
index eb8ef8f6..a2c904cc 100644
--- a/queue.h
+++ b/queue.h
@@ -35,7 +35,7 @@ typedef struct {
int fd; /* the file descriptor, -1 if closed */
uchar *pszFileName; /* name of current file (if open) */
int iCurrFileNum;/* current file number (NOT descriptor, but the number in the file name!) */
- int iCurrOffs; /* current offset */
+ size_t iCurrOffs;/* current offset */
uchar *pIOBuf; /* io Buffer */
int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
int iBufPtr; /* pointer into current buffer */
@@ -93,7 +93,7 @@ typedef struct queue_s {
uchar *pszFilePrefix;
size_t lenFilePrefix;
int iNumberFiles; /* how many files make up the queue? */
- int iMaxFileSize; /* max size for a single queue file */
+ size_t iMaxFileSize; /* max size for a single queue file */
queueFileDescription_t fWrite; /* current file to be written */
queueFileDescription_t fRead; /* current file to be read */
} disk;
@@ -104,6 +104,7 @@ typedef struct queue_s {
rsRetVal queueDestruct(queue_t *pThis);
rsRetVal queueEnqObj(queue_t *pThis, void *pUsr);
rsRetVal queueStart(queue_t *pThis);
+rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize);
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*));
diff --git a/rsyslog.h b/rsyslog.h
index b46b4b10..dc4a92ea 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -103,6 +103,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_INVALID_PROPFRAME = -2032, /**< invalid framing in serialized property */
RS_RET_NO_PROPLINE = -2033, /**< line is not a property line */
RS_RET_INVALID_TRAILER = -2034, /**< invalid trailer */
+ RS_RET_VALUE_TOO_LOW = -2035, /**< a provided value is too low */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
diff --git a/syslogd.c b/syslogd.c
index 79a0caa0..72edb732 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -381,6 +381,7 @@ static int logEveryMsg = 0;/* no repeat message processing - read-only after st
*/
uchar *pszSpoolDirectory = NULL;/* name of rsyslog's spool directory (without trailing slash) */
uchar *pszMainMsgQFilePrefix = NULL;/* prefix for the main message queue file */
+size_t iMainMsgQueMaxFileSize = 1024*1024;
/* end global config file state variables */
static unsigned int Forwarding = 0;
@@ -519,6 +520,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
pszMainMsgQFilePrefix = NULL;
}
iMainMsgQueueSize = 10000;
+ iMainMsgQueMaxFileSize = 1024 * 1024;
iMainMsgQueueNumWorkers = 1;
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
@@ -3357,6 +3359,12 @@ init(void)
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
exit(1);
}
+ /* ... set some properties ... */
+ CHKiRet_Hdlr(queueSetMaxFileSize(pMsgQueue, iMainMsgQueMaxFileSize)) {
+ logerrorInt("Invalid $MainMsgQueueMaxFileSize, error %d. Ignored, running with default setting", iRet);
+ }
+
+ /* ... and finally start the queue! */
CHKiRet_Hdlr(queueStart(pMsgQueue)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet);
@@ -4278,17 +4286,17 @@ dbgprintf(char *fmt, ...)
* rgerhards, 2007-06-15
*/
if(bWasNL) {
- //fprintf(stdout, "%8.8x: ", (unsigned int) pthread_self());
- fprintf(stderr, "%8.8x: ", (unsigned int) pthread_self());
+ fprintf(stdout, "%8.8x: ", (unsigned int) pthread_self());
+ //fprintf(stderr, "%8.8x: ", (unsigned int) pthread_self());
}
bWasNL = (*(fmt + strlen(fmt) - 1) == '\n') ? TRUE : FALSE;
va_start(ap, fmt);
- //vfprintf(stdout, fmt, ap);
- vfprintf(stderr, fmt, ap);
+ vfprintf(stdout, fmt, ap);
+ //vfprintf(stderr, fmt, ap);
va_end(ap);
- fflush(stderr);
- //fflush(stdout);
+ //fflush(stderr);
+ fflush(stdout);
return;
}
@@ -4496,6 +4504,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL));