diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-05-25 18:55:29 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-05-25 18:55:29 +0200 |
commit | 5f85909b17920172121d2ff8367c8185623f1409 (patch) | |
tree | ad8e0c0e67d150d522caf72d1216d158f67510ea /plugins/omelasticsearch/omelasticsearch.c | |
parent | c7ca67a37586164a028c52a4d0cd9328b09e8697 (diff) | |
download | rsyslog-5f85909b17920172121d2ff8367c8185623f1409.tar.gz rsyslog-5f85909b17920172121d2ff8367c8185623f1409.tar.xz rsyslog-5f85909b17920172121d2ff8367c8185623f1409.zip |
omelasticsearc: milestone, bulk insert basically works
dynamic index&type is not yet used, but easy to add (did not manage
to get it in today...)
Diffstat (limited to 'plugins/omelasticsearch/omelasticsearch.c')
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 115 |
1 files changed, 65 insertions, 50 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 1705c605..704c9950 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -158,46 +158,67 @@ CODESTARTtryResume ENDtryResume +/* get the current index and type for this message */ +static inline void +getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, + uchar **srchType) +{ + if(pData->dynSrchIdx) { + *srchIndex = tpl1; + if(pData->dynSrchType) + *srchType = tpl2; + else + *srchType = pData->searchType; + } else { + *srchIndex = pData->searchIndex; + if(pData->dynSrchType) + *srchType = tpl1; + else + *srchType = pData->searchType; + } +} + + static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ char authBuf[1024]; + char portBuf[64]; + char *restURL; uchar *searchIndex; uchar *searchType; - uchar *bulkpart; + es_str_t *url; + int r; - bulkpart = (pData->bulkmode) ? "/_bulk" : ""; - if(pData->dynSrchIdx) { - searchIndex = tpl1; - if(pData->dynSrchType) - searchType = tpl2; - else - searchType = pData->searchType; + getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + + r = es_addBuf(&url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(&url, ':'); + if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(&url, '/'); + if(pData->bulkmode) { + if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); } else { - searchIndex = pData->searchIndex; - if(pData->dynSrchType) - searchType = tpl1; - else - searchType = pData->searchType; + if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + if(r == 0) r = es_addChar(&url, '/'); + if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } + if(r == 0) r = es_addChar(&url, '?'); if(pData->asyncRepl) { - if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async&timeout=%s", - pData->server, pData->port, searchIndex, searchType, - bulkpart, pData->timeout); - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async", - pData->server, pData->port, searchIndex, searchType, - bulkpart); - } - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", - pData->server, pData->port, searchIndex, searchType, bulkpart); + if(r == 0) r = es_addBuf(&url, "replication=async&", + sizeof("replication=async&")-1); } + if(pData->timeout != NULL) { + if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + } + restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + es_deleteStr(url); + free(restURL); if(pData->uid != NULL) { snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -221,18 +242,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) int length = strlen((char *)message); int r; DEFiRet; - -#if 0 - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); -#endif - - //if(pData->batch.currTpl1 == NULL - r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); +# define META_STRT "{\"index\":{\"_index\": \"" +# define META_TYPE "\",\"_type\":\"" +# define META_END "\"}}\n" + +#warning TODO: use dynamic index/type! + r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex, + ustrlen(pData->searchIndex)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType, + ustrlen(pData->searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); - - if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); @@ -257,7 +280,6 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); -dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -303,14 +325,7 @@ BEGINendTransaction CODESTARTendTransaction cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), - NULL, NULL)); -#if 0 - if(pData->offsSndBuf != 0) { - iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); - pData->offsSndBuf = 0; - } -#endif + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); finalize_it: free(cstr); ENDendTransaction @@ -369,8 +384,8 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { - /* in this case, we know no tpls are involved --> NULL OK! */ + if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + /* in this case, we know no tpls are involved in the request-->NULL OK! */ setCurlURL(pData, NULL, NULL); } |