summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-07-10 17:15:03 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2012-07-10 17:15:03 +0200
commit68056a6128b9ebc8d65791b2647030d36c73f014 (patch)
tree7460f65e1e4a0379c2e586bcb62bc4a2c0963e88
parent4d453967cbff1f09becab38a2ad10b05df476eaf (diff)
downloadrsyslog-68056a6128b9ebc8d65791b2647030d36c73f014.tar.gz
rsyslog-68056a6128b9ebc8d65791b2647030d36c73f014.tar.xz
rsyslog-68056a6128b9ebc8d65791b2647030d36c73f014.zip
omelasticsearch: support for parameters parent & dynparent added
-rw-r--r--ChangeLog3
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c115
2 files changed, 98 insertions, 20 deletions
diff --git a/ChangeLog b/ChangeLog
index 09664a88..6219f04b 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,7 @@
---------------------------------------------------------------------------
+Version 6.3.13 [BETA] 2012-07-??
+- omelasticsearch: support for parameters parent & dynparent added
+---------------------------------------------------------------------------
Version 6.3.12 [BETA] 2012-07-02
- support for elasticsearch via omelasticsearch added
Note that this module has been tested quite well by a number of folks,
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index a1f3b8ab..5ddb66da 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -70,10 +70,12 @@ typedef struct _instanceData {
uchar *pwd;
uchar *searchIndex;
uchar *searchType;
+ uchar *parent;
uchar *tplName;
uchar *timeout;
sbool dynSrchIdx;
sbool dynSrchType;
+ sbool dynParent;
sbool bulkmode;
sbool asyncRepl;
struct {
@@ -95,8 +97,10 @@ static struct cnfparamdescr actpdescr[] = {
{ "pwd", eCmdHdlrGetWord, 0 },
{ "searchindex", eCmdHdlrGetWord, 0 },
{ "searchtype", eCmdHdlrGetWord, 0 },
+ { "parent", eCmdHdlrGetWord, 0 },
{ "dynsearchindex", eCmdHdlrBinary, 0 },
{ "dynsearchtype", eCmdHdlrBinary, 0 },
+ { "dynparent", eCmdHdlrBinary, 0 },
{ "bulkmode", eCmdHdlrBinary, 0 },
{ "asyncrepl", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
@@ -133,6 +137,7 @@ CODESTARTfreeInstance
free(pData->pwd);
free(pData->searchIndex);
free(pData->searchType);
+ free(pData->parent);
free(pData->tplName);
ENDfreeInstance
@@ -146,9 +151,11 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : "");
dbgprintf("\tsearch index='%s'\n", pData->searchIndex);
dbgprintf("\tsearch index='%s'\n", pData->searchType);
+ dbgprintf("\tparent='%s'\n", pData->parent);
dbgprintf("\ttimeout='%s'\n", pData->timeout);
dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx);
dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType);
+ dbgprintf("\tdynamic parent=%d\n", pData->dynParent);
dbgprintf("\tasync replication=%d\n", pData->asyncRepl);
dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
ENDdbgPrintInstInfo
@@ -197,10 +204,11 @@ checkConn(instanceData *pData)
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
- dbgprintf("omelasticsearch: checkConn() curl_easy_perform() "
+ DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() "
"failed: %s\n", curl_easy_strerror(res));
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ DBGPRINTF("omelasticsearch: checkConn() completed with success\n");
finalize_it:
if(curl != NULL)
@@ -218,38 +226,62 @@ ENDtryResume
/* get the current index and type for this message */
static inline void
-getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex,
- uchar **srchType)
+getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3,
+ uchar **srchIndex, uchar **srchType, uchar **parent)
{
if(pData->dynSrchIdx) {
*srchIndex = tpl1;
- if(pData->dynSrchType)
+ if(pData->dynSrchType) {
*srchType = tpl2;
- else
+ if(pData->dynParent) {
+ *parent = tpl3;
+ } else {
+ *parent = pData->parent;
+ }
+ } else {
*srchType = pData->searchType;
+ if(pData->dynParent) {
+ *parent = tpl2;
+ } else {
+ *parent = pData->parent;
+ }
+ }
} else {
*srchIndex = pData->searchIndex;
- if(pData->dynSrchType)
+ if(pData->dynSrchType) {
*srchType = tpl1;
- else
+ if(pData->dynParent) {
+ *parent = tpl2;
+ } else {
+ *parent = pData->parent;
+ }
+ } else {
*srchType = pData->searchType;
+ if(pData->dynParent) {
+ *parent = tpl1;
+ } else {
+ *parent = pData->parent;
+ }
+ }
}
}
static rsRetVal
-setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
+setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3)
{
char authBuf[1024];
char *restURL;
uchar *searchIndex;
uchar *searchType;
+ uchar *parent;
es_str_t *url;
int rLocal;
int r;
DEFiRet;
- getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType);
+ getIndexTypeAndParent(pData, tpl1, tpl2, tpl3,
+ &searchIndex, &searchType, &parent);
setBaseURL(pData, &url);
if(pData->bulkmode) {
@@ -267,6 +299,11 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
if(pData->timeout != NULL) {
if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1);
if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout));
+ if(r == 0) r = es_addChar(&url, '&');
+ }
+ if(parent != NULL) {
+ if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1);
+ if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent));
}
restURL = es_str2cstr(url, NULL);
curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
@@ -296,24 +333,29 @@ finalize_it:
* index changes.
*/
static rsRetVal
-buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2)
+buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3)
{
int length = strlen((char *)message);
int r;
uchar *searchIndex;
uchar *searchType;
+ uchar *parent;
DEFiRet;
# define META_STRT "{\"index\":{\"_index\": \""
# define META_TYPE "\",\"_type\":\""
+# define META_PARENT "\",\"_parent\":\""
# define META_END "\"}}\n"
- getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType);
+ getIndexTypeAndParent(pData, tpl1, tpl2, tpl3,
+ &searchIndex, &searchType, &parent);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType,
ustrlen(searchType));
+ if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
+ if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent));
if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length);
if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1);
@@ -328,14 +370,15 @@ finalize_it:
}
static rsRetVal
-curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2)
+curlPost(instanceData *instance, uchar *message, int msglen,
+ uchar *tpl1, uchar *tpl2, uchar *tpl3)
{
CURLcode code;
CURL *curl = instance->curlHandle;
DEFiRet;
- if(instance->dynSrchIdx || instance->dynSrchType)
- CHKiRet(setCurlURL(instance, tpl1, tpl2));
+ if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent)
+ CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
@@ -376,11 +419,11 @@ ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
if(pData->bulkmode) {
- CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2]));
+ CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3]));
} else {
dbgprintf("omelasticsearch: doAction calling curlPost\n");
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
- ppString[1], ppString[2]));
+ ppString[1], ppString[2], ppString[3]));
}
finalize_it:
dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode);
@@ -393,7 +436,7 @@ CODESTARTendTransaction
dbgprintf("omelasticsearch: endTransaction init\n");
cstr = es_str2cstr(pData->batch.data, NULL);
dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr);
- CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL));
+ CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL));
finalize_it:
free(cstr);
dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet);
@@ -455,13 +498,14 @@ curlSetup(instanceData *pData)
pData->curlHandle = handle;
pData->postHeader = header;
- if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) {
+ if( pData->bulkmode
+ || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) {
/* in this case, we know no tpls are involved in the request-->NULL OK! */
- setCurlURL(pData, NULL, NULL);
+ setCurlURL(pData, NULL, NULL, NULL);
}
if(Debug) {
- if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0)
+ if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)
dbgprintf("omelasticsearch setup, using static REST URL\n");
else
dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n");
@@ -478,9 +522,11 @@ setInstParamDefaults(instanceData *pData)
pData->pwd = NULL;
pData->searchIndex = NULL;
pData->searchType = NULL;
+ pData->parent = NULL;
pData->timeout = NULL;
pData->dynSrchIdx = 0;
pData->dynSrchType = 0;
+ pData->dynParent = 0;
pData->asyncRepl = 0;
pData->bulkmode = 0;
pData->tplName = NULL;
@@ -513,10 +559,14 @@ CODESTARTnewActInst
pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "searchtype")) {
pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "parent")) {
+ pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) {
pData->dynSrchIdx = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) {
pData->dynSrchType = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "dynparent")) {
+ pData->dynParent = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "bulkmode")) {
pData->bulkmode = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "timeout")) {
@@ -549,6 +599,12 @@ CODESTARTnewActInst
"name for type template given - action definition invalid");
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
+ if(pData->dynParent && pData->parent == NULL) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
+ "omelasticsearch: requested dynamic parent, but no "
+ "name for parent template given - action definition invalid");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
if(pData->bulkmode) {
pData->batch.currTpl1 = NULL;
@@ -563,6 +619,7 @@ CODESTARTnewActInst
iNumTpls = 1;
if(pData->dynSrchIdx) ++iNumTpls;
if(pData->dynSrchType) ++iNumTpls;
+ if(pData->dynParent) ++iNumTpls;
DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls);
CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls)
@@ -581,11 +638,29 @@ CODESTARTnewActInst
if(pData->dynSrchType) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynParent) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynParent) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
} else {
if(pData->dynSrchType) {
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynParent) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynParent) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
}