diff options
Diffstat (limited to 'plugins')
32 files changed, 4160 insertions, 657 deletions
diff --git a/plugins/cust1/cust1.c b/plugins/cust1/cust1.c deleted file mode 100644 index e69de29b..00000000 --- a/plugins/cust1/cust1.c +++ /dev/null diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 6ea615a1..09742537 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -390,6 +390,7 @@ finalize_it: if(pOurTcpsrv != NULL) tcpsrv.Destruct(&pOurTcpsrv); } + free(pNewVal); RETiRet; } diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 440f5991..453b6b05 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -64,7 +64,11 @@ DEFobjCurrIf(strm) DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) -#define NUM_MULTISUB 1024 /* max number of submits -- TODO: make configurable */ +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + +#define NUM_MULTISUB 1024 /* default max number of submits */ +#define DFLT_PollInterval 10 + typedef struct fileInfo_s { uchar *pszFileName; uchar *pszTag; @@ -81,25 +85,49 @@ typedef struct fileInfo_s { multi_submit_t multiSub; } fileInfo_t; +static struct configSettings_s { + uchar *pszFileName; + uchar *pszFileTag; + uchar *pszStateFile; + uchar *pszBindRuleset; + int iPollInterval; + int iPersistStateInterval; /* how often if state file to be persisted? (default 0->never) */ + int iFacility; /* local0 */ + int iSeverity; /* notice, as of rfc 3164 */ + int readMode; /* mode to use for ReadMultiLine call */ + int maxLinesAtOnce; /* how many lines to process in a row? */ + ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ +} cs; + +struct instanceConf_s { + uchar *pszFileName; + uchar *pszTag; + uchar *pszStateFile; + uchar *pszBindRuleset; + int nMultiSub; + int iPersistStateInterval; + int iFacility; + int iSeverity; + int readMode; + int maxLinesAtOnce; + ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + struct instanceConf_s *next; +}; + /* forward definitions */ static rsRetVal persistStrmState(fileInfo_t *pInfo); +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); /* config variables */ struct modConfData_s { - EMPTY_STRUCT; + rsconf_t *pConf; /* our overall config object */ + int iPollInterval; /* number of seconds to sleep when there was no file activity */ + instanceConf_t *root, *tail; + sbool configSetViaV2Method; }; - -static uchar *pszFileName = NULL; -static uchar *pszFileTag = NULL; -static uchar *pszStateFile = NULL; -static int iPollInterval = 10; /* number of seconds to sleep when there was no file activity */ -static int iPersistStateInterval = 0; /* how often if state file to be persisted? (default 0->never) */ -static int iFacility = 128; /* local0 */ -static int iSeverity = 5; /* notice, as of rfc 3164 */ -static int readMode = 0; /* mode to use for ReadMultiLine call */ -static int maxLinesAtOnce = 10240; /* how many lines to process in a row? */ -static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */ #define MAX_INPUT_FILES 100 @@ -107,6 +135,37 @@ static fileInfo_t files[MAX_INPUT_FILES]; static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this input */ +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "pollinginterval", eCmdHdlrPositiveInt, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +/* input instance parameters */ +static struct cnfparamdescr inppdescr[] = { + { "file", eCmdHdlrString, CNFPARAM_REQUIRED }, + { "statefile", eCmdHdlrString, CNFPARAM_REQUIRED }, + { "tag", eCmdHdlrString, CNFPARAM_REQUIRED }, + { "severity", eCmdHdlrSeverity, 0 }, + { "facility", eCmdHdlrFacility, 0 }, + { "ruleset", eCmdHdlrString, 0 }, + { "readmode", eCmdHdlrInt, 0 }, + { "maxlinesatonce", eCmdHdlrInt, 0 }, + { "maxsubmitatonce", eCmdHdlrInt, 0 }, + { "persiststateinterval", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk inppblk = + { CNFPARAMBLK_VERSION, + sizeof(inppdescr)/sizeof(struct cnfparamdescr), + inppdescr + }; + +#include "im-helper.h" /* must be included AFTER the type definitions! */ + /* enqueue the read file line as a message. The provided string is * not freed - thuis must be done by the caller. */ @@ -268,6 +327,308 @@ finalize_it: #pragma GCC diagnostic warning "-Wempty-body" +/* create input instance, set default paramters, and + * add it to the list of instances. + */ +static rsRetVal +createInstance(instanceConf_t **pinst) +{ + instanceConf_t *inst; + DEFiRet; + CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + inst->next = NULL; + inst->pBindRuleset = NULL; + + inst->pszBindRuleset = NULL; + inst->pszFileName = NULL; + inst->pszTag = NULL; + inst->pszStateFile = NULL; + inst->nMultiSub = NUM_MULTISUB; + inst->iSeverity = 5; + inst->iFacility = 128; + inst->maxLinesAtOnce = 10240; + inst->iPersistStateInterval = 0; + inst->readMode = 0; + + /* node created, let's add to config */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = inst; + } else { + loadModConf->tail->next = inst; + loadModConf->tail = inst; + } + + *pinst = inst; +finalize_it: + RETiRet; +} + + +/* add a new monitor */ +static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) +{ + instanceConf_t *inst; + DEFiRet; + + if(cs.pszFileName == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "imfile error: no file name given, file monitor can not be created"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if(cs.pszFileTag == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "imfile error: no tag value given , file monitor can not be created"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if(cs.pszStateFile == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "imfile error: not state file name given, file monitor can not be created"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + CHKiRet(createInstance(&inst)); + if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) { + inst->pszBindRuleset = NULL; + } else { + CHKmalloc(inst->pszBindRuleset = ustrdup(cs.pszBindRuleset)); + } + inst->pszFileName = (uchar*) strdup((char*) cs.pszFileName); + inst->pszTag = (uchar*) strdup((char*) cs.pszFileTag); + inst->pszStateFile = (uchar*) strdup((char*) cs.pszStateFile); + inst->iSeverity = cs.iSeverity; + inst->iFacility = cs.iFacility; + inst->maxLinesAtOnce = cs.maxLinesAtOnce; + inst->iPersistStateInterval = cs.iPersistStateInterval; + inst->readMode = cs.readMode; + + /* reset legacy system */ + cs.iPersistStateInterval = 0; + resetConfigVariables(NULL, NULL); /* values are both dummies */ + +finalize_it: + free(pNewVal); /* we do not need it, but we must free it! */ + RETiRet; +} + + +/* This function is called when a new listener (monitor) shall be added. */ +static inline rsRetVal +addListner(instanceConf_t *inst) +{ + DEFiRet; + fileInfo_t *pThis; + + if(iFilPtr < MAX_INPUT_FILES) { + pThis = &files[iFilPtr]; + //TODO: optimize, save strdup? + pThis->pszFileName = (uchar*) strdup((char*) inst->pszFileName); + pThis->pszTag = (uchar*) strdup((char*) inst->pszTag); + pThis->lenTag = ustrlen(pThis->pszTag); + pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile); + + CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*))); + pThis->multiSub.maxElem = inst->nMultiSub; + pThis->multiSub.nElem = 0; + pThis->iSeverity = inst->iSeverity; + pThis->iFacility = inst->iFacility; + pThis->maxLinesAtOnce = inst->maxLinesAtOnce; + pThis->iPersistStateInterval = inst->iPersistStateInterval; + pThis->readMode = inst->readMode; + pThis->pRuleset = inst->pBindRuleset; + pThis->nRecords = 0; + } else { + errmsg.LogError(0, RS_RET_OUT_OF_DESRIPTORS, + "Too many file monitors configured - ignoring %s", + inst->pszFileName); + ABORT_FINALIZE(RS_RET_OUT_OF_DESRIPTORS); + } + ++iFilPtr; /* we got a new file to monitor */ + + resetConfigVariables(NULL, NULL); /* values are both dummies */ +finalize_it: + RETiRet; +} + + +BEGINnewInpInst + struct cnfparamvals *pvals; + instanceConf_t *inst; + int i; +CODESTARTnewInpInst + DBGPRINTF("newInpInst (imfile)\n"); + + pvals = nvlstGetParams(lst, &inppblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, + "imfile: required parameter are missing\n"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("input param blk in imfile:\n"); + cnfparamsPrint(&inppblk, pvals); + } + + CHKiRet(createInstance(&inst)); + + for(i = 0 ; i < inppblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(inppblk.descr[i].name, "file")) { + inst->pszFileName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "statefile")) { + inst->pszStateFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "tag")) { + inst->pszTag = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { + inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "severity")) { + inst->iSeverity = pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "facility")) { + inst->iSeverity = pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "readmode")) { + inst->readMode = pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "maxlinesatonce")) { + inst->maxLinesAtOnce = pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "persistStateInterval")) { + inst->iPersistStateInterval = pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "maxsubmitatonce")) { + inst->nMultiSub = pvals[i].val.d.n; + } else { + dbgprintf("imfile: program error, non-handled " + "param '%s'\n", inppblk.descr[i].name); + } + } +finalize_it: +CODE_STD_FINALIZERnewInpInst + cnfparamvalsDestruct(pvals, &inppblk); +ENDnewInpInst + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + /* init our settings */ + loadModConf->iPollInterval = DFLT_PollInterval; + loadModConf->configSetViaV2Method = 0; + bLegacyCnfModGlobalsPermitted = 1; + /* init legacy config vars */ + cs.pszFileName = NULL; + cs.pszFileTag = NULL; + cs.pszStateFile = NULL; + cs.iPollInterval = DFLT_PollInterval; + cs.iPersistStateInterval = 0; + cs.iFacility = 128; + cs.iSeverity = 5; + cs.readMode = 0; + cs.maxLinesAtOnce = 10240; + cs.pBindRuleset = NULL; +ENDbeginCnfLoad + + +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imfile: error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imfile:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "pollinginterval")) { + loadModConf->iPollInterval = (int) pvals[i].val.d.n; + } else { + dbgprintf("imfile: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* remove all of our legacy handlers, as they can not used in addition + * the the new-style config method. + */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + +BEGINendCnfLoad +CODESTARTendCnfLoad + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->iPollInterval = cs.iPollInterval; + } + dbgprintf("imfile: polling interval is %d\n", loadModConf->iPollInterval); + + loadModConf = NULL; /* done loading */ + /* free legacy config vars */ + free(cs.pszFileName); + free(cs.pszFileTag); + free(cs.pszStateFile); +ENDendCnfLoad + + +BEGINcheckCnf + instanceConf_t *inst; +CODESTARTcheckCnf + for(inst = pModConf->root ; inst != NULL ; inst = inst->next) { + std_checkRuleset(pModConf, inst); + } + if(pModConf->root == NULL) { + errmsg.LogError(0, RS_RET_NO_LISTNERS, + "imfile: no files configured to be monitored - " + "no input will be gathered"); + iRet = RS_RET_NO_LISTNERS; + } +ENDcheckCnf + + +/* note: we do access files AFTER we have dropped privileges. This is + * intentional, user must make sure the files have the right permissions. + */ +BEGINactivateCnf + instanceConf_t *inst; +CODESTARTactivateCnf + runModConf = pModConf; + for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { + addListner(inst); + } + /* if we could not set up any listners, there is no point in running... */ + if(iFilPtr == 0) { + errmsg.LogError(0, NO_ERRCODE, "imfile: no file monitors could be started, " + "input not activated.\n"); + ABORT_FINALIZE(RS_RET_NO_RUN); + } +finalize_it: +ENDactivateCnf + + +BEGINfreeCnf + instanceConf_t *inst, *del; +CODESTARTfreeCnf + for(inst = pModConf->root ; inst != NULL ; ) { + free(inst->pszBindRuleset); + free(inst->pszFileName); + free(inst->pszTag); + free(inst->pszStateFile); + del = inst; + inst = inst->next; + free(del); + } +ENDfreeCnf + + + /* This function is the cancel cleanup handler. It is called when rsyslog decides the * module must be stopped, what most probably happens during shutdown of rsyslogd. When * this function is called, the runInput() function (below) is already terminated - somewhere @@ -328,7 +689,7 @@ CODESTARTrunInput * other valid scenario. So do not remove. -- rgerhards, 2008-02-14 */ if(glbl.GetGlobalInputTermState() == 0) - srSleep(iPollInterval, 10); + srSleep(runModConf->iPollInterval, 10); } DBGPRINTF("imfile: terminating upon request of rsyslog core\n"); @@ -350,16 +711,6 @@ ENDrunInput */ BEGINwillRun CODESTARTwillRun - /* free config variables we do no longer needed */ - free(pszFileName); - free(pszFileTag); - free(pszStateFile); - - if(iFilPtr == 0) { - errmsg.LogError(0, RS_RET_NO_RUN, "No files configured to be monitored"); - ABORT_FINALIZE(RS_RET_NO_RUN); - } - /* we need to create the inputName property (only once during our lifetime) */ CHKiRet(prop.Construct(&pInputName)); CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imfile"), sizeof("imfile") - 1)); @@ -458,6 +809,9 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES +CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -468,119 +822,37 @@ ENDqueryEtryPt * but in general this is not necessary. Once runInput() has been called, this * function here is never again called. */ -static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { DEFiRet; - if(pszFileName != NULL) { - free(pszFileName); - pszFileName = NULL; - } - - if(pszFileTag != NULL) { - free(pszFileTag); - pszFileTag = NULL; - } - - if(pszStateFile != NULL) { - free(pszFileTag); - pszFileTag = NULL; - } - + free(cs.pszFileName); + cs.pszFileName = NULL; + free(cs.pszFileTag); + cs.pszFileTag = NULL; + free(cs.pszFileTag); + cs.pszFileTag = NULL; /* set defaults... */ - iPollInterval = 10; - iFacility = 128; /* local0 */ - iSeverity = 5; /* notice, as of rfc 3164 */ - readMode = 0; - pBindRuleset = NULL; - maxLinesAtOnce = 10240; + cs.iPollInterval = DFLT_PollInterval; + cs.iFacility = 128; /* local0 */ + cs.iSeverity = 5; /* notice, as of rfc 3164 */ + cs.readMode = 0; + cs.pBindRuleset = NULL; + cs.maxLinesAtOnce = 10240; RETiRet; } - -/* add a new monitor */ -static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal) +static inline void +std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst) { - DEFiRet; - fileInfo_t *pThis; - - free(pNewVal); /* we do not need it, but we must free it! */ - - if(iFilPtr < MAX_INPUT_FILES) { - pThis = &files[iFilPtr]; - /* TODO: check for strdup() NULL return */ - if(pszFileName == NULL) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "imfile error: no file name given, file monitor can not be created"); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } else { - pThis->pszFileName = (uchar*) strdup((char*) pszFileName); - } - - if(pszFileTag == NULL) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "imfile error: no tag value given , file monitor can not be created"); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } else { - pThis->pszTag = (uchar*) strdup((char*) pszFileTag); - pThis->lenTag = ustrlen(pThis->pszTag); - } - - if(pszStateFile == NULL) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "imfile error: not state file name given, file monitor can not be created"); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } else { - pThis->pszStateFile = (uchar*) strdup((char*) pszStateFile); - } - - CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(NUM_MULTISUB * sizeof(msg_t*))); - pThis->multiSub.maxElem = NUM_MULTISUB; - pThis->multiSub.nElem = 0; - pThis->iSeverity = iSeverity; - pThis->iFacility = iFacility; - pThis->maxLinesAtOnce = maxLinesAtOnce; - pThis->iPersistStateInterval = iPersistStateInterval; - pThis->nRecords = 0; - pThis->readMode = readMode; - pThis->pRuleset = pBindRuleset; - iPersistStateInterval = 0; - } else { - errmsg.LogError(0, RS_RET_OUT_OF_DESRIPTORS, "Too many file monitors configured - ignoring this one"); - ABORT_FINALIZE(RS_RET_OUT_OF_DESRIPTORS); - } - - CHKiRet(resetConfigVariables((uchar*) "dummy", (void*) pThis)); /* values are both dummies */ - -finalize_it: - if(iRet == RS_RET_OK) - ++iFilPtr; /* we got a new file to monitor */ - - RETiRet; + errmsg.LogError(0, NO_ERRCODE, "imfile: ruleset '%s' for %s not found - " + "using default ruleset instead", inst->pszBindRuleset, + inst->pszFileName); } - -/* accept a new ruleset to bind. Checks if it exists and complains, if not */ -static rsRetVal -setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) -{ - ruleset_t *pRuleset; - rsRetVal localRet; - DEFiRet; - - localRet = ruleset.GetRuleset(ourConf, &pRuleset, pszName); - if(localRet == RS_RET_NOT_FOUND) { - errmsg.LogError(0, NO_ERRCODE, "error: ruleset '%s' not found - ignored", pszName); - } - CHKiRet(localRet); - pBindRuleset = pRuleset; - DBGPRINTF("imfile current bind ruleset %p: '%s'\n", pRuleset, pszName); - -finalize_it: - free(pszName); /* no longer needed */ - RETiRet; -} - - /* modInit() is called once the module is loaded. It must perform all module-wide * initialization tasks. There are also a number of housekeeping tasks that the * framework requires. These are handled by the macros. Please note that the @@ -603,28 +875,31 @@ CODEmodInit_QueryRegCFSLineHdlr DBGPRINTF("imfile: version %s initializing\n", VERSION); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilename", 0, eCmdHdlrGetWord, - NULL, &pszFileName, STD_LOADABLE_MODULE_ID)); + NULL, &cs.pszFileName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfiletag", 0, eCmdHdlrGetWord, - NULL, &pszFileTag, STD_LOADABLE_MODULE_ID)); + NULL, &cs.pszFileTag, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilestatefile", 0, eCmdHdlrGetWord, - NULL, &pszStateFile, STD_LOADABLE_MODULE_ID)); + NULL, &cs.pszStateFile, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfileseverity", 0, eCmdHdlrSeverity, - NULL, &iSeverity, STD_LOADABLE_MODULE_ID)); + NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilefacility", 0, eCmdHdlrFacility, - NULL, &iFacility, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepollinterval", 0, eCmdHdlrInt, - NULL, &iPollInterval, STD_LOADABLE_MODULE_ID)); + NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt, - NULL, &readMode, STD_LOADABLE_MODULE_ID)); + NULL, &cs.readMode, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilemaxlinesatonce", 0, eCmdHdlrSize, - NULL, &maxLinesAtOnce, STD_LOADABLE_MODULE_ID)); + NULL, &cs.maxLinesAtOnce, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt, - NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID)); + NULL, &cs.iPersistStateInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord, - setRuleset, NULL, STD_LOADABLE_MODULE_ID)); + NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); /* that command ads a new file! */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputrunfilemonitor", 0, eCmdHdlrGetWord, - addMonitor, NULL, STD_LOADABLE_MODULE_ID)); + addInstance, NULL, STD_LOADABLE_MODULE_ID)); + /* module-global config params - will be disabled in configs that are loaded + * via module(...). + */ + CHKiRet(regCfSysLineHdlr2((uchar *)"inputfilepollinterval", 0, eCmdHdlrInt, + NULL, &cs.iPollInterval, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c index f476c5ff..93323707 100644 --- a/plugins/imklog/imklog.c +++ b/plugins/imklog/imklog.c @@ -59,6 +59,7 @@ #include "net.h" #include "glbl.h" #include "prop.h" +#include "errmsg.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -71,22 +72,34 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(glbl) DEFobjCurrIf(prop) DEFobjCurrIf(net) +DEFobjCurrIf(errmsg) /* config settings */ typedef struct configSettings_s { - int dbgPrintSymbols; /* this one is extern so the helpers can access it! */ - int symbols_twice; - int use_syscall; - int symbol_lookup; /* on recent kernels > 2.6, the kernel does this */ int bPermitNonKernel; /* permit logging of messages not having LOG_KERN facility */ int iFacilIntMsg; /* the facility to use for internal messages (set by driver) */ uchar *pszPath; - int console_log_level; + int console_log_level; /* still used for BSD */ } configSettings_t; static configSettings_t cs; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "logpath", eCmdHdlrGetWord, 0 }, + { "permitnonkernelfacility", eCmdHdlrBinary, 0 }, + { "consoleloglevel", eCmdHdlrInt, 0 }, + { "internalmsgfacility", eCmdHdlrFacility, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */ @@ -96,10 +109,6 @@ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 static inline void initConfigSettings(void) { - cs.dbgPrintSymbols = 0; - cs.symbols_twice = 0; - cs.use_syscall = 0; - cs.symbol_lookup = 0; cs.bPermitNonKernel = 0; cs.console_log_level = -1; cs.pszPath = NULL; @@ -276,28 +285,77 @@ BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + pModConf->pszPath = NULL; + pModConf->bPermitNonKernel = 0; + pModConf->console_log_level = -1; + pModConf->iFacilIntMsg = klogFacilIntMsg(); + loadModConf->configSetViaV2Method = 0; + bLegacyCnfModGlobalsPermitted = 1; /* init legacy config vars */ initConfigSettings(); ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imklog:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "logpath")) { + loadModConf->pszPath = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "permitnonkernelfacility")) { + loadModConf->bPermitNonKernel = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "consoleloglevel")) { + loadModConf->console_log_level= (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "internalmsgfacility")) { + loadModConf->iFacilIntMsg = (int) pvals[i].val.d.n; + } else { + dbgprintf("imklog: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* disable legacy module-global config directives */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system */ - loadModConf->dbgPrintSymbols = cs.dbgPrintSymbols; - loadModConf->symbols_twice = cs.symbols_twice; - loadModConf->use_syscall = cs.use_syscall; - loadModConf->bPermitNonKernel = cs.bPermitNonKernel; - loadModConf->iFacilIntMsg = cs.iFacilIntMsg; - loadModConf->console_log_level = cs.console_log_level; - if((cs.pszPath == NULL) || (cs.pszPath[0] == '\0')) { - loadModConf->pszPath = NULL; - if(cs.pszPath != NULL) - free(cs.pszPath); - } else { - loadModConf->pszPath = cs.pszPath; + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->bPermitNonKernel = cs.bPermitNonKernel; + loadModConf->iFacilIntMsg = cs.iFacilIntMsg; + loadModConf->console_log_level = cs.console_log_level; + if((cs.pszPath == NULL) || (cs.pszPath[0] == '\0')) { + loadModConf->pszPath = NULL; + if(cs.pszPath != NULL) + free(cs.pszPath); + } else { + loadModConf->pszPath = cs.pszPath; + } + cs.pszPath = NULL; } - cs.pszPath = NULL; loadModConf = NULL; /* done loading */ ENDendCnfLoad @@ -348,6 +406,7 @@ CODESTARTmodExit objRelease(net, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(errmsg, CORE_COMPONENT); ENDmodExit @@ -355,15 +414,12 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES ENDqueryEtryPt static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { - cs.dbgPrintSymbols = 0; - cs.symbols_twice = 0; - cs.use_syscall = 0; - cs.symbol_lookup = 0; cs.bPermitNonKernel = 0; if(cs.pszPath != NULL) { free(cs.pszPath); @@ -381,6 +437,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(net, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* we need to create the inputName property (only once during our lifetime) */ CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imklog"), sizeof("imklog") - 1)); @@ -389,22 +446,22 @@ CODEmodInit_QueryRegCFSLineHdlr /* init legacy config settings */ initConfigSettings(); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"debugprintkernelsymbols", 0, eCmdHdlrBinary, - NULL, &cs.dbgPrintSymbols, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogpath", 0, eCmdHdlrGetWord, - NULL, &cs.pszPath, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbollookup", 0, eCmdHdlrBinary, - NULL, &cs.symbol_lookup, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbolstwice", 0, eCmdHdlrBinary, - NULL, &cs.symbols_twice, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogusesyscallinterface", 0, eCmdHdlrBinary, - NULL, &cs.use_syscall, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogpermitnonkernelfacility", 0, eCmdHdlrBinary, - NULL, &cs.bPermitNonKernel, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogconsoleloglevel", 0, eCmdHdlrInt, - NULL, &cs.console_log_level, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"kloginternalmsgfacility", 0, eCmdHdlrFacility, - NULL, &cs.iFacilIntMsg, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"debugprintkernelsymbols", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"klogpath", 0, eCmdHdlrGetWord, + NULL, &cs.pszPath, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbollookup", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbolstwice", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogusesyscallinterface", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"klogpermitnonkernelfacility", 0, eCmdHdlrBinary, + NULL, &cs.bPermitNonKernel, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"klogconsoleloglevel", 0, eCmdHdlrInt, + NULL, &cs.console_log_level, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"kloginternalmsgfacility", 0, eCmdHdlrFacility, + NULL, &cs.iFacilIntMsg, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/imklog/imklog.h b/plugins/imklog/imklog.h index 795dd68c..acfb50ab 100644 --- a/plugins/imklog/imklog.h +++ b/plugins/imklog/imklog.h @@ -31,15 +31,12 @@ /* we need to have the modConf type present in all submodules */ struct modConfData_s { - int dbgPrintSymbols; - int symbols_twice; - int use_syscall; - int symbol_lookup; - int bPermitNonKernel; + rsconf_t *pConf; int iFacilIntMsg; uchar *pszPath; int console_log_level; - rsconf_t *pConf; + sbool bPermitNonKernel; + sbool configSetViaV2Method; }; /* interface to "drivers" diff --git a/plugins/imkmsg/Makefile.am b/plugins/imkmsg/Makefile.am new file mode 100644 index 00000000..87c177d2 --- /dev/null +++ b/plugins/imkmsg/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = imkmsg.la +imkmsg_la_SOURCES = imkmsg.c imkmsg.h + +imkmsg_la_SOURCES += kmsg.c + +imkmsg_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) +imkmsg_la_LDFLAGS = -module -avoid-version +imkmsg_la_LIBADD = diff --git a/plugins/imkmsg/imkmsg.c b/plugins/imkmsg/imkmsg.c new file mode 100644 index 00000000..2a97f82d --- /dev/null +++ b/plugins/imkmsg/imkmsg.c @@ -0,0 +1,295 @@ +/* The kernel log module. + * + * This is rsyslog Linux only module for reading structured kernel logs. + * Module is based on imklog module so it retains its structure + * and other part is currently in kmsg.c file instead of this (imkmsg.c) + * For more information see that file. + * + * To test under Linux: + * echo test1 > /dev/kmsg + * + * Copyright (C) 2008-2012 Adiscon GmbH + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <assert.h> +#include <string.h> +#include <stdarg.h> +#include <ctype.h> +#include <stdlib.h> +#include <sys/socket.h> + +#include "dirty.h" +#include "cfsysline.h" +#include "obj.h" +#include "msg.h" +#include "module-template.h" +#include "datetime.h" +#include "imkmsg.h" +#include "net.h" +#include "glbl.h" +#include "prop.h" +#include "errmsg.h" +#include "unicode-helper.h" + +MODULE_TYPE_INPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("imkmsg") + +/* Module static data */ +DEF_IMOD_STATIC_DATA +DEFobjCurrIf(datetime) +DEFobjCurrIf(glbl) +DEFobjCurrIf(prop) +DEFobjCurrIf(net) +DEFobjCurrIf(errmsg) + +/* config settings */ +typedef struct configSettings_s { + int iFacilIntMsg; /* the facility to use for internal messages (set by driver) */ +} configSettings_t; +static configSettings_t cs; + +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + +static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */ +static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */ + +static inline void +initConfigSettings(void) +{ + cs.iFacilIntMsg = klogFacilIntMsg(); +} + + +/* enqueue the the kernel message into the message queue. + * The provided msg string is not freed - thus must be done + * by the caller. + * rgerhards, 2008-04-12 + */ +static rsRetVal +enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *tp, struct json_object *json) +{ + struct syslogTime st; + msg_t *pMsg; + DEFiRet; + + assert(msg != NULL); + assert(pszTag != NULL); + + if(tp == NULL) { + CHKiRet(msgConstruct(&pMsg)); + } else { + datetime.timeval2syslogTime(tp, &st); + CHKiRet(msgConstructWithTime(&pMsg, &st, tp->tv_sec)); + } + MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); + MsgSetInputName(pMsg, pInputName); + MsgSetRawMsgWOSize(pMsg, (char*)msg); + MsgSetMSGoffs(pMsg, 0); /* we do not have a header... */ + MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp()); + MsgSetRcvFromIP(pMsg, pLocalHostIP); + MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); + MsgSetTAG(pMsg, pszTag, ustrlen(pszTag)); + pMsg->iFacility = iFacility; + pMsg->iSeverity = iSeverity; + pMsg->json = json; + CHKiRet(submitMsg(pMsg)); + +finalize_it: + RETiRet; +} + + +/* log an imkmsg-internal message + * rgerhards, 2008-04-14 + */ +rsRetVal imkmsgLogIntMsg(int priority, char *fmt, ...) +{ + DEFiRet; + va_list ap; + uchar msgBuf[2048]; /* we use the same size as sysklogd to remain compatible */ + + va_start(ap, fmt); + vsnprintf((char*)msgBuf, sizeof(msgBuf) / sizeof(char), fmt, ap); + va_end(ap); + + logmsgInternal(NO_ERRCODE ,priority, msgBuf, 0); + + RETiRet; +} + + +/* log a message from /dev/kmsg + */ +rsRetVal Syslog(int priority, uchar *pMsg, struct timeval *tp, struct json_object *json) +{ + DEFiRet; + iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority), tp, json); + RETiRet; +} + + +/* helper for some klog drivers which need to know the MaxLine global setting. They can + * not obtain it themselfs, because they are no modules and can not query the object hander. + * It would probably be a good idea to extend the interface to support it, but so far + * we create a (sufficiently valid) work-around. -- rgerhards, 2008-11-24 + */ +int klog_getMaxLine(void) +{ + return glbl.GetMaxLine(); +} + + +BEGINrunInput +CODESTARTrunInput + /* this is an endless loop - it is terminated when the thread is + * signalled to do so. This, however, is handled by the framework, + * right into the sleep below. + */ + while(!pThrd->bShallStop) { + /* klogLogKMsg() waits for the next kernel message, obtains it + * and then submits it to the rsyslog main queue. + * rgerhards, 2008-04-09 + */ + CHKiRet(klogLogKMsg(runModConf)); + } +finalize_it: +ENDrunInput + + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + /* init our settings */ + pModConf->iFacilIntMsg = klogFacilIntMsg(); + loadModConf->configSetViaV2Method = 0; + bLegacyCnfModGlobalsPermitted = 1; + /* init legacy config vars */ + initConfigSettings(); +ENDbeginCnfLoad + + +BEGINendCnfLoad +CODESTARTendCnfLoad + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->iFacilIntMsg = cs.iFacilIntMsg; + } + + loadModConf = NULL; /* done loading */ +ENDendCnfLoad + + +BEGINcheckCnf +CODESTARTcheckCnf +ENDcheckCnf + + +BEGINactivateCnfPrePrivDrop +CODESTARTactivateCnfPrePrivDrop + runModConf = pModConf; + iRet = klogWillRun(runModConf); +ENDactivateCnfPrePrivDrop + + +BEGINactivateCnf +CODESTARTactivateCnf +ENDactivateCnf + + +BEGINfreeCnf +CODESTARTfreeCnf +ENDfreeCnf + + +BEGINwillRun +CODESTARTwillRun +ENDwillRun + + +BEGINafterRun +CODESTARTafterRun + iRet = klogAfterRun(runModConf); +ENDafterRun + + +BEGINmodExit +CODESTARTmodExit + if(pInputName != NULL) + prop.Destruct(&pInputName); + if(pLocalHostIP != NULL) + prop.Destruct(&pLocalHostIP); + + /* release objects we used */ + objRelease(glbl, CORE_COMPONENT); + objRelease(net, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(errmsg, CORE_COMPONENT); +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +ENDqueryEtryPt + +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + cs.iFacilIntMsg = klogFacilIntMsg(); + return RS_RET_OK; +} + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(net, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + + /* we need to create the inputName property (only once during our lifetime) */ + CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imkmsg"), sizeof("imkmsg") - 1)); + CHKiRet(prop.CreateStringProp(&pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); + + /* init legacy config settings */ + initConfigSettings(); + + CHKiRet(omsdRegCFSLineHdlr((uchar *)"debugprintkernelsymbols", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbollookup", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbolstwice", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogusesyscallinterface", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, + resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); +ENDmodInit +/* vim:set ai: + */ diff --git a/plugins/imkmsg/imkmsg.h b/plugins/imkmsg/imkmsg.h new file mode 100644 index 00000000..220a1634 --- /dev/null +++ b/plugins/imkmsg/imkmsg.h @@ -0,0 +1,64 @@ +/* imkmsg.h + * These are the definitions for the kmsg message generation module. + * + * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef IMKLOG_H_INCLUDED +#define IMKLOG_H_INCLUDED 1 + +#include "rsyslog.h" +#include "dirty.h" + +/* we need to have the modConf type present in all submodules */ +struct modConfData_s { + rsconf_t *pConf; + int iFacilIntMsg; + uchar *pszPath; + int console_log_level; + sbool bPermitNonKernel; + sbool configSetViaV2Method; +}; + +/* interface to "drivers" + * the platform specific drivers must implement these entry points. Only one + * driver may be active at any given time, thus we simply rely on the linker + * to resolve the addresses. + * rgerhards, 2008-04-09 + */ +rsRetVal klogLogKMsg(modConfData_t *pModConf); +rsRetVal klogWillRun(modConfData_t *pModConf); +rsRetVal klogAfterRun(modConfData_t *pModConf); +int klogFacilIntMsg(); + +/* the functions below may be called by the drivers */ +rsRetVal imkmsgLogIntMsg(int priority, char *fmt, ...) __attribute__((format(printf,2, 3))); +rsRetVal Syslog(int priority, uchar *msg, struct timeval *tp, struct json_object *json); + +/* prototypes */ +extern int klog_getMaxLine(void); /* work-around for klog drivers to get configured max line size */ +extern int InitKsyms(modConfData_t*); +extern void DeinitKsyms(void); +extern int InitMsyms(void); +extern void DeinitMsyms(void); +extern char * ExpandKadds(char *, char *); +extern void SetParanoiaLevel(int); + +#endif /* #ifndef IMKLOG_H_INCLUDED */ +/* vi:set ai: + */ diff --git a/plugins/imkmsg/kmsg.c b/plugins/imkmsg/kmsg.c new file mode 100644 index 00000000..9ad98da4 --- /dev/null +++ b/plugins/imkmsg/kmsg.c @@ -0,0 +1,239 @@ +/* imkmsg driver for Linux /dev/kmsg structured logging + * + * This contains Linux-specific functionality to read /dev/kmsg + * For a general overview, see head comment in imkmsg.c. + * This is heavily based on imklog bsd.c file. + * + * Copyright 2008-2012 Adiscon GmbH + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif +#include <stdlib.h> +#include <time.h> +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include <string.h> +#include <ctype.h> +#include <sys/klog.h> +#include <json/json.h> + +#include "rsyslog.h" +#include "srUtils.h" +#include "debug.h" +#include "imkmsg.h" + +/* globals */ +static int fklog = -1; /* kernel log fd */ + +#ifndef _PATH_KLOG +# define _PATH_KLOG "/dev/kmsg" +#endif + +/* submit a message to imkmsg Syslog() API. In this function, we parse + * necessary information from kernel log line, and make json string + * from the rest. + */ +static void +submitSyslog(uchar *buf) +{ + long offs = 0; + struct timeval tv; + long int timestamp = 0; + struct timespec monotonic; + struct timespec realtime; + char name[1024]; + char value[1024]; + char msg[1024]; + int priority = 0; + long int sequnum = 0; + struct json_object *json = NULL, *jval; + + /* create new json object */ + json = json_object_new_object(); + + /* get priority */ + for (; isdigit(*buf); buf++) { + priority += (priority * 10) + (*buf - '0'); + } + buf++; + + /* get messages sequence number and add it to json */ + for (; isdigit(*buf); buf++) { + sequnum = (sequnum * 10) + (*buf - '0'); + } + buf++; /* skip , */ + jval = json_object_new_int(sequnum); + json_object_object_add(json, "sequnum", jval); + + /* get timestamp */ + for (; isdigit(*buf); buf++) { + timestamp += (timestamp * 10) + (*buf - '0'); + } + buf++; /* skip ; */ + + /* get message */ + offs = 0; + for (; *buf != '\n' && *buf != '\0'; buf++, offs++) { + msg[offs] = *buf; + } + msg[offs] = '\0'; + jval = json_object_new_string((char*)msg); + json_object_object_add(json, "msg", jval); + + if (*buf != '\0') /* message has appended properties, skip \n */ + buf++; + + while (strlen((char *)buf)) { + /* get name of the property */ + buf++; /* skip ' ' */ + offs = 0; + for (; *buf != '=' && *buf != ' '; buf++, offs++) { + name[offs] = *buf; + } + name[offs] = '\0'; + buf++; /* skip = or ' ' */; + + offs = 0; + for (; *buf != '\n' && *buf != '\0'; buf++, offs++) { + value[offs] = *buf; + } + value[offs] = '\0'; + if (*buf != '\0') { + buf++; /* another property, skip \n */ + } + + jval = json_object_new_string((char*)value); + json_object_object_add(json, name, jval); + } + + /* calculate timestamp */ + clock_gettime(CLOCK_MONOTONIC, &monotonic); + clock_gettime(CLOCK_REALTIME, &realtime); + tv.tv_sec = realtime.tv_sec + ((timestamp / 1000000l) - monotonic.tv_sec); + tv.tv_usec = (realtime.tv_nsec + ((timestamp / 1000000000l) - monotonic.tv_nsec)) / 1000; + + Syslog(priority, (uchar *)msg, &tv, json); +} + + +/* open the kernel log - will be called inside the willRun() imkmsg entry point + */ +rsRetVal +klogWillRun(modConfData_t *pModConf) +{ + char errmsg[2048]; + int r; + DEFiRet; + + fklog = open(_PATH_KLOG, O_RDONLY, 0); + if (fklog < 0) { + imkmsgLogIntMsg(RS_RET_ERR_OPEN_KLOG, "imkmsg: cannot open kernel log(%s): %s.", + _PATH_KLOG, rs_strerror_r(errno, errmsg, sizeof(errmsg))); + ABORT_FINALIZE(RS_RET_ERR_OPEN_KLOG); + } + + /* Set level of kernel console messaging.. */ + if(pModConf->console_log_level != -1) { + r = klogctl(8, NULL, pModConf->console_log_level); + if(r != 0) { + imkmsgLogIntMsg(LOG_WARNING, "imkmsg: cannot set console log level: %s", + rs_strerror_r(errno, errmsg, sizeof(errmsg))); + /* make sure we do not try to re-set! */ + pModConf->console_log_level = -1; + } + } + +finalize_it: + RETiRet; +} + +/* Read kernel log while data are available, each read() reads one + * record of printk buffer. + */ +static void +readkmsg(void) +{ + int i; + uchar pRcv[8096+1]; + char errmsg[2048]; + + for (;;) { + dbgprintf("imkmsg waiting for kernel log line\n"); + + /* every read() from the opened device node receives one record of the printk buffer */ + i = read(fklog, pRcv, 8096); + + if (i > 0) { + /* successful read of message of nonzero length */ + pRcv[i] = '\0'; + } else { + /* something went wrong - error or zero length message */ + if (i < 0 && errno != EINTR && errno != EAGAIN) { + /* error occured */ + imkmsgLogIntMsg(LOG_ERR, + "imkmsg: error reading kernel log - shutting down: %s", + rs_strerror_r(errno, errmsg, sizeof(errmsg))); + fklog = -1; + } + break; + } + + submitSyslog(pRcv); + } +} + + +/* to be called in the module's AfterRun entry point + * rgerhards, 2008-04-09 + */ +rsRetVal klogAfterRun(modConfData_t *pModConf) +{ + DEFiRet; + if(fklog != -1) + close(fklog); + /* Turn on logging of messages to console, but only if a log level was speficied */ + if(pModConf->console_log_level != -1) + klogctl(7, NULL, 0); + RETiRet; +} + + +/* to be called in the module's WillRun entry point, this is the main + * "message pull" mechanism. + * rgerhards, 2008-04-09 + */ +rsRetVal klogLogKMsg(modConfData_t __attribute__((unused)) *pModConf) +{ + DEFiRet; + readkmsg(); + RETiRet; +} + + +/* provide the (system-specific) default facility for internal messages + * rgerhards, 2008-04-14 + */ +int +klogFacilIntMsg(void) +{ + return LOG_SYSLOG; +} + diff --git a/plugins/immark/immark.c b/plugins/immark/immark.c index 273af021..0e946c0b 100644 --- a/plugins/immark/immark.c +++ b/plugins/immark/immark.c @@ -58,9 +58,26 @@ DEFobjCurrIf(errmsg) static int iMarkMessagePeriod = DEFAULT_MARK_PERIOD; struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ int iMarkMessagePeriod; + sbool configSetViaV2Method; }; +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "interval", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + + +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURENonCancelInputTermination) @@ -75,12 +92,57 @@ ENDafterRun BEGINbeginCnfLoad CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + /* init our settings */ + pModConf->iMarkMessagePeriod = DEFAULT_MARK_PERIOD; + loadModConf->configSetViaV2Method = 0; + bLegacyCnfModGlobalsPermitted = 1; ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imuxsock:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "interval")) { + loadModConf->iMarkMessagePeriod = (int) pvals[i].val.d.n; + } else { + dbgprintf("imuxsock: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* disable legacy module-global config directives */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINendCnfLoad CODESTARTendCnfLoad - pModConf->iMarkMessagePeriod = iMarkMessagePeriod; + if(!loadModConf->configSetViaV2Method) { + pModConf->iMarkMessagePeriod = iMarkMessagePeriod; + } ENDendCnfLoad @@ -97,6 +159,7 @@ ENDcheckCnf BEGINactivateCnf CODESTARTactivateCnf MarkInterval = pModConf->iMarkMessagePeriod; + DBGPRINTF("immark set MarkInterval to %d\n", MarkInterval); ENDactivateCnf @@ -150,6 +213,7 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -167,8 +231,8 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* legacy config handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"markmessageperiod", 0, eCmdHdlrInt, NULL, - &iMarkMessagePeriod, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"markmessageperiod", 0, eCmdHdlrInt, NULL, + &iMarkMessagePeriod, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 4fec8e70..62599969 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -59,6 +59,7 @@ typedef struct configSettings_s { int iFacility; int iSeverity; int bJSON; + int bCEE; } configSettings_t; struct modConfData_s { @@ -67,15 +68,39 @@ struct modConfData_s { int iFacility; int iSeverity; statsFmtType_t statsFmt; + sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ - static configSettings_t cs; - +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ static prop_t *pInputName = NULL; +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "interval", eCmdHdlrInt, 0 }, + { "facility", eCmdHdlrInt, 0 }, + { "severity", eCmdHdlrInt, 0 }, + { "format", eCmdHdlrGetWord, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +BEGINmodExit +CODESTARTmodExit + prop.Destruct(&pInputName); + /* release objects we used */ + objRelease(glbl, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURENonCancelInputTermination) @@ -89,6 +114,7 @@ initConfigSettings(void) cs.iFacility = DEFAULT_FACILITY; cs.iSeverity = DEFAULT_SEVERITY; cs.bJSON = 0; + cs.bCEE = 0; } @@ -146,18 +172,87 @@ BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + loadModConf->configSetViaV2Method = 0; + loadModConf->iStatsInterval = DEFAULT_STATS_PERIOD; + loadModConf->iFacility = DEFAULT_FACILITY; + loadModConf->iSeverity = DEFAULT_SEVERITY; + loadModConf->statsFmt = statsFmt_Legacy; + bLegacyCnfModGlobalsPermitted = 1; /* init legacy config vars */ initConfigSettings(); ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + char *mode; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for impstats:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "interval")) { + loadModConf->iStatsInterval = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "facility")) { + loadModConf->iFacility = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "severity")) { + loadModConf->iSeverity = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "format")) { + mode = es_str2cstr(pvals[i].val.d.estr, NULL); + if(!strcasecmp(mode, "json")) { + loadModConf->statsFmt = statsFmt_JSON; + } else if(!strcasecmp(mode, "cee")) { + loadModConf->statsFmt = statsFmt_CEE; + } else if(!strcasecmp(mode, "legacy")) { + loadModConf->statsFmt = statsFmt_Legacy; + } else { + errmsg.LogError(0, RS_RET_ERR, "impstats: invalid format %s", + mode); + } + free(mode); + } else { + dbgprintf("impstats: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + loadModConf->configSetViaV2Method = 1; + bLegacyCnfModGlobalsPermitted = 0; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system */ - loadModConf->iStatsInterval = cs.iStatsInterval; - loadModConf->iFacility = cs.iFacility; - loadModConf->iSeverity = cs.iSeverity; - loadModConf->statsFmt = cs.bJSON ? statsFmt_JSON : statsFmt_Legacy; + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->iStatsInterval = cs.iStatsInterval; + loadModConf->iFacility = cs.iFacility; + loadModConf->iSeverity = cs.iSeverity; + if (cs.bCEE == 1) { + loadModConf->statsFmt = statsFmt_CEE; + } else if (cs.bJSON == 1) { + loadModConf->statsFmt = statsFmt_JSON; + } else { + loadModConf->statsFmt = statsFmt_Legacy; + } + } ENDendCnfLoad @@ -165,7 +260,7 @@ BEGINcheckCnf CODESTARTcheckCnf if(pModConf->iStatsInterval == 0) { errmsg.LogError(0, NO_ERRCODE, "impstats: stats interval zero not permitted, using " - "defaul of %d seconds", DEFAULT_STATS_PERIOD); + "default of %d seconds", DEFAULT_STATS_PERIOD); pModConf->iStatsInterval = DEFAULT_STATS_PERIOD; } ENDcheckCnf @@ -217,22 +312,11 @@ CODESTARTafterRun ENDafterRun -BEGINmodExit -CODESTARTmodExit - prop.Destruct(&pInputName); - /* release objects we used */ - objRelease(glbl, CORE_COMPONENT); - objRelease(prop, CORE_COMPONENT); - objRelease(errmsg, CORE_COMPONENT); - objRelease(statsobj, CORE_COMPONENT); -ENDmodExit - - - BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -254,11 +338,12 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); /* the pstatsinverval is an alias to support a previous screwed-up syntax... */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatsinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatjson", 0, eCmdHdlrBinary, NULL, &cs.bJSON, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatsinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatjson", 0, eCmdHdlrBinary, NULL, &cs.bJSON, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatcee", 0, eCmdHdlrBinary, NULL, &cs.bCEE, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(prop.Construct(&pInputName)); diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index aa1ad81e..8150fc33 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -90,6 +90,8 @@ DEFobjCurrIf(statsobj) /* forward references */ static void * wrkr(void *myself); +#define DFLT_wrkrMax 2 + /* config settings */ typedef struct configSettings_s { int bKeepAlive; /* support keep-alive packets */ @@ -127,12 +129,45 @@ struct modConfData_s { rsconf_t *pConf; /* our overall config object */ instanceConf_t *root, *tail; int wrkrMax; + sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "threads", eCmdHdlrPositiveInt, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +/* input instance parameters */ +static struct cnfparamdescr inppdescr[] = { + { "port", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */ + { "address", eCmdHdlrString, 0 }, + { "name", eCmdHdlrString, 0 }, + { "ruleset", eCmdHdlrString, 0 }, + { "supportoctetcountedframing", eCmdHdlrBinary, 0 }, + { "notifyonconnectionclose", eCmdHdlrBinary, 0 }, + { "keepalive", eCmdHdlrBinary, 0 }, + { "keepalive.probes", eCmdHdlrInt, 0 }, + { "keepalive.time", eCmdHdlrInt, 0 }, + { "keepalive.interval", eCmdHdlrInt, 0 }, + { "addtlframedelimiter", eCmdHdlrInt, 0 }, +}; +static struct cnfparamblk inppblk = + { CNFPARAMBLK_VERSION, + sizeof(inppdescr)/sizeof(struct cnfparamdescr), + inppdescr + }; + #include "im-helper.h" /* must be included AFTER the type definitions! */ +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + /* data elements describing our running config */ typedef struct ptcpsrv_s ptcpsrv_t; typedef struct ptcplstn_s ptcplstn_t; @@ -379,7 +414,9 @@ startupSrv(ptcpsrv_t *pSrv) #endif ) { /* TODO: check if *we* bound the socket - else we *have* an error! */ - DBGPRINTF("error %d while binding tcp socket", errno); + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + dbgprintf("error %d while binding tcp socket: %s\n", errno, errStr); close(sock); sock = -1; continue; @@ -812,7 +849,7 @@ static inline void initConfigSettings(void) { cs.bEmitMsgOnClose = 0; - cs.wrkrMax = 2; + cs.wrkrMax = DFLT_wrkrMax; cs.bSuppOctetFram = 1; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; cs.pszInputName = NULL; @@ -986,7 +1023,45 @@ closeSess(ptcpsess_t *pSess) destructSess(pSess); finalize_it: - DBGPRINTF("imtcp: session on socket %d closed with iRet %d.\n", sock, iRet); + DBGPRINTF("imptcp: session on socket %d closed with iRet %d.\n", sock, iRet); + RETiRet; +} + + +/* create input instance, set default paramters, and + * add it to the list of instances. + */ +static rsRetVal +createInstance(instanceConf_t **pinst) +{ + instanceConf_t *inst; + DEFiRet; + CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + inst->next = NULL; + + inst->pszBindPort = NULL; + inst->pszBindAddr = NULL; + inst->pszBindRuleset = NULL; + inst->pszInputName = NULL; + inst->bSuppOctetFram = 1; + inst->bKeepAlive = 0; + inst->iKeepAliveIntvl = 0; + inst->iKeepAliveProbes = 0; + inst->iKeepAliveTime = 0; + inst->bEmitMsgOnClose = 0; + inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; + inst->pBindRuleset = NULL; + + /* node created, let's add to config */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = inst; + } else { + loadModConf->tail->next = inst; + loadModConf->tail = inst; + } + + *pinst = inst; +finalize_it: RETiRet; } @@ -1000,7 +1075,7 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) instanceConf_t *inst; DEFiRet; - CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + CHKiRet(createInstance(&inst)); if(pNewVal == NULL || *pNewVal == '\0') { errmsg.LogError(0, NO_ERRCODE, "imptcp: port number must be specified, listener ignored"); } @@ -1032,15 +1107,6 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) inst->iKeepAliveTime = cs.iKeepAliveTime; inst->bEmitMsgOnClose = cs.bEmitMsgOnClose; inst->iAddtlFrameDelim = cs.iAddtlFrameDelim; - inst->next = NULL; - - /* node created, let's add to config */ - if(loadModConf->tail == NULL) { - loadModConf->tail = loadModConf->root = inst; - } else { - loadModConf->tail->next = inst; - loadModConf->tail = inst; - } finalize_it: free(pNewVal); @@ -1103,6 +1169,7 @@ startWorkerPool(void) wrkrRunning = 0; if(runModConf->wrkrMax > 16) runModConf->wrkrMax = 16; /* TODO: make dynamic? */ + DBGPRINTF("imptcp: starting worker pool, %d workers\n", runModConf->wrkrMax); pthread_mutex_init(&wrkrMut, NULL); pthread_cond_init(&wrkrIdle, NULL); for(i = 0 ; i < runModConf->wrkrMax ; ++i) { @@ -1121,6 +1188,7 @@ static inline void stopWorkerPool(void) { int i; + DBGPRINTF("imptcp: stoping worker pool\n"); for(i = 0 ; i < runModConf->wrkrMax ; ++i) { pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ pthread_join(wrkrInfo[i].tid, NULL); @@ -1129,7 +1197,6 @@ stopWorkerPool(void) } pthread_cond_destroy(&wrkrIdle); pthread_mutex_destroy(&wrkrMut); - } @@ -1213,7 +1280,7 @@ sessActivity(ptcpsess_t *pSess) if(lenRcv > 0) { /* have data, process it */ - DBGPRINTF("imtcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf); + DBGPRINTF("imptcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf); CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv)); } else if (lenRcv == 0) { /* session was closed, do clean-up */ @@ -1229,7 +1296,7 @@ sessActivity(ptcpsess_t *pSess) } else { if(errno == EAGAIN || errno == EWOULDBLOCK) break; - DBGPRINTF("imtcp: error on session socket %d - closed.\n", pSess->sock); + DBGPRINTF("imptcp: error on session socket %d - closed.\n", pSess->sock); closeSess(pSess); /* try clean-up by dropping session */ break; } @@ -1345,19 +1412,121 @@ wrkr(void *myself) } +BEGINnewInpInst + struct cnfparamvals *pvals; + instanceConf_t *inst; + int i; +CODESTARTnewInpInst + DBGPRINTF("newInpInst (imptcp)\n"); + + pvals = nvlstGetParams(lst, &inppblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, + "imptcp: required parameter are missing\n"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("input param blk in imptcp:\n"); + cnfparamsPrint(&inppblk, pvals); + } + + CHKiRet(createInstance(&inst)); + + for(i = 0 ; i < inppblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(inppblk.descr[i].name, "port")) { + inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "address")) { + inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "name")) { + inst->pszInputName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { + inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "supportOctetCountedFraming")) { + inst->bSuppOctetFram = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "keepalive")) { + inst->bKeepAlive = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "keepalive.probes")) { + inst->iKeepAliveProbes = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "keepalive.time")) { + inst->iKeepAliveTime = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "keepalive.interval")) { + inst->iKeepAliveIntvl = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "addtlframedelimiter")) { + inst->iAddtlFrameDelim = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) { + inst->bEmitMsgOnClose = (int) pvals[i].val.d.n; + } else { + dbgprintf("imptcp: program error, non-handled " + "param '%s'\n", inppblk.descr[i].name); + } + } +finalize_it: +CODE_STD_FINALIZERnewInpInst + cnfparamvalsDestruct(pvals, &inppblk); +ENDnewInpInst + + BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + loadModConf->wrkrMax = DFLT_wrkrMax; + loadModConf->configSetViaV2Method = 0; + bLegacyCnfModGlobalsPermitted = 1; /* init legacy config vars */ initConfigSettings(); ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imptcp: error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imptcp:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "threads")) { + loadModConf->wrkrMax = (int) pvals[i].val.d.n; + } else { + dbgprintf("imptcp: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* remove all of our legacy handlers, as they can not used in addition + * the the new-style config method. + */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system */ - loadModConf->wrkrMax = cs.wrkrMax; + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->wrkrMax = cs.wrkrMax; + } loadModConf = NULL; /* done loading */ /* free legacy config vars */ @@ -1541,7 +1710,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; - cs.wrkrMax = 2; + cs.wrkrMax = DFLT_wrkrMax; cs.bKeepAlive = 0; cs.iKeepAliveProbes = 0; cs.iKeepAliveTime = 0; @@ -1567,7 +1736,9 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -1609,14 +1780,15 @@ CODEmodInit_QueryRegCFSLineHdlr eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt, - NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0, eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0, eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); + /* module-global parameters */ + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt, + NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c index f4d9331c..fe987a50 100644 --- a/plugins/imrelp/imrelp.c +++ b/plugins/imrelp/imrelp.c @@ -22,7 +22,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "config.h" #include <stdlib.h> #include <assert.h> @@ -35,6 +34,7 @@ #include <netdb.h> #include <sys/types.h> #include <sys/socket.h> +#include <signal.h> #include <librelp.h> #include "rsyslog.h" #include "dirty.h" @@ -88,6 +88,17 @@ struct modConfData_s { static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +/* input instance parameters */ +static struct cnfparamdescr inppdescr[] = { + { "port", eCmdHdlrString, CNFPARAM_REQUIRED } +}; +static struct cnfparamblk inppblk = + { CNFPARAMBLK_VERSION, + sizeof(inppdescr)/sizeof(struct cnfparamdescr), + inppdescr + }; + + /* ------------------------------ callbacks ------------------------------ */ @@ -114,6 +125,32 @@ onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *pMsg, size_t lenMsg) /* ------------------------------ end callbacks ------------------------------ */ +/* create input instance, set default paramters, and + * add it to the list of instances. + */ +static rsRetVal +createInstance(instanceConf_t **pinst) +{ + instanceConf_t *inst; + DEFiRet; + CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + inst->next = NULL; + + inst->pszBindPort = NULL; + + /* node created, let's add to config */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = inst; + } else { + loadModConf->tail->next = inst; + loadModConf->tail = inst; + } + + *pinst = inst; +finalize_it: + RETiRet; +} + /* modified to work for module, not instance (as usual) */ static inline void @@ -134,21 +171,12 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) instanceConf_t *inst; DEFiRet; - CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + CHKiRet(createInstance(&inst)); if(pNewVal == NULL || *pNewVal == '\0') { errmsg.LogError(0, NO_ERRCODE, "imrelp: port number must be specified, listener ignored"); } inst->pszBindPort = pNewVal; - inst->next = NULL; - - /* node created, let's add to config */ - if(loadModConf->tail == NULL) { - loadModConf->tail = loadModConf->root = inst; - } else { - loadModConf->tail->next = inst; - loadModConf->tail = inst; - } finalize_it: RETiRet; @@ -176,6 +204,43 @@ finalize_it: } +BEGINnewInpInst + struct cnfparamvals *pvals; + instanceConf_t *inst; + int i; +CODESTARTnewInpInst + DBGPRINTF("newInpInst (imrelp)\n"); + + pvals = nvlstGetParams(lst, &inppblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, + "imrelp: required parameter are missing\n"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("input param blk in imrelp:\n"); + cnfparamsPrint(&inppblk, pvals); + } + + CHKiRet(createInstance(&inst)); + + for(i = 0 ; i < inppblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(inppblk.descr[i].name, "port")) { + inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("imrelp: program error, non-handled " + "param '%s'\n", inppblk.descr[i].name); + } + } +finalize_it: +CODE_STD_FINALIZERnewInpInst + cnfparamvalsDestruct(pvals, &inppblk); +ENDnewInpInst + + BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; @@ -241,13 +306,39 @@ BEGINfreeCnf CODESTARTfreeCnf ENDfreeCnf +/* This is used to terminate the plugin. Note that the signal handler blocks + * other activity on the thread. As such, it is safe to request the stop. When + * we terminate, relpEngine is called, and it's select() loop interrupted. But + * only *after this function is done*. So we do not have a race! + */ +static void +doSIGTTIN(int __attribute__((unused)) sig) +{ + DBGPRINTF("imrelp: termination requested via SIGTTIN - telling RELP engine\n"); + relpEngineSetStop(pRelpEngine); +} + + /* This function is called to gather input. */ BEGINrunInput + sigset_t sigSet; + struct sigaction sigAct; CODESTARTrunInput - /* TODO: we must be careful to start the listener here. Currently, tcpsrv.c seems to - * do that in ConstructFinalize + /* we want to support non-cancel input termination. To do so, we must signal librelp + * when to stop. As we run on the same thread, we need to register as SIGTTIN handler, + * which will be used to put the terminating condition into librelp. */ + sigfillset(&sigSet); + pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + sigemptyset(&sigSet); + sigaddset(&sigSet, SIGTTIN); + pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL); + memset(&sigAct, 0, sizeof (sigAct)); + sigemptyset(&sigAct.sa_mask); + sigAct.sa_handler = doSIGTTIN; + sigaction(SIGTTIN, &sigAct, NULL); + iRet = relpEngineRun(pRelpEngine); ENDrunInput @@ -290,12 +381,20 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus } +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index 33404fee..eaf9a213 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -36,7 +36,6 @@ * * rgerhards, 2008-05-19 */ - #include "config.h" #include <stdlib.h> #include <assert.h> @@ -62,6 +61,7 @@ #include "errmsg.h" #include "tcpsrv.h" #include "ruleset.h" +#include "rainerscript.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ MODULE_TYPE_INPUT @@ -123,13 +123,50 @@ struct modConfData_s { sbool bKeepAlive; sbool bEmitMsgOnClose; /* emit an informational message on close by remote peer */ uchar *pszStrmDrvrAuthMode; /* authentication mode to use */ + struct cnfarray *permittedPeers; + sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "flowcontrol", eCmdHdlrBinary, 0 }, + { "disablelfdelimiter", eCmdHdlrBinary, 0 }, + { "octetcountedframing", eCmdHdlrBinary, 0 }, + { "notifyonconnectionclose", eCmdHdlrBinary, 0 }, + { "addtlframedelimiter", eCmdHdlrPositiveInt, 0 }, + { "maxsessions", eCmdHdlrPositiveInt, 0 }, + { "maxlistners", eCmdHdlrPositiveInt, 0 }, + { "streamdriver.mode", eCmdHdlrPositiveInt, 0 }, + { "streamdriver.authmode", eCmdHdlrString, 0 }, + { "permittedpeer", eCmdHdlrArray, 0 }, + { "keepalive", eCmdHdlrBinary, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +/* input instance parameters */ +static struct cnfparamdescr inppdescr[] = { + { "port", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */ + { "name", eCmdHdlrString, 0 }, + { "ruleset", eCmdHdlrString, 0 }, + { "supportOctetCountedFraming", eCmdHdlrBinary, 0 } +}; +static struct cnfparamblk inppblk = + { CNFPARAMBLK_VERSION, + sizeof(inppdescr)/sizeof(struct cnfparamdescr), + inppdescr + }; + #include "im-helper.h" /* must be included AFTER the type definitions! */ +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + /* callbacks */ /* this shall go into a specific ACL module! */ static int @@ -201,6 +238,34 @@ finalize_it: } +/* create input instance, set default paramters, and + * add it to the list of instances. + */ +static rsRetVal +createInstance(instanceConf_t **pinst) +{ + instanceConf_t *inst; + DEFiRet; + CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + inst->next = NULL; + inst->pszBindRuleset = NULL; + inst->pszInputName = NULL; + inst->bSuppOctetFram = 1; + + /* node created, let's add to config */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = inst; + } else { + loadModConf->tail->next = inst; + loadModConf->tail = inst; + } + + *pinst = inst; +finalize_it: + RETiRet; +} + + /* This function is called when a new listener instace shall be added to * the current config object via the legacy config system. It just shuffles * all parameters to the listener in-memory instance. @@ -211,7 +276,7 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) instanceConf_t *inst; DEFiRet; - CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + CHKiRet(createInstance(&inst)); CHKmalloc(inst->pszBindPort = ustrdup((pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "10514" : pNewVal)); @@ -226,15 +291,6 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) CHKmalloc(inst->pszInputName = ustrdup(cs.pszInputName)); } inst->bSuppOctetFram = cs.bSuppOctetFram; - inst->next = NULL; - - /* node created, let's add to config */ - if(loadModConf->tail == NULL) { - loadModConf->tail = loadModConf->root = inst; - } else { - loadModConf->tail->next = inst; - loadModConf->tail = inst; - } finalize_it: free(pNewVal); @@ -288,34 +344,153 @@ finalize_it: } +BEGINnewInpInst + struct cnfparamvals *pvals; + instanceConf_t *inst; + int i; +CODESTARTnewInpInst + DBGPRINTF("newInpInst (imtcp)\n"); + + pvals = nvlstGetParams(lst, &inppblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, + "imtcp: required parameter are missing\n"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("input param blk in imtcp:\n"); + cnfparamsPrint(&inppblk, pvals); + } + + CHKiRet(createInstance(&inst)); + + for(i = 0 ; i < inppblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(inppblk.descr[i].name, "port")) { + inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "name")) { + inst->pszInputName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { + inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "supportOctetCountedFraming")) { + inst->bSuppOctetFram = (int) pvals[i].val.d.n; + } else { + dbgprintf("imtcp: program error, non-handled " + "param '%s'\n", inppblk.descr[i].name); + } + } +finalize_it: +CODE_STD_FINALIZERnewInpInst + cnfparamvalsDestruct(pvals, &inppblk); +ENDnewInpInst + + BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + loadModConf->iTCPSessMax = 200; + loadModConf->iTCPLstnMax = 20; + loadModConf->bSuppOctetFram = 1; + loadModConf->iStrmDrvrMode = 0; + loadModConf->bUseFlowControl = 0; + loadModConf->bKeepAlive = 0; + loadModConf->bEmitMsgOnClose = 0; + loadModConf->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; + loadModConf->bDisableLFDelim = 0; + loadModConf->pszStrmDrvrAuthMode = NULL; + loadModConf->permittedPeers = NULL; + loadModConf->configSetViaV2Method = 0; + bLegacyCnfModGlobalsPermitted = 1; /* init legacy config variables */ cs.pszStrmDrvrAuthMode = NULL; resetConfigVariables(NULL, NULL); /* dummy parameters just to fulfill interface def */ ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imtcp: error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imtcp:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "flowcontrol")) { + loadModConf->bUseFlowControl = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "disablelfdelimiter")) { + loadModConf->bDisableLFDelim = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "octetcountedframing")) { + loadModConf->bSuppOctetFram = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "notifyonconnectionclose")) { + loadModConf->bEmitMsgOnClose = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "addtlframedelimiter")) { + loadModConf->iAddtlFrameDelim = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "maxsessions")) { + loadModConf->iTCPSessMax = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "maxlistners")) { + loadModConf->iTCPLstnMax = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "keepalive")) { + loadModConf->bKeepAlive = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "streamdriver.mode")) { + loadModConf->iStrmDrvrMode = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "streamdriver.authmode")) { + loadModConf->pszStrmDrvrAuthMode = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "permittedpeer")) { + loadModConf->permittedPeers = cnfarrayDup(pvals[i].val.d.ar); + } else { + dbgprintf("imtcp: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* remove all of our legacy handlers, as they can not used in addition + * the the new-style config method. + */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system */ - pModConf->iTCPSessMax = cs.iTCPSessMax; - pModConf->iTCPLstnMax = cs.iTCPLstnMax; - pModConf->iStrmDrvrMode = cs.iStrmDrvrMode; - pModConf->bEmitMsgOnClose = cs.bEmitMsgOnClose; - pModConf->bSuppOctetFram = cs.bSuppOctetFram; - pModConf->iAddtlFrameDelim = cs.iAddtlFrameDelim; - pModConf->bDisableLFDelim = cs.bDisableLFDelim; - pModConf->bUseFlowControl = cs.bUseFlowControl; - pModConf->bKeepAlive = cs.bKeepAlive; - if((cs.pszStrmDrvrAuthMode == NULL) || (cs.pszStrmDrvrAuthMode[0] == '\0')) { - loadModConf->pszStrmDrvrAuthMode = NULL; - free(cs.pszStrmDrvrAuthMode); - } else { - loadModConf->pszStrmDrvrAuthMode = cs.pszStrmDrvrAuthMode; + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + pModConf->iTCPSessMax = cs.iTCPSessMax; + pModConf->iTCPLstnMax = cs.iTCPLstnMax; + pModConf->iStrmDrvrMode = cs.iStrmDrvrMode; + pModConf->bEmitMsgOnClose = cs.bEmitMsgOnClose; + pModConf->bSuppOctetFram = cs.bSuppOctetFram; + pModConf->iAddtlFrameDelim = cs.iAddtlFrameDelim; + pModConf->bDisableLFDelim = cs.bDisableLFDelim; + pModConf->bUseFlowControl = cs.bUseFlowControl; + pModConf->bKeepAlive = cs.bKeepAlive; + if((cs.pszStrmDrvrAuthMode == NULL) || (cs.pszStrmDrvrAuthMode[0] == '\0')) { + loadModConf->pszStrmDrvrAuthMode = NULL; + } else { + loadModConf->pszStrmDrvrAuthMode = cs.pszStrmDrvrAuthMode; + } } + if((cs.pszStrmDrvrAuthMode == NULL) || (cs.pszStrmDrvrAuthMode[0] == '\0')) + free(cs.pszStrmDrvrAuthMode); cs.pszStrmDrvrAuthMode = NULL; loadModConf = NULL; /* done loading */ @@ -347,8 +522,15 @@ ENDcheckCnf BEGINactivateCnfPrePrivDrop instanceConf_t *inst; + int i; CODESTARTactivateCnfPrePrivDrop runModConf = pModConf; + if(runModConf->permittedPeers != NULL) { + for(i = 0 ; i < runModConf->permittedPeers->nmemb ; ++i) { + setPermittedPeer(NULL, (uchar*) + es_str2cstr(runModConf->permittedPeers->arr[i], NULL)); + } + } for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { addListner(pModConf, inst); } @@ -366,7 +548,19 @@ ENDactivateCnf BEGINfreeCnf + instanceConf_t *inst, *del; CODESTARTfreeCnf + if(runModConf->permittedPeers != NULL) { + cnfarrayContentDestruct(runModConf->permittedPeers); + free(runModConf->permittedPeers); + } + for(inst = pModConf->root ; inst != NULL ; ) { + free(inst->pszBindPort); + free(inst->pszInputName); + del = inst; + inst = inst->next; + free(del); + } ENDfreeCnf /* This function is called to gather input. @@ -442,7 +636,9 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -463,36 +659,39 @@ CODEmodInit_QueryRegCFSLineHdlr /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverrun"), 0, eCmdHdlrGetWord, addInstance, NULL, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverkeepalive"), 0, eCmdHdlrBinary, - NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary, - NULL, &cs.bSuppOctetFram, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt, - NULL, &cs.iTCPSessMax, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt, - NULL, &cs.iTCPLstnMax, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpservernotifyonconnectionclose"), 0, eCmdHdlrBinary, - NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverstreamdrivermode"), 0, eCmdHdlrInt, - NULL, &cs.iStrmDrvrMode, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverstreamdriverauthmode"), 0, eCmdHdlrGetWord, - NULL, &cs.pszStrmDrvrAuthMode, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverstreamdriverpermittedpeer"), 0, eCmdHdlrGetWord, - setPermittedPeer, NULL, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, - NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverdisablelfdelimiter"), 0, eCmdHdlrBinary, - NULL, &cs.bDisableLFDelim, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverinputname"), 0, eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverbindruleset"), 0, eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpflowcontrol"), 0, eCmdHdlrBinary, - NULL, &cs.bUseFlowControl, STD_LOADABLE_MODULE_ID)); + /* module-global config params - will be disabled in configs that are loaded + * via module(...). + */ + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpserverstreamdriverpermittedpeer"), 0, eCmdHdlrGetWord, + setPermittedPeer, NULL, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpserverstreamdriverauthmode"), 0, eCmdHdlrGetWord, + NULL, &cs.pszStrmDrvrAuthMode, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpserverkeepalive"), 0, eCmdHdlrBinary, + NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpflowcontrol"), 0, eCmdHdlrBinary, + NULL, &cs.bUseFlowControl, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpserverdisablelfdelimiter"), 0, eCmdHdlrBinary, + NULL, &cs.bDisableLFDelim, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, + NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary, + NULL, &cs.bSuppOctetFram, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt, + NULL, &cs.iTCPSessMax, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt, + NULL, &cs.iTCPLstnMax, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpservernotifyonconnectionclose"), 0, eCmdHdlrBinary, + NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputtcpserverstreamdrivermode"), 0, eCmdHdlrInt, + NULL, &cs.iStrmDrvrMode, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit - /* vim:set ai: */ diff --git a/plugins/imttcp/imttcp.c b/plugins/imttcp/imttcp.c index c72886b3..9bd11f77 100644 --- a/plugins/imttcp/imttcp.c +++ b/plugins/imttcp/imttcp.c @@ -365,7 +365,9 @@ createSrv(ttcpsrv_t *pSrv) #endif ) { /* TODO: check if *we* bound the socket - else we *have* an error! */ - DBGPRINTF("error %d while binding tcp socket", errno); + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + dbgprintf("error %d while binding tcp socket: %s\n", errno, errStr); close(sock); sock = -1; continue; diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 8ab3b9be..0dda30ec 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -80,6 +80,7 @@ static struct lstn_s { STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded @@ -118,13 +119,65 @@ struct modConfData_s { int iSchedPolicy; /* scheduling policy as SCHED_xxx */ int iSchedPrio; /* scheduling priority */ int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */ + sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "schedulingpolicy", eCmdHdlrGetWord, 0 }, + { "schedulingpriority", eCmdHdlrInt, 0 }, + { "timerequery", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +/* input instance parameters */ +static struct cnfparamdescr inppdescr[] = { + { "port", eCmdHdlrArray, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */ + { "address", eCmdHdlrString, 0 }, + { "ruleset", eCmdHdlrString, 0 } +}; +static struct cnfparamblk inppblk = + { CNFPARAMBLK_VERSION, + sizeof(inppdescr)/sizeof(struct cnfparamdescr), + inppdescr + }; + #include "im-helper.h" /* must be included AFTER the type definitions! */ +/* create input instance, set default paramters, and + * add it to the list of instances. + */ +static rsRetVal +createInstance(instanceConf_t **pinst) +{ + instanceConf_t *inst; + DEFiRet; + CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + inst->next = NULL; + inst->pBindRuleset = NULL; + + inst->pszBindPort = NULL; + inst->pszBindAddr = NULL; + inst->pszBindRuleset = NULL; + /* node created, let's add to config */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = inst; + } else { + loadModConf->tail->next = inst; + loadModConf->tail = inst; + } + + *pinst = inst; +finalize_it: + RETiRet; +} /* This function is called when a new listener instace shall be added to * the current config object via the legacy config system. It just shuffles @@ -136,7 +189,7 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) instanceConf_t *inst; DEFiRet; - CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + CHKiRet(createInstance(&inst)); CHKmalloc(inst->pszBindPort = ustrdup((pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal)); if((cs.pszBindAddr == NULL) || (cs.pszBindAddr[0] == '\0')) { @@ -149,16 +202,6 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) } else { CHKmalloc(inst->pszBindRuleset = ustrdup(cs.pszBindRuleset)); } - inst->pBindRuleset = NULL; - inst->next = NULL; - - /* node created, let's add to config */ - if(loadModConf->tail == NULL) { - loadModConf->tail = loadModConf->root = inst; - } else { - loadModConf->tail->next = inst; - loadModConf->tail = inst; - } finalize_it: free(pNewVal); @@ -185,52 +228,6 @@ addListner(instanceConf_t *inst) /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 */ -#if 0 //<<<<<<< HEAD - - DBGPRINTF("imudp: trying to open port at %s:%s.\n", - (inst->pszBindAddr == NULL) ? (uchar*)"*" : inst->pszBindAddr, inst->pszBindPort); - - newSocks = net.create_udp_socket(inst->pszBindAddr, inst->pszBindPort, 1); - if(newSocks != NULL) { - /* we now need to add the new sockets to the existing set */ - if(udpLstnSocks == NULL) { - /* esay, we can just replace it */ - udpLstnSocks = newSocks; - CHKmalloc(udpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (newSocks[0] + 1))); - for(iDst = 1 ; iDst <= newSocks[0] ; ++iDst) - udpRulesets[iDst] = inst->pBindRuleset; - } else { - /* we need to add them */ - tmpSocks = (int*) MALLOC(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0])); - tmpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0])); - if(tmpSocks == NULL || tmpRulesets == NULL) { - DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); - /* in this case, we discard the new sockets but continue with what we - * already have - */ - free(newSocks); - free(tmpSocks); - free(tmpRulesets); - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } else { - /* ready to copy */ - iDst = 1; - for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc, ++iDst) { - tmpSocks[iDst] = udpLstnSocks[iSrc]; - tmpRulesets[iDst] = udpRulesets[iSrc]; - } - for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) { - tmpSocks[iDst] = newSocks[iSrc]; - tmpRulesets[iDst] = inst->pBindRuleset; - } - tmpSocks[0] = udpLstnSocks[0] + newSocks[0]; - free(newSocks); - free(udpLstnSocks); - udpLstnSocks = tmpSocks; - free(udpRulesets); - udpRulesets = tmpRulesets; - } -#else //======= if(inst->pszBindAddr == NULL) bindAddr = NULL; else if(inst->pszBindAddr[0] == '*' && inst->pszBindAddr[1] == '\0') @@ -270,7 +267,6 @@ addListner(instanceConf_t *inst) else { lcnfLast->next = newlcnfinfo; lcnfLast = newlcnfinfo; -#endif //>>>>>>> ef34821a2737799f48c3032b9616418e4f7fa34f } } } @@ -668,10 +664,74 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) #endif /* #if HAVE_EPOLL_CREATE1 */ +static inline rsRetVal +createListner(es_str_t *port, struct cnfparamvals *pvals) +{ + instanceConf_t *inst; + int i; + DEFiRet; + + CHKiRet(createInstance(&inst)); + inst->pszBindPort = (uchar*)es_str2cstr(port, NULL); + for(i = 0 ; i < inppblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(inppblk.descr[i].name, "port")) { + continue; /* array, handled by caller */ + } else if(!strcmp(inppblk.descr[i].name, "address")) { + inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { + inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("imudp: program error, non-handled " + "param '%s'\n", inppblk.descr[i].name); + } + } +finalize_it: + RETiRet; +} + + +BEGINnewInpInst + struct cnfparamvals *pvals; + int i; + int portIdx; +CODESTARTnewInpInst + DBGPRINTF("newInpInst (imudp)\n"); + + pvals = nvlstGetParams(lst, &inppblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, + "imudp: required parameter are missing\n"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + if(Debug) { + dbgprintf("input param blk in imudp:\n"); + cnfparamsPrint(&inppblk, pvals); + } + + portIdx = cnfparamGetIdx(&inppblk, "port"); + assert(portIdx != -1); + for(i = 0 ; i < pvals[portIdx].val.d.ar->nmemb ; ++i) { + createListner(pvals[portIdx].val.d.ar->arr[i], pvals); + } + +finalize_it: +CODE_STD_FINALIZERnewInpInst + cnfparamvalsDestruct(pvals, &inppblk); +ENDnewInpInst + + BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + loadModConf->configSetViaV2Method = 0; + loadModConf->iTimeRequery = TIME_REQUERY_DFLT; + loadModConf->iSchedPrio = SCHED_PRIO_UNSET; + loadModConf->pszSchedPolicy = NULL; + bLegacyCnfModGlobalsPermitted = 1; /* init legacy config vars */ cs.pszBindRuleset = NULL; cs.pszSchedPolicy = NULL; @@ -681,21 +741,57 @@ CODESTARTbeginCnfLoad ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imudp: error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imudp:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "timerequery")) { + loadModConf->iTimeRequery = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "schedulingpriority")) { + loadModConf->iSchedPrio = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) { + loadModConf->pszSchedPolicy = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("imudp: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* remove all of our legacy handlers, as they can not used in addition + * the the new-style config method. + */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system - * TODO: when we add the new config system, we must decide on priority - * already-set module options should not be overwritable by the legacy - * system (though this is debatable and should at least trigger an error - * message if the equivalent legacy option is selected as well) - * rgerhards, 2011-05-04 - */ - loadModConf->iSchedPrio = cs.iSchedPrio; - loadModConf->iTimeRequery = cs.iTimeRequery; - if((cs.pszSchedPolicy == NULL) || (cs.pszSchedPolicy[0] == '\0')) { - loadModConf->pszSchedPolicy = NULL; - } else { - CHKmalloc(loadModConf->pszSchedPolicy = ustrdup(cs.pszSchedPolicy)); + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->iSchedPrio = cs.iSchedPrio; + loadModConf->iTimeRequery = cs.iTimeRequery; + if((cs.pszSchedPolicy != NULL) && (cs.pszSchedPolicy[0] != '\0')) { + CHKmalloc(loadModConf->pszSchedPolicy = ustrdup(cs.pszSchedPolicy)); + } } finalize_it: @@ -751,7 +847,16 @@ ENDactivateCnf BEGINfreeCnf + instanceConf_t *inst, *del; CODESTARTfreeCnf + for(inst = pModConf->root ; inst != NULL ; ) { + free(inst->pszBindPort); + free(inst->pszBindAddr); + free(inst->pBindRuleset); + del = inst; + inst = inst->next; + free(del); + } ENDfreeCnf /* This function is called to gather input. @@ -819,24 +924,20 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { - if(cs.pszBindAddr != NULL) { - free(cs.pszBindAddr); - cs.pszBindAddr = NULL; - } - if(cs.pszSchedPolicy != NULL) { - free(cs.pszSchedPolicy); - cs.pszSchedPolicy = NULL; - } - if(cs.pszBindRuleset != NULL) { - free(cs.pszBindRuleset); - cs.pszBindRuleset = NULL; - } + free(cs.pszBindAddr); + cs.pszBindAddr = NULL; + free(cs.pszSchedPolicy); + cs.pszSchedPolicy = NULL; + free(cs.pszBindRuleset); + cs.pszBindRuleset = NULL; cs.iSchedPrio = SCHED_PRIO_UNSET; cs.iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */ return RS_RET_OK; @@ -867,12 +968,16 @@ CODEmodInit_QueryRegCFSLineHdlr addInstance, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, NULL, &cs.pszBindAddr, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpolicy", 0, eCmdHdlrGetWord, - NULL, &cs.pszSchedPolicy, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpriority", 0, eCmdHdlrInt, - NULL, &cs.iSchedPrio, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, - NULL, &cs.iTimeRequery, STD_LOADABLE_MODULE_ID)); + /* module-global config params - will be disabled in configs that are loaded + * via module(...). + */ + CHKiRet(regCfSysLineHdlr2((uchar *)"imudpschedulingpolicy", 0, eCmdHdlrGetWord, + NULL, &cs.pszSchedPolicy, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"imudpschedulingpriority", 0, eCmdHdlrInt, + NULL, &cs.iSchedPrio, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, + NULL, &cs.iTimeRequery, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index c2dbdba2..79d8c5d0 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-20 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -149,6 +149,7 @@ typedef struct lstn_s { sbool bCreatePath; /* auto-creation of socket directory? */ sbool bUseCreds; /* pull original creator credentials from socket */ sbool bAnnotate; /* annotate events with trusted properties */ + sbool bParseTrusted; /* parse trusted properties */ sbool bWritePid; /* write original PID into tag */ sbool bUseSysTimeStamp; /* use timestamp from system (instead of from message) */ } lstn_t; @@ -172,8 +173,10 @@ static struct configSettings_s { int bOmitLocalLogging; uchar *pLogSockName; uchar *pLogHostName; /* host name to use with this socket */ - int bUseFlowCtl; /* use flow control or not (if yes, only LIGHT is used! */ + int bUseFlowCtl; /* use flow control or not (if yes, only LIGHT is used!) */ + int bUseFlowCtlSysSock; int bIgnoreTimestamp; /* ignore timestamps present in the incoming message? */ + int bIgnoreTimestampSysSock; int bUseSysTimeStamp; /* use timestamp from system (rather than from message) */ int bUseSysTimeStampSysSock; /* same, for system log socket */ int bWritePid; /* use credentials from recvmsg() and fixup PID in TAG */ @@ -187,6 +190,7 @@ static struct configSettings_s { int ratelimitSeveritySysSock; int bAnnotate; /* annotate trusted properties */ int bAnnotateSysSock; /* same, for system log socket */ + int bParseTrusted; /* parse trusted properties */ } cs; struct instanceConf_s { @@ -201,6 +205,7 @@ struct instanceConf_s { int ratelimitBurst; /* max nbr of messages in interval */ int ratelimitSeverity; int bAnnotate; /* annotate trusted properties */ + int bParseTrusted; /* parse trusted properties */ struct instanceConf_s *next; }; @@ -211,18 +216,61 @@ struct modConfData_s { int ratelimitIntervalSysSock; int ratelimitBurstSysSock; int ratelimitSeveritySysSock; + int bAnnotateSysSock; + int bParseTrusted; + sbool bIgnoreTimestamp; /* ignore timestamps present in the incoming message? */ + sbool bUseFlowCtl; /* use flow control or not (if yes, only LIGHT is used! */ sbool bOmitLocalLogging; sbool bWritePidSysSock; - int bAnnotateSysSock; sbool bUseSysTimeStamp; + sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "syssock.use", eCmdHdlrBinary, 0 }, + { "syssock.name", eCmdHdlrGetWord, 0 }, + { "syssock.ignoretimestamp", eCmdHdlrBinary, 0 }, + { "syssock.flowcontrol", eCmdHdlrBinary, 0 }, + { "syssock.usesystimestamp", eCmdHdlrBinary, 0 }, + { "syssock.annotate", eCmdHdlrBinary, 0 }, + { "syssock.usepidfromsystem", eCmdHdlrBinary, 0 }, + { "syssock.ratelimit.interval", eCmdHdlrInt, 0 }, + { "syssock.ratelimit.burst", eCmdHdlrInt, 0 }, + { "syssock.ratelimit.severity", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +/* input instance parameters */ +static struct cnfparamdescr inppdescr[] = { + { "socket", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: addunixlistensocket */ + { "createpath", eCmdHdlrBinary, 0 }, + { "parsetrusted", eCmdHdlrBinary, 0 }, + { "hostname", eCmdHdlrString, 0 }, + { "ignoretimestamp", eCmdHdlrBinary, 0 }, + { "flowcontrol", eCmdHdlrBinary, 0 }, + { "usesystimestamp", eCmdHdlrBinary, 0 }, + { "annotate", eCmdHdlrBinary, 0 }, + { "usepidfromsystem", eCmdHdlrBinary, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 }, + { "ratelimit.severity", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk inppblk = + { CNFPARAMBLK_VERSION, + sizeof(inppdescr)/sizeof(struct cnfparamdescr), + inppdescr + }; + /* we do not use this, because we do not bind to a ruleset so far * enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */ - static void initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst) { @@ -233,6 +281,8 @@ initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsig rs->begin = 0; } +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + /* ratelimiting support, modelled after the linux kernel * returns 1 if message is within rate limit and shall be @@ -289,23 +339,39 @@ finalize_it: } -/* set the timestamp ignore / not ignore option for the system - * log socket. This must be done separtely, as it is not added via a command - * but present by default. -- rgerhards, 2008-03-06 +/* create input instance, set default paramters, and + * add it to the list of instances. */ -static rsRetVal setSystemLogTimestampIgnore(void __attribute__((unused)) *pVal, int iNewVal) +static rsRetVal +createInstance(instanceConf_t **pinst) { + instanceConf_t *inst; DEFiRet; - listeners[0].flags = iNewVal ? IGNDATE : NOFLAG; - RETiRet; -} + CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + inst->sockName = NULL; + inst->pLogHostName = NULL; + inst->ratelimitInterval = DFLT_ratelimitInterval; + inst->ratelimitBurst = DFLT_ratelimitSeverity; + inst->ratelimitSeverity = DFLT_ratelimitSeverity; + inst->bUseFlowCtl = 0; + inst->bIgnoreTimestamp = 1; + inst->bCreatePath = DFLT_bCreatePath; + inst->bUseSysTimeStamp = 1; + inst->bWritePid = 0; + inst->bAnnotate = 0; + inst->bParseTrusted = 0; + inst->next = NULL; -/* set flowcontrol for the system log socket - */ -static rsRetVal setSystemLogFlowControl(void __attribute__((unused)) *pVal, int iNewVal) -{ - DEFiRet; - listeners[0].flowCtl = iNewVal ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; + /* node created, let's add to config */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = inst; + } else { + loadModConf->tail->next = inst; + loadModConf->tail = inst; + } + + *pinst = inst; +finalize_it: RETiRet; } @@ -328,7 +394,7 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) ABORT_FINALIZE(RS_RET_SOCKNAME_MISSING); } - CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + CHKiRet(createInstance(&inst)); inst->sockName = pNewVal; inst->ratelimitInterval = cs.ratelimitInterval; inst->pLogHostName = cs.pLogHostName; @@ -340,16 +406,9 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) inst->bUseSysTimeStamp = cs.bUseSysTimeStamp; inst->bWritePid = cs.bWritePid; inst->bAnnotate = cs.bAnnotate; + inst->bParseTrusted = cs.bParseTrusted; inst->next = NULL; - /* node created, let's add to config */ - if(loadModConf->tail == NULL) { - loadModConf->tail = loadModConf->root = inst; - } else { - loadModConf->tail->next = inst; - loadModConf->tail = inst; - } - /* some legacy conf processing */ free(cs.pLogHostName); /* reset hostname for next socket */ cs.pLogHostName = NULL; @@ -388,7 +447,7 @@ addListner(instanceConf_t *inst) if(inst->ratelimitInterval > 0) { if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { /* in this case, we simply turn off rate-limiting */ - dbgprintf("imuxsock: turning off rate limiting because we could not " + DBGPRINTF("imuxsock: turning off rate limiting because we could not " "create hash table\n"); inst->ratelimitInterval = 0; } @@ -402,6 +461,7 @@ addListner(instanceConf_t *inst) listeners[nfd].sockName = ustrdup(inst->sockName); listeners[nfd].bUseCreds = (inst->bWritePid || inst->ratelimitInterval || inst->bAnnotate) ? 1 : 0; listeners[nfd].bAnnotate = inst->bAnnotate; + listeners[nfd].bParseTrusted = inst->bParseTrusted; listeners[nfd].bWritePid = inst->bWritePid; listeners[nfd].bUseSysTimeStamp = inst->bUseSysTimeStamp; nfd++; @@ -458,7 +518,7 @@ createLogSocket(lstn_t *pLstn) if(pLstn->fd < 0 || bind(pLstn->fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 || chmod((char*)pLstn->sockName, 0666) < 0) { errmsg.LogError(errno, NO_ERRCODE, "cannot create '%s'", pLstn->sockName); - dbgprintf("cannot create %s (%d).\n", pLstn->sockName, errno); + DBGPRINTF("cannot create %s (%d).\n", pLstn->sockName, errno); if(pLstn->fd != -1) close(pLstn->fd); pLstn->fd = -1; @@ -493,7 +553,7 @@ openLogSocket(lstn_t *pLstn) /* ok, it matches -- just use as is */ pLstn->fd = fd; - dbgprintf("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n", + DBGPRINTF("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n", pLstn->sockName, pLstn->fd); break; } @@ -561,7 +621,7 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) rl = hashtable_search(pLstn->ht, &cred->pid); if(rl == NULL) { /* we need to add a new ratelimiter, process not seen before! */ - dbgprintf("imuxsock: no ratelimiter for pid %lu, creating one\n", + DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n", (unsigned long) cred->pid); STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters); CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t))); @@ -628,14 +688,12 @@ getTrustedProp(struct ucred *cred, char *propName, uchar *buf, size_t lenBuf, in if((fd = open(namebuf, O_RDONLY)) == -1) { DBGPRINTF("error reading '%s'\n", namebuf); - *lenProp = 0; - FINALIZE; + ABORT_FINALIZE(RS_RET_ERR); } if((lenRead = read(fd, buf, lenBuf - 1)) == -1) { DBGPRINTF("error reading file data for '%s'\n", namebuf); - *lenProp = 0; close(fd); - FINALIZE; + ABORT_FINALIZE(RS_RET_ERR); } /* we strip after the first \n */ @@ -671,8 +729,7 @@ getTrustedExe(struct ucred *cred, uchar *buf, size_t lenBuf, int* lenProp) if((lenRead = readlink(namebuf, (char*)buf, lenBuf - 1)) == -1) { DBGPRINTF("error reading link '%s'\n", namebuf); - *lenProp = 0; - FINALIZE; + ABORT_FINALIZE(RS_RET_ERR); } buf[lenRead] = '\0'; @@ -705,6 +762,28 @@ copyescaped(uchar *dstbuf, uchar *inbuf, int inlen) } +#if 0 +/* Creates new field to be added to event + * used for SystemLogParseTrusted parsing + */ +struct ee_field * +createNewField(char *fieldname, char *value, int lenValue) { + es_str_t *newStr; + struct ee_value *newVal; + struct ee_field *newField; + + newStr = es_newStrFromBuf(value, (es_size_t) lenValue); + + newVal = ee_newValue(ctxee); + ee_setStrValue(newVal, newStr); + + newField = ee_newFieldFromNV(ctxee, fieldname, newVal); + + return newField; +} +#endif + + /* submit received message to the queue engine * We now parse the message according to expected format so that we * can also mangle it if necessary. @@ -730,6 +809,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim uchar *pmsgbuf; int toffs; /* offset for trusted properties */ struct syslogTime dummyTS; + struct json_object *json = NULL, *jval; DEFiRet; /* TODO: handle format errors?? */ @@ -774,37 +854,61 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim } else { CHKmalloc(pmsgbuf = malloc(lenRcv+4096)); } - memcpy(pmsgbuf, pRcv, lenRcv); - memcpy(pmsgbuf+lenRcv, " @[", 3); - toffs = lenRcv + 3; /* next free location */ - lenProp = snprintf((char*)propBuf, sizeof(propBuf), "_PID=%lu _UID=%lu _GID=%lu", - (long unsigned) cred->pid, (long unsigned) cred->uid, - (long unsigned) cred->gid); - memcpy(pmsgbuf+toffs, propBuf, lenProp); - toffs = toffs + lenProp; - getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp); - if(lenProp) { - memcpy(pmsgbuf+toffs, " _COMM=", 7); - memcpy(pmsgbuf+toffs+7, propBuf, lenProp); - toffs = toffs + 7 + lenProp; - } - getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp); - if(lenProp) { - memcpy(pmsgbuf+toffs, " _EXE=", 6); - memcpy(pmsgbuf+toffs+6, propBuf, lenProp); - toffs = toffs + 6 + lenProp; - } - getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp); - if(lenProp) { - memcpy(pmsgbuf+toffs, " _CMDLINE=", 10); - toffs = toffs + 10 + - copyescaped(pmsgbuf+toffs+10, propBuf, lenProp); + + if (pLstn->bParseTrusted) { + json = json_object_new_object(); + /* create value string, create field, and add it */ + jval = json_object_new_int(cred->pid); + json_object_object_add(json, "pid", jval); + jval = json_object_new_int(cred->uid); + json_object_object_add(json, "uid", jval); + jval = json_object_new_int(cred->gid); + json_object_object_add(json, "gid", jval); + if(getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp) == RS_RET_OK) { + jval = json_object_new_string((char*)propBuf); + json_object_object_add(json, "appname", jval); + } + if(getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp) == RS_RET_OK) { + jval = json_object_new_string((char*)propBuf); + json_object_object_add(json, "exe", jval); + } + if(getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp) == RS_RET_OK) { + jval = json_object_new_string((char*)propBuf); + json_object_object_add(json, "cmd", jval); + } + } else { + memcpy(pmsgbuf, pRcv, lenRcv); + memcpy(pmsgbuf+lenRcv, " @[", 3); + toffs = lenRcv + 3; /* next free location */ + lenProp = snprintf((char*)propBuf, sizeof(propBuf), "_PID=%lu _UID=%lu _GID=%lu", + (long unsigned) cred->pid, (long unsigned) cred->uid, + (long unsigned) cred->gid); + memcpy(pmsgbuf+toffs, propBuf, lenProp); + toffs = toffs + lenProp; + + if(getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp) == RS_RET_OK) { + memcpy(pmsgbuf+toffs, " _COMM=", 7); + memcpy(pmsgbuf+toffs+7, propBuf, lenProp); + toffs = toffs + 7 + lenProp; + } + if(getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp) == RS_RET_OK) { + memcpy(pmsgbuf+toffs, " _EXE=", 6); + memcpy(pmsgbuf+toffs+6, propBuf, lenProp); + toffs = toffs + 6 + lenProp; + } + if(getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp) == RS_RET_OK) { + memcpy(pmsgbuf+toffs, " _CMDLINE=", 10); + toffs = toffs + 10 + + copyescaped(pmsgbuf+toffs+10, propBuf, lenProp); + } + + /* finalize string */ + pmsgbuf[toffs] = ']'; + pmsgbuf[toffs+1] = '\0'; + + pRcv = pmsgbuf; + lenRcv = toffs + 1; } - /* finalize string */ - pmsgbuf[toffs] = ']'; - pmsgbuf[toffs+1] = '\0'; - pRcv = pmsgbuf; - lenRcv = toffs + 1; } /* we now create our own message object and submit it to the queue */ @@ -821,6 +925,13 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim parse++; lenMsg--; /* '>' */ + if(json != NULL) { + /* as per lumberjack spec, these properties need to go into + * the CEE root. + */ + msgAddJSON(pMsg, (uchar*)"!", json); + } + if(ts == NULL) { if((pLstn->flags & IGNDATE)) { /* in this case, we still need to find out if we have a valid @@ -927,7 +1038,7 @@ static rsRetVal readSocket(lstn_t *pLstn) msgh.msg_iovlen = 1; iRcvd = recvmsg(pLstn->fd, &msgh, MSG_DONTWAIT); - dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd); + DBGPRINTF("Message from UNIX socket: #%d\n", pLstn->fd); if(iRcvd > 0) { cred = NULL; ts = NULL; @@ -937,16 +1048,12 @@ static rsRetVal readSocket(lstn_t *pLstn) if( pLstn->bUseCreds && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) { cred = (struct ucred*) CMSG_DATA(cm); - break; } # endif /* HAVE_SCM_CREDENTIALS */ # if HAVE_SO_TIMESTAMP if( pLstn->bUseSysTimeStamp && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) { ts = (struct timeval *)CMSG_DATA(cm); - dbgprintf("XXX: got timestamp %ld.%ld\n", - (long) ts->tv_sec, (long) ts->tv_usec); - break; } # endif /* HAVE_SO_TIMESTAMP */ } @@ -955,7 +1062,7 @@ static rsRetVal readSocket(lstn_t *pLstn) } else if(iRcvd < 0 && errno != EINTR) { char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("UNIX socket error: %d = %s.\n", errno, errStr); + DBGPRINTF("UNIX socket error: %d = %s.\n", errno, errStr); errmsg.LogError(errno, NO_ERRCODE, "imuxsock: recvfrom UNIX"); } @@ -1005,10 +1112,13 @@ activateListeners() listeners[0].ratelimitInterval = runModConf->ratelimitIntervalSysSock; listeners[0].ratelimitBurst = runModConf->ratelimitBurstSysSock; listeners[0].ratelimitSev = runModConf->ratelimitSeveritySysSock; - listeners[0].bUseCreds = (runModConf->bWritePidSysSock || runModConf->ratelimitIntervalSysSock) ? 1 : 0; + listeners[0].bUseCreds = (runModConf->bWritePidSysSock || runModConf->ratelimitIntervalSysSock || runModConf->bAnnotateSysSock) ? 1 : 0; listeners[0].bWritePid = runModConf->bWritePidSysSock; listeners[0].bAnnotate = runModConf->bAnnotateSysSock; + listeners[0].bParseTrusted = runModConf->bParseTrusted; listeners[0].bUseSysTimeStamp = runModConf->bUseSysTimeStamp; + listeners[0].flags = runModConf->bIgnoreTimestamp ? IGNDATE : NOFLAG; + listeners[0].flowCtl = runModConf->bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; sd_fds = sd_listen_fds(0); if(sd_fds < 0) { @@ -1021,7 +1131,7 @@ activateListeners() for (i = startIndexUxLocalSockets ; i < nfd ; i++) { if(openLogSocket(&(listeners[i])) == RS_RET_OK) { ++actSocks; - dbgprintf("imuxsock: Opened UNIX socket '%s' (fd %d).\n", + DBGPRINTF("imuxsock: Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd); } } @@ -1041,16 +1151,149 @@ BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + pModConf->pLogSockName = NULL; + pModConf->bOmitLocalLogging = 0; + pModConf->bIgnoreTimestamp = 1; + pModConf->bUseFlowCtl = 0; + pModConf->bUseSysTimeStamp = 1; + pModConf->bWritePidSysSock = 0; + pModConf->bAnnotateSysSock = 0; + pModConf->bParseTrusted = 0; + pModConf->ratelimitIntervalSysSock = DFLT_ratelimitInterval; + pModConf->ratelimitBurstSysSock = DFLT_ratelimitBurst; + pModConf->ratelimitSeveritySysSock = DFLT_ratelimitSeverity; + bLegacyCnfModGlobalsPermitted = 1; /* reset legacy config vars */ resetConfigVariables(NULL, NULL); ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imuxsock:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "syssock.use")) { + loadModConf->bOmitLocalLogging = ((int) pvals[i].val.d.n) ? 0 : 1; + } else if(!strcmp(modpblk.descr[i].name, "syssock.name")) { + loadModConf->pLogSockName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "syssock.ignoretimestamp")) { + loadModConf->bIgnoreTimestamp = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.flowcontrol")) { + loadModConf->bUseFlowCtl = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.usesystimestamp")) { + loadModConf->bUseSysTimeStamp = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.annotate")) { + loadModConf->bAnnotateSysSock = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.usepidfromsystem")) { + loadModConf->bWritePidSysSock = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.ratelimit.interval")) { + loadModConf->ratelimitIntervalSysSock = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.ratelimit.burst")) { + loadModConf->ratelimitBurstSysSock = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.ratelimit.severity")) { + loadModConf->ratelimitSeveritySysSock = (int) pvals[i].val.d.n; + } else { + dbgprintf("imuxsock: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* disable legacy module-global config directives */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + +BEGINnewInpInst + struct cnfparamvals *pvals; + instanceConf_t *inst; + int i; +CODESTARTnewInpInst + DBGPRINTF("newInpInst (imuxsock)\n"); + + pvals = nvlstGetParams(lst, &inppblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, + "imuxsock: required parameter are missing\n"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("input param blk in imuxsock:\n"); + cnfparamsPrint(&inppblk, pvals); + } + + CHKiRet(createInstance(&inst)); + + for(i = 0 ; i < inppblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(inppblk.descr[i].name, "socket")) { + inst->sockName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "createpath")) { + inst->bCreatePath = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "parsetrusted")) { + inst->bParseTrusted = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "hostname")) { + inst->pLogHostName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "ignoretimestamp")) { + inst->bIgnoreTimestamp = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "flowcontrol")) { + inst->bUseFlowCtl = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "usesystimestamp")) { + inst->bUseSysTimeStamp = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "annotate")) { + inst->bAnnotate = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "usepidfromsystem")) { + inst->bWritePid = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "ratelimit.interval")) { + inst->ratelimitInterval = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "ratelimit.burst")) { + inst->ratelimitBurst = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "ratelimit.severity")) { + inst->ratelimitSeverity = (int) pvals[i].val.d.n; + } else { + dbgprintf("imuxsock: program error, non-handled " + "param '%s'\n", inppblk.descr[i].name); + } + } +finalize_it: +CODE_STD_FINALIZERnewInpInst + cnfparamvalsDestruct(pvals, &inppblk); +ENDnewInpInst + + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system */ - loadModConf->bOmitLocalLogging = cs.bOmitLocalLogging; - loadModConf->pLogSockName = cs.pLogSockName; + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->bOmitLocalLogging = cs.bOmitLocalLogging; + loadModConf->pLogSockName = cs.pLogSockName; + loadModConf->bIgnoreTimestamp = cs.bIgnoreTimestampSysSock; + loadModConf->bUseFlowCtl = cs.bUseFlowCtlSysSock; + loadModConf->bAnnotateSysSock = cs.bAnnotateSysSock; + loadModConf->bParseTrusted = cs.bParseTrusted; + } loadModConf = NULL; /* done loading */ /* free legacy config vars */ @@ -1083,8 +1326,16 @@ ENDactivateCnf BEGINfreeCnf + instanceConf_t *inst, *del; CODESTARTfreeCnf free(pModConf->pLogSockName); + for(inst = pModConf->root ; inst != NULL ; ) { + free(inst->sockName); + free(inst->pLogHostName); + del = inst; + inst = inst->next; + free(del); + } ENDfreeCnf @@ -1215,7 +1466,9 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -1227,13 +1480,16 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a cs.bOmitLocalLogging = 0; cs.pLogHostName = NULL; cs.bIgnoreTimestamp = 1; + cs.bIgnoreTimestampSysSock = 1; cs.bUseFlowCtl = 0; + cs.bUseFlowCtlSysSock = 0; cs.bUseSysTimeStamp = 1; cs.bUseSysTimeStampSysSock = 1; cs.bWritePid = 0; cs.bWritePidSysSock = 0; cs.bAnnotate = 0; cs.bAnnotateSysSock = 0; + cs.bParseTrusted = 0; cs.bCreatePath = DFLT_bCreatePath; cs.ratelimitInterval = DFLT_ratelimitInterval; cs.ratelimitIntervalSysSock = DFLT_ratelimitInterval; @@ -1259,7 +1515,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(parser, CORE_COMPONENT)); - dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION); + DBGPRINTF("imuxsock version %s initializing\n", PACKAGE_VERSION); /* init legacy config vars */ cs.pLogSockName = NULL; @@ -1286,6 +1542,7 @@ CODEmodInit_QueryRegCFSLineHdlr listeners[0].bParseHost = 0; listeners[0].bUseCreds = 0; listeners[0].bAnnotate = 0; + listeners[0].bParseTrusted = 0; listeners[0].bCreatePath = 0; listeners[0].bUseSysTimeStamp = 1; @@ -1301,12 +1558,8 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(prop.ConstructFinalize(pLocalHostIP)); /* register config file handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"omitlocallogging", 0, eCmdHdlrBinary, - NULL, &cs.bOmitLocalLogging, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketignoremsgtimestamp", 0, eCmdHdlrBinary, NULL, &cs.bIgnoreTimestamp, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketname", 0, eCmdHdlrGetWord, - NULL, &cs.pLogSockName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensockethostname", 0, eCmdHdlrGetWord, NULL, &cs.pLogHostName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary, @@ -1335,22 +1588,28 @@ CODEmodInit_QueryRegCFSLineHdlr * for that. We should revisit all of that once we have the new config format... * rgerhards, 2008-03-06 */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketignoremsgtimestamp", 0, eCmdHdlrBinary, - setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary, - setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusesystimestamp", 0, eCmdHdlrBinary, - NULL, &cs.bUseSysTimeStampSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketannotate", 0, eCmdHdlrBinary, - NULL, &cs.bAnnotateSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary, - NULL, &cs.bWritePidSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt, - NULL, &cs.ratelimitIntervalSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitburst", 0, eCmdHdlrInt, - NULL, &cs.ratelimitBurstSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitseverity", 0, eCmdHdlrInt, - NULL, &cs.ratelimitSeveritySysSock, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"omitlocallogging", 0, eCmdHdlrBinary, + NULL, &cs.bOmitLocalLogging, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogsocketname", 0, eCmdHdlrGetWord, + NULL, &cs.pLogSockName, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogsocketignoremsgtimestamp", 0, eCmdHdlrBinary, + NULL, &cs.bIgnoreTimestampSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary, + NULL, &cs.bUseFlowCtlSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogusesystimestamp", 0, eCmdHdlrBinary, + NULL, &cs.bUseSysTimeStampSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogsocketannotate", 0, eCmdHdlrBinary, + NULL, &cs.bAnnotateSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogparsetrusted", 0, eCmdHdlrBinary, + NULL, &cs.bParseTrusted, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary, + NULL, &cs.bWritePidSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt, + NULL, &cs.ratelimitIntervalSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogratelimitburst", 0, eCmdHdlrInt, + NULL, &cs.ratelimitBurstSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogratelimitseverity", 0, eCmdHdlrInt, + NULL, &cs.ratelimitSeveritySysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); /* support statistics gathering */ CHKiRet(statsobj.Construct(&modStats)); diff --git a/plugins/imzmq3/Makefile.am b/plugins/imzmq3/Makefile.am new file mode 100644 index 00000000..f9c84e5d --- /dev/null +++ b/plugins/imzmq3/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = imzmq3.la + +imzmq3_la_SOURCES = imzmq3.c +imzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS) +imzmq3_la_LDFLAGS = -module -avoid-version +imzmq3_la_LIBADD = $(CZMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README new file mode 100644 index 00000000..88653b83 --- /dev/null +++ b/plugins/imzmq3/README @@ -0,0 +1,24 @@ +ZeroMQ 3.x Input Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example +below binds a SUB socket to port 7172, and then any messages with the topic +"foo" will be pushed into rsyslog. + +Example Rsyslog.conf snippet: +------------------------------------------------------------------------------- + +$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo + +------------------------------------------------------------------------------- diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c new file mode 100644 index 00000000..dc1d64d3 --- /dev/null +++ b/plugins/imzmq3/imzmq3.c @@ -0,0 +1,657 @@ +/* imzmq3.c + * + * This input plugin enables rsyslog to read messages from a ZeroMQ + * queue. + * + * Copyright 2012 Talksum, Inc. + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: David Kelly + * <davidk@talksum.com> + */ + +#include <assert.h> +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "rsyslog.h" + +#include "cfsysline.h" +#include "config.h" +#include "dirty.h" +#include "errmsg.h" +#include "glbl.h" +#include "module-template.h" +#include "msg.h" +#include "net.h" +#include "parser.h" +#include "prop.h" +#include "ruleset.h" +#include "srUtils.h" +#include "unicode-helper.h" + +#include <czmq.h> + +MODULE_TYPE_INPUT +MODULE_TYPE_NOKEEP + +/* convienent symbols to denote a socket we want to bind + * vs one we want to just connect to + */ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + +/* Module static data */ +DEF_IMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) +DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +typedef struct _socket_type { + char* name; + int type; +} socket_type; + +/* more overkill, but seems nice to be consistent.*/ +typedef struct _socket_action { + char* name; + int action; +} socket_action; + +typedef struct _poller_data { + ruleset_t* ruleset; + thrdInfo_t* thread; +} poller_data; + +typedef struct _socket_info { + int type; + int action; + char* description; + int sndHWM; /* if you want more than 2^32 messages, */ + int rcvHWM; /* then pass in 0 (the default). */ + char* identity; + char** subscriptions; + ruleset_t* ruleset; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + +} socket_info; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations. + */ +static socket_info* s_socketInfo = NULL; +static size_t s_nitems = 0; +static prop_t * s_namep = NULL; +static zloop_t* s_zloop = NULL; +static int s_io_threads = 1; +static zctx_t* s_context = NULL; +static ruleset_t* s_ruleset = NULL; +static socket_type socketTypes[] = { + {"SUB", ZMQ_SUB }, + {"PULL", ZMQ_PULL }, + {"XSUB", ZMQ_XSUB } +}; + +static socket_action socketActions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + + +/* ---------------------------------------------------------------------------- + * Helper functions + */ + +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ +static int getSocketType(char* name) { + int type = -1; + uint i; + + /* match name with known socket type */ + for(i=0; i<sizeof(socketTypes)/sizeof(socket_type); ++i) { + if( !strcmp(socketTypes[i].name, name) ) { + type = socketTypes[i].type; + break; + } + } + + /* whine if no match was found. */ + if (type == -1) + errmsg.LogError(0, NO_ERRCODE, "unknown type %s",name); + + return type; +} + + +static int getSocketAction(char* name) { + int action = -1; + uint i; + + /* match name with known socket action */ + for(i=0; i < sizeof(socketActions)/sizeof(socket_action); ++i) { + if(!strcmp(socketActions[i].name, name)) { + action = socketActions[i].action; + break; + } + } + + /* whine if no matching action was found */ + if (action == -1) + errmsg.LogError(0, NO_ERRCODE, "unknown action %s",name); + + return action; +} + + +static void setDefaults(socket_info* info) { + info->type = ZMQ_SUB; + info->action = ACTION_BIND; + info->description = NULL; + info->sndHWM = 0; + info->rcvHWM = 0; + info->identity = NULL; + info->subscriptions = NULL; + info->ruleset = NULL; + info->sndBuf = -1; + info->rcvBuf = -1; + info->linger = -1; + info->backlog = -1; + info->sndTimeout = -1; + info->rcvTimeout = -1; + info->maxMsgSize = -1; + info->rate = -1; + info->recoveryIVL = -1; + info->multicastHops = -1; + info->reconnectIVL = -1; + info->reconnectIVLMax = -1; + info->ipv4Only = -1; + info->affinity = -1; + +}; + + +/* The config string should look like: + * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'" + * + */ +static rsRetVal parseConfig(char* config, socket_info* info) { + int nsubs = 0; + + char* binding; + char* ptr1; + for (binding = strtok_r(config, ",", &ptr1); + binding != NULL; + binding = strtok_r(NULL, ",", &ptr1)) { + + /* Each binding looks like foo=bar */ + char * sep = strchr(binding, '='); + if (sep == NULL) + { + errmsg.LogError(0, NO_ERRCODE, + "Invalid argument format %s, ignoring ...", + binding); + continue; + } + + /* Replace '=' with '\0'. */ + *sep = '\0'; + + char * val = sep + 1; + + if (strcmp(binding, "action") == 0) { + info->action = getSocketAction(val); + } else if (strcmp(binding, "type") == 0) { + info->type = getSocketType(val); + } else if (strcmp(binding, "description") == 0) { + info->description = strdup(val); + } else if (strcmp(binding, "sndHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "rcvHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "subscribe") == 0) { + /* Add the subscription value to the list.*/ + char * substr = NULL; + substr = strdup(val); + info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1); + info->subscriptions[nsubs] = substr; + ++nsubs; + } else if (strcmp(binding, "sndBuf") == 0) { + info->sndBuf = atoi(val); + } else if (strcmp(binding, "rcvBuf") == 0) { + info->rcvBuf = atoi(val); + } else if (strcmp(binding, "linger") == 0) { + info->linger = atoi(val); + } else if (strcmp(binding, "backlog") == 0) { + info->backlog = atoi(val); + } else if (strcmp(binding, "sndTimeout") == 0) { + info->sndTimeout = atoi(val); + } else if (strcmp(binding, "rcvTimeout") == 0) { + info->rcvTimeout = atoi(val); + } else if (strcmp(binding, "maxMsgSize") == 0) { + info->maxMsgSize = atoi(val); + } else if (strcmp(binding, "rate") == 0) { + info->rate = atoi(val); + } else if (strcmp(binding, "recoveryIVL") == 0) { + info->recoveryIVL = atoi(val); + } else if (strcmp(binding, "multicastHops") == 0) { + info->multicastHops = atoi(val); + } else if (strcmp(binding, "reconnectIVL") == 0) { + info->reconnectIVL = atoi(val); + } else if (strcmp(binding, "reconnectIVLMax") == 0) { + info->reconnectIVLMax = atoi(val); + } else if (strcmp(binding, "ipv4Only") == 0) { + info->ipv4Only = atoi(val); + } else if (strcmp(binding, "affinity") == 0) { + info->affinity = atoi(val); + } else { + errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding); + return RS_RET_INVALID_PARAMS; + } + } + + return RS_RET_OK; +} + +static rsRetVal validateConfig(socket_info* info) { + + if (info->type == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid type"); + return RS_RET_INVALID_PARAMS; + } + if (info->action == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid action"); + return RS_RET_INVALID_PARAMS; + } + if (info->description == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you didn't enter a description"); + return RS_RET_INVALID_PARAMS; + } + if(info->type == ZMQ_SUB && info->subscriptions == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "SUB sockets need at least one subscription"); + return RS_RET_INVALID_PARAMS; + } + if(info->type != ZMQ_SUB && info->subscriptions != NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "only SUB sockets can have subscriptions"); + return RS_RET_INVALID_PARAMS; + } + return RS_RET_OK; +} + +static rsRetVal createContext() { + if (s_context == NULL) { + errmsg.LogError(0, NO_ERRCODE, "creating zctx."); + s_context = zctx_new(); + + if (s_context == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "zctx_new failed: %s", + strerror(errno)); + /* DK: really should do better than invalid params...*/ + return RS_RET_INVALID_PARAMS; + } + + if (s_io_threads > 1) { + errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads); + zctx_set_iothreads(s_context, s_io_threads); + } + } + return RS_RET_OK; +} + +static rsRetVal createSocket(socket_info* info, void** sock) { + size_t ii; + int rv; + + *sock = zsocket_new(s_context, info->type); + if (!sock) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zsocket_new failed: %s, for type %d", + strerror(errno),info->type); + /* DK: invalid params seems right here */ + return RS_RET_INVALID_PARAMS; + } + + /* Set options *before* the connect/bind. */ + if (info->identity) zsocket_set_identity(*sock, info->identity); + if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf); + if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf); + if (info->linger > -1) zsocket_set_linger(*sock, info->linger); + if (info->backlog > -1) zsocket_set_backlog(*sock, info->backlog); + if (info->sndTimeout > -1) zsocket_set_sndtimeo(*sock, info->sndTimeout); + if (info->rcvTimeout > -1) zsocket_set_rcvtimeo(*sock, info->rcvTimeout); + if (info->maxMsgSize > -1) zsocket_set_maxmsgsize(*sock, info->maxMsgSize); + if (info->rate > -1) zsocket_set_rate(*sock, info->rate); + if (info->recoveryIVL > -1) zsocket_set_recovery_ivl(*sock, info->recoveryIVL); + if (info->multicastHops > -1) zsocket_set_multicast_hops(*sock, info->multicastHops); + if (info->reconnectIVL > -1) zsocket_set_reconnect_ivl(*sock, info->reconnectIVL); + if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax); + if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only); + if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity); + + /* since HWM have defaults, we always set them. No return codes to check, either.*/ + zsocket_set_sndhwm(*sock, info->sndHWM); + zsocket_set_rcvhwm(*sock, info->rcvHWM); + + /* Set subscriptions.*/ + for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii) + zsocket_set_subscribe(*sock, info->subscriptions[ii]); + + + + /* Do the bind/connect... */ + if (info->action==ACTION_CONNECT) { + rv = zsocket_connect(*sock, info->description); + if (rv < 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_connect using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } else { + rv = zsocket_bind(*sock, info->description); + if (rv <= 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_bind using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } + return RS_RET_OK; +} + +/* ---------------------------------------------------------------------------- + * Module endpoints + */ + +/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note + * that this makes the assumption that after the bind ruleset is called in the config, + * another call will be made to add an endpoint. +*/ +static rsRetVal +set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) { + ruleset_t* ruleset_ptr; + rsRetVal localRet; + DEFiRet; + + localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName); + if(localRet == RS_RET_NOT_FOUND) { + errmsg.LogError(0, NO_ERRCODE, "error: " + "ruleset '%s' not found - ignored", pszName); + } + CHKiRet(localRet); + s_ruleset = ruleset_ptr; + DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName); + +finalize_it: + free(pszName); /* no longer needed */ + RETiRet; +} + +/* add an actual endpoint + */ +static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) { + DEFiRet; + + /* increment number of items and store old num items, as it will be handy.*/ + size_t idx = s_nitems++; + + /* allocate a new socket_info array to accomidate this new endpoint*/ + socket_info* tmpSocketInfo; + CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems)); + + /* copy existing socket_info across into new array, if any, and free old storage*/ + if(idx) { + memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx); + free(s_socketInfo); + } + + /* set the static to hold the new array */ + s_socketInfo = tmpSocketInfo; + + /* point to the new one */ + socket_info* sockInfo = &s_socketInfo[idx]; + + /* set defaults for the new socket info */ + setDefaults(sockInfo); + + /* Make a writeable copy of the string so we can use strtok + in the parseConfig call */ + char * copy = NULL; + CHKmalloc(copy = strdup((char *) valp)); + + /* parse the config string */ + CHKiRet(parseConfig(copy, sockInfo)); + + /* validate it */ + CHKiRet(validateConfig(sockInfo)); + + /* bind to the current ruleset (if any)*/ + sockInfo->ruleset = s_ruleset; + +finalize_it: + free(valp); /* in any case, this is no longer needed */ + RETiRet; +} + + +static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) { + msg_t* logmsg; + poller_data* pollerData = (poller_data*)pd; + + char* buf = zstr_recv(poller->socket); + if (msgConstruct(&logmsg) == RS_RET_OK) { + MsgSetRawMsg(logmsg, buf, strlen(buf)); + MsgSetInputName(logmsg, s_namep); + MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(logmsg, pollerData->ruleset); + logmsg->msgFlags = NEEDS_PARSING; + submitMsg(logmsg); + } + + /* gotta free the string returned from zstr_recv() */ + free(buf); + + if( pollerData->thread->bShallStop == TRUE) { + /* a handler that returns -1 will terminate the + czmq reactor loop + */ + return -1; + } + + return 0; +} + +/* called when runInput is called by rsyslog + */ +static rsRetVal rcv_loop(thrdInfo_t* pThrd){ + size_t i; + int rv; + zmq_pollitem_t* items; + poller_data* pollerData; + + DEFiRet; + + /* create the context*/ + CHKiRet(createContext()); + + /* create the poll items*/ + CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems)); + + /* create poller data (stuff to pass into the zmq closure called when we get a message)*/ + CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems)); + + /* loop through and initialize the poll items and poller_data arrays...*/ + for(i=0; i<s_nitems;++i) { + /* create the socket, update items.*/ + createSocket(&s_socketInfo[i], &items[i].socket); + items[i].events = ZMQ_POLLIN; + + /* now update the poller_data for this item */ + pollerData[i].thread = pThrd; + pollerData[i].ruleset = s_socketInfo[i].ruleset; + } + + s_zloop = zloop_new(); + for(i=0; i<s_nitems; ++i) { + + rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]); + if (rv) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i); + } + } + zloop_start(s_zloop); + zloop_destroy(&s_zloop); + finalize_it: + for(i=0; i< s_nitems; ++i) { + zsocket_destroy(s_context, items[i].socket); + } + + zctx_destroy(&s_context); + + free(items); + RETiRet; +} + +/* ---------------------------------------------------------------------------- + * input module functions + */ + +BEGINrunInput +CODESTARTrunInput + iRet = rcv_loop(pThrd); + RETiRet; +ENDrunInput + + +/* initialize and return if will run or not */ +BEGINwillRun +CODESTARTwillRun + /* we need to create the inputName property (only once during our + lifetime) */ + CHKiRet(prop.Construct(&s_namep)); + CHKiRet(prop.SetString(s_namep, + UCHAR_CONSTANT("imzmq3"), + sizeof("imzmq3") - 1)); + CHKiRet(prop.ConstructFinalize(s_namep)); + +/* If there are no endpoints this is pointless ...*/ + if (s_nitems == 0) + ABORT_FINALIZE(RS_RET_NO_RUN); + +finalize_it: +ENDwillRun + + +BEGINafterRun +CODESTARTafterRun + /* do cleanup here */ + if(s_namep != NULL) + prop.Destruct(&s_namep); +ENDafterRun + + +BEGINmodExit +CODESTARTmodExit + /* release what we no longer need */ + objRelease(errmsg, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); +ENDmodExit + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, + void __attribute__((unused)) *pVal) { + return RS_RET_OK; +} +static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) { + errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val); + s_io_threads = val; + return RS_RET_OK; +} + +BEGINmodInit() +CODESTARTmodInit + /* we only support the current interface specification */ + *ipIFVersProvided = CURR_MOD_IF_VERSION; +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset", + 0, eCmdHdlrGetWord, + set_ruleset, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun", + 0, eCmdHdlrGetWord, + add_endpoint, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", + 1, eCmdHdlrCustomHandler, + resetConfigVariables, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads", + 1, eCmdHdlrInt, + setGlobalWorkerThreads, NULL, + STD_LOADABLE_MODULE_ID)); +ENDmodInit diff --git a/plugins/mmaudit/mmaudit.c b/plugins/mmaudit/mmaudit.c index fcefd013..4934312b 100644 --- a/plugins/mmaudit/mmaudit.c +++ b/plugins/mmaudit/mmaudit.c @@ -67,7 +67,6 @@ DEFobjCurrIf(errmsg); DEF_OMOD_STATIC_DATA typedef struct _instanceData { - ee_ctx ctxee; /**< context to be used for libee */ } instanceData; typedef struct configSettings_s { @@ -93,7 +92,6 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - ee_exitCtx(pData->ctxee); ENDfreeInstance @@ -169,17 +167,20 @@ finalize_it: /* parse the audit record and create libee structure */ static rsRetVal -audit_parse(instanceData *pData, uchar *buf, struct ee_event **event) +audit_parse(uchar *buf, struct json_object **jsonRoot) { - es_str_t *estr; + struct json_object *json; + struct json_object *jval; char name[1024]; char val[1024]; DEFiRet; - *event = ee_newEvent(pData->ctxee); - if(event == NULL) { + *jsonRoot = json_object_new_object(); + if(*jsonRoot == NULL) { ABORT_FINALIZE(RS_RET_ERR); } + json = json_object_new_object(); + json_object_object_add(*jsonRoot, "data", json); while(*buf) { //dbgprintf("audit_parse, buf: '%s'\n", buf); @@ -189,10 +190,8 @@ audit_parse(instanceData *pData, uchar *buf, struct ee_event **event) } ++buf; CHKiRet(parseValue(&buf, val, sizeof(val))); - - estr = es_newStrFromCStr(val, strlen(val)); - ee_addStrFieldToEvent(*event, name, estr); - es_deleteStr(estr); + jval = json_object_new_string(val); + json_object_object_add(json, name, jval); dbgprintf("mmaudit: parsed %s=%s\n", name, val); } @@ -206,9 +205,10 @@ BEGINdoAction msg_t *pMsg; uchar *buf; int typeID; - struct ee_event *event; + struct json_object *jsonRoot; + struct json_object *json; + struct json_object *jval; int i; - es_str_t *estr; char auditID[1024]; int bSuccess = 0; CODESTARTdoAction @@ -252,48 +252,24 @@ dbgprintf("mmaudit: msg is '%s'\n", buf); } buf += 2; -dbgprintf("mmaudit: cookie found, type %d, auditID '%s', rest of message: '%s'\n", typeID, auditID, buf); - audit_parse(pData, buf, &event); - if(event == NULL) { + audit_parse(buf, &jsonRoot); + if(jsonRoot == NULL) { DBGPRINTF("mmaudit: audit parse error, assuming no " "audit message: '%s'\n", buf); FINALIZE; } /* we now need to shuffle the "outer" properties into that stream */ - estr = es_newStrFromCStr(auditID, strlen(auditID)); - ee_addStrFieldToEvent(event, "audithdr.auditid", estr); - es_deleteStr(estr); - - /* we abuse auditID a bit to save space... (TODO: change!) */ - snprintf(auditID, sizeof(auditID), "%d", typeID); - estr = es_newStrFromCStr(auditID, strlen(auditID)); - ee_addStrFieldToEvent(event, "audithdr.type", estr); - es_deleteStr(estr); - - /* TODO: in the long term, we need to think about merging & different - name spaces (probably best to add the newly-obtained event as a child to - the existing event...) - */ - if(pMsg->event != NULL) { - ee_deleteEvent(pMsg->event); - } - pMsg->event = event; + json = json_object_new_object(); + json_object_object_add(jsonRoot, "hdr", json); + jval = json_object_new_string(auditID); + json_object_object_add(json, "auditid", jval); + jval = json_object_new_int(typeID); + json_object_object_add(json, "type", jval); + + msgAddJSON(pMsg, (uchar*)"!audit", jsonRoot); bSuccess = 1; -#if 1 - /***DEBUG***/ // TODO: remove after initial testing - 2010-12-01 - { - char *cstr; - es_str_t *str; - ee_fmtEventToJSON(pMsg->event, &str); - cstr = es_str2cstr(str, NULL); - dbgprintf("mmaudit generated: %s\n", cstr); - free(cstr); - es_deleteStr(str); - } - /***END DEBUG***/ -#endif finalize_it: MsgSetParseSuccess(pMsg, bSuccess); ENDdoAction @@ -318,13 +294,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) * the format specified (if any) is always ignored. */ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_MSG, (uchar*) "RSYSLOG_FileFormat")); - - /* finally build the instance */ - if((pData->ctxee = ee_initCtx()) == NULL) { - errmsg.LogError(0, RS_RET_NO_RULESET, "error: could not initialize libee ctx, cannot " - "activate action"); - ABORT_FINALIZE(RS_RET_ERR_LIBEE_INIT); - } CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct diff --git a/plugins/mmjsonparse/mmjsonparse.c b/plugins/mmjsonparse/mmjsonparse.c index 111ecc2f..da5cfb51 100644 --- a/plugins/mmjsonparse/mmjsonparse.c +++ b/plugins/mmjsonparse/mmjsonparse.c @@ -36,7 +36,7 @@ #include <unistd.h> #include <ctype.h> #include <libestr.h> -#include <libee/libee.h> +#include <json/json.h> #include "conf.h" #include "syslogd-types.h" #include "template.h" @@ -59,13 +59,9 @@ DEFobjCurrIf(errmsg); DEF_OMOD_STATIC_DATA typedef struct _instanceData { - ee_ctx ctxee; /**< context to be used for libee */ + struct json_tokener *tokener; } instanceData; -typedef struct configSettings_s { - int dummy; /* remove when the first real parameter is needed */ -} configSettings_t; -static configSettings_t cs; BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars @@ -85,7 +81,8 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - ee_exitCtx(pData->ctxee); + if(pData->tokener != NULL) + json_tokener_free(pData->tokener); ENDfreeInstance @@ -99,13 +96,56 @@ BEGINtryResume CODESTARTtryResume ENDtryResume + +static rsRetVal +processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf) +{ + struct json_object *json; + const char *errMsg; + DEFiRet; + + dbgprintf("mmjsonparse: toParse: '%s'\n", buf); + json_tokener_reset(pData->tokener); + + json = json_tokener_parse_ex(pData->tokener, buf, lenBuf); + if(Debug) { + errMsg = NULL; + if(json == NULL) { + enum json_tokener_error err; + + err = pData->tokener->err; + if(err != json_tokener_continue) + errMsg = json_tokener_errors[err]; + else + errMsg = "Unterminated input"; + } else if((size_t)pData->tokener->char_offset < lenBuf) + errMsg = "Extra characters after JSON object"; + else if(!json_object_is_type(json, json_type_object)) + errMsg = "JSON value is not an object"; + if(errMsg != NULL) { + dbgprintf("mmjsonparse: Error parsing JSON '%s': %s\n", + buf, errMsg); + } + } + if(json == NULL + || ((size_t)pData->tokener->char_offset < lenBuf) + || (!json_object_is_type(json, json_type_object))) { + ABORT_FINALIZE(RS_RET_NO_CEE_MSG); + } + + msgAddJSON(pMsg, (uchar*)"!", json); +finalize_it: + RETiRet; +} + #define COOKIE "@cee:" #define LEN_COOKIE (sizeof(COOKIE)-1) BEGINdoAction msg_t *pMsg; uchar *buf; - struct ee_event *event; int bSuccess = 0; + struct json_object *jval; + struct json_object *json; CODESTARTdoAction pMsg = (msg_t*) ppString[0]; /* note that we can performance-optimize the interface, but this also @@ -114,47 +154,25 @@ CODESTARTdoAction */ buf = getMSG(pMsg); -dbgprintf("mmjsonparse: msg is '%s'\n", buf); while(*buf && isspace(*buf)) { ++buf; } if(*buf == '\0' || strncmp((char*)buf, COOKIE, LEN_COOKIE)) { DBGPRINTF("mmjsonparse: no JSON cookie: '%s'\n", buf); - FINALIZE; + ABORT_FINALIZE(RS_RET_NO_CEE_MSG); } buf += LEN_COOKIE; -dbgprintf("mmjsonparse: cookie found, rest of message: '%s'\n", buf); - event = ee_newEventFromJSON(pData->ctxee, (char*)buf); - if(event == NULL) { - DBGPRINTF("mmjsonparse: JSON parse error, assuming no " - "JSON-enhanced message: '%s'\n", buf); - FINALIZE; - } - /* TODO: in the long term, we need to think about merging & different - name spaces (probably best to add the newly-obtained event as a child to - the existing event...) - */ - if(pMsg->event != NULL) { - ee_deleteEvent(pMsg->event); - } - pMsg->event = event; + CHKiRet(processJSON(pData, pMsg, (char*) buf, strlen((char*)buf))); bSuccess = 1; - -#if 1 - /***DEBUG***/ // TODO: remove after initial testing - 2010-12-01 - { - char *cstr; - es_str_t *str; - ee_fmtEventToJSON(pMsg->event, &str); - cstr = es_str2cstr(str, NULL); - dbgprintf("mmjsonparse generated: %s\n", cstr); - free(cstr); - es_deleteStr(str); - } - /***END DEBUG***/ -#endif finalize_it: + if(iRet == RS_RET_NO_CEE_MSG) { + /* add buf as msg */ + json = json_object_new_object(); + jval = json_object_new_string((char*)buf); + json_object_object_add(json, "msg", jval); + msgAddJSON(pMsg, (uchar*)"!", json); + } MsgSetParseSuccess(pMsg, bSuccess); ENDdoAction @@ -180,10 +198,11 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_MSG, (uchar*) "RSYSLOG_FileFormat")); /* finally build the instance */ - if((pData->ctxee = ee_initCtx()) == NULL) { - errmsg.LogError(0, RS_RET_NO_RULESET, "error: could not initialize libee ctx, cannot " - "activate action"); - ABORT_FINALIZE(RS_RET_ERR_LIBEE_INIT); + pData->tokener = json_tokener_new(); + if(pData->tokener == NULL) { + errmsg.LogError(0, RS_RET_ERR, "error: could not create json " + "tokener, cannot activate action"); + ABORT_FINALIZE(RS_RET_ERR); } CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct diff --git a/plugins/mmnormalize/mmnormalize.c b/plugins/mmnormalize/mmnormalize.c index c5b290f4..bf0b9ce6 100644 --- a/plugins/mmnormalize/mmnormalize.c +++ b/plugins/mmnormalize/mmnormalize.c @@ -4,9 +4,12 @@ * * NOTE: read comments in module-template.h for details on the calling interface! * + * TODO: check if we can replace libee via JSON system - currently that part + * is pretty inefficient... rgerhards, 2012-08-27 + * * File begun on 2010-01-01 by RGerhards * - * Copyright 2010 Rainer Gerhards and Adiscon GmbH. + * Copyright 2010-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -37,6 +40,7 @@ #include <unistd.h> #include <libestr.h> #include <libee/libee.h> +#include <json/json.h> #include <liblognorm.h> #include "conf.h" #include "syslogd-types.h" @@ -108,8 +112,12 @@ BEGINdoAction msg_t *pMsg; es_str_t *str; uchar *buf; + char *cstrJSON; int len; int r; + struct ee_event *event = NULL; + struct json_tokener *tokener; + struct json_object *json; CODESTARTdoAction pMsg = (msg_t*) ppString[0]; /* note that we can performance-optimize the interface, but this also @@ -123,7 +131,7 @@ CODESTARTdoAction len = getMSGLen(pMsg); } str = es_newStrFromCStr((char*)buf, len); - r = ln_normalize(pData->ctxln, str, &pMsg->event); + r = ln_normalize(pData->ctxln, str, &event); if(r != 0) { DBGPRINTF("error %d during ln_normalize\n", r); MsgSetParseSuccess(pMsg, 0); @@ -131,16 +139,20 @@ CODESTARTdoAction MsgSetParseSuccess(pMsg, 1); } es_deleteStr(str); - /***DEBUG***/ // TODO: remove after initial testing - 2010-12-01 - { - char *cstr; - ee_fmtEventToJSON(pMsg->event, &str); - cstr = es_str2cstr(str, NULL); - dbgprintf("mmnormalize generated: %s\n", cstr); - free(cstr); - es_deleteStr(str); - } - /***END DEBUG***/ + + /* reformat to our json data struct */ + // TODO: this is all extremly ineffcient! + ee_fmtEventToJSON(event, &str); + cstrJSON = es_str2cstr(str, NULL); + dbgprintf("mmnormalize generated: %s\n", cstrJSON); + + tokener = json_tokener_new(); + json = json_tokener_parse_ex(tokener, cstrJSON, strlen((char*)cstrJSON)); + json_tokener_free(tokener); + msgAddJSON(pMsg, (uchar*)"!", json); + + free(cstrJSON); + es_deleteStr(str); ENDdoAction diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index cd14d03c..f8a7e739 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -67,8 +67,8 @@ typedef struct configSettings_s { uchar *dfltTplName; /* default template name to use */ int hdfsPort; } configSettings_t; +static configSettings_t cs; -SCOPING_SUPPORT; /* must be set AFTER configSettings_t is defined */ BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars diff --git a/plugins/ommongodb/README b/plugins/ommongodb/README index 7581131a..ad4a8ea2 100644 --- a/plugins/ommongodb/README +++ b/plugins/ommongodb/README @@ -6,20 +6,13 @@ configuration: in your /etc/rsyslog.conf, together with other modules: $ModLoad ommongodb # provides mongodb support +*.* action(type="ommongodb" db="..." collection="..." template="...") -then in your /etc/rsyslog.d (check your distribution way to organize the configuration..) you create a file 10-mongodb.conf with the following content: +Note: if no template is specified, a default schema will be used. That schema +contains proper data types. However, if a template is specified, only strings +are supported. This is a restriction of the rsyslog v6 core engine. This +changed in v7. -*.* action(type="ommongodb" db="..." collection="...") - -Note: currently templates are not supported. Ommongodb will pick a default -schema and use the message object content for that (templateless). - - -TODO -we must ensure that the collection is a capped collection -refactor my code :-) - -email Victor Pereira <victor.pereira@bigrails.com> -twitter twitter.com/vpereira - -part of this doc by Rainer Gerhards <rgerhards@adiscon.com> +If templates are used, it is suggested to use list-based templates. Constants +can ONLY be inserted with list-based templates, as only these provide the +capability to specify a field name (outname parameter). diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c index aa20b33f..2c65f275 100644 --- a/plugins/ommongodb/ommongodb.c +++ b/plugins/ommongodb/ommongodb.c @@ -30,8 +30,12 @@ #include <errno.h> #include <assert.h> #include <signal.h> +#include <stdint.h> #include <time.h> #include <mongo.h> +#include <json/json.h> +/* For struct json_object_iter, should not be necessary in future versions */ +#include <json/json_object_private.h> #include "rsyslog.h" #include "conf.h" @@ -42,6 +46,7 @@ #include "datetime.h" #include "errmsg.h" #include "cfsysline.h" +#include "unicode-helper.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -54,6 +59,7 @@ DEFobjCurrIf(datetime) typedef struct _instanceData { mongo_sync_connection *conn; + struct json_tokener *json_tokener; /* only if (tplName != NULL) */ uchar *server; int port; uchar *db; @@ -108,11 +114,14 @@ static void closeMongoDB(instanceData *pData) BEGINfreeInstance CODESTARTfreeInstance closeMongoDB(pData); + if (pData->json_tokener != NULL) + json_tokener_free(pData->json_tokener); free(pData->server); free(pData->db); free(pData->collection); free(pData->uid); free(pData->pwd); + free(pData->dbNcoll); free(pData->tplName); ENDfreeInstance @@ -120,6 +129,7 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* nothing special here */ + (void)pData; ENDdbgPrintInstInfo @@ -139,6 +149,8 @@ reportMongoError(instanceData *pData) errmsg.LogError(0, RS_RET_ERR, "ommongodb: we had an error, but can " "not obtain specifics"); } +#else + (void)pData; #endif } @@ -202,29 +214,24 @@ i10pow(int exp) } return r; } -/* write to mongodb in MSG passing mode, that is without a template. +/* Return a BSON document when an user hasn't specified a template. * In this mode, we use the standard document format, which is somewhat * aligned to cee (as described in project lumberjack). Note that this is * a moving target, so we may run out of sync (and stay so to retain * backward compatibility, which we consider pretty important). */ -rsRetVal writeMongoDB_msg(msg_t *pMsg, instanceData *pData) +static bson * +getDefaultBSON(msg_t *pMsg) { bson *doc = NULL; - uchar *procid; short unsigned procid_free; size_t procid_len; - uchar *tag; short unsigned tag_free; size_t tag_len; - uchar *pid; short unsigned pid_free; size_t pid_len; - uchar *sys; short unsigned sys_free; size_t sys_len; - uchar *msg; short unsigned msg_free; size_t msg_len; + uchar *procid; short unsigned procid_free; rs_size_t procid_len; + uchar *tag; short unsigned tag_free; rs_size_t tag_len; + uchar *pid; short unsigned pid_free; rs_size_t pid_len; + uchar *sys; short unsigned sys_free; rs_size_t sys_len; + uchar *msg; short unsigned msg_free; rs_size_t msg_len; int severity, facil; gint64 ts_gen, ts_rcv; /* timestamps: generated, received */ int secfrac; - DEFiRet; - - /* see if we are ready to proceed */ - if(pData->conn == NULL) { - CHKiRet(initMongoDB(pData, 0)); - } procid = MsgGetProp(pMsg, NULL, PROP_PROGRAMNAME, NULL, &procid_len, &procid_free); tag = MsgGetProp(pMsg, NULL, PROP_SYSLOGTAG, NULL, &tag_len, &tag_free); @@ -276,22 +283,129 @@ dbgprintf("ommongodb: secfrac is %d, precision %d\n", pMsg->tTIMESTAMP.secfrac, if(sys_free) free(sys); if(msg_free) free(msg); - if(doc == NULL) { - reportMongoError(pData); - dbgprintf("ommongodb: error creating BSON doc\n"); - ABORT_FINALIZE(RS_RET_SUSPENDED); - } + if(doc == NULL) + return doc; bson_finish(doc); - if(!mongo_sync_cmd_insert(pData->conn, (char*)pData->dbNcoll, doc, NULL)) { - reportMongoError(pData); - dbgprintf("ommongodb: insert error\n"); - ABORT_FINALIZE(RS_RET_SUSPENDED); + return doc; +} + +static bson *BSONFromJSONArray(struct json_object *json); +static bson *BSONFromJSONObject(struct json_object *json); + +/* Append a BSON variant of json to doc using name. Return TRUE on success */ +static gboolean +BSONAppendJSONObject(bson *doc, const gchar *name, struct json_object *json) +{ + switch(json != NULL ? json_object_get_type(json) : json_type_null) { + case json_type_null: + return bson_append_null(doc, name); + case json_type_boolean: + return bson_append_boolean(doc, name, + json_object_get_boolean(json)); + case json_type_double: + return bson_append_double(doc, name, + json_object_get_double(json)); + case json_type_int: { + int64_t i; + + /* FIXME: the future version will have get_int64 */ + i = json_object_get_int(json); + if (i >= INT32_MIN && i <= INT32_MAX) + return bson_append_int32(doc, name, i); + else + return bson_append_int64(doc, name, i); + } + case json_type_object: { + bson *sub; + gboolean ok; + + sub = BSONFromJSONObject(json); + if (sub == NULL) + return FALSE; + ok = bson_append_document(doc, name, sub); + bson_free(sub); + return ok; + } + case json_type_array: { + bson *sub; + gboolean ok; + + sub = BSONFromJSONArray(json); + if (sub == NULL) + return FALSE; + ok = bson_append_document(doc, name, sub); + bson_free(sub); + return ok; } + case json_type_string: + return bson_append_string(doc, name, + json_object_get_string(json), -1); -finalize_it: + default: + return FALSE; + } +} + +/* Return a BSON variant of json, which must be a json_type_array */ +static bson * +BSONFromJSONArray(struct json_object *json) +{ + /* Way more than necessary */ + bson *doc = NULL; + size_t i, array_len; + + doc = bson_new(); + if(doc == NULL) + goto error; + + array_len = json_object_array_length(json); + for (i = 0; i < array_len; i++) { + char buf[sizeof(size_t) * CHAR_BIT + 1]; + + if ((size_t)snprintf(buf, sizeof(buf), "%zu", i) >= sizeof(buf)) + goto error; + if (BSONAppendJSONObject(doc, buf, + json_object_array_get_idx(json, i)) + == FALSE) + goto error; + } + + if(bson_finish(doc) == FALSE) + goto error; + + return doc; + +error: if(doc != NULL) bson_free(doc); - RETiRet; + return NULL; +} + +/* Return a BSON variant of json, which must be a json_type_object */ +static bson * +BSONFromJSONObject(struct json_object *json) +{ + bson *doc = NULL; + struct json_object_iter it; + + doc = bson_new(); + if(doc == NULL) + goto error; + + json_object_object_foreachC(json, it) { + if (BSONAppendJSONObject(doc, it.key, it.val) == FALSE) + goto error; + } + + if(bson_finish(doc) == FALSE) + goto error; + + return doc; + +error: + if(doc != NULL) + bson_free(doc); + return NULL; } BEGINtryResume @@ -302,10 +416,32 @@ CODESTARTtryResume ENDtryResume BEGINdoAction + bson *doc = NULL; CODESTARTdoAction + /* see if we are ready to proceed */ + if(pData->conn == NULL) { + CHKiRet(initMongoDB(pData, 0)); + } + if(pData->tplName == NULL) { - iRet = writeMongoDB_msg((msg_t*)ppString[0], pData); + doc = getDefaultBSON((msg_t*)ppString[0]); + } else { + doc = BSONFromJSONObject((struct json_object *)ppString[0]); + } + if(doc == NULL) { + dbgprintf("ommongodb: error creating BSON doc\n"); + /* FIXME: is this a correct return code? */ + ABORT_FINALIZE(RS_RET_ERR); } + if(!mongo_sync_cmd_insert(pData->conn, (char*)pData->dbNcoll, doc, NULL)) { + reportMongoError(pData); + dbgprintf("ommongodb: insert error\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(doc != NULL) + bson_free(doc); ENDdoAction @@ -360,12 +496,9 @@ CODESTARTnewActInst if(pData->tplName == NULL) { CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); } else { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, - "ommongodb: templates are not supported in this version"); - ABORT_FINALIZE(RS_RET_ERR); - CHKiRet(OMSRsetEntry(*ppOMSR, 0, - (uchar*) strdup((char*) pData->tplName), - OMSR_TPL_AS_ARRAY)); + CHKiRet(OMSRsetEntry(*ppOMSR, 0, ustrdup(pData->tplName), + OMSR_TPL_AS_JSON)); + CHKmalloc(pData->json_tokener = json_tokener_new()); } if(pData->db == NULL) @@ -417,6 +550,10 @@ CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt BEGINmodInit() + rsRetVal localRet; + rsRetVal (*pomsrGetSupportedTplOpts)(unsigned long *pOpts); + unsigned long opts; + int bJSONPassingSupported; CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr @@ -424,5 +561,22 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); DBGPRINTF("ommongodb: module compiled with rsyslog version %s.\n", VERSION); - //DBGPRINTF("ommongodb: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); + + /* check if the rsyslog core supports parameter passing code */ + bJSONPassingSupported = 0; + localRet = pHostQueryEtryPt((uchar*)"OMSRgetSupportedTplOpts", + &pomsrGetSupportedTplOpts); + if(localRet == RS_RET_OK) { + /* found entry point, so let's see if core supports msg passing */ + CHKiRet((*pomsrGetSupportedTplOpts)(&opts)); + if(opts & OMSR_TPL_AS_JSON) + bJSONPassingSupported = 1; + } else if(localRet != RS_RET_ENTRY_POINT_NOT_FOUND) { + ABORT_FINALIZE(localRet); /* Something else went wrong, not acceptable */ + } + if(!bJSONPassingSupported) { + DBGPRINTF("ommongodb: JSON-passing is not supported by rsyslog core, " + "can not continue.\n"); + ABORT_FINALIZE(RS_RET_NO_JSON_PASSING); + } ENDmodInit diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c index 67aee97e..6c770c94 100644 --- a/plugins/omruleset/omruleset.c +++ b/plugins/omruleset/omruleset.c @@ -165,6 +165,9 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) p += sizeof(":omruleset:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ CHKiRet(createInstance(&pData)); + errmsg.LogError(0, RS_RET_DEPRECATED, "warning: omruleset is deprecated, consider " + "using the 'call' statement instead"); + /* check if a non-standard template is to be applied */ if(*(p-1) == ';') --p; @@ -237,6 +240,9 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + errmsg.LogError(0, RS_RET_DEPRECATED, "warning: omruleset is deprecated, consider " + "using the 'call' statement instead"); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionomrulesetrulesetname", 0, eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c index 1db2f7f0..531a0dcf 100644 --- a/plugins/omudpspoof/omudpspoof.c +++ b/plugins/omudpspoof/omudpspoof.c @@ -107,23 +107,41 @@ typedef struct _instanceData { #define DFLT_SOURCE_PORT_END 42000 typedef struct configSettings_s { - uchar *pszTplName; /* name of the default template to use */ + uchar *tplName; /* name of the default template to use */ uchar *pszSourceNameTemplate; /* name of the template containing the spoofing address */ uchar *pszTargetHost; uchar *pszTargetPort; - int iCompressionLevel; /* zlib compressionlevel, the usual values */ int iSourcePortStart; int iSourcePortEnd; } configSettings_t; static configSettings_t cs; +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "template", eCmdHdlrGetWord, 0 }, +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ + uchar *tplName; /* default template */ +}; + +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ + + + BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars - cs.pszTplName = NULL; + cs.tplName = NULL; cs.pszSourceNameTemplate = NULL; cs.pszTargetHost = NULL; cs.pszTargetPort = NULL; - cs.iCompressionLevel = 0; cs.iSourcePortStart = DFLT_SOURCE_PORT_START; cs.iSourcePortEnd = DFLT_SOURCE_PORT_END; ENDinitConfVars @@ -138,6 +156,44 @@ pthread_mutex_t mutLibnet; static rsRetVal doTryResume(instanceData *pData); +/* this function gets the default template. It coordinates action between + * old-style and new-style configuration parts. + */ +static inline uchar* +getDfltTpl(void) +{ + if(loadModConf != NULL && loadModConf->tplName != NULL) + return loadModConf->tplName; + else if(cs.tplName == NULL) + return (uchar*)"RSYSLOG_FileFormat"; + else + return cs.tplName; +} + + +/* set the default template to be used + * This is a module-global parameter, and as such needs special handling. It needs to + * be coordinated with values set via the v2 config system (rsyslog v6+). What we do + * is we do not permit this directive after the v2 config system has been used to set + * the parameter. + */ +rsRetVal +setLegacyDfltTpl(void __attribute__((unused)) *pVal, uchar* newVal) +{ + DEFiRet; + + if(loadModConf != NULL && loadModConf->tplName != NULL) { + free(newVal); + errmsg.LogError(0, RS_RET_ERR, "omudpspoof default template already set via module " + "global parameter - can no longer be changed"); + ABORT_FINALIZE(RS_RET_ERR); + } + free(cs.tplName); + cs.tplName = newVal; +finalize_it: + RETiRet; +} + /* Close the UDP sockets. * rgerhards, 2009-05-29 */ @@ -167,6 +223,72 @@ static inline uchar *getFwdPt(instanceData *pData) } +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + pModConf->tplName = NULL; +ENDbeginCnfLoad + +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for omudpspoof:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "template")) { + loadModConf->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + if(cs.tplName != NULL) { + errmsg.LogError(0, RS_RET_DUP_PARAM, "omudpspoof: warning: default template " + "was already set via legacy directive - may lead to inconsistent " + "results."); + } + } else { + dbgprintf("omudpspoof: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + +BEGINendCnfLoad +CODESTARTendCnfLoad + loadModConf = NULL; /* done loading */ + /* free legacy config vars */ + free(cs.tplName); + cs.tplName = NULL; +ENDendCnfLoad + +BEGINcheckCnf +CODESTARTcheckCnf +ENDcheckCnf + +BEGINactivateCnf +CODESTARTactivateCnf + runModConf = pModConf; +ENDactivateCnf + +BEGINfreeCnf +CODESTARTfreeCnf + free(pModConf->tplName); +ENDfreeCnf + + BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance @@ -421,13 +543,12 @@ CODE_STD_STRING_REQUESTparseSelectorAct(2) else CHKmalloc(pData->port = ustrdup(cs.pszTargetPort)); CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(sourceTpl), OMSR_NO_RQD_TPL_OPTS)); - pData->compressionLevel = cs.iCompressionLevel; pData->sourcePort = pData->sourcePortStart = cs.iSourcePortStart; pData->sourcePortEnd = cs.iSourcePortEnd; /* process template */ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, - (cs.pszTplName == NULL) ? (uchar*)"RSYSLOG_TraditionalForwardFormat" : cs.pszTplName)); + (cs.tplName == NULL) ? (uchar*)"RSYSLOG_TraditionalForwardFormat" : cs.tplName)); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -439,8 +560,8 @@ ENDparseSelectorAct static void freeConfigVars(void) { - free(cs.pszTplName); - cs.pszTplName = NULL; + free(cs.tplName); + cs.tplName = NULL; free(cs.pszTargetHost); cs.pszTargetHost = NULL; free(cs.pszTargetPort); @@ -464,6 +585,8 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES ENDqueryEtryPt @@ -474,7 +597,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a { freeConfigVars(); /* we now must reset all non-string values */ - cs.iCompressionLevel = 0; cs.iSourcePortStart = DFLT_SOURCE_PORT_START; cs.iSourcePortEnd = DFLT_SOURCE_PORT_END; return RS_RET_OK; @@ -504,13 +626,12 @@ CODEmodInit_QueryRegCFSLineHdlr } pthread_mutex_init(&mutLibnet, NULL); - CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.pszTplName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofdefaulttemplate", 0, eCmdHdlrGetWord, setLegacyDfltTpl, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofsourcenametemplate", 0, eCmdHdlrGetWord, NULL, &cs.pszSourceNameTemplate, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspooftargethost", 0, eCmdHdlrGetWord, NULL, &cs.pszTargetHost, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspooftargetport", 0, eCmdHdlrGetWord, NULL, &cs.pszTargetPort, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofsourceportstart", 0, eCmdHdlrInt, NULL, &cs.iSourcePortStart, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofsourceportend", 0, eCmdHdlrInt, NULL, &cs.iSourcePortEnd, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpcompressionlevel", 0, eCmdHdlrInt, NULL, &cs.iCompressionLevel, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/omuxsock/omuxsock.c b/plugins/omuxsock/omuxsock.c index cf27c93c..583b9f94 100644 --- a/plugins/omuxsock/omuxsock.c +++ b/plugins/omuxsock/omuxsock.c @@ -71,6 +71,26 @@ typedef struct configSettings_s { } configSettings_t; static configSettings_t cs; +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "template", eCmdHdlrGetWord, 0 }, +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ + uchar *tplName; /* default template */ +}; + +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ + + + BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars cs.tplName = NULL; @@ -80,8 +100,45 @@ ENDinitConfVars static rsRetVal doTryResume(instanceData *pData); -/* Close socket. + +/* this function gets the default template. It coordinates action between + * old-style and new-style configuration parts. + */ +static inline uchar* +getDfltTpl(void) +{ + if(loadModConf != NULL && loadModConf->tplName != NULL) + return loadModConf->tplName; + else if(cs.tplName == NULL) + return (uchar*)"RSYSLOG_TraditionalForwardFormat"; + else + return cs.tplName; +} + +/* set the default template to be used + * This is a module-global parameter, and as such needs special handling. It needs to + * be coordinated with values set via the v2 config system (rsyslog v6+). What we do + * is we do not permit this directive after the v2 config system has been used to set + * the parameter. */ +rsRetVal +setLegacyDfltTpl(void __attribute__((unused)) *pVal, uchar* newVal) +{ + DEFiRet; + + if(loadModConf != NULL && loadModConf->tplName != NULL) { + free(newVal); + errmsg.LogError(0, RS_RET_ERR, "omuxsock default template already set via module " + "global parameter - can no longer be changed"); + ABORT_FINALIZE(RS_RET_ERR); + } + free(cs.tplName); + cs.tplName = newVal; +finalize_it: + RETiRet; +} + + static inline rsRetVal closeSocket(instanceData *pData) { @@ -96,6 +153,72 @@ pData->bIsConnected = 0; // TODO: remove this variable altogether + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + pModConf->tplName = NULL; +ENDbeginCnfLoad + +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for omuxsock:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "template")) { + loadModConf->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + if(cs.tplName != NULL) { + errmsg.LogError(0, RS_RET_DUP_PARAM, "omuxsock: warning: default template " + "was already set via legacy directive - may lead to inconsistent " + "results."); + } + } else { + dbgprintf("omuxsock: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + +BEGINendCnfLoad +CODESTARTendCnfLoad + loadModConf = NULL; /* done loading */ + /* free legacy config vars */ + free(cs.tplName); + cs.tplName = NULL; +ENDendCnfLoad + +BEGINcheckCnf +CODESTARTcheckCnf +ENDcheckCnf + +BEGINactivateCnf +CODESTARTactivateCnf + runModConf = pModConf; +ENDactivateCnf + +BEGINfreeCnf +CODESTARTfreeCnf + free(pModConf->tplName); +ENDfreeCnf + BEGINcreateInstance CODESTARTcreateInstance pData->sock = INVLD_SOCK; @@ -250,8 +373,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) /* check if a non-standard template is to be applied */ if(*(p-1) == ';') --p; - CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0, cs.tplName == NULL ? UCHAR_CONSTANT("RSYSLOG_TraditionalForwardFormat") - : cs.tplName )); + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0, getDfltTpl())); if(cs.sockName == NULL) { errmsg.LogError(0, RS_RET_NO_SOCK_CONFIGURED, "No output socket configured for omuxsock\n"); @@ -291,6 +413,8 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES ENDqueryEtryPt @@ -312,7 +436,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); - CHKiRet(regCfSysLineHdlr((uchar *)"omuxsockdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.tplName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"omuxsockdefaulttemplate", 0, eCmdHdlrGetWord, setLegacyDfltTpl, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"omuxsocksocket", 0, eCmdHdlrGetWord, NULL, &cs.sockName, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/omzmq3/Makefile.am b/plugins/omzmq3/Makefile.am new file mode 100644 index 00000000..92cd7586 --- /dev/null +++ b/plugins/omzmq3/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omzmq3.la + +omzmq3_la_SOURCES = omzmq3.c +omzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS) +omzmq3_la_LDFLAGS = -module -avoid-version +omzmq3_la_LIBADD = $(CZMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README new file mode 100644 index 00000000..ccc96c74 --- /dev/null +++ b/plugins/omzmq3/README @@ -0,0 +1,25 @@ +ZeroMQ 3.x Output Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example +below binds a PUB socket to port 7171, and any message fitting the criteria will +be output to the zmq socket. + +Example Rsyslog.conf snippet (NOTE: v6 format): +------------------------------------------------------------------------------- +if $msg then { + action(type="omzmq3", sockType="PUB", action="BIND", + description="tcp://*:7172) +} +------------------------------------------------------------------------------- diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c new file mode 100644 index 00000000..e13011fb --- /dev/null +++ b/plugins/omzmq3/omzmq3.c @@ -0,0 +1,462 @@ +/* omzmq3.c + * Copyright 2012 Talksum, Inc + * Using the czmq interface to zeromq, we output + * to a zmq socket. + + +* +* This program is free software: you can redistribute it and/or +* modify it under the terms of the GNU Lesser General Public License +* as published by the Free Software Foundation, either version 3 of +* the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public +* License along with this program. If not, see +* <http://www.gnu.org/licenses/>. +* +* Author: David Kelly +* <davidk@talksum.com> +*/ + + +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <unistd.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "cfsysline.h" + +#include <czmq.h> + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omzmq3") + +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + +/* convienent symbols to denote a socket we want to bind + vs one we want to just connect to +*/ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +struct socket_type { + char* name; + int type; +}; + +/* more overkill, but seems nice to be consistent. */ +struct socket_action { + char* name; + int action; +}; + +typedef struct _instanceData { + void* socket; + uchar* description; + int type; + int action; + int sndHWM; + int rcvHWM; + uchar* identity; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + uchar* tplName; +} instanceData; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations + */ + +/* only 1 zctx for all the sockets, with an adjustable number of + worker threads which may be useful if we use affinity in particular + sockets +*/ +static zctx_t* s_context = NULL; +static int s_workerThreads = -1; + +static struct socket_type types[] = { + {"PUB", ZMQ_PUB }, + {"PUSH", ZMQ_PUSH }, + {"XPUB", ZMQ_XPUB } +}; + +static struct socket_action actions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + +static struct cnfparamdescr actpdescr[] = { + { "description", eCmdHdlrGetWord, 0 }, + { "sockType", eCmdHdlrGetWord, 0 }, + { "action", eCmdHdlrGetWord, 0 }, + { "sndHWM", eCmdHdlrInt, 0 }, + { "rcvHWM", eCmdHdlrInt, 0 }, + { "identity", eCmdHdlrGetWord, 0 }, + { "sndBuf", eCmdHdlrInt, 0 }, + { "rcvBuf", eCmdHdlrInt, 0 }, + { "linger", eCmdHdlrInt, 0 }, + { "backlog", eCmdHdlrInt, 0 }, + { "sndTimeout", eCmdHdlrInt, 0 }, + { "rcvTimeout", eCmdHdlrInt, 0 }, + { "maxMsgSize", eCmdHdlrInt, 0 }, + { "rate", eCmdHdlrInt, 0 }, + { "recoveryIVL", eCmdHdlrInt, 0 }, + { "multicastHops", eCmdHdlrInt, 0 }, + { "reconnectIVL", eCmdHdlrInt, 0 }, + { "reconnectIVLMax", eCmdHdlrInt, 0 }, + { "ipv4Only", eCmdHdlrInt, 0 }, + { "affinity", eCmdHdlrInt, 0 }, + { "globalWorkerThreads", eCmdHdlrInt, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; + +static struct cnfparamblk actpblk = { + CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr +}; + +/* ---------------------------------------------------------------------------- + * Helper Functions + */ + +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ +int getSocketType(char* name) { + int type = -1; + uint i; + for(i=0; i<sizeof(types)/sizeof(struct socket_type); ++i) { + if( !strcmp(types[i].name, name) ) { + type = types[i].type; + break; + } + } + return type; +} + + +static int getSocketAction(char* name) { + int action = -1; + uint i; + for(i=0; i < sizeof(actions)/sizeof(struct socket_action); ++i) { + if(!strcmp(actions[i].name, name)) { + action = actions[i].action; + break; + } + } + return action; +} + +/* closeZMQ will destroy the context and + * associated socket + */ +static void closeZMQ(instanceData* pData) { + errmsg.LogError(0, NO_ERRCODE, "closeZMQ called"); + if(s_context && pData->socket) { + if(pData->socket != NULL) { + zsocket_destroy(s_context, pData->socket); + } + } +} + + +static rsRetVal initZMQ(instanceData* pData) { + DEFiRet; + + /* create the context if necessary. */ + if (NULL == s_context) { + s_context = zctx_new(); + if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads); + } + + pData->socket = zsocket_new(s_context, pData->type); + + /* ALWAYS set the HWM as the zmq3 default is 1000 and we default + to 0 (infinity) */ + zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); + zsocket_set_sndhwm(pData->socket, pData->sndHWM); + + /* use czmq defaults for these, unless set to non-default values */ + if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity); + if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf); + if(pData->rcvBuf > -1) zsocket_set_sndbuf(pData->socket, pData->rcvBuf); + if(pData->linger > -1) zsocket_set_linger(pData->socket, pData->linger); + if(pData->backlog > -1) zsocket_set_backlog(pData->socket, pData->backlog); + if(pData->sndTimeout > -1) zsocket_set_sndtimeo(pData->socket, pData->sndTimeout); + if(pData->rcvTimeout > -1) zsocket_set_rcvtimeo(pData->socket, pData->rcvTimeout); + if(pData->maxMsgSize > -1) zsocket_set_maxmsgsize(pData->socket, pData->maxMsgSize); + if(pData->rate > -1) zsocket_set_rate(pData->socket, pData->rate); + if(pData->recoveryIVL > -1) zsocket_set_recovery_ivl(pData->socket, pData->recoveryIVL); + if(pData->multicastHops > -1) zsocket_set_multicast_hops(pData->socket, pData->multicastHops); + if(pData->reconnectIVL > -1) zsocket_set_reconnect_ivl(pData->socket, pData->reconnectIVL); + if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax); + if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only); + if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity); + + /* bind or connect to it */ + if (pData->action == ACTION_BIND) { + /* bind asserts, so no need to test return val here + which isn't the greatest api -- oh well */ + zsocket_bind(pData->socket, (char*)pData->description); + } else { + if(zsocket_connect(pData->socket, (char*)pData->description) == -1) { + errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + } + finalize_it: + RETiRet; +} + +rsRetVal writeZMQ(uchar* msg, instanceData* pData) { + DEFiRet; + + /* initialize if necessary */ + if(NULL == pData->socket) + CHKiRet(initZMQ(pData)); + + /* send it */ + int result = zstr_send(pData->socket, (char*)msg); + + /* whine if things went wrong */ + if (result == -1) { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result); + ABORT_FINALIZE(RS_RET_ERR); + } + finalize_it: + RETiRet; +} + +static inline void +setInstParamDefaults(instanceData* pData) { + pData->description = (uchar*)"tcp://*:7171"; + pData->socket = NULL; + pData->tplName = NULL; + pData->type = ZMQ_PUB; + pData->action = ACTION_BIND; + pData->sndHWM = 0; /*unlimited*/ + pData->rcvHWM = 0; /*unlimited*/ + pData->identity = NULL; + pData->sndBuf = -1; + pData->rcvBuf = -1; + pData->linger = -1; + pData->backlog = -1; + pData->sndTimeout = -1; + pData->rcvTimeout = -1; + pData->maxMsgSize = -1; + pData->rate = -1; + pData->recoveryIVL = -1; + pData->multicastHops = -1; + pData->reconnectIVL = -1; + pData->reconnectIVLMax = -1; + pData->ipv4Only = -1; + pData->affinity = 1; +} + + +/* ---------------------------------------------------------------------------- + * Output Module Functions + */ + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINfreeInstance +CODESTARTfreeInstance + closeZMQ(pData); + free(pData->description); + free(pData->tplName); +ENDfreeInstance + +BEGINtryResume +CODESTARTtryResume + if(NULL == pData->socket) + iRet = initZMQ(pData); +ENDtryResume + +BEGINdoAction +CODESTARTdoAction +iRet = writeZMQ(ppString[0], pData); +ENDdoAction + + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst +if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + +CHKiRet(createInstance(&pData)); +setInstParamDefaults(pData); + +CODE_STD_STRING_REQUESTparseSelectorAct(1) +for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "description")) { + pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sockType")){ + pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "action")){ + pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { + pData->sndHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { + pData->rcvHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "identity")){ + pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { + pData->sndBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { + pData->rcvBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "linger")) { + pData->linger = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "backlog")) { + pData->backlog = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { + pData->sndTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { + pData->rcvTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { + pData->maxMsgSize = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rate")) { + pData->rate = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { + pData->recoveryIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { + pData->multicastHops = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { + pData->reconnectIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { + pData->reconnectIVLMax = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { + pData->ipv4Only = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "affinity")) { + pData->affinity = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { + s_workerThreads = (int) pvals[i].val.d.n; + } else { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + +if(pData->tplName == NULL) { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); + } else { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); + } + +if(pData->type == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } +if(pData->action == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + +BEGINparseSelectorAct +CODESTARTparseSelectorAct + +/* tell the engine we only want one template string */ +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1)) + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omzmq3 supports only v6 config format, use: " + "action(type=\"omzmq3\" serverport=...)"); + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINinitConfVars /* (re)set config variables to defaults */ +CODESTARTinitConfVars +s_workerThreads = -1; +ENDinitConfVars + +BEGINmodExit +CODESTARTmodExit +if(NULL != s_context) { + zctx_destroy(&s_context); + s_context=NULL; + } +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +ENDqueryEtryPt + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION); + +INITLegCnfVars +CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); +ENDmodInit + + + |