diff options
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 262 | ||||
-rw-r--r-- | runtime/rsconf.c | 2 |
2 files changed, 217 insertions, 47 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 2b451eb4..79a24ffe 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h for more specifics! * * Copyright 2011 Nathan Scott. - * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -42,9 +42,11 @@ #include "errmsg.h" #include "statsobj.h" #include "cfsysline.h" +#include "unicode-helper.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omelasticsearch") /* internal structures */ DEF_OMOD_STATIC_DATA @@ -62,15 +64,38 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) */ typedef struct curl_slist HEADER; typedef struct _instanceData { + uchar *server; + int port; + uchar *searchIndex; + uchar *searchType; + uchar *tplName; + uchar *timeout; + sbool dynSrchIdx; + sbool dynSrchType; + sbool asyncRepl; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; -/* config variables */ -static int restPort = 9200; -static char *hostName = "localhost"; -static char *searchIndex = "system"; -static char *searchType = "events"; + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "server", eCmdHdlrGetWord, 0 }, + { "serverport", eCmdHdlrInt, 0 }, + { "searchindex", eCmdHdlrGetWord, 0 }, + { "searchtype", eCmdHdlrGetWord, 0 }, + { "dynsearchindex", eCmdHdlrBinary, 0 }, + { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "asyncrepl", eCmdHdlrBinary, 0 }, + { "timeout", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; BEGINcreateInstance CODESTARTcreateInstance @@ -92,6 +117,10 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + free(pData->server); + free(pData->searchIndex); + free(pData->searchType); + free(pData->tplName); ENDfreeInstance BEGINdbgPrintInstInfo @@ -102,12 +131,57 @@ BEGINtryResume CODESTARTtryResume ENDtryResume -rsRetVal -curlPost(instanceData *instance, uchar *message) + +static rsRetVal +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + uchar *searchIndex; + uchar *searchType; + + if(pData->dynSrchIdx) { + searchIndex = tpl1; + if(pData->dynSrchType) + searchType = tpl2; + else + searchType = pData->searchType; + } else { + searchIndex = pData->searchIndex; + if(pData->dynSrchType) + searchType = tpl1; + else + searchType = pData->searchType; + } + if(pData->asyncRepl) { + if(pData->timeout != NULL) { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async&timeout=%s", + pData->server, pData->port, searchIndex, searchType, + pData->timeout); + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async", + pData->server, pData->port, searchIndex, searchType); + } + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + } + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); + return RS_RET_OK; +} + +static rsRetVal +curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; int length = strlen((char *)message); + DEFiRet; + + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -124,11 +198,13 @@ curlPost(instanceData *instance, uchar *message) STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); return RS_RET_OK; } +finalize_it: + RETiRet; } BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0])); + CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction @@ -160,10 +236,10 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) return size * nmemb; } + static rsRetVal -curlSetup(instanceData *instance) +curlSetup(instanceData *pData) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ HEADER *header; CURL *handle; @@ -172,41 +248,151 @@ curlSetup(instanceData *instance) return RS_RET_OBJ_CREATION_FAILED; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - hostName, restPort, searchIndex, searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); - curl_easy_setopt(handle, CURLOPT_URL, restURL); curl_easy_setopt(handle, CURLOPT_POST, 1); - instance->curlHandle = handle; - instance->postHeader = header; + pData->curlHandle = handle; + pData->postHeader = header; + + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { + /* in this case, we know no tpls are involved --> NULL OK! */ + setCurlURL(pData, NULL, NULL); + } - DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + if(Debug) { + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + dbgprintf("omelasticsearch setup, using static REST URL\n"); + else + dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); + } return RS_RET_OK; } -BEGINparseSelectorAct -CODESTARTparseSelectorAct -CODE_STD_STRING_REQUESTparseSelectorAct(1) - if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { - ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +static inline void +setInstParamDefaults(instanceData *pData) +{ + pData->server = NULL; + pData->port = 9200; + pData->searchIndex = NULL; + pData->searchType = NULL; + pData->timeout = NULL; + pData->dynSrchIdx = 0; + pData->dynSrchType = 0; + pData->asyncRepl = 0; + pData->tplName = NULL; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; + int iNumTpls; +CODESTARTnewActInst + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } - p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "server")) { + pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "serverport")) { + pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { + pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { + pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { + pData->dynSrchIdx = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { + pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "timeout")) { + pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { + pData->asyncRepl = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("omelasticsearch: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + + if(pData->dynSrchIdx && pData->searchIndex == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search index, but no " + "name for index template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + if(pData->dynSrchType && pData->searchType == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search type, but no " + "name for type template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } - /* check if a non-standard template is to be applied */ - if(*(p-1) == ';') - --p; - CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + iNumTpls = 1; + if(pData->dynSrchIdx) ++iNumTpls; + if(pData->dynSrchType) ++iNumTpls; + DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); + CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) + + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? + " StdJSONFmt" : (char*)pData->tplName), + OMSR_NO_RQD_TPL_OPTS)); + + + /* we need to request additional templates. If we have a dynamic search index, + * it will always be string 1. Type may be 1 or 2, depending on whether search + * index is dynamic as well. Rule needs to be followed throughout the module. + */ + if(pData->dynSrchIdx) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchIndex), + OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } + + if(pData->server == NULL) + pData->server = (uchar*) strdup("localhost"); + if(pData->searchIndex == NULL) + pData->searchIndex = (uchar*) strdup("system"); + if(pData->searchType == NULL) + pData->searchType = (uchar*) strdup("events"); - /* all good, we can now initialise our private data */ CHKiRet(curlSetup(pData)); + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch supports only v6 config format, use: " + "action(type=\"omelasticsearch\" server=...)"); + } + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct + BEGINmodExit CODESTARTmodExit curl_global_cleanup(); @@ -219,18 +405,9 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt -static rsRetVal -resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) -{ - DEFiRet; - restPort = 9200; - hostName = "localhost"; - searchIndex = "system"; - searchType = "events"; - RETiRet; -} BEGINmodInit() CODESTARTmodInit @@ -239,13 +416,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); - /* register config file handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); - if (curl_global_init(CURL_GLOBAL_ALL) != 0) { errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index ed61e65e..ff9e1291 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -93,7 +93,7 @@ static uchar template_StdUsrMsgFmt[] = "\" %syslogtag%%msg%\n\r\""; static uchar template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-mysql%', '%timegenerated:::date-mysql%', %iut%, '%syslogtag%')\",SQL"; static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-pgsql%', '%timegenerated:::date-pgsql%', %iut%, '%syslogtag%')\",STDSQL"; static uchar template_spoofadr[] = "\"%fromhost-ip%\""; -static uchar template_StdJSONFmt[] = "\"{\\\"message\\\":\\\"%msg%\\\",\\\"fromhost\\\":\\\"%HOSTNAME%\\\",\\\"facility\\\":\\\"%syslogfacility-text%\\\",\\\"priority\\\":\\\"%syslogpriority-text%\\\",\\\"timereported\\\":\\\"%timereported:::date-rfc3339%\\\",\\\"timegenerated\\\":\\\"%timegenerated:::date-rfc3339%\\\"}\",JSON"; +static uchar template_StdJSONFmt[] = "\"{\\\"message\\\":\\\"%msg:::json%\\\",\\\"fromhost\\\":\\\"%HOSTNAME:::json%\\\",\\\"facility\\\":\\\"%syslogfacility-text%\\\",\\\"priority\\\":\\\"%syslogpriority-text%\\\",\\\"timereported\\\":\\\"%timereported:::date-rfc3339%\\\",\\\"timegenerated\\\":\\\"%timegenerated:::date-rfc3339%\\\"}\""; /* end templates */ void cnfDoCfsysline(char *ln); |