diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imklog/imklog.c | 143 | ||||
-rw-r--r-- | plugins/imklog/imklog.h | 9 | ||||
-rw-r--r-- | plugins/immark/immark.c | 70 | ||||
-rw-r--r-- | plugins/impstats/impstats.c | 135 | ||||
-rw-r--r-- | plugins/imrelp/imrelp.c | 39 | ||||
-rw-r--r-- | plugins/imtcp/imtcp.c | 9 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 157 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 359 | ||||
-rw-r--r-- | plugins/imzmq3/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/imzmq3/README | 24 | ||||
-rw-r--r-- | plugins/imzmq3/imzmq3.c | 657 | ||||
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 2 | ||||
-rw-r--r-- | plugins/omudpspoof/omudpspoof.c | 143 | ||||
-rw-r--r-- | plugins/omuxsock/omuxsock.c | 132 | ||||
-rw-r--r-- | plugins/omzmq3/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/omzmq3/README | 25 | ||||
-rw-r--r-- | plugins/omzmq3/omzmq3.c | 462 |
17 files changed, 2129 insertions, 253 deletions
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c index f476c5ff..93323707 100644 --- a/plugins/imklog/imklog.c +++ b/plugins/imklog/imklog.c @@ -59,6 +59,7 @@ #include "net.h" #include "glbl.h" #include "prop.h" +#include "errmsg.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -71,22 +72,34 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(glbl) DEFobjCurrIf(prop) DEFobjCurrIf(net) +DEFobjCurrIf(errmsg) /* config settings */ typedef struct configSettings_s { - int dbgPrintSymbols; /* this one is extern so the helpers can access it! */ - int symbols_twice; - int use_syscall; - int symbol_lookup; /* on recent kernels > 2.6, the kernel does this */ int bPermitNonKernel; /* permit logging of messages not having LOG_KERN facility */ int iFacilIntMsg; /* the facility to use for internal messages (set by driver) */ uchar *pszPath; - int console_log_level; + int console_log_level; /* still used for BSD */ } configSettings_t; static configSettings_t cs; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "logpath", eCmdHdlrGetWord, 0 }, + { "permitnonkernelfacility", eCmdHdlrBinary, 0 }, + { "consoleloglevel", eCmdHdlrInt, 0 }, + { "internalmsgfacility", eCmdHdlrFacility, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */ @@ -96,10 +109,6 @@ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 static inline void initConfigSettings(void) { - cs.dbgPrintSymbols = 0; - cs.symbols_twice = 0; - cs.use_syscall = 0; - cs.symbol_lookup = 0; cs.bPermitNonKernel = 0; cs.console_log_level = -1; cs.pszPath = NULL; @@ -276,28 +285,77 @@ BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + pModConf->pszPath = NULL; + pModConf->bPermitNonKernel = 0; + pModConf->console_log_level = -1; + pModConf->iFacilIntMsg = klogFacilIntMsg(); + loadModConf->configSetViaV2Method = 0; + bLegacyCnfModGlobalsPermitted = 1; /* init legacy config vars */ initConfigSettings(); ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imklog:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "logpath")) { + loadModConf->pszPath = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "permitnonkernelfacility")) { + loadModConf->bPermitNonKernel = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "consoleloglevel")) { + loadModConf->console_log_level= (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "internalmsgfacility")) { + loadModConf->iFacilIntMsg = (int) pvals[i].val.d.n; + } else { + dbgprintf("imklog: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* disable legacy module-global config directives */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system */ - loadModConf->dbgPrintSymbols = cs.dbgPrintSymbols; - loadModConf->symbols_twice = cs.symbols_twice; - loadModConf->use_syscall = cs.use_syscall; - loadModConf->bPermitNonKernel = cs.bPermitNonKernel; - loadModConf->iFacilIntMsg = cs.iFacilIntMsg; - loadModConf->console_log_level = cs.console_log_level; - if((cs.pszPath == NULL) || (cs.pszPath[0] == '\0')) { - loadModConf->pszPath = NULL; - if(cs.pszPath != NULL) - free(cs.pszPath); - } else { - loadModConf->pszPath = cs.pszPath; + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->bPermitNonKernel = cs.bPermitNonKernel; + loadModConf->iFacilIntMsg = cs.iFacilIntMsg; + loadModConf->console_log_level = cs.console_log_level; + if((cs.pszPath == NULL) || (cs.pszPath[0] == '\0')) { + loadModConf->pszPath = NULL; + if(cs.pszPath != NULL) + free(cs.pszPath); + } else { + loadModConf->pszPath = cs.pszPath; + } + cs.pszPath = NULL; } - cs.pszPath = NULL; loadModConf = NULL; /* done loading */ ENDendCnfLoad @@ -348,6 +406,7 @@ CODESTARTmodExit objRelease(net, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(errmsg, CORE_COMPONENT); ENDmodExit @@ -355,15 +414,12 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES ENDqueryEtryPt static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { - cs.dbgPrintSymbols = 0; - cs.symbols_twice = 0; - cs.use_syscall = 0; - cs.symbol_lookup = 0; cs.bPermitNonKernel = 0; if(cs.pszPath != NULL) { free(cs.pszPath); @@ -381,6 +437,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(net, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* we need to create the inputName property (only once during our lifetime) */ CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imklog"), sizeof("imklog") - 1)); @@ -389,22 +446,22 @@ CODEmodInit_QueryRegCFSLineHdlr /* init legacy config settings */ initConfigSettings(); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"debugprintkernelsymbols", 0, eCmdHdlrBinary, - NULL, &cs.dbgPrintSymbols, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogpath", 0, eCmdHdlrGetWord, - NULL, &cs.pszPath, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbollookup", 0, eCmdHdlrBinary, - NULL, &cs.symbol_lookup, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbolstwice", 0, eCmdHdlrBinary, - NULL, &cs.symbols_twice, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogusesyscallinterface", 0, eCmdHdlrBinary, - NULL, &cs.use_syscall, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogpermitnonkernelfacility", 0, eCmdHdlrBinary, - NULL, &cs.bPermitNonKernel, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogconsoleloglevel", 0, eCmdHdlrInt, - NULL, &cs.console_log_level, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"kloginternalmsgfacility", 0, eCmdHdlrFacility, - NULL, &cs.iFacilIntMsg, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"debugprintkernelsymbols", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"klogpath", 0, eCmdHdlrGetWord, + NULL, &cs.pszPath, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbollookup", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogsymbolstwice", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"klogusesyscallinterface", 0, eCmdHdlrGoneAway, + NULL, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"klogpermitnonkernelfacility", 0, eCmdHdlrBinary, + NULL, &cs.bPermitNonKernel, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"klogconsoleloglevel", 0, eCmdHdlrInt, + NULL, &cs.console_log_level, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"kloginternalmsgfacility", 0, eCmdHdlrFacility, + NULL, &cs.iFacilIntMsg, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/imklog/imklog.h b/plugins/imklog/imklog.h index 795dd68c..acfb50ab 100644 --- a/plugins/imklog/imklog.h +++ b/plugins/imklog/imklog.h @@ -31,15 +31,12 @@ /* we need to have the modConf type present in all submodules */ struct modConfData_s { - int dbgPrintSymbols; - int symbols_twice; - int use_syscall; - int symbol_lookup; - int bPermitNonKernel; + rsconf_t *pConf; int iFacilIntMsg; uchar *pszPath; int console_log_level; - rsconf_t *pConf; + sbool bPermitNonKernel; + sbool configSetViaV2Method; }; /* interface to "drivers" diff --git a/plugins/immark/immark.c b/plugins/immark/immark.c index 273af021..0e946c0b 100644 --- a/plugins/immark/immark.c +++ b/plugins/immark/immark.c @@ -58,9 +58,26 @@ DEFobjCurrIf(errmsg) static int iMarkMessagePeriod = DEFAULT_MARK_PERIOD; struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ int iMarkMessagePeriod; + sbool configSetViaV2Method; }; +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "interval", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + + +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURENonCancelInputTermination) @@ -75,12 +92,57 @@ ENDafterRun BEGINbeginCnfLoad CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + /* init our settings */ + pModConf->iMarkMessagePeriod = DEFAULT_MARK_PERIOD; + loadModConf->configSetViaV2Method = 0; + bLegacyCnfModGlobalsPermitted = 1; ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imuxsock:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "interval")) { + loadModConf->iMarkMessagePeriod = (int) pvals[i].val.d.n; + } else { + dbgprintf("imuxsock: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* disable legacy module-global config directives */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINendCnfLoad CODESTARTendCnfLoad - pModConf->iMarkMessagePeriod = iMarkMessagePeriod; + if(!loadModConf->configSetViaV2Method) { + pModConf->iMarkMessagePeriod = iMarkMessagePeriod; + } ENDendCnfLoad @@ -97,6 +159,7 @@ ENDcheckCnf BEGINactivateCnf CODESTARTactivateCnf MarkInterval = pModConf->iMarkMessagePeriod; + DBGPRINTF("immark set MarkInterval to %d\n", MarkInterval); ENDactivateCnf @@ -150,6 +213,7 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -167,8 +231,8 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* legacy config handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"markmessageperiod", 0, eCmdHdlrInt, NULL, - &iMarkMessagePeriod, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"markmessageperiod", 0, eCmdHdlrInt, NULL, + &iMarkMessagePeriod, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 4fec8e70..62599969 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -59,6 +59,7 @@ typedef struct configSettings_s { int iFacility; int iSeverity; int bJSON; + int bCEE; } configSettings_t; struct modConfData_s { @@ -67,15 +68,39 @@ struct modConfData_s { int iFacility; int iSeverity; statsFmtType_t statsFmt; + sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ - static configSettings_t cs; - +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ static prop_t *pInputName = NULL; +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "interval", eCmdHdlrInt, 0 }, + { "facility", eCmdHdlrInt, 0 }, + { "severity", eCmdHdlrInt, 0 }, + { "format", eCmdHdlrGetWord, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +BEGINmodExit +CODESTARTmodExit + prop.Destruct(&pInputName); + /* release objects we used */ + objRelease(glbl, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURENonCancelInputTermination) @@ -89,6 +114,7 @@ initConfigSettings(void) cs.iFacility = DEFAULT_FACILITY; cs.iSeverity = DEFAULT_SEVERITY; cs.bJSON = 0; + cs.bCEE = 0; } @@ -146,18 +172,87 @@ BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + loadModConf->configSetViaV2Method = 0; + loadModConf->iStatsInterval = DEFAULT_STATS_PERIOD; + loadModConf->iFacility = DEFAULT_FACILITY; + loadModConf->iSeverity = DEFAULT_SEVERITY; + loadModConf->statsFmt = statsFmt_Legacy; + bLegacyCnfModGlobalsPermitted = 1; /* init legacy config vars */ initConfigSettings(); ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + char *mode; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for impstats:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "interval")) { + loadModConf->iStatsInterval = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "facility")) { + loadModConf->iFacility = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "severity")) { + loadModConf->iSeverity = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "format")) { + mode = es_str2cstr(pvals[i].val.d.estr, NULL); + if(!strcasecmp(mode, "json")) { + loadModConf->statsFmt = statsFmt_JSON; + } else if(!strcasecmp(mode, "cee")) { + loadModConf->statsFmt = statsFmt_CEE; + } else if(!strcasecmp(mode, "legacy")) { + loadModConf->statsFmt = statsFmt_Legacy; + } else { + errmsg.LogError(0, RS_RET_ERR, "impstats: invalid format %s", + mode); + } + free(mode); + } else { + dbgprintf("impstats: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + loadModConf->configSetViaV2Method = 1; + bLegacyCnfModGlobalsPermitted = 0; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system */ - loadModConf->iStatsInterval = cs.iStatsInterval; - loadModConf->iFacility = cs.iFacility; - loadModConf->iSeverity = cs.iSeverity; - loadModConf->statsFmt = cs.bJSON ? statsFmt_JSON : statsFmt_Legacy; + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->iStatsInterval = cs.iStatsInterval; + loadModConf->iFacility = cs.iFacility; + loadModConf->iSeverity = cs.iSeverity; + if (cs.bCEE == 1) { + loadModConf->statsFmt = statsFmt_CEE; + } else if (cs.bJSON == 1) { + loadModConf->statsFmt = statsFmt_JSON; + } else { + loadModConf->statsFmt = statsFmt_Legacy; + } + } ENDendCnfLoad @@ -165,7 +260,7 @@ BEGINcheckCnf CODESTARTcheckCnf if(pModConf->iStatsInterval == 0) { errmsg.LogError(0, NO_ERRCODE, "impstats: stats interval zero not permitted, using " - "defaul of %d seconds", DEFAULT_STATS_PERIOD); + "default of %d seconds", DEFAULT_STATS_PERIOD); pModConf->iStatsInterval = DEFAULT_STATS_PERIOD; } ENDcheckCnf @@ -217,22 +312,11 @@ CODESTARTafterRun ENDafterRun -BEGINmodExit -CODESTARTmodExit - prop.Destruct(&pInputName); - /* release objects we used */ - objRelease(glbl, CORE_COMPONENT); - objRelease(prop, CORE_COMPONENT); - objRelease(errmsg, CORE_COMPONENT); - objRelease(statsobj, CORE_COMPONENT); -ENDmodExit - - - BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -254,11 +338,12 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); /* the pstatsinverval is an alias to support a previous screwed-up syntax... */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatsinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatjson", 0, eCmdHdlrBinary, NULL, &cs.bJSON, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatsinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatjson", 0, eCmdHdlrBinary, NULL, &cs.bJSON, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"pstatcee", 0, eCmdHdlrBinary, NULL, &cs.bCEE, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(prop.Construct(&pInputName)); diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c index 99fabd18..f6040b21 100644 --- a/plugins/imrelp/imrelp.c +++ b/plugins/imrelp/imrelp.c @@ -22,7 +22,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "config.h" #include <stdlib.h> #include <assert.h> @@ -35,6 +34,7 @@ #include <netdb.h> #include <sys/types.h> #include <sys/socket.h> +#include <signal.h> #include <librelp.h> #include "rsyslog.h" #include "dirty.h" @@ -236,13 +236,39 @@ BEGINfreeCnf CODESTARTfreeCnf ENDfreeCnf +/* This is used to terminate the plugin. Note that the signal handler blocks + * other activity on the thread. As such, it is safe to request the stop. When + * we terminate, relpEngine is called, and it's select() loop interrupted. But + * only *after this function is done*. So we do not have a race! + */ +static void +doSIGTTIN(int __attribute__((unused)) sig) +{ + DBGPRINTF("imrelp: termination requested via SIGTTIN - telling RELP engine\n"); + relpEngineSetStop(pRelpEngine); +} + + /* This function is called to gather input. */ BEGINrunInput + sigset_t sigSet; + struct sigaction sigAct; CODESTARTrunInput - /* TODO: we must be careful to start the listener here. Currently, tcpsrv.c seems to - * do that in ConstructFinalize + /* we want to support non-cancel input termination. To do so, we must signal librelp + * when to stop. As we run on the same thread, we need to register as SIGTTIN handler, + * which will be used to put the terminating condition into librelp. */ + sigfillset(&sigSet); + pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + sigemptyset(&sigSet); + sigaddset(&sigSet, SIGTTIN); + pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL); + memset(&sigAct, 0, sizeof (sigAct)); + sigemptyset(&sigAct.sa_mask); + sigAct.sa_handler = doSIGTTIN; + sigaction(SIGTTIN, &sigAct, NULL); + iRet = relpEngineRun(pRelpEngine); ENDrunInput @@ -284,12 +310,19 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus } +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index 33404fee..a3365d44 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -366,7 +366,16 @@ ENDactivateCnf BEGINfreeCnf + instanceConf_t *inst, *del; CODESTARTfreeCnf + for(inst = pModConf->root ; inst != NULL ; ) { + free(inst->pszBindPort); + free(inst->pBindRuleset); + free(inst->pszInputName); + del = inst; + inst = inst->next; + free(del); + } ENDfreeCnf /* This function is called to gather input. diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 8ab3b9be..9c92ddde 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -80,6 +80,7 @@ static struct lstn_s { STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded @@ -118,10 +119,23 @@ struct modConfData_s { int iSchedPolicy; /* scheduling policy as SCHED_xxx */ int iSchedPrio; /* scheduling priority */ int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */ + sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "schedulingpolicy", eCmdHdlrGetWord, 0 }, + { "schedulingpriority", eCmdHdlrInt, 0 }, + { "timerequery", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + #include "im-helper.h" /* must be included AFTER the type definitions! */ @@ -185,52 +199,6 @@ addListner(instanceConf_t *inst) /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 */ -#if 0 //<<<<<<< HEAD - - DBGPRINTF("imudp: trying to open port at %s:%s.\n", - (inst->pszBindAddr == NULL) ? (uchar*)"*" : inst->pszBindAddr, inst->pszBindPort); - - newSocks = net.create_udp_socket(inst->pszBindAddr, inst->pszBindPort, 1); - if(newSocks != NULL) { - /* we now need to add the new sockets to the existing set */ - if(udpLstnSocks == NULL) { - /* esay, we can just replace it */ - udpLstnSocks = newSocks; - CHKmalloc(udpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (newSocks[0] + 1))); - for(iDst = 1 ; iDst <= newSocks[0] ; ++iDst) - udpRulesets[iDst] = inst->pBindRuleset; - } else { - /* we need to add them */ - tmpSocks = (int*) MALLOC(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0])); - tmpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0])); - if(tmpSocks == NULL || tmpRulesets == NULL) { - DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); - /* in this case, we discard the new sockets but continue with what we - * already have - */ - free(newSocks); - free(tmpSocks); - free(tmpRulesets); - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } else { - /* ready to copy */ - iDst = 1; - for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc, ++iDst) { - tmpSocks[iDst] = udpLstnSocks[iSrc]; - tmpRulesets[iDst] = udpRulesets[iSrc]; - } - for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) { - tmpSocks[iDst] = newSocks[iSrc]; - tmpRulesets[iDst] = inst->pBindRuleset; - } - tmpSocks[0] = udpLstnSocks[0] + newSocks[0]; - free(newSocks); - free(udpLstnSocks); - udpLstnSocks = tmpSocks; - free(udpRulesets); - udpRulesets = tmpRulesets; - } -#else //======= if(inst->pszBindAddr == NULL) bindAddr = NULL; else if(inst->pszBindAddr[0] == '*' && inst->pszBindAddr[1] == '\0') @@ -270,7 +238,6 @@ addListner(instanceConf_t *inst) else { lcnfLast->next = newlcnfinfo; lcnfLast = newlcnfinfo; -#endif //>>>>>>> ef34821a2737799f48c3032b9616418e4f7fa34f } } } @@ -672,6 +639,12 @@ BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + loadModConf->configSetViaV2Method = 0; + loadModConf->iTimeRequery = TIME_REQUERY_DFLT; + loadModConf->iSchedPrio = SCHED_PRIO_UNSET; + loadModConf->pszSchedPolicy = NULL; + bLegacyCnfModGlobalsPermitted = 1; /* init legacy config vars */ cs.pszBindRuleset = NULL; cs.pszSchedPolicy = NULL; @@ -681,21 +654,57 @@ CODESTARTbeginCnfLoad ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for impstats:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "timerequery")) { + loadModConf->iTimeRequery = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "schedulingpriority")) { + loadModConf->iSchedPrio = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) { + loadModConf->pszSchedPolicy = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("impstats: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* remove all of our legacy handlers, as they can not used in addition + * the the new-style config method. + */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system - * TODO: when we add the new config system, we must decide on priority - * already-set module options should not be overwritable by the legacy - * system (though this is debatable and should at least trigger an error - * message if the equivalent legacy option is selected as well) - * rgerhards, 2011-05-04 - */ - loadModConf->iSchedPrio = cs.iSchedPrio; - loadModConf->iTimeRequery = cs.iTimeRequery; - if((cs.pszSchedPolicy == NULL) || (cs.pszSchedPolicy[0] == '\0')) { - loadModConf->pszSchedPolicy = NULL; - } else { - CHKmalloc(loadModConf->pszSchedPolicy = ustrdup(cs.pszSchedPolicy)); + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->iSchedPrio = cs.iSchedPrio; + loadModConf->iTimeRequery = cs.iTimeRequery; + if((cs.pszSchedPolicy != NULL) && (cs.pszSchedPolicy[0] != '\0')) { + CHKmalloc(loadModConf->pszSchedPolicy = ustrdup(cs.pszSchedPolicy)); + } } finalize_it: @@ -751,7 +760,16 @@ ENDactivateCnf BEGINfreeCnf + instanceConf_t *inst, *del; CODESTARTfreeCnf + for(inst = pModConf->root ; inst != NULL ; ) { + free(inst->pszBindPort); + free(inst->pszBindAddr); + free(inst->pBindRuleset); + del = inst; + inst = inst->next; + free(del); + } ENDfreeCnf /* This function is called to gather input. @@ -819,6 +837,7 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -867,12 +886,16 @@ CODEmodInit_QueryRegCFSLineHdlr addInstance, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, NULL, &cs.pszBindAddr, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpolicy", 0, eCmdHdlrGetWord, - NULL, &cs.pszSchedPolicy, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpriority", 0, eCmdHdlrInt, - NULL, &cs.iSchedPrio, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, - NULL, &cs.iTimeRequery, STD_LOADABLE_MODULE_ID)); + /* module-global config params - will be disabled in configs that are loaded + * via module(...). + */ + CHKiRet(regCfSysLineHdlr2((uchar *)"imudpschedulingpolicy", 0, eCmdHdlrGetWord, + NULL, &cs.pszSchedPolicy, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"imudpschedulingpriority", 0, eCmdHdlrInt, + NULL, &cs.iSchedPrio, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, + NULL, &cs.iTimeRequery, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index fe04c8f2..57b2a70a 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-20 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -149,6 +149,7 @@ typedef struct lstn_s { sbool bCreatePath; /* auto-creation of socket directory? */ sbool bUseCreds; /* pull original creator credentials from socket */ sbool bAnnotate; /* annotate events with trusted properties */ + sbool bParseTrusted; /* parse trusted properties */ sbool bWritePid; /* write original PID into tag */ sbool bUseSysTimeStamp; /* use timestamp from system (instead of from message) */ } lstn_t; @@ -163,6 +164,8 @@ static int startIndexUxLocalSockets; /* process fd from that index on (used to static int nfd = 1; /* number of Unix sockets open / read-only after startup */ static int sd_fds = 0; /* number of systemd activated sockets */ +static ee_ctx ctxee = NULL; /* library context */ + /* config vars for legacy config system */ #define DFLT_bCreatePath 0 #define DFLT_ratelimitInterval 0 @@ -172,8 +175,10 @@ static struct configSettings_s { int bOmitLocalLogging; uchar *pLogSockName; uchar *pLogHostName; /* host name to use with this socket */ - int bUseFlowCtl; /* use flow control or not (if yes, only LIGHT is used! */ + int bUseFlowCtl; /* use flow control or not (if yes, only LIGHT is used!) */ + int bUseFlowCtlSysSock; int bIgnoreTimestamp; /* ignore timestamps present in the incoming message? */ + int bIgnoreTimestampSysSock; int bUseSysTimeStamp; /* use timestamp from system (rather than from message) */ int bUseSysTimeStampSysSock; /* same, for system log socket */ int bWritePid; /* use credentials from recvmsg() and fixup PID in TAG */ @@ -187,6 +192,7 @@ static struct configSettings_s { int ratelimitSeveritySysSock; int bAnnotate; /* annotate trusted properties */ int bAnnotateSysSock; /* same, for system log socket */ + int bParseTrusted; /* parse trusted properties */ } cs; struct instanceConf_s { @@ -201,6 +207,7 @@ struct instanceConf_s { int ratelimitBurst; /* max nbr of messages in interval */ int ratelimitSeverity; int bAnnotate; /* annotate trusted properties */ + int bParseTrusted; /* parse trusted properties */ struct instanceConf_s *next; }; @@ -211,14 +218,37 @@ struct modConfData_s { int ratelimitIntervalSysSock; int ratelimitBurstSysSock; int ratelimitSeveritySysSock; + int bAnnotateSysSock; + int bParseTrusted; + sbool bIgnoreTimestamp; /* ignore timestamps present in the incoming message? */ + sbool bUseFlowCtl; /* use flow control or not (if yes, only LIGHT is used! */ sbool bOmitLocalLogging; sbool bWritePidSysSock; - int bAnnotateSysSock; sbool bUseSysTimeStamp; + sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "syssock.use", eCmdHdlrBinary, 0 }, + { "syssock.name", eCmdHdlrGetWord, 0 }, + { "syssock.ignoretimestamp", eCmdHdlrBinary, 0 }, + { "syssock.flowcontrol", eCmdHdlrBinary, 0 }, + { "syssock.usesystimestamp", eCmdHdlrBinary, 0 }, + { "syssock.annotate", eCmdHdlrBinary, 0 }, + { "syssock.usepidfromsystem", eCmdHdlrBinary, 0 }, + { "syssock.ratelimit.interval", eCmdHdlrInt, 0 }, + { "syssock.ratelimit.burst", eCmdHdlrInt, 0 }, + { "syssock.ratelimit.severity", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + /* we do not use this, because we do not bind to a ruleset so far * enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */ @@ -233,6 +263,8 @@ initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsig rs->begin = 0; } +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + /* ratelimiting support, modelled after the linux kernel * returns 1 if message is within rate limit and shall be @@ -289,27 +321,6 @@ finalize_it: } -/* set the timestamp ignore / not ignore option for the system - * log socket. This must be done separtely, as it is not added via a command - * but present by default. -- rgerhards, 2008-03-06 - */ -static rsRetVal setSystemLogTimestampIgnore(void __attribute__((unused)) *pVal, int iNewVal) -{ - DEFiRet; - listeners[0].flags = iNewVal ? IGNDATE : NOFLAG; - RETiRet; -} - -/* set flowcontrol for the system log socket - */ -static rsRetVal setSystemLogFlowControl(void __attribute__((unused)) *pVal, int iNewVal) -{ - DEFiRet; - listeners[0].flowCtl = iNewVal ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; - RETiRet; -} - - /* This function is called when a new listen socket instace shall be added to * the current config object via the legacy config system. It just shuffles * all parameters to the listener in-memory instance. @@ -340,6 +351,7 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) inst->bUseSysTimeStamp = cs.bUseSysTimeStamp; inst->bWritePid = cs.bWritePid; inst->bAnnotate = cs.bAnnotate; + inst->bParseTrusted = cs.bParseTrusted; inst->next = NULL; /* node created, let's add to config */ @@ -388,7 +400,7 @@ addListner(instanceConf_t *inst) if(inst->ratelimitInterval > 0) { if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { /* in this case, we simply turn off rate-limiting */ - dbgprintf("imuxsock: turning off rate limiting because we could not " + DBGPRINTF("imuxsock: turning off rate limiting because we could not " "create hash table\n"); inst->ratelimitInterval = 0; } @@ -402,6 +414,7 @@ addListner(instanceConf_t *inst) listeners[nfd].sockName = ustrdup(inst->sockName); listeners[nfd].bUseCreds = (inst->bWritePid || inst->ratelimitInterval || inst->bAnnotate) ? 1 : 0; listeners[nfd].bAnnotate = inst->bAnnotate; + listeners[nfd].bParseTrusted = inst->bParseTrusted; listeners[nfd].bWritePid = inst->bWritePid; listeners[nfd].bUseSysTimeStamp = inst->bUseSysTimeStamp; nfd++; @@ -458,7 +471,7 @@ createLogSocket(lstn_t *pLstn) if(pLstn->fd < 0 || bind(pLstn->fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 || chmod((char*)pLstn->sockName, 0666) < 0) { errmsg.LogError(errno, NO_ERRCODE, "cannot create '%s'", pLstn->sockName); - dbgprintf("cannot create %s (%d).\n", pLstn->sockName, errno); + DBGPRINTF("cannot create %s (%d).\n", pLstn->sockName, errno); if(pLstn->fd != -1) close(pLstn->fd); pLstn->fd = -1; @@ -491,7 +504,7 @@ openLogSocket(lstn_t *pLstn) /* ok, it matches -- just use as is */ pLstn->fd = fd; - dbgprintf("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n", + DBGPRINTF("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n", pLstn->sockName, pLstn->fd); break; } @@ -563,7 +576,7 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) rl = hashtable_search(pLstn->ht, &cred->pid); if(rl == NULL) { /* we need to add a new ratelimiter, process not seen before! */ - dbgprintf("imuxsock: no ratelimiter for pid %lu, creating one\n", + DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n", (unsigned long) cred->pid); STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters); CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t))); @@ -707,6 +720,26 @@ copyescaped(uchar *dstbuf, uchar *inbuf, int inlen) } +/* Creates new field to be added to event + * used for SystemLogParseTrusted parsing + */ +struct ee_field * +createNewField(char *fieldname, char *value, int lenValue) { + es_str_t *newStr; + struct ee_value *newVal; + struct ee_field *newField; + + newStr = es_newStrFromBuf(value, (es_size_t) lenValue); + + newVal = ee_newValue(ctxee); + ee_setStrValue(newVal, newStr); + + newField = ee_newFieldFromNV(ctxee, fieldname, newVal); + + return newField; +} + + /* submit received message to the queue engine * We now parse the message according to expected format so that we * can also mangle it if necessary. @@ -732,6 +765,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim uchar *pmsgbuf; int toffs; /* offset for trusted properties */ struct syslogTime dummyTS; + struct ee_event *event = NULL; DEFiRet; /* TODO: handle format errors?? */ @@ -776,37 +810,82 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim } else { CHKmalloc(pmsgbuf = malloc(lenRcv+4096)); } - memcpy(pmsgbuf, pRcv, lenRcv); - memcpy(pmsgbuf+lenRcv, " @[", 3); - toffs = lenRcv + 3; /* next free location */ - lenProp = snprintf((char*)propBuf, sizeof(propBuf), "_PID=%lu _UID=%lu _GID=%lu", - (long unsigned) cred->pid, (long unsigned) cred->uid, - (long unsigned) cred->gid); - memcpy(pmsgbuf+toffs, propBuf, lenProp); - toffs = toffs + lenProp; - getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp); - if(lenProp) { - memcpy(pmsgbuf+toffs, " _COMM=", 7); - memcpy(pmsgbuf+toffs+7, propBuf, lenProp); - toffs = toffs + 7 + lenProp; - } - getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp); - if(lenProp) { - memcpy(pmsgbuf+toffs, " _EXE=", 6); - memcpy(pmsgbuf+toffs+6, propBuf, lenProp); - toffs = toffs + 6 + lenProp; - } - getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp); - if(lenProp) { - memcpy(pmsgbuf+toffs, " _CMDLINE=", 10); - toffs = toffs + 10 + - copyescaped(pmsgbuf+toffs+10, propBuf, lenProp); + + if (pLstn->bParseTrusted) { + struct ee_field *newField; + + if(ctxee == NULL) { + if((ctxee = ee_initCtx()) == NULL) { + errmsg.LogError(0, RS_RET_NO_RULESET, "error: could not initialize libee ctx, cannot " + "activate action"); + ABORT_FINALIZE(RS_RET_ERR_LIBEE_INIT); + } + } + + event = ee_newEvent(ctxee); + + /* create value string, create field, and add it to event */ + lenProp = snprintf((char *)propBuf, sizeof(propBuf), "%lu", (long unsigned) cred->pid); + newField = createNewField("pid", (char *)propBuf, lenProp); + ee_addFieldToEvent(event, newField); + + lenProp = snprintf((char *)propBuf, sizeof(propBuf), "%lu", (long unsigned) cred->uid); + newField = createNewField("uid", (char *)propBuf, lenProp); + ee_addFieldToEvent(event, newField); + + lenProp = snprintf((char *)propBuf, sizeof(propBuf), "%lu", (long unsigned) cred->gid); + newField = createNewField("gid", (char *)propBuf, lenProp); + ee_addFieldToEvent(event, newField); + + getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp); + newField = createNewField("appname", (char *)propBuf, lenProp); + ee_addFieldToEvent(event, newField); + + getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp); + newField = createNewField("exe", (char *)propBuf, lenProp); + ee_addFieldToEvent(event, newField); + + getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp); + newField = createNewField("cmd", (char *)propBuf, lenProp); + ee_addFieldToEvent(event, newField); + + } else { + + memcpy(pmsgbuf, pRcv, lenRcv); + memcpy(pmsgbuf+lenRcv, " @[", 3); + toffs = lenRcv + 3; /* next free location */ + lenProp = snprintf((char*)propBuf, sizeof(propBuf), "_PID=%lu _UID=%lu _GID=%lu", + (long unsigned) cred->pid, (long unsigned) cred->uid, + (long unsigned) cred->gid); + memcpy(pmsgbuf+toffs, propBuf, lenProp); + toffs = toffs + lenProp; + + getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp); + if(lenProp) { + memcpy(pmsgbuf+toffs, " _COMM=", 7); + memcpy(pmsgbuf+toffs+7, propBuf, lenProp); + toffs = toffs + 7 + lenProp; + } + getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp); + if(lenProp) { + memcpy(pmsgbuf+toffs, " _EXE=", 6); + memcpy(pmsgbuf+toffs+6, propBuf, lenProp); + toffs = toffs + 6 + lenProp; + } + getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp); + if(lenProp) { + memcpy(pmsgbuf+toffs, " _CMDLINE=", 9); + toffs = toffs + 9 + + copyescaped(pmsgbuf+toffs+9, propBuf, lenProp); + } + + /* finalize string */ + pmsgbuf[toffs] = ']'; + pmsgbuf[toffs+1] = '\0'; + + pRcv = pmsgbuf; + lenRcv = toffs + 1; } - /* finalize string */ - pmsgbuf[toffs] = ']'; - pmsgbuf[toffs+1] = '\0'; - pRcv = pmsgbuf; - lenRcv = toffs + 1; } /* we now create our own message object and submit it to the queue */ @@ -823,6 +902,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim parse++; lenMsg--; /* '>' */ + /* event is saved to pMsg */ + if(pMsg->event != NULL) { + ee_deleteEvent(pMsg->event); + } + if (event != NULL) { + pMsg->event = event; + } + if(ts == NULL) { if((pLstn->flags & IGNDATE)) { /* in this case, we still need to find out if we have a valid @@ -935,7 +1022,7 @@ static rsRetVal readSocket(lstn_t *pLstn) msgh.msg_iovlen = 1; iRcvd = recvmsg(pLstn->fd, &msgh, MSG_DONTWAIT); - dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd); + DBGPRINTF("Message from UNIX socket: #%d\n", pLstn->fd); if(iRcvd > 0) { cred = NULL; ts = NULL; @@ -945,16 +1032,12 @@ static rsRetVal readSocket(lstn_t *pLstn) if( pLstn->bUseCreds && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) { cred = (struct ucred*) CMSG_DATA(cm); - break; } # endif /* HAVE_SCM_CREDENTIALS */ # if HAVE_SO_TIMESTAMP if( pLstn->bUseSysTimeStamp && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) { ts = (struct timeval *)CMSG_DATA(cm); - dbgprintf("XXX: got timestamp %ld.%ld\n", - (long) ts->tv_sec, (long) ts->tv_usec); - break; } # endif /* HAVE_SO_TIMESTAMP */ } @@ -963,7 +1046,7 @@ static rsRetVal readSocket(lstn_t *pLstn) } else if(iRcvd < 0 && errno != EINTR) { char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("UNIX socket error: %d = %s.\n", errno, errStr); + DBGPRINTF("UNIX socket error: %d = %s.\n", errno, errStr); errmsg.LogError(errno, NO_ERRCODE, "imuxsock: recvfrom UNIX"); } @@ -1013,10 +1096,13 @@ activateListeners() listeners[0].ratelimitInterval = runModConf->ratelimitIntervalSysSock; listeners[0].ratelimitBurst = runModConf->ratelimitBurstSysSock; listeners[0].ratelimitSev = runModConf->ratelimitSeveritySysSock; - listeners[0].bUseCreds = (runModConf->bWritePidSysSock || runModConf->ratelimitIntervalSysSock) ? 1 : 0; + listeners[0].bUseCreds = (runModConf->bWritePidSysSock || runModConf->ratelimitIntervalSysSock || runModConf->bAnnotateSysSock) ? 1 : 0; listeners[0].bWritePid = runModConf->bWritePidSysSock; listeners[0].bAnnotate = runModConf->bAnnotateSysSock; + listeners[0].bParseTrusted = runModConf->bParseTrusted; listeners[0].bUseSysTimeStamp = runModConf->bUseSysTimeStamp; + listeners[0].flags = runModConf->bIgnoreTimestamp ? IGNDATE : NOFLAG; + listeners[0].flowCtl = runModConf->bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; sd_fds = sd_listen_fds(0); if(sd_fds < 0) { @@ -1029,7 +1115,7 @@ activateListeners() for (i = startIndexUxLocalSockets ; i < nfd ; i++) { if(openLogSocket(&(listeners[i])) == RS_RET_OK) { ++actSocks; - dbgprintf("imuxsock: Opened UNIX socket '%s' (fd %d).\n", + DBGPRINTF("imuxsock: Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd); } } @@ -1049,16 +1135,90 @@ BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + /* init our settings */ + pModConf->pLogSockName = NULL; + pModConf->bOmitLocalLogging = 0; + pModConf->bIgnoreTimestamp = 1; + pModConf->bUseFlowCtl = 0; + pModConf->bUseSysTimeStamp = 1; + pModConf->bWritePidSysSock = 0; + pModConf->bAnnotateSysSock = 0; + pModConf->bParseTrusted = 0; + pModConf->ratelimitIntervalSysSock = DFLT_ratelimitInterval; + pModConf->ratelimitBurstSysSock = DFLT_ratelimitBurst; + pModConf->ratelimitSeveritySysSock = DFLT_ratelimitSeverity; + bLegacyCnfModGlobalsPermitted = 1; /* reset legacy config vars */ resetConfigVariables(NULL, NULL); ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imuxsock:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "syssock.use")) { + loadModConf->bOmitLocalLogging = ((int) pvals[i].val.d.n) ? 0 : 1; + } else if(!strcmp(modpblk.descr[i].name, "syssock.name")) { + loadModConf->pLogSockName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "syssock.ignoretimestamp")) { + loadModConf->bIgnoreTimestamp = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.flowcontrol")) { + loadModConf->bUseFlowCtl = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.usesystimestamp")) { + loadModConf->bUseSysTimeStamp = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.annotate")) { + loadModConf->bAnnotateSysSock = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.usepidfromsystem")) { + loadModConf->bWritePidSysSock = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.ratelimit.interval")) { + loadModConf->ratelimitIntervalSysSock = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.ratelimit.burst")) { + loadModConf->ratelimitBurstSysSock = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "syssock.ratelimit.severity")) { + loadModConf->ratelimitSeveritySysSock = (int) pvals[i].val.d.n; + } else { + dbgprintf("imuxsock: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* disable legacy module-global config directives */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINendCnfLoad CODESTARTendCnfLoad - /* persist module-specific settings from legacy config system */ - loadModConf->bOmitLocalLogging = cs.bOmitLocalLogging; - loadModConf->pLogSockName = cs.pLogSockName; + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->bOmitLocalLogging = cs.bOmitLocalLogging; + loadModConf->pLogSockName = cs.pLogSockName; + loadModConf->bIgnoreTimestamp = cs.bIgnoreTimestampSysSock; + loadModConf->bUseFlowCtl = cs.bUseFlowCtlSysSock; + loadModConf->bAnnotateSysSock = cs.bAnnotateSysSock; + loadModConf->bParseTrusted = cs.bParseTrusted; + } loadModConf = NULL; /* done loading */ /* free legacy config vars */ @@ -1091,8 +1251,16 @@ ENDactivateCnf BEGINfreeCnf + instanceConf_t *inst, *del; CODESTARTfreeCnf free(pModConf->pLogSockName); + for(inst = pModConf->root ; inst != NULL ; ) { + free(inst->sockName); + free(inst->pLogHostName); + del = inst; + inst = inst->next; + free(del); + } ENDfreeCnf @@ -1193,6 +1361,10 @@ CODESTARTafterRun discardLogSockets(); nfd = 1; + if(ctxee != NULL) { + ee_exitCtx(ctxee); + ctxee = NULL; + } ENDafterRun @@ -1223,6 +1395,7 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -1235,13 +1408,16 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a cs.bOmitLocalLogging = 0; cs.pLogHostName = NULL; cs.bIgnoreTimestamp = 1; + cs.bIgnoreTimestampSysSock = 1; cs.bUseFlowCtl = 0; + cs.bUseFlowCtlSysSock = 0; cs.bUseSysTimeStamp = 1; cs.bUseSysTimeStampSysSock = 1; cs.bWritePid = 0; cs.bWritePidSysSock = 0; cs.bAnnotate = 0; cs.bAnnotateSysSock = 0; + cs.bParseTrusted = 0; cs.bCreatePath = DFLT_bCreatePath; cs.ratelimitInterval = DFLT_ratelimitInterval; cs.ratelimitIntervalSysSock = DFLT_ratelimitInterval; @@ -1267,7 +1443,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(parser, CORE_COMPONENT)); - dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION); + DBGPRINTF("imuxsock version %s initializing\n", PACKAGE_VERSION); /* init legacy config vars */ cs.pLogSockName = NULL; @@ -1294,6 +1470,7 @@ CODEmodInit_QueryRegCFSLineHdlr listeners[0].bParseHost = 0; listeners[0].bUseCreds = 0; listeners[0].bAnnotate = 0; + listeners[0].bParseTrusted = 0; listeners[0].bCreatePath = 0; listeners[0].bUseSysTimeStamp = 1; @@ -1309,12 +1486,8 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(prop.ConstructFinalize(pLocalHostIP)); /* register config file handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"omitlocallogging", 0, eCmdHdlrBinary, - NULL, &cs.bOmitLocalLogging, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketignoremsgtimestamp", 0, eCmdHdlrBinary, NULL, &cs.bIgnoreTimestamp, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketname", 0, eCmdHdlrGetWord, - NULL, &cs.pLogSockName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensockethostname", 0, eCmdHdlrGetWord, NULL, &cs.pLogHostName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary, @@ -1343,22 +1516,28 @@ CODEmodInit_QueryRegCFSLineHdlr * for that. We should revisit all of that once we have the new config format... * rgerhards, 2008-03-06 */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketignoremsgtimestamp", 0, eCmdHdlrBinary, - setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary, - setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusesystimestamp", 0, eCmdHdlrBinary, - NULL, &cs.bUseSysTimeStampSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketannotate", 0, eCmdHdlrBinary, - NULL, &cs.bAnnotateSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary, - NULL, &cs.bWritePidSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt, - NULL, &cs.ratelimitIntervalSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitburst", 0, eCmdHdlrInt, - NULL, &cs.ratelimitBurstSysSock, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitseverity", 0, eCmdHdlrInt, - NULL, &cs.ratelimitSeveritySysSock, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr2((uchar *)"omitlocallogging", 0, eCmdHdlrBinary, + NULL, &cs.bOmitLocalLogging, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogsocketname", 0, eCmdHdlrGetWord, + NULL, &cs.pLogSockName, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogsocketignoremsgtimestamp", 0, eCmdHdlrBinary, + NULL, &cs.bIgnoreTimestampSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary, + NULL, &cs.bUseFlowCtlSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogusesystimestamp", 0, eCmdHdlrBinary, + NULL, &cs.bUseSysTimeStampSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogsocketannotate", 0, eCmdHdlrBinary, + NULL, &cs.bAnnotateSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogparsetrusted", 0, eCmdHdlrBinary, + NULL, &cs.bParseTrusted, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary, + NULL, &cs.bWritePidSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt, + NULL, &cs.ratelimitIntervalSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogratelimitburst", 0, eCmdHdlrInt, + NULL, &cs.ratelimitBurstSysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(regCfSysLineHdlr2((uchar *)"systemlogratelimitseverity", 0, eCmdHdlrInt, + NULL, &cs.ratelimitSeveritySysSock, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); /* support statistics gathering */ CHKiRet(statsobj.Construct(&modStats)); diff --git a/plugins/imzmq3/Makefile.am b/plugins/imzmq3/Makefile.am new file mode 100644 index 00000000..f9c84e5d --- /dev/null +++ b/plugins/imzmq3/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = imzmq3.la + +imzmq3_la_SOURCES = imzmq3.c +imzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS) +imzmq3_la_LDFLAGS = -module -avoid-version +imzmq3_la_LIBADD = $(CZMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README new file mode 100644 index 00000000..88653b83 --- /dev/null +++ b/plugins/imzmq3/README @@ -0,0 +1,24 @@ +ZeroMQ 3.x Input Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example +below binds a SUB socket to port 7172, and then any messages with the topic +"foo" will be pushed into rsyslog. + +Example Rsyslog.conf snippet: +------------------------------------------------------------------------------- + +$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo + +------------------------------------------------------------------------------- diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c new file mode 100644 index 00000000..dc1d64d3 --- /dev/null +++ b/plugins/imzmq3/imzmq3.c @@ -0,0 +1,657 @@ +/* imzmq3.c + * + * This input plugin enables rsyslog to read messages from a ZeroMQ + * queue. + * + * Copyright 2012 Talksum, Inc. + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: David Kelly + * <davidk@talksum.com> + */ + +#include <assert.h> +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "rsyslog.h" + +#include "cfsysline.h" +#include "config.h" +#include "dirty.h" +#include "errmsg.h" +#include "glbl.h" +#include "module-template.h" +#include "msg.h" +#include "net.h" +#include "parser.h" +#include "prop.h" +#include "ruleset.h" +#include "srUtils.h" +#include "unicode-helper.h" + +#include <czmq.h> + +MODULE_TYPE_INPUT +MODULE_TYPE_NOKEEP + +/* convienent symbols to denote a socket we want to bind + * vs one we want to just connect to + */ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + +/* Module static data */ +DEF_IMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) +DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +typedef struct _socket_type { + char* name; + int type; +} socket_type; + +/* more overkill, but seems nice to be consistent.*/ +typedef struct _socket_action { + char* name; + int action; +} socket_action; + +typedef struct _poller_data { + ruleset_t* ruleset; + thrdInfo_t* thread; +} poller_data; + +typedef struct _socket_info { + int type; + int action; + char* description; + int sndHWM; /* if you want more than 2^32 messages, */ + int rcvHWM; /* then pass in 0 (the default). */ + char* identity; + char** subscriptions; + ruleset_t* ruleset; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + +} socket_info; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations. + */ +static socket_info* s_socketInfo = NULL; +static size_t s_nitems = 0; +static prop_t * s_namep = NULL; +static zloop_t* s_zloop = NULL; +static int s_io_threads = 1; +static zctx_t* s_context = NULL; +static ruleset_t* s_ruleset = NULL; +static socket_type socketTypes[] = { + {"SUB", ZMQ_SUB }, + {"PULL", ZMQ_PULL }, + {"XSUB", ZMQ_XSUB } +}; + +static socket_action socketActions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + + +/* ---------------------------------------------------------------------------- + * Helper functions + */ + +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ +static int getSocketType(char* name) { + int type = -1; + uint i; + + /* match name with known socket type */ + for(i=0; i<sizeof(socketTypes)/sizeof(socket_type); ++i) { + if( !strcmp(socketTypes[i].name, name) ) { + type = socketTypes[i].type; + break; + } + } + + /* whine if no match was found. */ + if (type == -1) + errmsg.LogError(0, NO_ERRCODE, "unknown type %s",name); + + return type; +} + + +static int getSocketAction(char* name) { + int action = -1; + uint i; + + /* match name with known socket action */ + for(i=0; i < sizeof(socketActions)/sizeof(socket_action); ++i) { + if(!strcmp(socketActions[i].name, name)) { + action = socketActions[i].action; + break; + } + } + + /* whine if no matching action was found */ + if (action == -1) + errmsg.LogError(0, NO_ERRCODE, "unknown action %s",name); + + return action; +} + + +static void setDefaults(socket_info* info) { + info->type = ZMQ_SUB; + info->action = ACTION_BIND; + info->description = NULL; + info->sndHWM = 0; + info->rcvHWM = 0; + info->identity = NULL; + info->subscriptions = NULL; + info->ruleset = NULL; + info->sndBuf = -1; + info->rcvBuf = -1; + info->linger = -1; + info->backlog = -1; + info->sndTimeout = -1; + info->rcvTimeout = -1; + info->maxMsgSize = -1; + info->rate = -1; + info->recoveryIVL = -1; + info->multicastHops = -1; + info->reconnectIVL = -1; + info->reconnectIVLMax = -1; + info->ipv4Only = -1; + info->affinity = -1; + +}; + + +/* The config string should look like: + * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'" + * + */ +static rsRetVal parseConfig(char* config, socket_info* info) { + int nsubs = 0; + + char* binding; + char* ptr1; + for (binding = strtok_r(config, ",", &ptr1); + binding != NULL; + binding = strtok_r(NULL, ",", &ptr1)) { + + /* Each binding looks like foo=bar */ + char * sep = strchr(binding, '='); + if (sep == NULL) + { + errmsg.LogError(0, NO_ERRCODE, + "Invalid argument format %s, ignoring ...", + binding); + continue; + } + + /* Replace '=' with '\0'. */ + *sep = '\0'; + + char * val = sep + 1; + + if (strcmp(binding, "action") == 0) { + info->action = getSocketAction(val); + } else if (strcmp(binding, "type") == 0) { + info->type = getSocketType(val); + } else if (strcmp(binding, "description") == 0) { + info->description = strdup(val); + } else if (strcmp(binding, "sndHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "rcvHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "subscribe") == 0) { + /* Add the subscription value to the list.*/ + char * substr = NULL; + substr = strdup(val); + info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1); + info->subscriptions[nsubs] = substr; + ++nsubs; + } else if (strcmp(binding, "sndBuf") == 0) { + info->sndBuf = atoi(val); + } else if (strcmp(binding, "rcvBuf") == 0) { + info->rcvBuf = atoi(val); + } else if (strcmp(binding, "linger") == 0) { + info->linger = atoi(val); + } else if (strcmp(binding, "backlog") == 0) { + info->backlog = atoi(val); + } else if (strcmp(binding, "sndTimeout") == 0) { + info->sndTimeout = atoi(val); + } else if (strcmp(binding, "rcvTimeout") == 0) { + info->rcvTimeout = atoi(val); + } else if (strcmp(binding, "maxMsgSize") == 0) { + info->maxMsgSize = atoi(val); + } else if (strcmp(binding, "rate") == 0) { + info->rate = atoi(val); + } else if (strcmp(binding, "recoveryIVL") == 0) { + info->recoveryIVL = atoi(val); + } else if (strcmp(binding, "multicastHops") == 0) { + info->multicastHops = atoi(val); + } else if (strcmp(binding, "reconnectIVL") == 0) { + info->reconnectIVL = atoi(val); + } else if (strcmp(binding, "reconnectIVLMax") == 0) { + info->reconnectIVLMax = atoi(val); + } else if (strcmp(binding, "ipv4Only") == 0) { + info->ipv4Only = atoi(val); + } else if (strcmp(binding, "affinity") == 0) { + info->affinity = atoi(val); + } else { + errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding); + return RS_RET_INVALID_PARAMS; + } + } + + return RS_RET_OK; +} + +static rsRetVal validateConfig(socket_info* info) { + + if (info->type == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid type"); + return RS_RET_INVALID_PARAMS; + } + if (info->action == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid action"); + return RS_RET_INVALID_PARAMS; + } + if (info->description == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you didn't enter a description"); + return RS_RET_INVALID_PARAMS; + } + if(info->type == ZMQ_SUB && info->subscriptions == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "SUB sockets need at least one subscription"); + return RS_RET_INVALID_PARAMS; + } + if(info->type != ZMQ_SUB && info->subscriptions != NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "only SUB sockets can have subscriptions"); + return RS_RET_INVALID_PARAMS; + } + return RS_RET_OK; +} + +static rsRetVal createContext() { + if (s_context == NULL) { + errmsg.LogError(0, NO_ERRCODE, "creating zctx."); + s_context = zctx_new(); + + if (s_context == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "zctx_new failed: %s", + strerror(errno)); + /* DK: really should do better than invalid params...*/ + return RS_RET_INVALID_PARAMS; + } + + if (s_io_threads > 1) { + errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads); + zctx_set_iothreads(s_context, s_io_threads); + } + } + return RS_RET_OK; +} + +static rsRetVal createSocket(socket_info* info, void** sock) { + size_t ii; + int rv; + + *sock = zsocket_new(s_context, info->type); + if (!sock) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zsocket_new failed: %s, for type %d", + strerror(errno),info->type); + /* DK: invalid params seems right here */ + return RS_RET_INVALID_PARAMS; + } + + /* Set options *before* the connect/bind. */ + if (info->identity) zsocket_set_identity(*sock, info->identity); + if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf); + if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf); + if (info->linger > -1) zsocket_set_linger(*sock, info->linger); + if (info->backlog > -1) zsocket_set_backlog(*sock, info->backlog); + if (info->sndTimeout > -1) zsocket_set_sndtimeo(*sock, info->sndTimeout); + if (info->rcvTimeout > -1) zsocket_set_rcvtimeo(*sock, info->rcvTimeout); + if (info->maxMsgSize > -1) zsocket_set_maxmsgsize(*sock, info->maxMsgSize); + if (info->rate > -1) zsocket_set_rate(*sock, info->rate); + if (info->recoveryIVL > -1) zsocket_set_recovery_ivl(*sock, info->recoveryIVL); + if (info->multicastHops > -1) zsocket_set_multicast_hops(*sock, info->multicastHops); + if (info->reconnectIVL > -1) zsocket_set_reconnect_ivl(*sock, info->reconnectIVL); + if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax); + if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only); + if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity); + + /* since HWM have defaults, we always set them. No return codes to check, either.*/ + zsocket_set_sndhwm(*sock, info->sndHWM); + zsocket_set_rcvhwm(*sock, info->rcvHWM); + + /* Set subscriptions.*/ + for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii) + zsocket_set_subscribe(*sock, info->subscriptions[ii]); + + + + /* Do the bind/connect... */ + if (info->action==ACTION_CONNECT) { + rv = zsocket_connect(*sock, info->description); + if (rv < 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_connect using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } else { + rv = zsocket_bind(*sock, info->description); + if (rv <= 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_bind using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } + return RS_RET_OK; +} + +/* ---------------------------------------------------------------------------- + * Module endpoints + */ + +/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note + * that this makes the assumption that after the bind ruleset is called in the config, + * another call will be made to add an endpoint. +*/ +static rsRetVal +set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) { + ruleset_t* ruleset_ptr; + rsRetVal localRet; + DEFiRet; + + localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName); + if(localRet == RS_RET_NOT_FOUND) { + errmsg.LogError(0, NO_ERRCODE, "error: " + "ruleset '%s' not found - ignored", pszName); + } + CHKiRet(localRet); + s_ruleset = ruleset_ptr; + DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName); + +finalize_it: + free(pszName); /* no longer needed */ + RETiRet; +} + +/* add an actual endpoint + */ +static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) { + DEFiRet; + + /* increment number of items and store old num items, as it will be handy.*/ + size_t idx = s_nitems++; + + /* allocate a new socket_info array to accomidate this new endpoint*/ + socket_info* tmpSocketInfo; + CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems)); + + /* copy existing socket_info across into new array, if any, and free old storage*/ + if(idx) { + memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx); + free(s_socketInfo); + } + + /* set the static to hold the new array */ + s_socketInfo = tmpSocketInfo; + + /* point to the new one */ + socket_info* sockInfo = &s_socketInfo[idx]; + + /* set defaults for the new socket info */ + setDefaults(sockInfo); + + /* Make a writeable copy of the string so we can use strtok + in the parseConfig call */ + char * copy = NULL; + CHKmalloc(copy = strdup((char *) valp)); + + /* parse the config string */ + CHKiRet(parseConfig(copy, sockInfo)); + + /* validate it */ + CHKiRet(validateConfig(sockInfo)); + + /* bind to the current ruleset (if any)*/ + sockInfo->ruleset = s_ruleset; + +finalize_it: + free(valp); /* in any case, this is no longer needed */ + RETiRet; +} + + +static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) { + msg_t* logmsg; + poller_data* pollerData = (poller_data*)pd; + + char* buf = zstr_recv(poller->socket); + if (msgConstruct(&logmsg) == RS_RET_OK) { + MsgSetRawMsg(logmsg, buf, strlen(buf)); + MsgSetInputName(logmsg, s_namep); + MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(logmsg, pollerData->ruleset); + logmsg->msgFlags = NEEDS_PARSING; + submitMsg(logmsg); + } + + /* gotta free the string returned from zstr_recv() */ + free(buf); + + if( pollerData->thread->bShallStop == TRUE) { + /* a handler that returns -1 will terminate the + czmq reactor loop + */ + return -1; + } + + return 0; +} + +/* called when runInput is called by rsyslog + */ +static rsRetVal rcv_loop(thrdInfo_t* pThrd){ + size_t i; + int rv; + zmq_pollitem_t* items; + poller_data* pollerData; + + DEFiRet; + + /* create the context*/ + CHKiRet(createContext()); + + /* create the poll items*/ + CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems)); + + /* create poller data (stuff to pass into the zmq closure called when we get a message)*/ + CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems)); + + /* loop through and initialize the poll items and poller_data arrays...*/ + for(i=0; i<s_nitems;++i) { + /* create the socket, update items.*/ + createSocket(&s_socketInfo[i], &items[i].socket); + items[i].events = ZMQ_POLLIN; + + /* now update the poller_data for this item */ + pollerData[i].thread = pThrd; + pollerData[i].ruleset = s_socketInfo[i].ruleset; + } + + s_zloop = zloop_new(); + for(i=0; i<s_nitems; ++i) { + + rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]); + if (rv) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i); + } + } + zloop_start(s_zloop); + zloop_destroy(&s_zloop); + finalize_it: + for(i=0; i< s_nitems; ++i) { + zsocket_destroy(s_context, items[i].socket); + } + + zctx_destroy(&s_context); + + free(items); + RETiRet; +} + +/* ---------------------------------------------------------------------------- + * input module functions + */ + +BEGINrunInput +CODESTARTrunInput + iRet = rcv_loop(pThrd); + RETiRet; +ENDrunInput + + +/* initialize and return if will run or not */ +BEGINwillRun +CODESTARTwillRun + /* we need to create the inputName property (only once during our + lifetime) */ + CHKiRet(prop.Construct(&s_namep)); + CHKiRet(prop.SetString(s_namep, + UCHAR_CONSTANT("imzmq3"), + sizeof("imzmq3") - 1)); + CHKiRet(prop.ConstructFinalize(s_namep)); + +/* If there are no endpoints this is pointless ...*/ + if (s_nitems == 0) + ABORT_FINALIZE(RS_RET_NO_RUN); + +finalize_it: +ENDwillRun + + +BEGINafterRun +CODESTARTafterRun + /* do cleanup here */ + if(s_namep != NULL) + prop.Destruct(&s_namep); +ENDafterRun + + +BEGINmodExit +CODESTARTmodExit + /* release what we no longer need */ + objRelease(errmsg, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); +ENDmodExit + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, + void __attribute__((unused)) *pVal) { + return RS_RET_OK; +} +static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) { + errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val); + s_io_threads = val; + return RS_RET_OK; +} + +BEGINmodInit() +CODESTARTmodInit + /* we only support the current interface specification */ + *ipIFVersProvided = CURR_MOD_IF_VERSION; +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset", + 0, eCmdHdlrGetWord, + set_ruleset, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun", + 0, eCmdHdlrGetWord, + add_endpoint, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", + 1, eCmdHdlrCustomHandler, + resetConfigVariables, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads", + 1, eCmdHdlrInt, + setGlobalWorkerThreads, NULL, + STD_LOADABLE_MODULE_ID)); +ENDmodInit diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index cd14d03c..f8a7e739 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -67,8 +67,8 @@ typedef struct configSettings_s { uchar *dfltTplName; /* default template name to use */ int hdfsPort; } configSettings_t; +static configSettings_t cs; -SCOPING_SUPPORT; /* must be set AFTER configSettings_t is defined */ BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c index 1db2f7f0..531a0dcf 100644 --- a/plugins/omudpspoof/omudpspoof.c +++ b/plugins/omudpspoof/omudpspoof.c @@ -107,23 +107,41 @@ typedef struct _instanceData { #define DFLT_SOURCE_PORT_END 42000 typedef struct configSettings_s { - uchar *pszTplName; /* name of the default template to use */ + uchar *tplName; /* name of the default template to use */ uchar *pszSourceNameTemplate; /* name of the template containing the spoofing address */ uchar *pszTargetHost; uchar *pszTargetPort; - int iCompressionLevel; /* zlib compressionlevel, the usual values */ int iSourcePortStart; int iSourcePortEnd; } configSettings_t; static configSettings_t cs; +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "template", eCmdHdlrGetWord, 0 }, +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ + uchar *tplName; /* default template */ +}; + +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ + + + BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars - cs.pszTplName = NULL; + cs.tplName = NULL; cs.pszSourceNameTemplate = NULL; cs.pszTargetHost = NULL; cs.pszTargetPort = NULL; - cs.iCompressionLevel = 0; cs.iSourcePortStart = DFLT_SOURCE_PORT_START; cs.iSourcePortEnd = DFLT_SOURCE_PORT_END; ENDinitConfVars @@ -138,6 +156,44 @@ pthread_mutex_t mutLibnet; static rsRetVal doTryResume(instanceData *pData); +/* this function gets the default template. It coordinates action between + * old-style and new-style configuration parts. + */ +static inline uchar* +getDfltTpl(void) +{ + if(loadModConf != NULL && loadModConf->tplName != NULL) + return loadModConf->tplName; + else if(cs.tplName == NULL) + return (uchar*)"RSYSLOG_FileFormat"; + else + return cs.tplName; +} + + +/* set the default template to be used + * This is a module-global parameter, and as such needs special handling. It needs to + * be coordinated with values set via the v2 config system (rsyslog v6+). What we do + * is we do not permit this directive after the v2 config system has been used to set + * the parameter. + */ +rsRetVal +setLegacyDfltTpl(void __attribute__((unused)) *pVal, uchar* newVal) +{ + DEFiRet; + + if(loadModConf != NULL && loadModConf->tplName != NULL) { + free(newVal); + errmsg.LogError(0, RS_RET_ERR, "omudpspoof default template already set via module " + "global parameter - can no longer be changed"); + ABORT_FINALIZE(RS_RET_ERR); + } + free(cs.tplName); + cs.tplName = newVal; +finalize_it: + RETiRet; +} + /* Close the UDP sockets. * rgerhards, 2009-05-29 */ @@ -167,6 +223,72 @@ static inline uchar *getFwdPt(instanceData *pData) } +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + pModConf->tplName = NULL; +ENDbeginCnfLoad + +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for omudpspoof:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "template")) { + loadModConf->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + if(cs.tplName != NULL) { + errmsg.LogError(0, RS_RET_DUP_PARAM, "omudpspoof: warning: default template " + "was already set via legacy directive - may lead to inconsistent " + "results."); + } + } else { + dbgprintf("omudpspoof: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + +BEGINendCnfLoad +CODESTARTendCnfLoad + loadModConf = NULL; /* done loading */ + /* free legacy config vars */ + free(cs.tplName); + cs.tplName = NULL; +ENDendCnfLoad + +BEGINcheckCnf +CODESTARTcheckCnf +ENDcheckCnf + +BEGINactivateCnf +CODESTARTactivateCnf + runModConf = pModConf; +ENDactivateCnf + +BEGINfreeCnf +CODESTARTfreeCnf + free(pModConf->tplName); +ENDfreeCnf + + BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance @@ -421,13 +543,12 @@ CODE_STD_STRING_REQUESTparseSelectorAct(2) else CHKmalloc(pData->port = ustrdup(cs.pszTargetPort)); CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(sourceTpl), OMSR_NO_RQD_TPL_OPTS)); - pData->compressionLevel = cs.iCompressionLevel; pData->sourcePort = pData->sourcePortStart = cs.iSourcePortStart; pData->sourcePortEnd = cs.iSourcePortEnd; /* process template */ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, - (cs.pszTplName == NULL) ? (uchar*)"RSYSLOG_TraditionalForwardFormat" : cs.pszTplName)); + (cs.tplName == NULL) ? (uchar*)"RSYSLOG_TraditionalForwardFormat" : cs.tplName)); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -439,8 +560,8 @@ ENDparseSelectorAct static void freeConfigVars(void) { - free(cs.pszTplName); - cs.pszTplName = NULL; + free(cs.tplName); + cs.tplName = NULL; free(cs.pszTargetHost); cs.pszTargetHost = NULL; free(cs.pszTargetPort); @@ -464,6 +585,8 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES ENDqueryEtryPt @@ -474,7 +597,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a { freeConfigVars(); /* we now must reset all non-string values */ - cs.iCompressionLevel = 0; cs.iSourcePortStart = DFLT_SOURCE_PORT_START; cs.iSourcePortEnd = DFLT_SOURCE_PORT_END; return RS_RET_OK; @@ -504,13 +626,12 @@ CODEmodInit_QueryRegCFSLineHdlr } pthread_mutex_init(&mutLibnet, NULL); - CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.pszTplName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofdefaulttemplate", 0, eCmdHdlrGetWord, setLegacyDfltTpl, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofsourcenametemplate", 0, eCmdHdlrGetWord, NULL, &cs.pszSourceNameTemplate, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspooftargethost", 0, eCmdHdlrGetWord, NULL, &cs.pszTargetHost, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspooftargetport", 0, eCmdHdlrGetWord, NULL, &cs.pszTargetPort, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofsourceportstart", 0, eCmdHdlrInt, NULL, &cs.iSourcePortStart, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofsourceportend", 0, eCmdHdlrInt, NULL, &cs.iSourcePortEnd, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpcompressionlevel", 0, eCmdHdlrInt, NULL, &cs.iCompressionLevel, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/omuxsock/omuxsock.c b/plugins/omuxsock/omuxsock.c index cf27c93c..583b9f94 100644 --- a/plugins/omuxsock/omuxsock.c +++ b/plugins/omuxsock/omuxsock.c @@ -71,6 +71,26 @@ typedef struct configSettings_s { } configSettings_t; static configSettings_t cs; +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "template", eCmdHdlrGetWord, 0 }, +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ + uchar *tplName; /* default template */ +}; + +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ + + + BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars cs.tplName = NULL; @@ -80,8 +100,45 @@ ENDinitConfVars static rsRetVal doTryResume(instanceData *pData); -/* Close socket. + +/* this function gets the default template. It coordinates action between + * old-style and new-style configuration parts. + */ +static inline uchar* +getDfltTpl(void) +{ + if(loadModConf != NULL && loadModConf->tplName != NULL) + return loadModConf->tplName; + else if(cs.tplName == NULL) + return (uchar*)"RSYSLOG_TraditionalForwardFormat"; + else + return cs.tplName; +} + +/* set the default template to be used + * This is a module-global parameter, and as such needs special handling. It needs to + * be coordinated with values set via the v2 config system (rsyslog v6+). What we do + * is we do not permit this directive after the v2 config system has been used to set + * the parameter. */ +rsRetVal +setLegacyDfltTpl(void __attribute__((unused)) *pVal, uchar* newVal) +{ + DEFiRet; + + if(loadModConf != NULL && loadModConf->tplName != NULL) { + free(newVal); + errmsg.LogError(0, RS_RET_ERR, "omuxsock default template already set via module " + "global parameter - can no longer be changed"); + ABORT_FINALIZE(RS_RET_ERR); + } + free(cs.tplName); + cs.tplName = newVal; +finalize_it: + RETiRet; +} + + static inline rsRetVal closeSocket(instanceData *pData) { @@ -96,6 +153,72 @@ pData->bIsConnected = 0; // TODO: remove this variable altogether + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + pModConf->tplName = NULL; +ENDbeginCnfLoad + +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for omuxsock:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "template")) { + loadModConf->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + if(cs.tplName != NULL) { + errmsg.LogError(0, RS_RET_DUP_PARAM, "omuxsock: warning: default template " + "was already set via legacy directive - may lead to inconsistent " + "results."); + } + } else { + dbgprintf("omuxsock: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + +BEGINendCnfLoad +CODESTARTendCnfLoad + loadModConf = NULL; /* done loading */ + /* free legacy config vars */ + free(cs.tplName); + cs.tplName = NULL; +ENDendCnfLoad + +BEGINcheckCnf +CODESTARTcheckCnf +ENDcheckCnf + +BEGINactivateCnf +CODESTARTactivateCnf + runModConf = pModConf; +ENDactivateCnf + +BEGINfreeCnf +CODESTARTfreeCnf + free(pModConf->tplName); +ENDfreeCnf + BEGINcreateInstance CODESTARTcreateInstance pData->sock = INVLD_SOCK; @@ -250,8 +373,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) /* check if a non-standard template is to be applied */ if(*(p-1) == ';') --p; - CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0, cs.tplName == NULL ? UCHAR_CONSTANT("RSYSLOG_TraditionalForwardFormat") - : cs.tplName )); + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0, getDfltTpl())); if(cs.sockName == NULL) { errmsg.LogError(0, RS_RET_NO_SOCK_CONFIGURED, "No output socket configured for omuxsock\n"); @@ -291,6 +413,8 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES ENDqueryEtryPt @@ -312,7 +436,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); - CHKiRet(regCfSysLineHdlr((uchar *)"omuxsockdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.tplName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"omuxsockdefaulttemplate", 0, eCmdHdlrGetWord, setLegacyDfltTpl, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"omuxsocksocket", 0, eCmdHdlrGetWord, NULL, &cs.sockName, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/omzmq3/Makefile.am b/plugins/omzmq3/Makefile.am new file mode 100644 index 00000000..92cd7586 --- /dev/null +++ b/plugins/omzmq3/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omzmq3.la + +omzmq3_la_SOURCES = omzmq3.c +omzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS) +omzmq3_la_LDFLAGS = -module -avoid-version +omzmq3_la_LIBADD = $(CZMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README new file mode 100644 index 00000000..ccc96c74 --- /dev/null +++ b/plugins/omzmq3/README @@ -0,0 +1,25 @@ +ZeroMQ 3.x Output Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example +below binds a PUB socket to port 7171, and any message fitting the criteria will +be output to the zmq socket. + +Example Rsyslog.conf snippet (NOTE: v6 format): +------------------------------------------------------------------------------- +if $msg then { + action(type="omzmq3", sockType="PUB", action="BIND", + description="tcp://*:7172) +} +------------------------------------------------------------------------------- diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c new file mode 100644 index 00000000..e13011fb --- /dev/null +++ b/plugins/omzmq3/omzmq3.c @@ -0,0 +1,462 @@ +/* omzmq3.c + * Copyright 2012 Talksum, Inc + * Using the czmq interface to zeromq, we output + * to a zmq socket. + + +* +* This program is free software: you can redistribute it and/or +* modify it under the terms of the GNU Lesser General Public License +* as published by the Free Software Foundation, either version 3 of +* the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public +* License along with this program. If not, see +* <http://www.gnu.org/licenses/>. +* +* Author: David Kelly +* <davidk@talksum.com> +*/ + + +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <unistd.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "cfsysline.h" + +#include <czmq.h> + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omzmq3") + +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + +/* convienent symbols to denote a socket we want to bind + vs one we want to just connect to +*/ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +struct socket_type { + char* name; + int type; +}; + +/* more overkill, but seems nice to be consistent. */ +struct socket_action { + char* name; + int action; +}; + +typedef struct _instanceData { + void* socket; + uchar* description; + int type; + int action; + int sndHWM; + int rcvHWM; + uchar* identity; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + uchar* tplName; +} instanceData; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations + */ + +/* only 1 zctx for all the sockets, with an adjustable number of + worker threads which may be useful if we use affinity in particular + sockets +*/ +static zctx_t* s_context = NULL; +static int s_workerThreads = -1; + +static struct socket_type types[] = { + {"PUB", ZMQ_PUB }, + {"PUSH", ZMQ_PUSH }, + {"XPUB", ZMQ_XPUB } +}; + +static struct socket_action actions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + +static struct cnfparamdescr actpdescr[] = { + { "description", eCmdHdlrGetWord, 0 }, + { "sockType", eCmdHdlrGetWord, 0 }, + { "action", eCmdHdlrGetWord, 0 }, + { "sndHWM", eCmdHdlrInt, 0 }, + { "rcvHWM", eCmdHdlrInt, 0 }, + { "identity", eCmdHdlrGetWord, 0 }, + { "sndBuf", eCmdHdlrInt, 0 }, + { "rcvBuf", eCmdHdlrInt, 0 }, + { "linger", eCmdHdlrInt, 0 }, + { "backlog", eCmdHdlrInt, 0 }, + { "sndTimeout", eCmdHdlrInt, 0 }, + { "rcvTimeout", eCmdHdlrInt, 0 }, + { "maxMsgSize", eCmdHdlrInt, 0 }, + { "rate", eCmdHdlrInt, 0 }, + { "recoveryIVL", eCmdHdlrInt, 0 }, + { "multicastHops", eCmdHdlrInt, 0 }, + { "reconnectIVL", eCmdHdlrInt, 0 }, + { "reconnectIVLMax", eCmdHdlrInt, 0 }, + { "ipv4Only", eCmdHdlrInt, 0 }, + { "affinity", eCmdHdlrInt, 0 }, + { "globalWorkerThreads", eCmdHdlrInt, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; + +static struct cnfparamblk actpblk = { + CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr +}; + +/* ---------------------------------------------------------------------------- + * Helper Functions + */ + +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ +int getSocketType(char* name) { + int type = -1; + uint i; + for(i=0; i<sizeof(types)/sizeof(struct socket_type); ++i) { + if( !strcmp(types[i].name, name) ) { + type = types[i].type; + break; + } + } + return type; +} + + +static int getSocketAction(char* name) { + int action = -1; + uint i; + for(i=0; i < sizeof(actions)/sizeof(struct socket_action); ++i) { + if(!strcmp(actions[i].name, name)) { + action = actions[i].action; + break; + } + } + return action; +} + +/* closeZMQ will destroy the context and + * associated socket + */ +static void closeZMQ(instanceData* pData) { + errmsg.LogError(0, NO_ERRCODE, "closeZMQ called"); + if(s_context && pData->socket) { + if(pData->socket != NULL) { + zsocket_destroy(s_context, pData->socket); + } + } +} + + +static rsRetVal initZMQ(instanceData* pData) { + DEFiRet; + + /* create the context if necessary. */ + if (NULL == s_context) { + s_context = zctx_new(); + if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads); + } + + pData->socket = zsocket_new(s_context, pData->type); + + /* ALWAYS set the HWM as the zmq3 default is 1000 and we default + to 0 (infinity) */ + zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); + zsocket_set_sndhwm(pData->socket, pData->sndHWM); + + /* use czmq defaults for these, unless set to non-default values */ + if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity); + if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf); + if(pData->rcvBuf > -1) zsocket_set_sndbuf(pData->socket, pData->rcvBuf); + if(pData->linger > -1) zsocket_set_linger(pData->socket, pData->linger); + if(pData->backlog > -1) zsocket_set_backlog(pData->socket, pData->backlog); + if(pData->sndTimeout > -1) zsocket_set_sndtimeo(pData->socket, pData->sndTimeout); + if(pData->rcvTimeout > -1) zsocket_set_rcvtimeo(pData->socket, pData->rcvTimeout); + if(pData->maxMsgSize > -1) zsocket_set_maxmsgsize(pData->socket, pData->maxMsgSize); + if(pData->rate > -1) zsocket_set_rate(pData->socket, pData->rate); + if(pData->recoveryIVL > -1) zsocket_set_recovery_ivl(pData->socket, pData->recoveryIVL); + if(pData->multicastHops > -1) zsocket_set_multicast_hops(pData->socket, pData->multicastHops); + if(pData->reconnectIVL > -1) zsocket_set_reconnect_ivl(pData->socket, pData->reconnectIVL); + if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax); + if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only); + if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity); + + /* bind or connect to it */ + if (pData->action == ACTION_BIND) { + /* bind asserts, so no need to test return val here + which isn't the greatest api -- oh well */ + zsocket_bind(pData->socket, (char*)pData->description); + } else { + if(zsocket_connect(pData->socket, (char*)pData->description) == -1) { + errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + } + finalize_it: + RETiRet; +} + +rsRetVal writeZMQ(uchar* msg, instanceData* pData) { + DEFiRet; + + /* initialize if necessary */ + if(NULL == pData->socket) + CHKiRet(initZMQ(pData)); + + /* send it */ + int result = zstr_send(pData->socket, (char*)msg); + + /* whine if things went wrong */ + if (result == -1) { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result); + ABORT_FINALIZE(RS_RET_ERR); + } + finalize_it: + RETiRet; +} + +static inline void +setInstParamDefaults(instanceData* pData) { + pData->description = (uchar*)"tcp://*:7171"; + pData->socket = NULL; + pData->tplName = NULL; + pData->type = ZMQ_PUB; + pData->action = ACTION_BIND; + pData->sndHWM = 0; /*unlimited*/ + pData->rcvHWM = 0; /*unlimited*/ + pData->identity = NULL; + pData->sndBuf = -1; + pData->rcvBuf = -1; + pData->linger = -1; + pData->backlog = -1; + pData->sndTimeout = -1; + pData->rcvTimeout = -1; + pData->maxMsgSize = -1; + pData->rate = -1; + pData->recoveryIVL = -1; + pData->multicastHops = -1; + pData->reconnectIVL = -1; + pData->reconnectIVLMax = -1; + pData->ipv4Only = -1; + pData->affinity = 1; +} + + +/* ---------------------------------------------------------------------------- + * Output Module Functions + */ + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINfreeInstance +CODESTARTfreeInstance + closeZMQ(pData); + free(pData->description); + free(pData->tplName); +ENDfreeInstance + +BEGINtryResume +CODESTARTtryResume + if(NULL == pData->socket) + iRet = initZMQ(pData); +ENDtryResume + +BEGINdoAction +CODESTARTdoAction +iRet = writeZMQ(ppString[0], pData); +ENDdoAction + + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst +if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + +CHKiRet(createInstance(&pData)); +setInstParamDefaults(pData); + +CODE_STD_STRING_REQUESTparseSelectorAct(1) +for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "description")) { + pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sockType")){ + pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "action")){ + pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { + pData->sndHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { + pData->rcvHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "identity")){ + pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { + pData->sndBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { + pData->rcvBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "linger")) { + pData->linger = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "backlog")) { + pData->backlog = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { + pData->sndTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { + pData->rcvTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { + pData->maxMsgSize = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rate")) { + pData->rate = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { + pData->recoveryIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { + pData->multicastHops = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { + pData->reconnectIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { + pData->reconnectIVLMax = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { + pData->ipv4Only = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "affinity")) { + pData->affinity = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { + s_workerThreads = (int) pvals[i].val.d.n; + } else { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + +if(pData->tplName == NULL) { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); + } else { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); + } + +if(pData->type == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } +if(pData->action == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + +BEGINparseSelectorAct +CODESTARTparseSelectorAct + +/* tell the engine we only want one template string */ +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1)) + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omzmq3 supports only v6 config format, use: " + "action(type=\"omzmq3\" serverport=...)"); + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINinitConfVars /* (re)set config variables to defaults */ +CODESTARTinitConfVars +s_workerThreads = -1; +ENDinitConfVars + +BEGINmodExit +CODESTARTmodExit +if(NULL != s_context) { + zctx_destroy(&s_context); + s_context=NULL; + } +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +ENDqueryEtryPt + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION); + +INITLegCnfVars +CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); +ENDmodInit + + + |