summaryrefslogtreecommitdiffstats
path: root/plugins/imptcp/imptcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r--plugins/imptcp/imptcp.c214
1 files changed, 193 insertions, 21 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index aa1ad81e..8150fc33 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -90,6 +90,8 @@ DEFobjCurrIf(statsobj)
/* forward references */
static void * wrkr(void *myself);
+#define DFLT_wrkrMax 2
+
/* config settings */
typedef struct configSettings_s {
int bKeepAlive; /* support keep-alive packets */
@@ -127,12 +129,45 @@ struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
instanceConf_t *root, *tail;
int wrkrMax;
+ sbool configSetViaV2Method;
};
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "threads", eCmdHdlrPositiveInt, 0 }
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
+
+/* input instance parameters */
+static struct cnfparamdescr inppdescr[] = {
+ { "port", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
+ { "address", eCmdHdlrString, 0 },
+ { "name", eCmdHdlrString, 0 },
+ { "ruleset", eCmdHdlrString, 0 },
+ { "supportoctetcountedframing", eCmdHdlrBinary, 0 },
+ { "notifyonconnectionclose", eCmdHdlrBinary, 0 },
+ { "keepalive", eCmdHdlrBinary, 0 },
+ { "keepalive.probes", eCmdHdlrInt, 0 },
+ { "keepalive.time", eCmdHdlrInt, 0 },
+ { "keepalive.interval", eCmdHdlrInt, 0 },
+ { "addtlframedelimiter", eCmdHdlrInt, 0 },
+};
+static struct cnfparamblk inppblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(inppdescr)/sizeof(struct cnfparamdescr),
+ inppdescr
+ };
+
#include "im-helper.h" /* must be included AFTER the type definitions! */
+static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
+
/* data elements describing our running config */
typedef struct ptcpsrv_s ptcpsrv_t;
typedef struct ptcplstn_s ptcplstn_t;
@@ -379,7 +414,9 @@ startupSrv(ptcpsrv_t *pSrv)
#endif
) {
/* TODO: check if *we* bound the socket - else we *have* an error! */
- DBGPRINTF("error %d while binding tcp socket", errno);
+ char errStr[1024];
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ dbgprintf("error %d while binding tcp socket: %s\n", errno, errStr);
close(sock);
sock = -1;
continue;
@@ -812,7 +849,7 @@ static inline void
initConfigSettings(void)
{
cs.bEmitMsgOnClose = 0;
- cs.wrkrMax = 2;
+ cs.wrkrMax = DFLT_wrkrMax;
cs.bSuppOctetFram = 1;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
cs.pszInputName = NULL;
@@ -986,7 +1023,45 @@ closeSess(ptcpsess_t *pSess)
destructSess(pSess);
finalize_it:
- DBGPRINTF("imtcp: session on socket %d closed with iRet %d.\n", sock, iRet);
+ DBGPRINTF("imptcp: session on socket %d closed with iRet %d.\n", sock, iRet);
+ RETiRet;
+}
+
+
+/* create input instance, set default paramters, and
+ * add it to the list of instances.
+ */
+static rsRetVal
+createInstance(instanceConf_t **pinst)
+{
+ instanceConf_t *inst;
+ DEFiRet;
+ CHKmalloc(inst = MALLOC(sizeof(instanceConf_t)));
+ inst->next = NULL;
+
+ inst->pszBindPort = NULL;
+ inst->pszBindAddr = NULL;
+ inst->pszBindRuleset = NULL;
+ inst->pszInputName = NULL;
+ inst->bSuppOctetFram = 1;
+ inst->bKeepAlive = 0;
+ inst->iKeepAliveIntvl = 0;
+ inst->iKeepAliveProbes = 0;
+ inst->iKeepAliveTime = 0;
+ inst->bEmitMsgOnClose = 0;
+ inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
+ inst->pBindRuleset = NULL;
+
+ /* node created, let's add to config */
+ if(loadModConf->tail == NULL) {
+ loadModConf->tail = loadModConf->root = inst;
+ } else {
+ loadModConf->tail->next = inst;
+ loadModConf->tail = inst;
+ }
+
+ *pinst = inst;
+finalize_it:
RETiRet;
}
@@ -1000,7 +1075,7 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal)
instanceConf_t *inst;
DEFiRet;
- CHKmalloc(inst = MALLOC(sizeof(instanceConf_t)));
+ CHKiRet(createInstance(&inst));
if(pNewVal == NULL || *pNewVal == '\0') {
errmsg.LogError(0, NO_ERRCODE, "imptcp: port number must be specified, listener ignored");
}
@@ -1032,15 +1107,6 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal)
inst->iKeepAliveTime = cs.iKeepAliveTime;
inst->bEmitMsgOnClose = cs.bEmitMsgOnClose;
inst->iAddtlFrameDelim = cs.iAddtlFrameDelim;
- inst->next = NULL;
-
- /* node created, let's add to config */
- if(loadModConf->tail == NULL) {
- loadModConf->tail = loadModConf->root = inst;
- } else {
- loadModConf->tail->next = inst;
- loadModConf->tail = inst;
- }
finalize_it:
free(pNewVal);
@@ -1103,6 +1169,7 @@ startWorkerPool(void)
wrkrRunning = 0;
if(runModConf->wrkrMax > 16)
runModConf->wrkrMax = 16; /* TODO: make dynamic? */
+ DBGPRINTF("imptcp: starting worker pool, %d workers\n", runModConf->wrkrMax);
pthread_mutex_init(&wrkrMut, NULL);
pthread_cond_init(&wrkrIdle, NULL);
for(i = 0 ; i < runModConf->wrkrMax ; ++i) {
@@ -1121,6 +1188,7 @@ static inline void
stopWorkerPool(void)
{
int i;
+ DBGPRINTF("imptcp: stoping worker pool\n");
for(i = 0 ; i < runModConf->wrkrMax ; ++i) {
pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
pthread_join(wrkrInfo[i].tid, NULL);
@@ -1129,7 +1197,6 @@ stopWorkerPool(void)
}
pthread_cond_destroy(&wrkrIdle);
pthread_mutex_destroy(&wrkrMut);
-
}
@@ -1213,7 +1280,7 @@ sessActivity(ptcpsess_t *pSess)
if(lenRcv > 0) {
/* have data, process it */
- DBGPRINTF("imtcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf);
+ DBGPRINTF("imptcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf);
CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv));
} else if (lenRcv == 0) {
/* session was closed, do clean-up */
@@ -1229,7 +1296,7 @@ sessActivity(ptcpsess_t *pSess)
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK)
break;
- DBGPRINTF("imtcp: error on session socket %d - closed.\n", pSess->sock);
+ DBGPRINTF("imptcp: error on session socket %d - closed.\n", pSess->sock);
closeSess(pSess); /* try clean-up by dropping session */
break;
}
@@ -1345,19 +1412,121 @@ wrkr(void *myself)
}
+BEGINnewInpInst
+ struct cnfparamvals *pvals;
+ instanceConf_t *inst;
+ int i;
+CODESTARTnewInpInst
+ DBGPRINTF("newInpInst (imptcp)\n");
+
+ pvals = nvlstGetParams(lst, &inppblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS,
+ "imptcp: required parameter are missing\n");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("input param blk in imptcp:\n");
+ cnfparamsPrint(&inppblk, pvals);
+ }
+
+ CHKiRet(createInstance(&inst));
+
+ for(i = 0 ; i < inppblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(inppblk.descr[i].name, "port")) {
+ inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "address")) {
+ inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "name")) {
+ inst->pszInputName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "ruleset")) {
+ inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "supportOctetCountedFraming")) {
+ inst->bSuppOctetFram = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "keepalive")) {
+ inst->bKeepAlive = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "keepalive.probes")) {
+ inst->iKeepAliveProbes = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "keepalive.time")) {
+ inst->iKeepAliveTime = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "keepalive.interval")) {
+ inst->iKeepAliveIntvl = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "addtlframedelimiter")) {
+ inst->iAddtlFrameDelim = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) {
+ inst->bEmitMsgOnClose = (int) pvals[i].val.d.n;
+ } else {
+ dbgprintf("imptcp: program error, non-handled "
+ "param '%s'\n", inppblk.descr[i].name);
+ }
+ }
+finalize_it:
+CODE_STD_FINALIZERnewInpInst
+ cnfparamvalsDestruct(pvals, &inppblk);
+ENDnewInpInst
+
+
BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
loadModConf = pModConf;
pModConf->pConf = pConf;
+ /* init our settings */
+ loadModConf->wrkrMax = DFLT_wrkrMax;
+ loadModConf->configSetViaV2Method = 0;
+ bLegacyCnfModGlobalsPermitted = 1;
/* init legacy config vars */
initConfigSettings();
ENDbeginCnfLoad
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imptcp: error processing module "
+ "config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for imptcp:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(modpblk.descr[i].name, "threads")) {
+ loadModConf->wrkrMax = (int) pvals[i].val.d.n;
+ } else {
+ dbgprintf("imptcp: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+
+ /* remove all of our legacy handlers, as they can not used in addition
+ * the the new-style config method.
+ */
+ bLegacyCnfModGlobalsPermitted = 0;
+ loadModConf->configSetViaV2Method = 1;
+
+finalize_it:
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+
BEGINendCnfLoad
CODESTARTendCnfLoad
- /* persist module-specific settings from legacy config system */
- loadModConf->wrkrMax = cs.wrkrMax;
+ if(!loadModConf->configSetViaV2Method) {
+ /* persist module-specific settings from legacy config system */
+ loadModConf->wrkrMax = cs.wrkrMax;
+ }
loadModConf = NULL; /* done loading */
/* free legacy config vars */
@@ -1541,7 +1710,7 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
cs.bEmitMsgOnClose = 0;
- cs.wrkrMax = 2;
+ cs.wrkrMax = DFLT_wrkrMax;
cs.bKeepAlive = 0;
cs.iKeepAliveProbes = 0;
cs.iKeepAliveTime = 0;
@@ -1567,7 +1736,9 @@ BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
+CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
@@ -1609,14 +1780,15 @@ CODEmodInit_QueryRegCFSLineHdlr
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt,
- NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0,
eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0,
eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0,
eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID));
+ /* module-global parameters */
+ CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt,
+ NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
ENDmodInit