summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/omhdfs/Makefile.am4
-rw-r--r--plugins/omhdfs/omhdfs.c62
2 files changed, 49 insertions, 17 deletions
diff --git a/plugins/omhdfs/Makefile.am b/plugins/omhdfs/Makefile.am
index 329e6816..23fcf75a 100644
--- a/plugins/omhdfs/Makefile.am
+++ b/plugins/omhdfs/Makefile.am
@@ -1,6 +1,6 @@
pkglib_LTLIBRARIES = omhdfs.la
omhdfs_la_SOURCES = omhdfs.c
-omhdfs_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
-omhdfs_la_LDFLAGS = -module -avoid-version
+omhdfs_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) -I/usr/lib/jvm/java-6-sun/include -I/usr/lib/jvm/java-6-sun/include/linux
+omhdfs_la_LDFLAGS = -module -avoid-version -lhdfs -L/usr/lib/jvm/java-6-sun-1.6.0.20/jre/lib/amd64 -L/usr/lib/jvm/java-6-sun-1.6.0.20/jre/lib/amd64/server -ljava -ljvm -lverify
omhdfs_la_LIBADD =
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index 42e5dc8a..fcdff6e5 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -28,7 +28,7 @@
* (and hopefully troubleshooting) especially in cases when no
* hdfs environment is available.
*/
-#define USE_REGULAR_FS 1
+//#define USE_REGULAR_FS 1
#include "config.h"
#include "rsyslog.h"
@@ -62,6 +62,8 @@ DEF_OMOD_STATIC_DATA
/* globals for default values */
static uchar *fileName = NULL;
+static uchar *hdfsHost = NULL;
+int hdfsPort = 0;
/* end globals for default values */
typedef struct _instanceData {
@@ -71,6 +73,8 @@ typedef struct _instanceData {
# else
hdfsFS fs;
hdfsFile fd;
+ const char *hdfsHost;
+ tPort hdfsPort;
# endif
} instanceData;
@@ -85,8 +89,8 @@ ENDisCompatibleWithFeature
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
printf("omhdfs: file:%s", pData->fileName);
- if (pData->fd == -1)
- printf(" (unused)");
+ //if (pData->fd == -1)
+ //printf(" (unused)");
ENDdbgPrintInstInfo
@@ -139,22 +143,41 @@ static void prepareFile(instanceData *pData, uchar *newFileName)
}
#endif
-static void prepareFile(instanceData *pData, uchar *newFileName)
+static void
+prepareFile(instanceData *pData, uchar *newFileName)
{
# if USE_REGULAR_FS
pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, 0666);
# else
- pData->fs = hdfsConnect("default", 0);
- pData->fd = hdfsOpenFile(fs, newFileName, O_WRONLY|O_CREAT, 0, 0, 0);
+ dbgprintf("omhdfs: try to connect to host '%s' at port %d\n",
+ pData->hdfsHost, pData->hdfsPort);
+ pData->fs = hdfsConnect(pData->hdfsHost, pData->hdfsPort);
+ if(pData->fs == NULL) {
+ dbgprintf("omhdfs: error can not connect to hdfs\n");
+ }
+ pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_APPEND, 0, 0, 0);
+ if(pData->fd == NULL) {
+ /* maybe the file does not exist, so we try to create it now.
+ * Note that we can not use hdfsExists() because of a deficit in
+ * it: https://issues.apache.org/jira/browse/HDFS-1154
+ * As of my testing, libhdfs at least seems to return ENOENT if
+ * the file does not exist.
+ */
+ if(errno == ENOENT) {
+ dbgprintf("omhdfs: ENOENT trying to append to '%s', now trying create\n",
+ newFileName);
+ pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_CREAT, 0, 0, 0);
+ }
+ }
if(!pData->fd) {
- dbgprintf(stderr, "Failed to open %s for writing!\n", newFileName);
+ dbgprintf("omhdfs: failed to open %s for writing!\n", newFileName);
// TODO: suspend/error report
}
# endif
}
-static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
+static rsRetVal writeFile(uchar **ppString, instanceData *pData)
{
size_t lenWrite;
DEFiRet;
@@ -176,15 +199,14 @@ static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pDa
//logerror((char*) pData->f_fname);
}
# else
- lenWrite = strlen(char*ppString[0]);
+ lenWrite = strlen((char*) ppString[0]);
tSize num_written_bytes = hdfsWrite(pData->fs, pData->fd, ppString[0], lenWrite);
- if(num_written_bytes != lenWrite) {
- dbgprintf("Failed to write %s, expected %lu bytes, written %lu\n", pData->fileName,
- lenWrite, num_written_bytes);
+ if((unsigned) num_written_bytes != lenWrite) {
+ dbgprintf("omhdfs: failed to write %s, expected %lu bytes, written %lu\n", pData->fileName,
+ lenWrite, (unsigned long) num_written_bytes);
// TODO: suspend/error report
}
# endif
-else { dbgprintf("omhdfs has successfully written to file\n"); }
RETiRet;
}
@@ -192,7 +214,7 @@ else { dbgprintf("omhdfs has successfully written to file\n"); }
BEGINcreateInstance
CODESTARTcreateInstance
- pData->fd = -1;
+ //pData->fd = -1;
ENDcreateInstance
@@ -214,7 +236,7 @@ ENDtryResume
BEGINdoAction
CODESTARTdoAction
dbgprintf(" (%s)\n", pData->fileName);
- iRet = writeFile(ppString, iMsgOpts, pData);
+ iRet = writeFile(ppString, pData);
ENDdoAction
@@ -241,6 +263,12 @@ CODESTARTparseSelectorAct
// TODO: check for NULL filename
CHKmalloc(pData->fileName = ustrdup(fileName));
+ if(hdfsHost == NULL) {
+ pData->hdfsHost = "default";
+ } else {
+ CHKmalloc(pData->hdfsHost = strdup((char*)hdfsHost));
+ }
+ pData->hdfsPort = hdfsPort;
prepareFile(pData, pData->fileName);
@@ -271,6 +299,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
fDirCreateMode = 0700;
bCreateDirs = 1;
*/
+ hdfsHost = NULL;
+ hdfsPort = 0;
return RS_RET_OK;
}
@@ -291,6 +321,8 @@ CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION;
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
CODEmodInit_QueryRegCFSLineHdlr
ENDmodInit