summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog10
-rw-r--r--plugins/omhdfs/javaenv.sh14
-rw-r--r--plugins/omhdfs/omhdfs.c85
3 files changed, 97 insertions, 12 deletions
diff --git a/ChangeLog b/ChangeLog
index 1eba3db9..7e8b77a1 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,13 @@
---------------------------------------------------------------------------
+Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-03-??
+- this begins a new devel branch for v5
+- enhanced omhdfs to support batching mode. This permits to increase
+ performance, as we now call the HDFS API with much larger message
+ sizes and far more infrequently
+- bugfix: failover did not work correctly if repeated msg reduction was on
+ affected directive was: $ActionExecOnlyWhenPreviousIsSuspended on
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=236
+---------------------------------------------------------------------------
Version 5.8.0 [V5-stable] (rgerhards), 2011-04-??
- bugfix: DA queue was never shutdown once it was started
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=241
@@ -157,6 +166,7 @@ Version 5.7.0 [V5-DEVEL] (rgerhards), 2010-09-16
Version 5.6.5 [V5-STABLE] (rgerhards), 2011-03-??
- bugfix: failover did not work correctly if repeated msg reduction was on
affected directive was: $ActionExecOnlyWhenPreviousIsSuspended on
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=236
- bugfix: omlibdbi did not use password from rsyslog.con
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=203
- bugfix(kind of): tell users that config graph can currently not be
diff --git a/plugins/omhdfs/javaenv.sh b/plugins/omhdfs/javaenv.sh
new file mode 100644
index 00000000..d07a8473
--- /dev/null
+++ b/plugins/omhdfs/javaenv.sh
@@ -0,0 +1,14 @@
+# This is a sample file for environment settings on Fedora 13
+# that made me compile & run omhdfs. I really *hate* the way
+# java uses environment variables... Hopefully this file will
+# help building and testing omhdfs in the future (there is also
+# some more information in the rsyslog wiki).
+# rgerhards, 2011-03-11
+# this now works, but don't ask my why ;)
+#export JAVA_HOME=/usr/java/jdk1.6.0_21/bin/java
+export PATH=/usr/java/jdk1.6.0_21/bin:$PATH
+export JAVA_INCLUDES="-I/usr/java/jdk1.6.0_21/include -I/usr/java/jdk1.6.0_21/include/linux"
+export JAVA_LIBS="-L/usr/java/jdk1.6.0_21/jre/lib/i386 -ljava -ljvm -lverify"
+export HADOOP_HOME=/usr/lib/hadoop
+export CLASSPATH=/usr/lib/jvm/java-6-sun/lib:/usr/lib/hadoop/lib:/usr/lib/hadoop/hadoop-ant-0.20.2+320.jar:/usr/lib/hadoop/hadoop-core-0.20.2+320.jar:/usr/lib/hadoop/hadoop-examples-0.20.2+320.jar:/usr/lib/hadoop/hadoop-test-0.20.2+320.jar:/usr/lib/hadoop/hadoop-tools-0.20.2+320.jar/usr/lib/hadoop/lib/commons-cli-1.2.jar:/usr/lib/hadoop/lib/commons-codec-1.3.jar:/usr/lib/hadoop/lib/commons-el-1.0.jar:/usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar:/usr/lib/hadoop/lib/commons-logging-1.0.4.jar:/usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar:/usr/lib/hadoop/lib/commons-net-1.4.1.jar:/usr/lib/hadoop/lib/core-3.1.1.jar:/usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar:/usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar:/usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar:/usr/lib/hadoop/lib/hsqldb.jar:/usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar:/usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar:/usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar:/usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar:/usr/lib/hadoop/lib/jets3t-0.6.1.jar:/usr/lib/hadoop/lib/jetty-6.1.14.jar:/usr/lib/hadoop/lib/jetty-util-6.1.14.jar:/usr/lib/hadoop/lib/junit-4.5.jar:/usr/lib/hadoop/lib/kfs-0.2.2.jar:/usr/lib/hadoop/lib/libfb303.jar:/usr/lib/hadoop/lib/libthrift.jar:/usr/lib/hadoop/lib/log4j-1.2.15.jar:/usr/lib/hadoop/lib/mockito-all-1.8.2.jar:/usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar:/usr/lib/hadoop/lib/oro-2.0.8.jar:/usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar:/usr/lib/hadoop/lib/slf4j-api-1.4.3.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar:/usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf
+###export CLASSPATH="/usr/lib/hadoop/hadoop-0.20.2+320-ant.jar: /usr/lib/hadoop/hadoop-0.20.2+320-core.jar: /usr/lib/hadoop/hadoop-0.20.2+320-examples.jar: /usr/lib/hadoop/hadoop-0.20.2+320-test.jar: /usr/lib/hadoop/hadoop-0.20.2+320-tools.jar: /usr/lib/hadoop/hadoop-ant-0.20.2+320.jar: /usr/lib/hadoop/hadoop-core-0.20.2+320.jar: /usr/lib/hadoop/hadoop-examples-0.20.2+320.jar: /usr/lib/hadoop/hadoop-test-0.20.2+320.jar: /usr/lib/hadoop/hadoop-tools-0.20.2+320.jar:/usr/lib/hadoop/lib: /usr/lib/hadoop/lib/commons-cli-1.2.jar: /usr/lib/hadoop/lib/commons-codec-1.3.jar: /usr/lib/hadoop/lib/commons-el-1.0.jar: /usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar: /usr/lib/hadoop/lib/commons-logging-1.0.4.jar: /usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar: /usr/lib/hadoop/lib/commons-net-1.4.1.jar: /usr/lib/hadoop/lib/core-3.1.1.jar: /usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar: /usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar: /usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar: /usr/lib/hadoop/lib/hsqldb.jar: /usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar: /usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar: /usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar: /usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar: /usr/lib/hadoop/lib/jets3t-0.6.1.jar: /usr/lib/hadoop/lib/jetty-6.1.14.jar: /usr/lib/hadoop/lib/jetty-util-6.1.14.jar: /usr/lib/hadoop/lib/junit-4.5.jar: /usr/lib/hadoop/lib/kfs-0.2.2.jar: /usr/lib/hadoop/lib/libfb303.jar: /usr/lib/hadoop/lib/libthrift.jar: /usr/lib/hadoop/lib/log4j-1.2.15.jar: /usr/lib/hadoop/lib/mockito-all-1.8.2.jar: /usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar: /usr/lib/hadoop/lib/oro-2.0.8.jar: /usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar: /usr/lib/hadoop/lib/slf4j-api-1.4.3.jar: /usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar: /usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf:/usr/lib/hadoop/lib"
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index 8b72747f..76128a4e 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -80,6 +80,8 @@ typedef struct {
typedef struct _instanceData {
file_t *pFile;
+ uchar ioBuf[64*1024];
+ unsigned offsBuf;
} instanceData;
/* forward definitions (down here, need data types) */
@@ -260,7 +262,8 @@ fileOpen(file_t *pFile)
if(errno == ENOENT) {
DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n",
pFile->name);
- pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
+ pFile->fh = hdfsOpenFile(pFile->fs,
+ (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
}
}
if(pFile->fh == NULL) {
@@ -275,12 +278,15 @@ finalize_it:
}
+/* Note: lenWrite is reset to zero on successful write! */
static inline rsRetVal
-fileWrite(file_t *pFile, uchar *buf)
+fileWrite(file_t *pFile, uchar *buf, size_t *lenWrite)
{
- size_t lenWrite;
DEFiRet;
+ if(*lenWrite == 0)
+ FINALIZE;
+
if(pFile->nUsers > 1)
d_pthread_mutex_lock(&pFile->mut);
@@ -294,18 +300,18 @@ fileWrite(file_t *pFile, uchar *buf)
}
}
- lenWrite = strlen((char*) buf);
- tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
- if((unsigned) num_written_bytes != lenWrite) {
- errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, "
- "written %lu\n", pFile->name, (unsigned long) lenWrite,
+dbgprintf("XXXXX: omhdfs writing %u bytes\n", *lenWrite);
+ tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, *lenWrite);
+ if((unsigned) num_written_bytes != *lenWrite) {
+ errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE,
+ "omhdfs: failed to write %s, expected %lu bytes, "
+ "written %lu\n", pFile->name, (unsigned long) *lenWrite,
(unsigned long) num_written_bytes);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ *lenWrite = 0;
finalize_it:
- if(pFile->nUsers > 1)
- d_pthread_mutex_unlock(&pFile->mut);
RETiRet;
}
@@ -333,6 +339,40 @@ finalize_it:
/* ---END FILE OBJECT---------------------------------------------------- */
+/* This adds data to the output buffer and performs an actual write
+ * if the new data does not fit into the buffer. Note that we never write
+ * partial data records. Other actions may write into the same file, and if
+ * we would write partial records, data could become severely mixed up.
+ * Note that we must check of some new data arrived is large than our
+ * buffer. In that case, the new data will written with its own
+ * write operation.
+ */
+static inline rsRetVal
+addData(instanceData *pData, uchar *buf)
+{
+ unsigned len;
+ DEFiRet;
+
+ len = strlen((char*)buf);
+ if(pData->offsBuf + len < sizeof(pData->ioBuf)) {
+ /* new data fits into remaining buffer */
+ memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len);
+ pData->offsBuf += len;
+ } else {
+dbgprintf("XXXXX: not enough room, need to flush\n");
+ CHKiRet(fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf));
+ if(len >= sizeof(pData->ioBuf)) {
+ CHKiRet(fileWrite(pData->pFile, buf, &len));
+ } else {
+ memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len);
+ pData->offsBuf += len;
+ }
+ }
+
+ iRet = RS_RET_DEFER_COMMIT;
+finalize_it:
+ RETiRet;
+}
BEGINcreateInstance
CODESTARTcreateInstance
@@ -358,13 +398,31 @@ CODESTARTtryResume
}
ENDtryResume
+
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+dbgprintf("omhdfs: beginTransaction\n");
+ENDbeginTransaction
+
+
BEGINdoAction
CODESTARTdoAction
- DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name);
- iRet = fileWrite(pData->pFile, ppString[0]);
+ DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name);
+ iRet = addData(pData, ppString[0]);
+dbgprintf("omhdfs: done doAction\n");
ENDdoAction
+BEGINendTransaction
+CODESTARTendTransaction
+dbgprintf("omhdfs: endTransaction\n");
+ if(pData->offsBuf != 0) {
+ DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n");
+ iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf);
+ }
+ENDendTransaction
+
+
BEGINparseSelectorAct
file_t *pFile;
int r;
@@ -409,6 +467,7 @@ CODESTARTparseSelectorAct
}
fileObjAddUser(pFile);
pData->pFile = pFile;
+ pData->offsBuf = 0;
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -455,6 +514,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
CODEqueryEtryPt_doHUP
ENDqueryEtryPt
@@ -472,5 +532,6 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
+ DBGPRINTF("omhdfs: module compiled with rsyslog version %s.\n", VERSION);
CODEmodInit_QueryRegCFSLineHdlr
ENDmodInit