From 09d3320ac258a3d78e02b29fc3806043af4fbf65 Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
Date: Mon, 2 Jul 2012 15:55:02 +0200
Subject: preparing for 6.3.12
---
ChangeLog | 4 +---
configure.ac | 2 +-
doc/manual.html | 2 +-
3 files changed, 3 insertions(+), 5 deletions(-)
diff --git a/ChangeLog b/ChangeLog
index 6ba46438..09664a88 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,12 +1,10 @@
---------------------------------------------------------------------------
-Version 6.3.13 [BETA] 2012-06-??
+Version 6.3.12 [BETA] 2012-07-02
- support for elasticsearch via omelasticsearch added
Note that this module has been tested quite well by a number of folks,
and this is why we merge in new functionality in a late beta stage.
Even if problems would exist, only users of omelasticsearch would
experience them, making it a pretty safe addition.
----------------------------------------------------------------------------
-Version 6.3.12 [BETA] 2012-06-18
- bugfix: $ActionName was not properly honored
Thanks to Abby Edwards for alerting us
---------------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 3c068d0d..8b5e3bd0 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
-AC_INIT([rsyslog],[6.3.11],[rsyslog@lists.adiscon.com])
+AC_INIT([rsyslog],[6.3.12],[rsyslog@lists.adiscon.com])
AM_INIT_AUTOMAKE
m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
diff --git a/doc/manual.html b/doc/manual.html
index 1bc8f1f7..12667161 100644
--- a/doc/manual.html
+++ b/doc/manual.html
@@ -19,7 +19,7 @@ rsyslog support available directly from the source!
Please visit the rsyslog sponsor's page
to honor the project sponsors or become one yourself! We are very grateful for any help towards the
project goals.
-This documentation is for version 6.3.11 (beta branch) of rsyslog.
+
This documentation is for version 6.3.12 (beta branch) of rsyslog.
Visit the rsyslog status page
to obtain current version information and project status.
If you like rsyslog, you might
--
cgit
From 3dd44b02d6251c519b04c3147425622603dd4754 Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
Date: Thu, 5 Jul 2012 17:15:07 +0200
Subject: debug log: emit (some) action queue parameters to debug log
---
action.c | 4 +---
runtime/queue.c | 26 ++++++++++++++++++++++++++
runtime/queue.h | 1 +
3 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/action.c b/action.c
index f33fece9..5db6d735 100644
--- a/action.c
+++ b/action.c
@@ -478,9 +478,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
# undef setQPROP
# undef setQPROPstr
- dbgoprint((obj_t*) pThis->pQueue, "save on shutdown %d, max disk space allowed %lld\n",
- cs.bActionQSaveOnShutdown, cs.iActionQueMaxDiskSpace);
-
+ qqueueDbgPrint(pThis->pQueue);
DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue);
diff --git a/runtime/queue.c b/runtime/queue.c
index a2f80d29..6b85f013 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -225,6 +225,32 @@ finalize_it:
/* methods */
+void
+qqueueDbgPrint(qqueue_t *pThis)
+{
+ dbgoprint((obj_t*) pThis, "size %d messages.\n", pThis->iMaxQueueSize);
+ dbgoprint((obj_t*) pThis, "worker threads: %d, wThread shutdown: %d, Perists every %d updates.\n",
+ pThis->iNumWorkerThreads, pThis->toWrkShutdown, pThis->iPersistUpdCnt);
+ dbgoprint((obj_t*) pThis, "queue timeouts: shutdown: %d, action completion shutdown: %d, enq: %d\n",
+ pThis->toQShutdown, pThis->toActShutdown, pThis->toEnq);
+ dbgoprint((obj_t*) pThis, "queue watermarks: high: %d, low: %d, discard: %d, discard-severity: %d\n",
+ pThis->iHighWtrMrk, pThis->iLowWtrMrk,
+ pThis->iDiscardMrk, pThis->iDiscardSeverity);
+ dbgoprint((obj_t*) pThis, "queue save on shutdown %d, max disk space allowed %lld\n",
+ pThis->bSaveOnShutdown, pThis->sizeOnDiskMax);
+ dbgoprint((obj_t*) pThis, "dequeueBatchSize: %d\n", pThis->iDeqBatchSize);
+ /* TODO: add
+ iActionRetryCount = 0;
+ iActionRetryInterval = 30000;
+ static int iMainMsgQtoWrkMinMsgs = 100;
+ static int iMainMsgQbSaveOnShutdown = 1;
+ iMainMsgQueMaxDiskSpace = 0;
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100);
+ setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1);
+ */
+}
+
+
/* get the physical queue size. Must only be called
* while mutex is locked!
* rgerhards, 2008-01-29
diff --git a/runtime/queue.h b/runtime/queue.h
index 3841615a..edb770c6 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -195,6 +195,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals);
void qqueueSetDefaultsActionQueue(qqueue_t *pThis);
+void qqueueDbgPrint(qqueue_t *pThis);
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
--
cgit
From 00ada18a67892d5d5f4197cb5180cb6453571b4d Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
Date: Thu, 5 Jul 2012 17:53:44 +0200
Subject: debug log: cleaned up & streamlined queue param output
---
runtime/queue.c | 63 +++++++++++++++++++++++++++++++++++++++------------------
1 file changed, 43 insertions(+), 20 deletions(-)
diff --git a/runtime/queue.c b/runtime/queue.c
index 6b85f013..5d69da24 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -224,30 +224,53 @@ finalize_it:
/* methods */
+static inline char *
+getQueueTypeName(queueType_t t)
+{
+ char *r;
+
+ switch(t) {
+ case QUEUETYPE_FIXED_ARRAY:
+ r = "FixedArray";
+ case QUEUETYPE_LINKEDLIST:
+ r = "LinkedList";
+ case QUEUETYPE_DISK:
+ r = "Disk";
+ case QUEUETYPE_DIRECT:
+ r = "Direct";
+ }
+ return r;
+}
void
qqueueDbgPrint(qqueue_t *pThis)
{
- dbgoprint((obj_t*) pThis, "size %d messages.\n", pThis->iMaxQueueSize);
- dbgoprint((obj_t*) pThis, "worker threads: %d, wThread shutdown: %d, Perists every %d updates.\n",
- pThis->iNumWorkerThreads, pThis->toWrkShutdown, pThis->iPersistUpdCnt);
- dbgoprint((obj_t*) pThis, "queue timeouts: shutdown: %d, action completion shutdown: %d, enq: %d\n",
- pThis->toQShutdown, pThis->toActShutdown, pThis->toEnq);
- dbgoprint((obj_t*) pThis, "queue watermarks: high: %d, low: %d, discard: %d, discard-severity: %d\n",
- pThis->iHighWtrMrk, pThis->iLowWtrMrk,
- pThis->iDiscardMrk, pThis->iDiscardSeverity);
- dbgoprint((obj_t*) pThis, "queue save on shutdown %d, max disk space allowed %lld\n",
- pThis->bSaveOnShutdown, pThis->sizeOnDiskMax);
- dbgoprint((obj_t*) pThis, "dequeueBatchSize: %d\n", pThis->iDeqBatchSize);
- /* TODO: add
- iActionRetryCount = 0;
- iActionRetryInterval = 30000;
- static int iMainMsgQtoWrkMinMsgs = 100;
- static int iMainMsgQbSaveOnShutdown = 1;
- iMainMsgQueMaxDiskSpace = 0;
- setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100);
- setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1);
- */
+ dbgoprint((obj_t*) pThis, "parameter dump:\n");
+ dbgoprint((obj_t*) pThis, "queue.filename '%s'\n",
+ (pThis->pszFilePrefix == NULL) ? "[NONE]" : (char*)pThis->pszFilePrefix);
+ dbgoprint((obj_t*) pThis, "queue.size: %d\n", pThis->iMaxQueueSize);
+ dbgoprint((obj_t*) pThis, "queue.dequeuebatchsize: %d\n", pThis->iDeqBatchSize);
+ dbgoprint((obj_t*) pThis, "queue.maxdiskspace: %lld\n", pThis->iMaxFileSize);
+ dbgoprint((obj_t*) pThis, "queue.highwatermark: %d\n", pThis->iHighWtrMrk);
+ dbgoprint((obj_t*) pThis, "queue.lowwatermark: %d\n", pThis->iLowWtrMrk);
+ dbgoprint((obj_t*) pThis, "queue.fulldelaymark: %d\n", pThis->iFullDlyMrk);
+ dbgoprint((obj_t*) pThis, "queue.lightdelaymark: %d\n", pThis->iLightDlyMrk);
+ dbgoprint((obj_t*) pThis, "queue.discardmark: %d\n", pThis->iDiscardMrk);
+ dbgoprint((obj_t*) pThis, "queue.discardseverity: %d\n", pThis->iDiscardSeverity);
+ dbgoprint((obj_t*) pThis, "queue.checkpointinterval: %d\n", pThis->iPersistUpdCnt);
+ dbgoprint((obj_t*) pThis, "queue.syncqueuefiles: %d\n", pThis->bSyncQueueFiles);
+ dbgoprint((obj_t*) pThis, "queue.type: %d [%s]\n", pThis->qType, getQueueTypeName(pThis->qType));
+ dbgoprint((obj_t*) pThis, "queue.workerthreads: %d\n", pThis->iNumWorkerThreads);
+ dbgoprint((obj_t*) pThis, "queue.timeoutshutdown: %d\n", pThis->toQShutdown);
+ dbgoprint((obj_t*) pThis, "queue.timeoutactioncompletion: %d\n", pThis->toActShutdown);
+ dbgoprint((obj_t*) pThis, "queue.timeoutenqueue: %d\n", pThis->toEnq);
+ dbgoprint((obj_t*) pThis, "queue.timeoutworkerthreadshutdown: %d\n", pThis->toWrkShutdown);
+ dbgoprint((obj_t*) pThis, "queue.workerthreadminimummessages: %d\n", pThis->iMinMsgsPerWrkr);
+ dbgoprint((obj_t*) pThis, "queue.maxfilesize: %lld\n", pThis->iMaxFileSize);
+ dbgoprint((obj_t*) pThis, "queue.saveonshutdown: %d\n", pThis->bSaveOnShutdown);
+ dbgoprint((obj_t*) pThis, "queue.dequeueslowdown: %d\n", pThis->iDeqSlowdown);
+ dbgoprint((obj_t*) pThis, "queue.dequeuetimebegin: %d\n", pThis->iDeqtWinFromHr);
+ dbgoprint((obj_t*) pThis, "queuedequeuetimend.: %d\n", pThis->iDeqtWinToHr);
}
--
cgit
From ebaf375ed108b14c5a5a3af62067df988506bfec Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
Date: Mon, 9 Jul 2012 17:28:40 +0200
Subject: omelasticsearch: better debug instrumentation
---
plugins/omelasticsearch/omelasticsearch.c | 48 +++++++++++++++++++++----------
runtime/msg.c | 2 --
2 files changed, 33 insertions(+), 17 deletions(-)
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index d8db7307..c18c1c52 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -143,7 +143,7 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tserver='%s'\n", pData->server);
dbgprintf("\tserverport=%d\n", pData->port);
dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid);
- dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : "");
+ dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : "");
dbgprintf("\tsearch index='%s'\n", pData->searchIndex);
dbgprintf("\tsearch index='%s'\n", pData->searchType);
dbgprintf("\ttimeout='%s'\n", pData->timeout);
@@ -155,6 +155,7 @@ ENDdbgPrintInstInfo
BEGINtryResume
CODESTARTtryResume
+ DBGPRINTF("omelasticsearch: tryResume called\n");
ENDtryResume
@@ -188,7 +189,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
uchar *searchIndex;
uchar *searchType;
es_str_t *url;
+ int rLocal;
int r;
+ DEFiRet;
getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType);
url = es_newStr(128);
@@ -218,17 +221,23 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
restURL = es_str2cstr(url, NULL);
curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
es_deleteStr(url);
+ DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL);
free(restURL);
if(pData->uid != NULL) {
- snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid,
- (pData->pwd == NULL) ? "" : (char*)pData->pwd);
- //TODO: create better code, check errors!
+ rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid,
+ (pData->pwd == NULL) ? "" : (char*)pData->pwd);
+ if(rLocal != (int) es_strlen(url)) {
+ errmsg.LogError(0, RS_RET_ERR, "omelasticsearch: snprintf failed "
+ "when trying to build auth string (return %d)\n",
+ rLocal);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf);
curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
}
- DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL);
- return RS_RET_OK;
+finalize_it:
+ RETiRet;
}
@@ -248,7 +257,6 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2)
# define META_TYPE "\",\"_type\":\""
# define META_END "\"}}\n"
-#warning TODO: use dynamic index/type!
getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
@@ -282,13 +290,18 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
+dbgprintf("omelasticsearch: do curl_easy_perform()\n");
code = curl_easy_perform(curl);
+DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code);
switch (code) {
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_CONNECT:
case CURLE_WRITE_ERROR:
STATSCOUNTER_INC(indexConFail, mutIndexConFail);
+ DBGPRINTF("omelasticsearch: we are suspending ourselfs due "
+ "to failure %lld of curl_easy_perform()\n",
+ (long long) code);
return RS_RET_SUSPENDED;
default:
STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
@@ -315,22 +328,25 @@ CODESTARTdoAction
if(pData->bulkmode) {
CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2]));
} else {
+dbgprintf("omelasticsearch: doAction calling curlPost\n");
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
ppString[1], ppString[2]));
}
finalize_it:
-dbgprintf("omelasticsearch: result doAction: %d\n", iRet);
+dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode);
ENDdoAction
BEGINendTransaction
char *cstr;
CODESTARTendTransaction
+dbgprintf("omelasticsearch: endTransaction init\n");
cstr = es_str2cstr(pData->batch.data, NULL);
- dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr);
+ dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr);
CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL));
finalize_it:
free(cstr);
+dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet);
ENDendTransaction
/* elasticsearch POST result string ... useful for debugging */
@@ -353,11 +369,13 @@ DBGPRINTF("\n");
nmemb > sizeof(ok)-1 &&
strncmp(p, ok, sizeof(ok)-1) == 0) {
STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
+dbgprintf("omelasticsearch ok\n");
} else {
+dbgprintf("omelasticsearch fail\n");
STATSCOUNTER_INC(indexFailed, mutIndexFailed);
if (Debug) {
- DBGPRINTF("omelasticsearch request: %s\n", jsonData);
- DBGPRINTF("omelasticsearch result: ");
+ DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData);
+ DBGPRINTF("omelasticsearch (fail) result: ");
for (i = 0; i < nmemb; i++)
DBGPRINTF("%c", p[i]);
DBGPRINTF("\n");
@@ -470,16 +488,16 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_UID_MISSING);
}
if(pData->dynSrchIdx && pData->searchIndex == NULL) {
- errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
"omelasticsearch: requested dynamic search index, but no "
"name for index template given - action definition invalid");
- ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED);
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
if(pData->dynSrchType && pData->searchType == NULL) {
- errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
"omelasticsearch: requested dynamic search type, but no "
"name for type template given - action definition invalid");
- ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED);
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
if(pData->bulkmode) {
diff --git a/runtime/msg.c b/runtime/msg.c
index a7df6928..99874317 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -3206,7 +3206,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
}
-dbgprintf("prop repl 4, pRes='%s', len %d\n", pRes, bufLen);
/* Take care of spurious characters to make the property safe
* for a path definition
*/
@@ -3375,7 +3374,6 @@ dbgprintf("prop repl 4, pRes='%s', len %d\n", pRes, bufLen);
bufLen = ustrlen(pRes);
*pPropLen = bufLen;
-dbgprintf("end prop repl, pRes='%s', len %d\n", pRes, bufLen);
ENDfunc
return(pRes);
}
--
cgit
From 4d453967cbff1f09becab38a2ad10b05df476eaf Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
Date: Tue, 10 Jul 2012 12:10:34 +0200
Subject: omelasticsearch: implement retry via request to es server homepage
---
plugins/omelasticsearch/omelasticsearch.c | 70 ++++++++++++++++++++++++++-----
1 file changed, 60 insertions(+), 10 deletions(-)
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index c18c1c52..a1f3b8ab 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -153,9 +153,66 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
ENDdbgPrintInstInfo
+
+/* Build basic URL part, which includes hostname and port as follows:
+ * http://hostname:port/
+ * Newly creates an estr for this purpose.
+ */
+static rsRetVal
+setBaseURL(instanceData *pData, es_str_t **url)
+{
+ char portBuf[64];
+ int r;
+ DEFiRet;
+
+ *url = es_newStr(128);
+ snprintf(portBuf, sizeof(portBuf), "%d", pData->port);
+ r = es_addBuf(url, "http://", sizeof("http://")-1);
+ if(r == 0) r = es_addBuf(url, (char*)pData->server, strlen((char*)pData->server));
+ if(r == 0) r = es_addChar(url, ':');
+ if(r == 0) r = es_addBuf(url, portBuf, strlen(portBuf));
+ if(r == 0) r = es_addChar(url, '/');
+ RETiRet;
+}
+
+
+static inline rsRetVal
+checkConn(instanceData *pData)
+{
+ es_str_t *url;
+ CURL *curl = NULL;
+ CURLcode res;
+ char *cstr;
+ DEFiRet;
+
+ setBaseURL(pData, &url);
+ curl = curl_easy_init();
+ if(curl == NULL) {
+ DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ cstr = es_str2cstr(url, NULL);
+ curl_easy_setopt(curl, CURLOPT_URL, cstr);
+ free(cstr);
+
+ res = curl_easy_perform(curl);
+ if(res != CURLE_OK) {
+ dbgprintf("omelasticsearch: checkConn() curl_easy_perform() "
+ "failed: %s\n", curl_easy_strerror(res));
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
+ if(curl != NULL)
+ curl_easy_cleanup(curl);
+ RETiRet;
+}
+
+
BEGINtryResume
CODESTARTtryResume
DBGPRINTF("omelasticsearch: tryResume called\n");
+ iRet = checkConn(pData);
ENDtryResume
@@ -184,7 +241,6 @@ static rsRetVal
setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
{
char authBuf[1024];
- char portBuf[64];
char *restURL;
uchar *searchIndex;
uchar *searchType;
@@ -194,18 +250,12 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
DEFiRet;
getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType);
- url = es_newStr(128);
- snprintf(portBuf, sizeof(portBuf), "%d", pData->port);
+ setBaseURL(pData, &url);
- r = es_addBuf(&url, "http://", sizeof("http://")-1);
- if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server));
- if(r == 0) r = es_addChar(&url, ':');
- if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf));
- if(r == 0) r = es_addChar(&url, '/');
if(pData->bulkmode) {
- if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1);
+ r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1);
} else {
- if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
+ r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
if(r == 0) r = es_addChar(&url, '/');
if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
}
--
cgit
From 68056a6128b9ebc8d65791b2647030d36c73f014 Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
Date: Tue, 10 Jul 2012 17:15:03 +0200
Subject: omelasticsearch: support for parameters parent & dynparent added
---
ChangeLog | 3 +
plugins/omelasticsearch/omelasticsearch.c | 115 ++++++++++++++++++++++++------
2 files changed, 98 insertions(+), 20 deletions(-)
diff --git a/ChangeLog b/ChangeLog
index 09664a88..6219f04b 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,7 @@
---------------------------------------------------------------------------
+Version 6.3.13 [BETA] 2012-07-??
+- omelasticsearch: support for parameters parent & dynparent added
+---------------------------------------------------------------------------
Version 6.3.12 [BETA] 2012-07-02
- support for elasticsearch via omelasticsearch added
Note that this module has been tested quite well by a number of folks,
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index a1f3b8ab..5ddb66da 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -70,10 +70,12 @@ typedef struct _instanceData {
uchar *pwd;
uchar *searchIndex;
uchar *searchType;
+ uchar *parent;
uchar *tplName;
uchar *timeout;
sbool dynSrchIdx;
sbool dynSrchType;
+ sbool dynParent;
sbool bulkmode;
sbool asyncRepl;
struct {
@@ -95,8 +97,10 @@ static struct cnfparamdescr actpdescr[] = {
{ "pwd", eCmdHdlrGetWord, 0 },
{ "searchindex", eCmdHdlrGetWord, 0 },
{ "searchtype", eCmdHdlrGetWord, 0 },
+ { "parent", eCmdHdlrGetWord, 0 },
{ "dynsearchindex", eCmdHdlrBinary, 0 },
{ "dynsearchtype", eCmdHdlrBinary, 0 },
+ { "dynparent", eCmdHdlrBinary, 0 },
{ "bulkmode", eCmdHdlrBinary, 0 },
{ "asyncrepl", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
@@ -133,6 +137,7 @@ CODESTARTfreeInstance
free(pData->pwd);
free(pData->searchIndex);
free(pData->searchType);
+ free(pData->parent);
free(pData->tplName);
ENDfreeInstance
@@ -146,9 +151,11 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : "");
dbgprintf("\tsearch index='%s'\n", pData->searchIndex);
dbgprintf("\tsearch index='%s'\n", pData->searchType);
+ dbgprintf("\tparent='%s'\n", pData->parent);
dbgprintf("\ttimeout='%s'\n", pData->timeout);
dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx);
dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType);
+ dbgprintf("\tdynamic parent=%d\n", pData->dynParent);
dbgprintf("\tasync replication=%d\n", pData->asyncRepl);
dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
ENDdbgPrintInstInfo
@@ -197,10 +204,11 @@ checkConn(instanceData *pData)
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
- dbgprintf("omelasticsearch: checkConn() curl_easy_perform() "
+ DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() "
"failed: %s\n", curl_easy_strerror(res));
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ DBGPRINTF("omelasticsearch: checkConn() completed with success\n");
finalize_it:
if(curl != NULL)
@@ -218,38 +226,62 @@ ENDtryResume
/* get the current index and type for this message */
static inline void
-getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex,
- uchar **srchType)
+getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3,
+ uchar **srchIndex, uchar **srchType, uchar **parent)
{
if(pData->dynSrchIdx) {
*srchIndex = tpl1;
- if(pData->dynSrchType)
+ if(pData->dynSrchType) {
*srchType = tpl2;
- else
+ if(pData->dynParent) {
+ *parent = tpl3;
+ } else {
+ *parent = pData->parent;
+ }
+ } else {
*srchType = pData->searchType;
+ if(pData->dynParent) {
+ *parent = tpl2;
+ } else {
+ *parent = pData->parent;
+ }
+ }
} else {
*srchIndex = pData->searchIndex;
- if(pData->dynSrchType)
+ if(pData->dynSrchType) {
*srchType = tpl1;
- else
+ if(pData->dynParent) {
+ *parent = tpl2;
+ } else {
+ *parent = pData->parent;
+ }
+ } else {
*srchType = pData->searchType;
+ if(pData->dynParent) {
+ *parent = tpl1;
+ } else {
+ *parent = pData->parent;
+ }
+ }
}
}
static rsRetVal
-setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
+setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3)
{
char authBuf[1024];
char *restURL;
uchar *searchIndex;
uchar *searchType;
+ uchar *parent;
es_str_t *url;
int rLocal;
int r;
DEFiRet;
- getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType);
+ getIndexTypeAndParent(pData, tpl1, tpl2, tpl3,
+ &searchIndex, &searchType, &parent);
setBaseURL(pData, &url);
if(pData->bulkmode) {
@@ -267,6 +299,11 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2)
if(pData->timeout != NULL) {
if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1);
if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout));
+ if(r == 0) r = es_addChar(&url, '&');
+ }
+ if(parent != NULL) {
+ if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1);
+ if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent));
}
restURL = es_str2cstr(url, NULL);
curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
@@ -296,24 +333,29 @@ finalize_it:
* index changes.
*/
static rsRetVal
-buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2)
+buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3)
{
int length = strlen((char *)message);
int r;
uchar *searchIndex;
uchar *searchType;
+ uchar *parent;
DEFiRet;
# define META_STRT "{\"index\":{\"_index\": \""
# define META_TYPE "\",\"_type\":\""
+# define META_PARENT "\",\"_parent\":\""
# define META_END "\"}}\n"
- getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType);
+ getIndexTypeAndParent(pData, tpl1, tpl2, tpl3,
+ &searchIndex, &searchType, &parent);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType,
ustrlen(searchType));
+ if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
+ if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent));
if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length);
if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1);
@@ -328,14 +370,15 @@ finalize_it:
}
static rsRetVal
-curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2)
+curlPost(instanceData *instance, uchar *message, int msglen,
+ uchar *tpl1, uchar *tpl2, uchar *tpl3)
{
CURLcode code;
CURL *curl = instance->curlHandle;
DEFiRet;
- if(instance->dynSrchIdx || instance->dynSrchType)
- CHKiRet(setCurlURL(instance, tpl1, tpl2));
+ if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent)
+ CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
@@ -376,11 +419,11 @@ ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
if(pData->bulkmode) {
- CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2]));
+ CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3]));
} else {
dbgprintf("omelasticsearch: doAction calling curlPost\n");
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
- ppString[1], ppString[2]));
+ ppString[1], ppString[2], ppString[3]));
}
finalize_it:
dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode);
@@ -393,7 +436,7 @@ CODESTARTendTransaction
dbgprintf("omelasticsearch: endTransaction init\n");
cstr = es_str2cstr(pData->batch.data, NULL);
dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr);
- CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL));
+ CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL));
finalize_it:
free(cstr);
dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet);
@@ -455,13 +498,14 @@ curlSetup(instanceData *pData)
pData->curlHandle = handle;
pData->postHeader = header;
- if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) {
+ if( pData->bulkmode
+ || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) {
/* in this case, we know no tpls are involved in the request-->NULL OK! */
- setCurlURL(pData, NULL, NULL);
+ setCurlURL(pData, NULL, NULL, NULL);
}
if(Debug) {
- if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0)
+ if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)
dbgprintf("omelasticsearch setup, using static REST URL\n");
else
dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n");
@@ -478,9 +522,11 @@ setInstParamDefaults(instanceData *pData)
pData->pwd = NULL;
pData->searchIndex = NULL;
pData->searchType = NULL;
+ pData->parent = NULL;
pData->timeout = NULL;
pData->dynSrchIdx = 0;
pData->dynSrchType = 0;
+ pData->dynParent = 0;
pData->asyncRepl = 0;
pData->bulkmode = 0;
pData->tplName = NULL;
@@ -513,10 +559,14 @@ CODESTARTnewActInst
pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "searchtype")) {
pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "parent")) {
+ pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) {
pData->dynSrchIdx = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) {
pData->dynSrchType = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "dynparent")) {
+ pData->dynParent = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "bulkmode")) {
pData->bulkmode = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "timeout")) {
@@ -549,6 +599,12 @@ CODESTARTnewActInst
"name for type template given - action definition invalid");
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
+ if(pData->dynParent && pData->parent == NULL) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
+ "omelasticsearch: requested dynamic parent, but no "
+ "name for parent template given - action definition invalid");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
if(pData->bulkmode) {
pData->batch.currTpl1 = NULL;
@@ -563,6 +619,7 @@ CODESTARTnewActInst
iNumTpls = 1;
if(pData->dynSrchIdx) ++iNumTpls;
if(pData->dynSrchType) ++iNumTpls;
+ if(pData->dynParent) ++iNumTpls;
DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls);
CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls)
@@ -581,11 +638,29 @@ CODESTARTnewActInst
if(pData->dynSrchType) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynParent) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynParent) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
} else {
if(pData->dynSrchType) {
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynParent) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynParent) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
}
--
cgit
From afe402d2418d181a1fd5f469ec4c4cf5c394d8c5 Mon Sep 17 00:00:00 2001
From: Andre Lorbach
Date: Wed, 11 Jul 2012 03:08:32 -0700
Subject: bugfix: imtcp aborted when more than 2 connections were used.
Incremented pthread stack size to 4MB for imtcp, imptcp and imttcp
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=342
---
ChangeLog | 3 +++
plugins/imptcp/imptcp.c | 2 +-
plugins/imttcp/imttcp.c | 2 +-
tcpsrv.c | 2 +-
4 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/ChangeLog b/ChangeLog
index 6219f04b..34dec2d7 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,6 +1,9 @@
---------------------------------------------------------------------------
Version 6.3.13 [BETA] 2012-07-??
- omelasticsearch: support for parameters parent & dynparent added
+- bugfix: imtcp aborted when more than 2 connections were used.
+ Incremented pthread stack size to 4MB for imtcp, imptcp and imttcp
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=342
---------------------------------------------------------------------------
Version 6.3.12 [BETA] 2012-07-02
- support for elasticsearch via omelasticsearch added
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 6961a696..b63e7ca3 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -1587,7 +1587,7 @@ CODEmodInit_QueryRegCFSLineHdlr
/* initialize "read-only" thread attributes */
pthread_attr_init(&wrkrThrdAttr);
- pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024);
+ pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024);
/* init legacy config settings */
initConfigSettings();
diff --git a/plugins/imttcp/imttcp.c b/plugins/imttcp/imttcp.c
index 5ed714fa..c72886b3 100644
--- a/plugins/imttcp/imttcp.c
+++ b/plugins/imttcp/imttcp.c
@@ -1127,7 +1127,7 @@ CODEmodInit_QueryRegCFSLineHdlr
/* initialize "read-only" thread attributes */
pthread_attr_init(&sessThrdAttr);
pthread_attr_setdetachstate(&sessThrdAttr, PTHREAD_CREATE_DETACHED);
- pthread_attr_setstacksize(&sessThrdAttr, 200*1024);
+ pthread_attr_setstacksize(&sessThrdAttr, 4096*1024);
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputttcpserverrun"), 0, eCmdHdlrGetWord,
diff --git a/tcpsrv.c b/tcpsrv.c
index 0e6e13d2..e5fe0d71 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -1335,7 +1335,7 @@ startWorkerPool(void)
wrkrRunning = 0;
pthread_cond_init(&wrkrIdle, NULL);
pthread_attr_init(&sessThrdAttr);
- pthread_attr_setstacksize(&sessThrdAttr, 200*1024);
+ pthread_attr_setstacksize(&sessThrdAttr, 4096*1024);
for(i = 0 ; i < wrkrMax ; ++i) {
/* init worker info structure! */
pthread_cond_init(&wrkrInfo[i].run, NULL);
--
cgit
From fdbc4cb666b4fc92562ece1fba97227c40237e04 Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
Date: Wed, 11 Jul 2012 17:12:34 +0200
Subject: omelasticsearch: regression from "parent" feature could case aborts
this was not present in any released version
---
plugins/omelasticsearch/omelasticsearch.c | 49 ++++++++++++++++---------------
1 file changed, 26 insertions(+), 23 deletions(-)
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index 5ddb66da..f77caeca 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -226,22 +226,22 @@ ENDtryResume
/* get the current index and type for this message */
static inline void
-getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3,
+getIndexTypeAndParent(instanceData *pData, uchar **tpls,
uchar **srchIndex, uchar **srchType, uchar **parent)
{
if(pData->dynSrchIdx) {
- *srchIndex = tpl1;
+ *srchIndex = tpls[1];
if(pData->dynSrchType) {
- *srchType = tpl2;
+ *srchType = tpls[2];
if(pData->dynParent) {
- *parent = tpl3;
+ *parent = tpls[3];
} else {
*parent = pData->parent;
}
} else {
*srchType = pData->searchType;
if(pData->dynParent) {
- *parent = tpl2;
+ *parent = tpls[2];
} else {
*parent = pData->parent;
}
@@ -249,16 +249,16 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3
} else {
*srchIndex = pData->searchIndex;
if(pData->dynSrchType) {
- *srchType = tpl1;
+ *srchType = tpls[1];
if(pData->dynParent) {
- *parent = tpl2;
+ *parent = tpls[2];
} else {
*parent = pData->parent;
}
} else {
*srchType = pData->searchType;
if(pData->dynParent) {
- *parent = tpl1;
+ *parent = tpls[1];
} else {
*parent = pData->parent;
}
@@ -268,7 +268,7 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3
static rsRetVal
-setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3)
+setCurlURL(instanceData *pData, uchar **tpls)
{
char authBuf[1024];
char *restURL;
@@ -280,13 +280,13 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3)
int r;
DEFiRet;
- getIndexTypeAndParent(pData, tpl1, tpl2, tpl3,
- &searchIndex, &searchType, &parent);
setBaseURL(pData, &url);
if(pData->bulkmode) {
r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1);
+ parent = NULL;
} else {
+ getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent);
r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
if(r == 0) r = es_addChar(&url, '/');
if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
@@ -333,7 +333,7 @@ finalize_it:
* index changes.
*/
static rsRetVal
-buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3)
+buildBatch(instanceData *pData, uchar *message, uchar **tpls)
{
int length = strlen((char *)message);
int r;
@@ -346,16 +346,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar
# define META_PARENT "\",\"_parent\":\""
# define META_END "\"}}\n"
- getIndexTypeAndParent(pData, tpl1, tpl2, tpl3,
- &searchIndex, &searchType, &parent);
+ getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent);
+dbgprintf("AAA: searchIndex: '%s'\n", searchIndex);
+dbgprintf("AAA: searchType: '%s'\n", searchType);
+dbgprintf("AAA: parent: '%s'\n", parent);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType,
ustrlen(searchType));
- if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
- if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent));
+ if(parent != NULL) {
+ if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
+ if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent));
+ }
if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length);
if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1);
@@ -370,15 +374,14 @@ finalize_it:
}
static rsRetVal
-curlPost(instanceData *instance, uchar *message, int msglen,
- uchar *tpl1, uchar *tpl2, uchar *tpl3)
+curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls)
{
CURLcode code;
CURL *curl = instance->curlHandle;
DEFiRet;
if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent)
- CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3));
+ CHKiRet(setCurlURL(instance, tpls));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
@@ -419,11 +422,11 @@ ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
if(pData->bulkmode) {
- CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3]));
+ CHKiRet(buildBatch(pData, ppString[0], ppString));
} else {
dbgprintf("omelasticsearch: doAction calling curlPost\n");
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
- ppString[1], ppString[2], ppString[3]));
+ ppString));
}
finalize_it:
dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode);
@@ -436,7 +439,7 @@ CODESTARTendTransaction
dbgprintf("omelasticsearch: endTransaction init\n");
cstr = es_str2cstr(pData->batch.data, NULL);
dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr);
- CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL));
+ CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL));
finalize_it:
free(cstr);
dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet);
@@ -501,7 +504,7 @@ curlSetup(instanceData *pData)
if( pData->bulkmode
|| (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) {
/* in this case, we know no tpls are involved in the request-->NULL OK! */
- setCurlURL(pData, NULL, NULL, NULL);
+ setCurlURL(pData, NULL);
}
if(Debug) {
--
cgit
From 686270440c601d5a4e3eac246397f60248889f5f Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
Date: Tue, 24 Jul 2012 12:40:13 +0200
Subject: bugfix: imptcp aborted when $InputPTCPServerBindRuleset was used
---
ChangeLog | 1 +
plugins/imptcp/imptcp.c | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/ChangeLog b/ChangeLog
index 0dd34961..ded398e9 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -4,6 +4,7 @@ Version 6.3.13 [BETA] 2012-07-??
- bugfix: imtcp aborted when more than 2 connections were used.
Incremented pthread stack size to 4MB for imtcp, imptcp and imttcp
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=342
+- bugfix: imptcp aborted when $InputPTCPServerBindRuleset was used
---------------------------------------------------------------------------
Version 6.3.12 [BETA] 2012-07-02
- support for elasticsearch via omelasticsearch added
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index b63e7ca3..aa1ad81e 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -1616,7 +1616,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0,
eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0,
- eCmdHdlrGetWord, NULL, cs.pszBindRuleset, STD_LOADABLE_MODULE_ID));
+ eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
ENDmodInit
--
cgit