diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-05-25 08:03:19 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-05-25 08:03:19 +0200 |
commit | c7ca67a37586164a028c52a4d0cd9328b09e8697 (patch) | |
tree | 76ff8d4492ad3565ad81e74f45b18e9c2db8dd9d /plugins/omelasticsearch | |
parent | a4c0d08c78ef27891b192973174b997a0c7c4aa1 (diff) | |
download | rsyslog-c7ca67a37586164a028c52a4d0cd9328b09e8697.tar.gz rsyslog-c7ca67a37586164a028c52a4d0cd9328b09e8697.tar.xz rsyslog-c7ca67a37586164a028c52a4d0cd9328b09e8697.zip |
omelasticsearch: test commit, first shot at bulk interface
This obviously does not work correctly. So expect problems if you
set bulkmode="on".
Diffstat (limited to 'plugins/omelasticsearch')
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 96 |
1 files changed, 85 insertions, 11 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 7642d603..1705c605 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -76,6 +76,11 @@ typedef struct _instanceData { sbool dynSrchType; sbool bulkmode; sbool asyncRepl; + struct { + es_str_t *data; + uchar *currTpl1; + uchar *currTpl2; + } batch; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -160,7 +165,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) char authBuf[1024]; uchar *searchIndex; uchar *searchType; + uchar *bulkpart; + bulkpart = (pData->bulkmode) ? "/_bulk" : ""; if(pData->dynSrchIdx) { searchIndex = tpl1; if(pData->dynSrchType) @@ -176,18 +183,19 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) } if(pData->asyncRepl) { if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async&timeout=%s", pData->server, pData->port, searchIndex, searchType, - pData->timeout); + bulkpart, pData->timeout); } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async", - pData->server, pData->port, searchIndex, searchType); + pData->server, pData->port, searchIndex, searchType, + bulkpart); } } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - pData->server, pData->port, searchIndex, searchType); + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", + pData->server, pData->port, searchIndex, searchType, bulkpart); } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); @@ -202,12 +210,44 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) return RS_RET_OK; } + +/* this method does not directly submit but builds a batch instead. It + * may submit, if we have dynamic index/type and the current type or + * index changes. + */ +static rsRetVal +buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +{ + int length = strlen((char *)message); + int r; + DEFiRet; + +#if 0 + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); +#endif + + //if(pData->batch.currTpl1 == NULL + r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); + + if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r != 0) { + DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); + ABORT_FINALIZE(RS_RET_ERR); + } + iRet = RS_RET_DEFER_COMMIT; + +finalize_it: + RETiRet; +} + static rsRetVal -curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; - int length = strlen((char *)message); DEFiRet; if(instance->dynSrchIdx || instance->dynSrchType) @@ -215,8 +255,9 @@ curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); +dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -235,25 +276,43 @@ finalize_it: BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("omelasticsearch: beginTransaction\n"); + if(!pData->bulkmode) { + FINALIZE; + } + + es_emptyStr(pData->batch.data); +finalize_it: ENDbeginTransaction BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); + if(pData->bulkmode) { + CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + } else { + CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), + ppString[1], ppString[2])); + } finalize_it: +dbgprintf("omelasticsearch: result doAction: %d\n", iRet); ENDdoAction BEGINendTransaction + char *cstr; CODESTARTendTransaction -dbgprintf("elasticsearch: endTransaction\n"); + cstr = es_str2cstr(pData->batch.data, NULL); + dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), + NULL, NULL)); #if 0 if(pData->offsSndBuf != 0) { iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); pData->offsSndBuf = 0; } #endif +finalize_it: + free(cstr); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -266,6 +325,11 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) static char ok[] = "{\"ok\":true,"; ASSERT(size == 1); +DBGPRINTF("omelasticsearch request: %s\n", jsonData); +DBGPRINTF("omelasticsearch result: "); +for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); +DBGPRINTF("\n"); if (size == 1 && nmemb > sizeof(ok)-1 && @@ -399,6 +463,16 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); } + if(pData->bulkmode) { + pData->batch.currTpl1 = NULL; + pData->batch.currTpl2 = NULL; + if((pData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; |