summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c262
-rw-r--r--runtime/rsconf.c2
2 files changed, 217 insertions, 47 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index 2b451eb4..79a24ffe 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -4,7 +4,7 @@
* NOTE: read comments in module-template.h for more specifics!
*
* Copyright 2011 Nathan Scott.
- * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -42,9 +42,11 @@
#include "errmsg.h"
#include "statsobj.h"
#include "cfsysline.h"
+#include "unicode-helper.h"
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("omelasticsearch")
/* internal structures */
DEF_OMOD_STATIC_DATA
@@ -62,15 +64,38 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
*/
typedef struct curl_slist HEADER;
typedef struct _instanceData {
+ uchar *server;
+ int port;
+ uchar *searchIndex;
+ uchar *searchType;
+ uchar *tplName;
+ uchar *timeout;
+ sbool dynSrchIdx;
+ sbool dynSrchType;
+ sbool asyncRepl;
CURL *curlHandle; /* libcurl session handle */
HEADER *postHeader; /* json POST request info */
} instanceData;
-/* config variables */
-static int restPort = 9200;
-static char *hostName = "localhost";
-static char *searchIndex = "system";
-static char *searchType = "events";
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "server", eCmdHdlrGetWord, 0 },
+ { "serverport", eCmdHdlrInt, 0 },
+ { "searchindex", eCmdHdlrGetWord, 0 },
+ { "searchtype", eCmdHdlrGetWord, 0 },
+ { "dynsearchindex", eCmdHdlrBinary, 0 },
+ { "dynsearchtype", eCmdHdlrBinary, 0 },
+ { "asyncrepl", eCmdHdlrBinary, 0 },
+ { "timeout", eCmdHdlrGetWord, 0 },
+ { "template", eCmdHdlrGetWord, 1 }
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
BEGINcreateInstance
CODESTARTcreateInstance
@@ -92,6 +117,10 @@ CODESTARTfreeInstance
curl_easy_cleanup(pData->curlHandle);
pData->curlHandle = NULL;
}
+ free(pData->server);
+ free(pData->searchIndex);
+ free(pData->searchType);
+ free(pData->tplName);
ENDfreeInstance
BEGINdbgPrintInstInfo
@@ -102,12 +131,57 @@ BEGINtryResume
CODESTARTtryResume
ENDtryResume
-rsRetVal
-curlPost(instanceData *instance, uchar *message)
+
+static rsRetVal
+setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
+{
+ char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */
+ uchar *searchIndex;
+ uchar *searchType;
+
+ if(pData->dynSrchIdx) {
+ searchIndex = tpl1;
+ if(pData->dynSrchType)
+ searchType = tpl2;
+ else
+ searchType = pData->searchType;
+ } else {
+ searchIndex = pData->searchIndex;
+ if(pData->dynSrchType)
+ searchType = tpl1;
+ else
+ searchType = pData->searchType;
+ }
+ if(pData->asyncRepl) {
+ if(pData->timeout != NULL) {
+ snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?"
+ "replication=async&timeout=%s",
+ pData->server, pData->port, searchIndex, searchType,
+ pData->timeout);
+ } else {
+ snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?"
+ "replication=async",
+ pData->server, pData->port, searchIndex, searchType);
+ }
+ } else {
+ snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s",
+ pData->server, pData->port, searchIndex, searchType);
+ }
+ curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
+ DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL);
+ return RS_RET_OK;
+}
+
+static rsRetVal
+curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2)
{
CURLcode code;
CURL *curl = instance->curlHandle;
int length = strlen((char *)message);
+ DEFiRet;
+
+ if(instance->dynSrchIdx || instance->dynSrchType)
+ CHKiRet(setCurlURL(instance, tpl1, tpl2));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
@@ -124,11 +198,13 @@ curlPost(instanceData *instance, uchar *message)
STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
return RS_RET_OK;
}
+finalize_it:
+ RETiRet;
}
BEGINdoAction
CODESTARTdoAction
- CHKiRet(curlPost(pData, ppString[0]));
+ CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2]));
finalize_it:
ENDdoAction
@@ -160,10 +236,10 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
return size * nmemb;
}
+
static rsRetVal
-curlSetup(instanceData *instance)
+curlSetup(instanceData *pData)
{
- char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */
HEADER *header;
CURL *handle;
@@ -172,41 +248,151 @@ curlSetup(instanceData *instance)
return RS_RET_OBJ_CREATION_FAILED;
}
- snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s",
- hostName, restPort, searchIndex, searchType);
header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
+ curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header);
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
- curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header);
- curl_easy_setopt(handle, CURLOPT_URL, restURL);
curl_easy_setopt(handle, CURLOPT_POST, 1);
- instance->curlHandle = handle;
- instance->postHeader = header;
+ pData->curlHandle = handle;
+ pData->postHeader = header;
+
+ if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) {
+ /* in this case, we know no tpls are involved --> NULL OK! */
+ setCurlURL(pData, NULL, NULL);
+ }
- DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL);
+ if(Debug) {
+ if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0)
+ dbgprintf("omelasticsearch setup, using static REST URL\n");
+ else
+ dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n");
+ }
return RS_RET_OK;
}
-BEGINparseSelectorAct
-CODESTARTparseSelectorAct
-CODE_STD_STRING_REQUESTparseSelectorAct(1)
- if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) {
- ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->server = NULL;
+ pData->port = 9200;
+ pData->searchIndex = NULL;
+ pData->searchType = NULL;
+ pData->timeout = NULL;
+ pData->dynSrchIdx = 0;
+ pData->dynSrchType = 0;
+ pData->asyncRepl = 0;
+ pData->tplName = NULL;
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+ int iNumTpls;
+CODESTARTnewActInst
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
- p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
+
CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "server")) {
+ pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "serverport")) {
+ pData->port = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "searchindex")) {
+ pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "searchtype")) {
+ pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) {
+ pData->dynSrchIdx = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) {
+ pData->dynSrchType = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "timeout")) {
+ pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) {
+ pData->asyncRepl = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("omelasticsearch: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+ if(pData->dynSrchIdx && pData->searchIndex == NULL) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "omelasticsearch: requested dynamic search index, but no "
+ "name for index template given - action definition invalid");
+ ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED);
+ }
+ if(pData->dynSrchType && pData->searchType == NULL) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "omelasticsearch: requested dynamic search type, but no "
+ "name for type template given - action definition invalid");
+ ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED);
+ }
- /* check if a non-standard template is to be applied */
- if(*(p-1) == ';')
- --p;
- CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt"));
+ iNumTpls = 1;
+ if(pData->dynSrchIdx) ++iNumTpls;
+ if(pData->dynSrchType) ++iNumTpls;
+ DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls);
+ CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls)
+
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
+ " StdJSONFmt" : (char*)pData->tplName),
+ OMSR_NO_RQD_TPL_OPTS));
+
+
+ /* we need to request additional templates. If we have a dynamic search index,
+ * it will always be string 1. Type may be 1 or 2, depending on whether search
+ * index is dynamic as well. Rule needs to be followed throughout the module.
+ */
+ if(pData->dynSrchIdx) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchIndex),
+ OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynSrchType) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynSrchType) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ }
+
+ if(pData->server == NULL)
+ pData->server = (uchar*) strdup("localhost");
+ if(pData->searchIndex == NULL)
+ pData->searchIndex = (uchar*) strdup("system");
+ if(pData->searchType == NULL)
+ pData->searchType = (uchar*) strdup("events");
- /* all good, we can now initialise our private data */
CHKiRet(curlSetup(pData));
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(!strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "omelasticsearch supports only v6 config format, use: "
+ "action(type=\"omelasticsearch\" server=...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
+
BEGINmodExit
CODESTARTmodExit
curl_global_cleanup();
@@ -219,18 +405,9 @@ BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
-static rsRetVal
-resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
-{
- DEFiRet;
- restPort = 9200;
- hostName = "localhost";
- searchIndex = "system";
- searchType = "events";
- RETiRet;
-}
BEGINmodInit()
CODESTARTmodInit
@@ -239,13 +416,6 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(statsobj, CORE_COMPONENT));
- /* register config file handlers */
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
-
if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED);
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index ed61e65e..ff9e1291 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -93,7 +93,7 @@ static uchar template_StdUsrMsgFmt[] = "\" %syslogtag%%msg%\n\r\"";
static uchar template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-mysql%', '%timegenerated:::date-mysql%', %iut%, '%syslogtag%')\",SQL";
static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-pgsql%', '%timegenerated:::date-pgsql%', %iut%, '%syslogtag%')\",STDSQL";
static uchar template_spoofadr[] = "\"%fromhost-ip%\"";
-static uchar template_StdJSONFmt[] = "\"{\\\"message\\\":\\\"%msg%\\\",\\\"fromhost\\\":\\\"%HOSTNAME%\\\",\\\"facility\\\":\\\"%syslogfacility-text%\\\",\\\"priority\\\":\\\"%syslogpriority-text%\\\",\\\"timereported\\\":\\\"%timereported:::date-rfc3339%\\\",\\\"timegenerated\\\":\\\"%timegenerated:::date-rfc3339%\\\"}\",JSON";
+static uchar template_StdJSONFmt[] = "\"{\\\"message\\\":\\\"%msg:::json%\\\",\\\"fromhost\\\":\\\"%HOSTNAME:::json%\\\",\\\"facility\\\":\\\"%syslogfacility-text%\\\",\\\"priority\\\":\\\"%syslogpriority-text%\\\",\\\"timereported\\\":\\\"%timereported:::date-rfc3339%\\\",\\\"timegenerated\\\":\\\"%timegenerated:::date-rfc3339%\\\"}\"";
/* end templates */
void cnfDoCfsysline(char *ln);