From f9bb245a4ae06151897a4a68d6c22ba88a6ecee8 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 13 Apr 2012 12:12:39 +0200 Subject: omelasticsearch: permit dynamic index/type parameters (just like dynafile) --- plugins/omelasticsearch/omelasticsearch.c | 123 ++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 16 deletions(-) diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 8965d40b..1143de06 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -43,6 +43,7 @@ #include "errmsg.h" #include "statsobj.h" #include "cfsysline.h" +#include "unicode-helper.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -69,6 +70,8 @@ typedef struct _instanceData { uchar *searchIndex; uchar *searchType; uchar *tplName; + sbool dynSrchIdx; + sbool dynSrchType; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -81,6 +84,8 @@ static struct cnfparamdescr actpdescr[] = { { "serverport", eCmdHdlrInt, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "dynsearchindex", eCmdHdlrBinary, 0 }, + { "dynsearchtype", eCmdHdlrBinary, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -123,12 +128,44 @@ BEGINtryResume CODESTARTtryResume ENDtryResume -rsRetVal -curlPost(instanceData *instance, uchar *message) + +static rsRetVal +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + uchar *searchIndex; + uchar *searchType; + + if(pData->dynSrchIdx) { + searchIndex = tpl1; + if(pData->dynSrchType) + searchType = tpl2; + else + searchType = pData->searchType; + } else { + searchIndex = pData->searchIndex; + if(pData->dynSrchType) + searchType = tpl1; + else + searchType = pData->searchType; + } + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); + return RS_RET_OK; +} + +static rsRetVal +curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; int length = strlen((char *)message); + DEFiRet; + + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -145,11 +182,13 @@ curlPost(instanceData *instance, uchar *message) STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); return RS_RET_OK; } +finalize_it: + RETiRet; } BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0])); + CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction @@ -181,10 +220,10 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) return size * nmemb; } + static rsRetVal -curlSetup(instanceData *instance) +curlSetup(instanceData *pData) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ HEADER *header; CURL *handle; @@ -193,19 +232,26 @@ curlSetup(instanceData *instance) return RS_RET_OBJ_CREATION_FAILED; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - instance->server, instance->port, instance->searchIndex, instance->searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); - curl_easy_setopt(handle, CURLOPT_URL, restURL); curl_easy_setopt(handle, CURLOPT_POST, 1); - instance->curlHandle = handle; - instance->postHeader = header; + pData->curlHandle = handle; + pData->postHeader = header; - DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { + /* in this case, we know no tpls are involved --> NULL OK! */ + setCurlURL(pData, NULL, NULL); + } + + if(Debug) { + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + dbgprintf("omelasticsearch setup, using static REST URL\n"); + else + dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); + } return RS_RET_OK; } @@ -216,12 +262,15 @@ setInstParamDefaults(instanceData *pData) pData->port = 9200; pData->searchIndex = NULL; pData->searchType = NULL; + pData->dynSrchIdx = 0; + pData->dynSrchType = 0; pData->tplName = NULL; } BEGINnewActInst struct cnfparamvals *pvals; int i; + int iNumTpls; CODESTARTnewActInst if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); @@ -230,7 +279,6 @@ CODESTARTnewActInst CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); - CODE_STD_STRING_REQUESTparseSelectorAct(1) for(i = 0 ; i < actpblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; @@ -238,10 +286,14 @@ CODESTARTnewActInst pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; - } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) { + } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "searchType")) { + } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { + pData->dynSrchIdx = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { + pData->dynSrchType = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { @@ -249,11 +301,49 @@ CODESTARTnewActInst "param '%s'\n", actpblk.descr[i].name); } } + + if(pData->dynSrchIdx && pData->searchIndex == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search index, but no " + "name for index template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + if(pData->dynSrchType && pData->searchType == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search type, but no " + "name for type template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + + iNumTpls = 1; + if(pData->dynSrchIdx) ++iNumTpls; + if(pData->dynSrchType) ++iNumTpls; + DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); + CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); + + /* we need to request additional templates. If we have a dynamic search index, + * it will always be string 1. Type may be 1 or 2, depending on whether search + * index is dynamic as well. Rule needs to be followed throughout the module. + */ + if(pData->dynSrchIdx) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchIndex), + OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } + if(pData->server == NULL) pData->server = (uchar*) strdup("localhost"); if(pData->searchIndex == NULL) @@ -261,9 +351,10 @@ CODESTARTnewActInst if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); + CHKiRet(curlSetup(pData)); + CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); - CHKiRet(curlSetup(pData)); ENDnewActInst -- cgit