summaryrefslogtreecommitdiffstats
path: root/plugins/omelasticsearch
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-05-25 08:03:19 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2012-05-25 08:03:19 +0200
commitc7ca67a37586164a028c52a4d0cd9328b09e8697 (patch)
tree76ff8d4492ad3565ad81e74f45b18e9c2db8dd9d /plugins/omelasticsearch
parenta4c0d08c78ef27891b192973174b997a0c7c4aa1 (diff)
downloadrsyslog-c7ca67a37586164a028c52a4d0cd9328b09e8697.tar.gz
rsyslog-c7ca67a37586164a028c52a4d0cd9328b09e8697.tar.xz
rsyslog-c7ca67a37586164a028c52a4d0cd9328b09e8697.zip
omelasticsearch: test commit, first shot at bulk interface
This obviously does not work correctly. So expect problems if you set bulkmode="on".
Diffstat (limited to 'plugins/omelasticsearch')
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c96
1 files changed, 85 insertions, 11 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index 7642d603..1705c605 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -76,6 +76,11 @@ typedef struct _instanceData {
sbool dynSrchType;
sbool bulkmode;
sbool asyncRepl;
+ struct {
+ es_str_t *data;
+ uchar *currTpl1;
+ uchar *currTpl2;
+ } batch;
CURL *curlHandle; /* libcurl session handle */
HEADER *postHeader; /* json POST request info */
} instanceData;
@@ -160,7 +165,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
char authBuf[1024];
uchar *searchIndex;
uchar *searchType;
+ uchar *bulkpart;
+ bulkpart = (pData->bulkmode) ? "/_bulk" : "";
if(pData->dynSrchIdx) {
searchIndex = tpl1;
if(pData->dynSrchType)
@@ -176,18 +183,19 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
}
if(pData->asyncRepl) {
if(pData->timeout != NULL) {
- snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?"
+ snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?"
"replication=async&timeout=%s",
pData->server, pData->port, searchIndex, searchType,
- pData->timeout);
+ bulkpart, pData->timeout);
} else {
- snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?"
+ snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?"
"replication=async",
- pData->server, pData->port, searchIndex, searchType);
+ pData->server, pData->port, searchIndex, searchType,
+ bulkpart);
}
} else {
- snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s",
- pData->server, pData->port, searchIndex, searchType);
+ snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s",
+ pData->server, pData->port, searchIndex, searchType, bulkpart);
}
curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
@@ -202,12 +210,44 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
return RS_RET_OK;
}
+
+/* this method does not directly submit but builds a batch instead. It
+ * may submit, if we have dynamic index/type and the current type or
+ * index changes.
+ */
+static rsRetVal
+buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2)
+{
+ int length = strlen((char *)message);
+ int r;
+ DEFiRet;
+
+#if 0
+ if(instance->dynSrchIdx || instance->dynSrchType)
+ CHKiRet(setCurlURL(instance, tpl1, tpl2));
+#endif
+
+ //if(pData->batch.currTpl1 == NULL
+ r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1);
+ if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length);
+ if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1);
+
+ if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1);
+ if(r != 0) {
+ DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ iRet = RS_RET_DEFER_COMMIT;
+
+finalize_it:
+ RETiRet;
+}
+
static rsRetVal
-curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2)
+curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2)
{
CURLcode code;
CURL *curl = instance->curlHandle;
- int length = strlen((char *)message);
DEFiRet;
if(instance->dynSrchIdx || instance->dynSrchType)
@@ -215,8 +255,9 @@ curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2)
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
- curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
code = curl_easy_perform(curl);
+dbgprintf("curl_easy_perform result: %d\n", code);
switch (code) {
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_COULDNT_RESOLVE_PROXY:
@@ -235,25 +276,43 @@ finalize_it:
BEGINbeginTransaction
CODESTARTbeginTransaction
dbgprintf("omelasticsearch: beginTransaction\n");
+ if(!pData->bulkmode) {
+ FINALIZE;
+ }
+
+ es_emptyStr(pData->batch.data);
+finalize_it:
ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
- CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2]));
+ if(pData->bulkmode) {
+ CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2]));
+ } else {
+ CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
+ ppString[1], ppString[2]));
+ }
finalize_it:
+dbgprintf("omelasticsearch: result doAction: %d\n", iRet);
ENDdoAction
BEGINendTransaction
+ char *cstr;
CODESTARTendTransaction
-dbgprintf("elasticsearch: endTransaction\n");
+ cstr = es_str2cstr(pData->batch.data, NULL);
+ dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr);
+ CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr),
+ NULL, NULL));
#if 0
if(pData->offsSndBuf != 0) {
iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf);
pData->offsSndBuf = 0;
}
#endif
+finalize_it:
+ free(cstr);
ENDendTransaction
/* elasticsearch POST result string ... useful for debugging */
@@ -266,6 +325,11 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
static char ok[] = "{\"ok\":true,";
ASSERT(size == 1);
+DBGPRINTF("omelasticsearch request: %s\n", jsonData);
+DBGPRINTF("omelasticsearch result: ");
+for (i = 0; i < nmemb; i++)
+ DBGPRINTF("%c", p[i]);
+DBGPRINTF("\n");
if (size == 1 &&
nmemb > sizeof(ok)-1 &&
@@ -399,6 +463,16 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED);
}
+ if(pData->bulkmode) {
+ pData->batch.currTpl1 = NULL;
+ pData->batch.currTpl2 = NULL;
+ if((pData->batch.data = es_newStr(1024)) == NULL) {
+ DBGPRINTF("omelasticsearch: error creating batch string "
+ "turned off bulk mode\n");
+ pData->bulkmode = 0; /* at least it works */
+ }
+ }
+
iNumTpls = 1;
if(pData->dynSrchIdx) ++iNumTpls;
if(pData->dynSrchType) ++iNumTpls;