summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-05-25 18:55:29 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2012-05-25 18:55:29 +0200
commit5f85909b17920172121d2ff8367c8185623f1409 (patch)
treead8e0c0e67d150d522caf72d1216d158f67510ea
parentc7ca67a37586164a028c52a4d0cd9328b09e8697 (diff)
downloadrsyslog-5f85909b17920172121d2ff8367c8185623f1409.tar.gz
rsyslog-5f85909b17920172121d2ff8367c8185623f1409.tar.xz
rsyslog-5f85909b17920172121d2ff8367c8185623f1409.zip
omelasticsearc: milestone, bulk insert basically works
dynamic index&type is not yet used, but easy to add (did not manage to get it in today...)
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c115
1 files changed, 65 insertions, 50 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index 1705c605..704c9950 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -158,46 +158,67 @@ CODESTARTtryResume
ENDtryResume
+/* get the current index and type for this message */
+static inline void
+getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex,
+ uchar **srchType)
+{
+ if(pData->dynSrchIdx) {
+ *srchIndex = tpl1;
+ if(pData->dynSrchType)
+ *srchType = tpl2;
+ else
+ *srchType = pData->searchType;
+ } else {
+ *srchIndex = pData->searchIndex;
+ if(pData->dynSrchType)
+ *srchType = tpl1;
+ else
+ *srchType = pData->searchType;
+ }
+}
+
+
static rsRetVal
setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
{
- char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */
char authBuf[1024];
+ char portBuf[64];
+ char *restURL;
uchar *searchIndex;
uchar *searchType;
- uchar *bulkpart;
+ es_str_t *url;
+ int r;
- bulkpart = (pData->bulkmode) ? "/_bulk" : "";
- if(pData->dynSrchIdx) {
- searchIndex = tpl1;
- if(pData->dynSrchType)
- searchType = tpl2;
- else
- searchType = pData->searchType;
+ getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType);
+ url = es_newStr(128);
+ snprintf(portBuf, sizeof(portBuf), "%d", pData->port);
+
+ r = es_addBuf(&url, "http://", sizeof("http://")-1);
+ if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server));
+ if(r == 0) r = es_addChar(&url, ':');
+ if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf));
+ if(r == 0) r = es_addChar(&url, '/');
+ if(pData->bulkmode) {
+ if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1);
} else {
- searchIndex = pData->searchIndex;
- if(pData->dynSrchType)
- searchType = tpl1;
- else
- searchType = pData->searchType;
+ if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
+ if(r == 0) r = es_addChar(&url, '/');
+ if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
}
+ if(r == 0) r = es_addChar(&url, '?');
if(pData->asyncRepl) {
- if(pData->timeout != NULL) {
- snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?"
- "replication=async&timeout=%s",
- pData->server, pData->port, searchIndex, searchType,
- bulkpart, pData->timeout);
- } else {
- snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?"
- "replication=async",
- pData->server, pData->port, searchIndex, searchType,
- bulkpart);
- }
- } else {
- snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s",
- pData->server, pData->port, searchIndex, searchType, bulkpart);
+ if(r == 0) r = es_addBuf(&url, "replication=async&",
+ sizeof("replication=async&")-1);
}
+ if(pData->timeout != NULL) {
+ if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1);
+ if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout));
+ }
+ restURL = es_str2cstr(url, NULL);
curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
+ es_deleteStr(url);
+ free(restURL);
if(pData->uid != NULL) {
snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid,
@@ -221,18 +242,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2)
int length = strlen((char *)message);
int r;
DEFiRet;
-
-#if 0
- if(instance->dynSrchIdx || instance->dynSrchType)
- CHKiRet(setCurlURL(instance, tpl1, tpl2));
-#endif
-
- //if(pData->batch.currTpl1 == NULL
- r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1);
+# define META_STRT "{\"index\":{\"_index\": \""
+# define META_TYPE "\",\"_type\":\""
+# define META_END "\"}}\n"
+
+#warning TODO: use dynamic index/type!
+ r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
+ if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex,
+ ustrlen(pData->searchIndex));
+ if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
+ if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType,
+ ustrlen(pData->searchType));
+ if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length);
- if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1);
-
- if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1);
+ if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1);
if(r != 0) {
DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r);
ABORT_FINALIZE(RS_RET_ERR);
@@ -257,7 +280,6 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
code = curl_easy_perform(curl);
-dbgprintf("curl_easy_perform result: %d\n", code);
switch (code) {
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_COULDNT_RESOLVE_PROXY:
@@ -303,14 +325,7 @@ BEGINendTransaction
CODESTARTendTransaction
cstr = es_str2cstr(pData->batch.data, NULL);
dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr);
- CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr),
- NULL, NULL));
-#if 0
- if(pData->offsSndBuf != 0) {
- iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf);
- pData->offsSndBuf = 0;
- }
-#endif
+ CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL));
finalize_it:
free(cstr);
ENDendTransaction
@@ -369,8 +384,8 @@ curlSetup(instanceData *pData)
pData->curlHandle = handle;
pData->postHeader = header;
- if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) {
- /* in this case, we know no tpls are involved --> NULL OK! */
+ if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) {
+ /* in this case, we know no tpls are involved in the request-->NULL OK! */
setCurlURL(pData, NULL, NULL);
}