From ebaf375ed108b14c5a5a3af62067df988506bfec Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 9 Jul 2012 17:28:40 +0200 Subject: omelasticsearch: better debug instrumentation --- plugins/omelasticsearch/omelasticsearch.c | 48 +++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 15 deletions(-) (limited to 'plugins') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index d8db7307..c18c1c52 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -143,7 +143,7 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tserver='%s'\n", pData->server); dbgprintf("\tserverport=%d\n", pData->port); dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); - dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); dbgprintf("\tsearch index='%s'\n", pData->searchIndex); dbgprintf("\tsearch index='%s'\n", pData->searchType); dbgprintf("\ttimeout='%s'\n", pData->timeout); @@ -155,6 +155,7 @@ ENDdbgPrintInstInfo BEGINtryResume CODESTARTtryResume + DBGPRINTF("omelasticsearch: tryResume called\n"); ENDtryResume @@ -188,7 +189,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) uchar *searchIndex; uchar *searchType; es_str_t *url; + int rLocal; int r; + DEFiRet; getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); url = es_newStr(128); @@ -218,17 +221,23 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); es_deleteStr(url); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); free(restURL); if(pData->uid != NULL) { - snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, - (pData->pwd == NULL) ? "" : (char*)pData->pwd); - //TODO: create better code, check errors! + rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, + (pData->pwd == NULL) ? "" : (char*)pData->pwd); + if(rLocal != (int) es_strlen(url)) { + errmsg.LogError(0, RS_RET_ERR, "omelasticsearch: snprintf failed " + "when trying to build auth string (return %d)\n", + rLocal); + ABORT_FINALIZE(RS_RET_ERR); + } curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - return RS_RET_OK; +finalize_it: + RETiRet; } @@ -248,7 +257,6 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) # define META_TYPE "\",\"_type\":\"" # define META_END "\"}}\n" -#warning TODO: use dynamic index/type! getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, @@ -282,13 +290,18 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); +dbgprintf("omelasticsearch: do curl_easy_perform()\n"); code = curl_easy_perform(curl); +DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_CONNECT: case CURLE_WRITE_ERROR: STATSCOUNTER_INC(indexConFail, mutIndexConFail); + DBGPRINTF("omelasticsearch: we are suspending ourselfs due " + "to failure %lld of curl_easy_perform()\n", + (long long) code); return RS_RET_SUSPENDED; default: STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); @@ -315,22 +328,25 @@ CODESTARTdoAction if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); } else { +dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), ppString[1], ppString[2])); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d\n", iRet); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); ENDdoAction BEGINendTransaction char *cstr; CODESTARTendTransaction +dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); - dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); + dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); finalize_it: free(cstr); +dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -353,11 +369,13 @@ DBGPRINTF("\n"); nmemb > sizeof(ok)-1 && strncmp(p, ok, sizeof(ok)-1) == 0) { STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); +dbgprintf("omelasticsearch ok\n"); } else { +dbgprintf("omelasticsearch fail\n"); STATSCOUNTER_INC(indexFailed, mutIndexFailed); if (Debug) { - DBGPRINTF("omelasticsearch request: %s\n", jsonData); - DBGPRINTF("omelasticsearch result: "); + DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); + DBGPRINTF("omelasticsearch (fail) result: "); for (i = 0; i < nmemb; i++) DBGPRINTF("%c", p[i]); DBGPRINTF("\n"); @@ -470,16 +488,16 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_UID_MISSING); } if(pData->dynSrchIdx && pData->searchIndex == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search index, but no " "name for index template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->dynSrchType && pData->searchType == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search type, but no " "name for type template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->bulkmode) { -- cgit From 4d453967cbff1f09becab38a2ad10b05df476eaf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 10 Jul 2012 12:10:34 +0200 Subject: omelasticsearch: implement retry via request to es server homepage --- plugins/omelasticsearch/omelasticsearch.c | 70 ++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 10 deletions(-) (limited to 'plugins') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index c18c1c52..a1f3b8ab 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -153,9 +153,66 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo + +/* Build basic URL part, which includes hostname and port as follows: + * http://hostname:port/ + * Newly creates an estr for this purpose. + */ +static rsRetVal +setBaseURL(instanceData *pData, es_str_t **url) +{ + char portBuf[64]; + int r; + DEFiRet; + + *url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + r = es_addBuf(url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(url, ':'); + if(r == 0) r = es_addBuf(url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(url, '/'); + RETiRet; +} + + +static inline rsRetVal +checkConn(instanceData *pData) +{ + es_str_t *url; + CURL *curl = NULL; + CURLcode res; + char *cstr; + DEFiRet; + + setBaseURL(pData, &url); + curl = curl_easy_init(); + if(curl == NULL) { + DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + cstr = es_str2cstr(url, NULL); + curl_easy_setopt(curl, CURLOPT_URL, cstr); + free(cstr); + + res = curl_easy_perform(curl); + if(res != CURLE_OK) { + dbgprintf("omelasticsearch: checkConn() curl_easy_perform() " + "failed: %s\n", curl_easy_strerror(res)); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(curl != NULL) + curl_easy_cleanup(curl); + RETiRet; +} + + BEGINtryResume CODESTARTtryResume DBGPRINTF("omelasticsearch: tryResume called\n"); + iRet = checkConn(pData); ENDtryResume @@ -184,7 +241,6 @@ static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { char authBuf[1024]; - char portBuf[64]; char *restURL; uchar *searchIndex; uchar *searchType; @@ -194,18 +250,12 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) DEFiRet; getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); - url = es_newStr(128); - snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + setBaseURL(pData, &url); - r = es_addBuf(&url, "http://", sizeof("http://")-1); - if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); - if(r == 0) r = es_addChar(&url, ':'); - if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); - if(r == 0) r = es_addChar(&url, '/'); if(pData->bulkmode) { - if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); } else { - if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } -- cgit From 68056a6128b9ebc8d65791b2647030d36c73f014 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 10 Jul 2012 17:15:03 +0200 Subject: omelasticsearch: support for parameters parent & dynparent added --- plugins/omelasticsearch/omelasticsearch.c | 115 ++++++++++++++++++++++++------ 1 file changed, 95 insertions(+), 20 deletions(-) (limited to 'plugins') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a1f3b8ab..5ddb66da 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -70,10 +70,12 @@ typedef struct _instanceData { uchar *pwd; uchar *searchIndex; uchar *searchType; + uchar *parent; uchar *tplName; uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool dynParent; sbool bulkmode; sbool asyncRepl; struct { @@ -95,8 +97,10 @@ static struct cnfparamdescr actpdescr[] = { { "pwd", eCmdHdlrGetWord, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "parent", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "dynparent", eCmdHdlrBinary, 0 }, { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, @@ -133,6 +137,7 @@ CODESTARTfreeInstance free(pData->pwd); free(pData->searchIndex); free(pData->searchType); + free(pData->parent); free(pData->tplName); ENDfreeInstance @@ -146,9 +151,11 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); dbgprintf("\tsearch index='%s'\n", pData->searchIndex); dbgprintf("\tsearch index='%s'\n", pData->searchType); + dbgprintf("\tparent='%s'\n", pData->parent); dbgprintf("\ttimeout='%s'\n", pData->timeout); dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo @@ -197,10 +204,11 @@ checkConn(instanceData *pData) res = curl_easy_perform(curl); if(res != CURLE_OK) { - dbgprintf("omelasticsearch: checkConn() curl_easy_perform() " + DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } + DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: if(curl != NULL) @@ -218,38 +226,62 @@ ENDtryResume /* get the current index and type for this message */ static inline void -getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, - uchar **srchType) +getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3, + uchar **srchIndex, uchar **srchType, uchar **parent) { if(pData->dynSrchIdx) { *srchIndex = tpl1; - if(pData->dynSrchType) + if(pData->dynSrchType) { *srchType = tpl2; - else + if(pData->dynParent) { + *parent = tpl3; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpl2; + } else { + *parent = pData->parent; + } + } } else { *srchIndex = pData->searchIndex; - if(pData->dynSrchType) + if(pData->dynSrchType) { *srchType = tpl1; - else + if(pData->dynParent) { + *parent = tpl2; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpl1; + } else { + *parent = pData->parent; + } + } } } static rsRetVal -setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) { char authBuf[1024]; char *restURL; uchar *searchIndex; uchar *searchType; + uchar *parent; es_str_t *url; int rLocal; int r; DEFiRet; - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, + &searchIndex, &searchType, &parent); setBaseURL(pData, &url); if(pData->bulkmode) { @@ -267,6 +299,11 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) if(pData->timeout != NULL) { if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + if(r == 0) r = es_addChar(&url, '&'); + } + if(parent != NULL) { + if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); + if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); @@ -296,24 +333,29 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3) { int length = strlen((char *)message); int r; uchar *searchIndex; uchar *searchType; + uchar *parent; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" +# define META_PARENT "\",\"_parent\":\"" # define META_END "\"}}\n" - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, + &searchIndex, &searchType, &parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, ustrlen(searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -328,14 +370,15 @@ finalize_it: } static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, + uchar *tpl1, uchar *tpl2, uchar *tpl3) { CURLcode code; CURL *curl = instance->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); + if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) + CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -376,11 +419,11 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3])); } else { dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString[1], ppString[2])); + ppString[1], ppString[2], ppString[3])); } finalize_it: dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); @@ -393,7 +436,7 @@ CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL)); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -455,13 +498,14 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + if( pData->bulkmode + || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL, NULL); + setCurlURL(pData, NULL, NULL, NULL); } if(Debug) { - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0) dbgprintf("omelasticsearch setup, using static REST URL\n"); else dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); @@ -478,9 +522,11 @@ setInstParamDefaults(instanceData *pData) pData->pwd = NULL; pData->searchIndex = NULL; pData->searchType = NULL; + pData->parent = NULL; pData->timeout = NULL; pData->dynSrchIdx = 0; pData->dynSrchType = 0; + pData->dynParent = 0; pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; @@ -513,10 +559,14 @@ CODESTARTnewActInst pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "parent")) { + pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynparent")) { + pData->dynParent = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { pData->bulkmode = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "timeout")) { @@ -549,6 +599,12 @@ CODESTARTnewActInst "name for type template given - action definition invalid"); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + if(pData->dynParent && pData->parent == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic parent, but no " + "name for parent template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } if(pData->bulkmode) { pData->batch.currTpl1 = NULL; @@ -563,6 +619,7 @@ CODESTARTnewActInst iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; + if(pData->dynParent) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) @@ -581,11 +638,29 @@ CODESTARTnewActInst if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } -- cgit From afe402d2418d181a1fd5f469ec4c4cf5c394d8c5 Mon Sep 17 00:00:00 2001 From: Andre Lorbach Date: Wed, 11 Jul 2012 03:08:32 -0700 Subject: bugfix: imtcp aborted when more than 2 connections were used. Incremented pthread stack size to 4MB for imtcp, imptcp and imttcp closes: http://bugzilla.adiscon.com/show_bug.cgi?id=342 --- plugins/imptcp/imptcp.c | 2 +- plugins/imttcp/imttcp.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'plugins') diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 6961a696..b63e7ca3 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -1587,7 +1587,7 @@ CODEmodInit_QueryRegCFSLineHdlr /* initialize "read-only" thread attributes */ pthread_attr_init(&wrkrThrdAttr); - pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024); + pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024); /* init legacy config settings */ initConfigSettings(); diff --git a/plugins/imttcp/imttcp.c b/plugins/imttcp/imttcp.c index 5ed714fa..c72886b3 100644 --- a/plugins/imttcp/imttcp.c +++ b/plugins/imttcp/imttcp.c @@ -1127,7 +1127,7 @@ CODEmodInit_QueryRegCFSLineHdlr /* initialize "read-only" thread attributes */ pthread_attr_init(&sessThrdAttr); pthread_attr_setdetachstate(&sessThrdAttr, PTHREAD_CREATE_DETACHED); - pthread_attr_setstacksize(&sessThrdAttr, 200*1024); + pthread_attr_setstacksize(&sessThrdAttr, 4096*1024); /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputttcpserverrun"), 0, eCmdHdlrGetWord, -- cgit From fdbc4cb666b4fc92562ece1fba97227c40237e04 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 11 Jul 2012 17:12:34 +0200 Subject: omelasticsearch: regression from "parent" feature could case aborts this was not present in any released version --- plugins/omelasticsearch/omelasticsearch.c | 49 ++++++++++++++++--------------- 1 file changed, 26 insertions(+), 23 deletions(-) (limited to 'plugins') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 5ddb66da..f77caeca 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -226,22 +226,22 @@ ENDtryResume /* get the current index and type for this message */ static inline void -getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3, +getIndexTypeAndParent(instanceData *pData, uchar **tpls, uchar **srchIndex, uchar **srchType, uchar **parent) { if(pData->dynSrchIdx) { - *srchIndex = tpl1; + *srchIndex = tpls[1]; if(pData->dynSrchType) { - *srchType = tpl2; + *srchType = tpls[2]; if(pData->dynParent) { - *parent = tpl3; + *parent = tpls[3]; } else { *parent = pData->parent; } } else { *srchType = pData->searchType; if(pData->dynParent) { - *parent = tpl2; + *parent = tpls[2]; } else { *parent = pData->parent; } @@ -249,16 +249,16 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3 } else { *srchIndex = pData->searchIndex; if(pData->dynSrchType) { - *srchType = tpl1; + *srchType = tpls[1]; if(pData->dynParent) { - *parent = tpl2; + *parent = tpls[2]; } else { *parent = pData->parent; } } else { *srchType = pData->searchType; if(pData->dynParent) { - *parent = tpl1; + *parent = tpls[1]; } else { *parent = pData->parent; } @@ -268,7 +268,7 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3 static rsRetVal -setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) +setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; char *restURL; @@ -280,13 +280,13 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) int r; DEFiRet; - getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, - &searchIndex, &searchType, &parent); setBaseURL(pData, &url); if(pData->bulkmode) { r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + parent = NULL; } else { + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); @@ -333,7 +333,7 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3) +buildBatch(instanceData *pData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; @@ -346,16 +346,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar # define META_PARENT "\",\"_parent\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, - &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); +dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); +dbgprintf("AAA: searchType: '%s'\n", searchType); +dbgprintf("AAA: parent: '%s'\n", parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, ustrlen(searchType)); - if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + if(parent != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -370,15 +374,14 @@ finalize_it: } static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, - uchar *tpl1, uchar *tpl2, uchar *tpl3) +curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) { CURLcode code; CURL *curl = instance->curlHandle; DEFiRet; if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) - CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3)); + CHKiRet(setCurlURL(instance, tpls)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -419,11 +422,11 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3])); + CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString[1], ppString[2], ppString[3])); + ppString)); } finalize_it: dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); @@ -436,7 +439,7 @@ CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL)); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL)); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -501,7 +504,7 @@ curlSetup(instanceData *pData) if( pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL, NULL, NULL); + setCurlURL(pData, NULL); } if(Debug) { -- cgit From 686270440c601d5a4e3eac246397f60248889f5f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 24 Jul 2012 12:40:13 +0200 Subject: bugfix: imptcp aborted when $InputPTCPServerBindRuleset was used --- plugins/imptcp/imptcp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins') diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index b63e7ca3..aa1ad81e 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -1616,7 +1616,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0, - eCmdHdlrGetWord, NULL, cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); + eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit -- cgit