diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-07-24 12:48:39 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-07-24 12:48:39 +0200 |
commit | cf100be58c69e34f42b15c03b6104139d37eb47b (patch) | |
tree | c6ea5b5eb7e6b6080498ef03882aa54cda417c9b /plugins | |
parent | 87311d12771fc94e2d324e73922bd5e0b07de64e (diff) | |
parent | 686270440c601d5a4e3eac246397f60248889f5f (diff) | |
download | rsyslog-cf100be58c69e34f42b15c03b6104139d37eb47b.tar.gz rsyslog-cf100be58c69e34f42b15c03b6104139d37eb47b.tar.xz rsyslog-cf100be58c69e34f42b15c03b6104139d37eb47b.zip |
Merge branch 'beta'
Conflicts:
ChangeLog
configure.ac
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imptcp/imptcp.c | 4 | ||||
-rw-r--r-- | plugins/imttcp/imttcp.c | 2 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 240 |
3 files changed, 196 insertions, 50 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 6961a696..aa1ad81e 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -1587,7 +1587,7 @@ CODEmodInit_QueryRegCFSLineHdlr /* initialize "read-only" thread attributes */ pthread_attr_init(&wrkrThrdAttr); - pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024); + pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024); /* init legacy config settings */ initConfigSettings(); @@ -1616,7 +1616,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0, - eCmdHdlrGetWord, NULL, cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); + eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/imttcp/imttcp.c b/plugins/imttcp/imttcp.c index 5ed714fa..c72886b3 100644 --- a/plugins/imttcp/imttcp.c +++ b/plugins/imttcp/imttcp.c @@ -1127,7 +1127,7 @@ CODEmodInit_QueryRegCFSLineHdlr /* initialize "read-only" thread attributes */ pthread_attr_init(&sessThrdAttr); pthread_attr_setdetachstate(&sessThrdAttr, PTHREAD_CREATE_DETACHED); - pthread_attr_setstacksize(&sessThrdAttr, 200*1024); + pthread_attr_setstacksize(&sessThrdAttr, 4096*1024); /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputttcpserverrun"), 0, eCmdHdlrGetWord, diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index d8db7307..f77caeca 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -70,10 +70,12 @@ typedef struct _instanceData { uchar *pwd; uchar *searchIndex; uchar *searchType; + uchar *parent; uchar *tplName; uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool dynParent; sbool bulkmode; sbool asyncRepl; struct { @@ -95,8 +97,10 @@ static struct cnfparamdescr actpdescr[] = { { "pwd", eCmdHdlrGetWord, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "parent", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "dynparent", eCmdHdlrBinary, 0 }, { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, @@ -133,6 +137,7 @@ CODESTARTfreeInstance free(pData->pwd); free(pData->searchIndex); free(pData->searchType); + free(pData->parent); free(pData->tplName); ENDfreeInstance @@ -143,66 +148,146 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tserver='%s'\n", pData->server); dbgprintf("\tserverport=%d\n", pData->port); dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); - dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); dbgprintf("\tsearch index='%s'\n", pData->searchIndex); dbgprintf("\tsearch index='%s'\n", pData->searchType); + dbgprintf("\tparent='%s'\n", pData->parent); dbgprintf("\ttimeout='%s'\n", pData->timeout); dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo + +/* Build basic URL part, which includes hostname and port as follows: + * http://hostname:port/ + * Newly creates an estr for this purpose. + */ +static rsRetVal +setBaseURL(instanceData *pData, es_str_t **url) +{ + char portBuf[64]; + int r; + DEFiRet; + + *url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + r = es_addBuf(url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(url, ':'); + if(r == 0) r = es_addBuf(url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(url, '/'); + RETiRet; +} + + +static inline rsRetVal +checkConn(instanceData *pData) +{ + es_str_t *url; + CURL *curl = NULL; + CURLcode res; + char *cstr; + DEFiRet; + + setBaseURL(pData, &url); + curl = curl_easy_init(); + if(curl == NULL) { + DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + cstr = es_str2cstr(url, NULL); + curl_easy_setopt(curl, CURLOPT_URL, cstr); + free(cstr); + + res = curl_easy_perform(curl); + if(res != CURLE_OK) { + DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " + "failed: %s\n", curl_easy_strerror(res)); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); + +finalize_it: + if(curl != NULL) + curl_easy_cleanup(curl); + RETiRet; +} + + BEGINtryResume CODESTARTtryResume + DBGPRINTF("omelasticsearch: tryResume called\n"); + iRet = checkConn(pData); ENDtryResume /* get the current index and type for this message */ static inline void -getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, - uchar **srchType) +getIndexTypeAndParent(instanceData *pData, uchar **tpls, + uchar **srchIndex, uchar **srchType, uchar **parent) { if(pData->dynSrchIdx) { - *srchIndex = tpl1; - if(pData->dynSrchType) - *srchType = tpl2; - else + *srchIndex = tpls[1]; + if(pData->dynSrchType) { + *srchType = tpls[2]; + if(pData->dynParent) { + *parent = tpls[3]; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpls[2]; + } else { + *parent = pData->parent; + } + } } else { *srchIndex = pData->searchIndex; - if(pData->dynSrchType) - *srchType = tpl1; - else + if(pData->dynSrchType) { + *srchType = tpls[1]; + if(pData->dynParent) { + *parent = tpls[2]; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpls[1]; + } else { + *parent = pData->parent; + } + } } } static rsRetVal -setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; - char portBuf[64]; char *restURL; uchar *searchIndex; uchar *searchType; + uchar *parent; es_str_t *url; + int rLocal; int r; + DEFiRet; - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); - url = es_newStr(128); - snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + setBaseURL(pData, &url); - r = es_addBuf(&url, "http://", sizeof("http://")-1); - if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); - if(r == 0) r = es_addChar(&url, ':'); - if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); - if(r == 0) r = es_addChar(&url, '/'); if(pData->bulkmode) { - if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + parent = NULL; } else { - if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } @@ -214,21 +299,32 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) if(pData->timeout != NULL) { if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + if(r == 0) r = es_addChar(&url, '&'); + } + if(parent != NULL) { + if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); + if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); es_deleteStr(url); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); free(restURL); if(pData->uid != NULL) { - snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, - (pData->pwd == NULL) ? "" : (char*)pData->pwd); - //TODO: create better code, check errors! + rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, + (pData->pwd == NULL) ? "" : (char*)pData->pwd); + if(rLocal != (int) es_strlen(url)) { + errmsg.LogError(0, RS_RET_ERR, "omelasticsearch: snprintf failed " + "when trying to build auth string (return %d)\n", + rLocal); + ABORT_FINALIZE(RS_RET_ERR); + } curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - return RS_RET_OK; +finalize_it: + RETiRet; } @@ -237,25 +333,33 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +buildBatch(instanceData *pData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; uchar *searchIndex; uchar *searchType; + uchar *parent; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" +# define META_PARENT "\",\"_parent\":\"" # define META_END "\"}}\n" -#warning TODO: use dynamic index/type! - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); +dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); +dbgprintf("AAA: searchType: '%s'\n", searchType); +dbgprintf("AAA: parent: '%s'\n", parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, ustrlen(searchType)); + if(parent != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -270,25 +374,30 @@ finalize_it: } static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) { CURLcode code; CURL *curl = instance->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); + if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) + CHKiRet(setCurlURL(instance, tpls)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); +dbgprintf("omelasticsearch: do curl_easy_perform()\n"); code = curl_easy_perform(curl); +DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_CONNECT: case CURLE_WRITE_ERROR: STATSCOUNTER_INC(indexConFail, mutIndexConFail); + DBGPRINTF("omelasticsearch: we are suspending ourselfs due " + "to failure %lld of curl_easy_perform()\n", + (long long) code); return RS_RET_SUSPENDED; default: STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); @@ -313,24 +422,27 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { +dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString[1], ppString[2])); + ppString)); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d\n", iRet); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); ENDdoAction BEGINendTransaction char *cstr; CODESTARTendTransaction +dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); - dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); + dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL)); finalize_it: free(cstr); +dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -353,11 +465,13 @@ DBGPRINTF("\n"); nmemb > sizeof(ok)-1 && strncmp(p, ok, sizeof(ok)-1) == 0) { STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); +dbgprintf("omelasticsearch ok\n"); } else { +dbgprintf("omelasticsearch fail\n"); STATSCOUNTER_INC(indexFailed, mutIndexFailed); if (Debug) { - DBGPRINTF("omelasticsearch request: %s\n", jsonData); - DBGPRINTF("omelasticsearch result: "); + DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); + DBGPRINTF("omelasticsearch (fail) result: "); for (i = 0; i < nmemb; i++) DBGPRINTF("%c", p[i]); DBGPRINTF("\n"); @@ -387,13 +501,14 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + if( pData->bulkmode + || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL, NULL); + setCurlURL(pData, NULL); } if(Debug) { - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0) dbgprintf("omelasticsearch setup, using static REST URL\n"); else dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); @@ -410,9 +525,11 @@ setInstParamDefaults(instanceData *pData) pData->pwd = NULL; pData->searchIndex = NULL; pData->searchType = NULL; + pData->parent = NULL; pData->timeout = NULL; pData->dynSrchIdx = 0; pData->dynSrchType = 0; + pData->dynParent = 0; pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; @@ -445,10 +562,14 @@ CODESTARTnewActInst pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "parent")) { + pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynparent")) { + pData->dynParent = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { pData->bulkmode = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "timeout")) { @@ -470,16 +591,22 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_UID_MISSING); } if(pData->dynSrchIdx && pData->searchIndex == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search index, but no " "name for index template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->dynSrchType && pData->searchType == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search type, but no " "name for type template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if(pData->dynParent && pData->parent == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic parent, but no " + "name for parent template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->bulkmode) { @@ -495,6 +622,7 @@ CODESTARTnewActInst iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; + if(pData->dynParent) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) @@ -513,11 +641,29 @@ CODESTARTnewActInst if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } |