summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-04-13 12:12:39 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2012-04-13 12:12:39 +0200
commitf9bb245a4ae06151897a4a68d6c22ba88a6ecee8 (patch)
tree6ad03c837e719dad6d3afbf3eef7fbc4b3f77033
parent4f043bcd3575d4e61015240cbea6fdfb92c9e4f8 (diff)
downloadrsyslog-f9bb245a4ae06151897a4a68d6c22ba88a6ecee8.tar.gz
rsyslog-f9bb245a4ae06151897a4a68d6c22ba88a6ecee8.tar.xz
rsyslog-f9bb245a4ae06151897a4a68d6c22ba88a6ecee8.zip
omelasticsearch: permit dynamic index/type parameters (just like dynafile)
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c123
1 files changed, 107 insertions, 16 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index 8965d40b..1143de06 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -43,6 +43,7 @@
#include "errmsg.h"
#include "statsobj.h"
#include "cfsysline.h"
+#include "unicode-helper.h"
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
@@ -69,6 +70,8 @@ typedef struct _instanceData {
uchar *searchIndex;
uchar *searchType;
uchar *tplName;
+ sbool dynSrchIdx;
+ sbool dynSrchType;
CURL *curlHandle; /* libcurl session handle */
HEADER *postHeader; /* json POST request info */
} instanceData;
@@ -81,6 +84,8 @@ static struct cnfparamdescr actpdescr[] = {
{ "serverport", eCmdHdlrInt, 0 },
{ "searchindex", eCmdHdlrGetWord, 0 },
{ "searchtype", eCmdHdlrGetWord, 0 },
+ { "dynsearchindex", eCmdHdlrBinary, 0 },
+ { "dynsearchtype", eCmdHdlrBinary, 0 },
{ "template", eCmdHdlrGetWord, 1 }
};
static struct cnfparamblk actpblk =
@@ -123,12 +128,44 @@ BEGINtryResume
CODESTARTtryResume
ENDtryResume
-rsRetVal
-curlPost(instanceData *instance, uchar *message)
+
+static rsRetVal
+setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
+{
+ char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */
+ uchar *searchIndex;
+ uchar *searchType;
+
+ if(pData->dynSrchIdx) {
+ searchIndex = tpl1;
+ if(pData->dynSrchType)
+ searchType = tpl2;
+ else
+ searchType = pData->searchType;
+ } else {
+ searchIndex = pData->searchIndex;
+ if(pData->dynSrchType)
+ searchType = tpl1;
+ else
+ searchType = pData->searchType;
+ }
+ snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s",
+ pData->server, pData->port, searchIndex, searchType);
+ curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
+ DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL);
+ return RS_RET_OK;
+}
+
+static rsRetVal
+curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2)
{
CURLcode code;
CURL *curl = instance->curlHandle;
int length = strlen((char *)message);
+ DEFiRet;
+
+ if(instance->dynSrchIdx || instance->dynSrchType)
+ CHKiRet(setCurlURL(instance, tpl1, tpl2));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
@@ -145,11 +182,13 @@ curlPost(instanceData *instance, uchar *message)
STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
return RS_RET_OK;
}
+finalize_it:
+ RETiRet;
}
BEGINdoAction
CODESTARTdoAction
- CHKiRet(curlPost(pData, ppString[0]));
+ CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2]));
finalize_it:
ENDdoAction
@@ -181,10 +220,10 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
return size * nmemb;
}
+
static rsRetVal
-curlSetup(instanceData *instance)
+curlSetup(instanceData *pData)
{
- char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */
HEADER *header;
CURL *handle;
@@ -193,19 +232,26 @@ curlSetup(instanceData *instance)
return RS_RET_OBJ_CREATION_FAILED;
}
- snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s",
- instance->server, instance->port, instance->searchIndex, instance->searchType);
header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
+ curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header);
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
- curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header);
- curl_easy_setopt(handle, CURLOPT_URL, restURL);
curl_easy_setopt(handle, CURLOPT_POST, 1);
- instance->curlHandle = handle;
- instance->postHeader = header;
+ pData->curlHandle = handle;
+ pData->postHeader = header;
- DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL);
+ if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) {
+ /* in this case, we know no tpls are involved --> NULL OK! */
+ setCurlURL(pData, NULL, NULL);
+ }
+
+ if(Debug) {
+ if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0)
+ dbgprintf("omelasticsearch setup, using static REST URL\n");
+ else
+ dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n");
+ }
return RS_RET_OK;
}
@@ -216,12 +262,15 @@ setInstParamDefaults(instanceData *pData)
pData->port = 9200;
pData->searchIndex = NULL;
pData->searchType = NULL;
+ pData->dynSrchIdx = 0;
+ pData->dynSrchType = 0;
pData->tplName = NULL;
}
BEGINnewActInst
struct cnfparamvals *pvals;
int i;
+ int iNumTpls;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
@@ -230,7 +279,6 @@ CODESTARTnewActInst
CHKiRet(createInstance(&pData));
setInstParamDefaults(pData);
- CODE_STD_STRING_REQUESTparseSelectorAct(1)
for(i = 0 ; i < actpblk.nParams ; ++i) {
if(!pvals[i].bUsed)
continue;
@@ -238,10 +286,14 @@ CODESTARTnewActInst
pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "serverport")) {
pData->port = (int) pvals[i].val.d.n, NULL;
- } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) {
+ } else if(!strcmp(actpblk.descr[i].name, "searchindex")) {
pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "searchType")) {
+ } else if(!strcmp(actpblk.descr[i].name, "searchtype")) {
pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) {
+ pData->dynSrchIdx = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) {
+ pData->dynSrchType = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
@@ -249,11 +301,49 @@ CODESTARTnewActInst
"param '%s'\n", actpblk.descr[i].name);
}
}
+
+ if(pData->dynSrchIdx && pData->searchIndex == NULL) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "omelasticsearch: requested dynamic search index, but no "
+ "name for index template given - action definition invalid");
+ ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED);
+ }
+ if(pData->dynSrchType && pData->searchType == NULL) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "omelasticsearch: requested dynamic search type, but no "
+ "name for type template given - action definition invalid");
+ ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED);
+ }
+
+ iNumTpls = 1;
+ if(pData->dynSrchIdx) ++iNumTpls;
+ if(pData->dynSrchType) ++iNumTpls;
+ DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls);
+ CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls)
CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
" StdJSONFmt" : (char*)pData->tplName),
OMSR_NO_RQD_TPL_OPTS));
+
+ /* we need to request additional templates. If we have a dynamic search index,
+ * it will always be string 1. Type may be 1 or 2, depending on whether search
+ * index is dynamic as well. Rule needs to be followed throughout the module.
+ */
+ if(pData->dynSrchIdx) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchIndex),
+ OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynSrchType) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynSrchType) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ }
+
if(pData->server == NULL)
pData->server = (uchar*) strdup("localhost");
if(pData->searchIndex == NULL)
@@ -261,9 +351,10 @@ CODESTARTnewActInst
if(pData->searchType == NULL)
pData->searchType = (uchar*) strdup("events");
+ CHKiRet(curlSetup(pData));
+
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
- CHKiRet(curlSetup(pData));
ENDnewActInst