diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/omhdfs/Makefile.am | 4 | ||||
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 62 |
2 files changed, 49 insertions, 17 deletions
diff --git a/plugins/omhdfs/Makefile.am b/plugins/omhdfs/Makefile.am index 329e6816..23fcf75a 100644 --- a/plugins/omhdfs/Makefile.am +++ b/plugins/omhdfs/Makefile.am @@ -1,6 +1,6 @@ pkglib_LTLIBRARIES = omhdfs.la omhdfs_la_SOURCES = omhdfs.c -omhdfs_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) -omhdfs_la_LDFLAGS = -module -avoid-version +omhdfs_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) -I/usr/lib/jvm/java-6-sun/include -I/usr/lib/jvm/java-6-sun/include/linux +omhdfs_la_LDFLAGS = -module -avoid-version -lhdfs -L/usr/lib/jvm/java-6-sun-1.6.0.20/jre/lib/amd64 -L/usr/lib/jvm/java-6-sun-1.6.0.20/jre/lib/amd64/server -ljava -ljvm -lverify omhdfs_la_LIBADD = diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index 42e5dc8a..fcdff6e5 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -28,7 +28,7 @@ * (and hopefully troubleshooting) especially in cases when no * hdfs environment is available. */ -#define USE_REGULAR_FS 1 +//#define USE_REGULAR_FS 1 #include "config.h" #include "rsyslog.h" @@ -62,6 +62,8 @@ DEF_OMOD_STATIC_DATA /* globals for default values */ static uchar *fileName = NULL; +static uchar *hdfsHost = NULL; +int hdfsPort = 0; /* end globals for default values */ typedef struct _instanceData { @@ -71,6 +73,8 @@ typedef struct _instanceData { # else hdfsFS fs; hdfsFile fd; + const char *hdfsHost; + tPort hdfsPort; # endif } instanceData; @@ -85,8 +89,8 @@ ENDisCompatibleWithFeature BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo printf("omhdfs: file:%s", pData->fileName); - if (pData->fd == -1) - printf(" (unused)"); + //if (pData->fd == -1) + //printf(" (unused)"); ENDdbgPrintInstInfo @@ -139,22 +143,41 @@ static void prepareFile(instanceData *pData, uchar *newFileName) } #endif -static void prepareFile(instanceData *pData, uchar *newFileName) +static void +prepareFile(instanceData *pData, uchar *newFileName) { # if USE_REGULAR_FS pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, 0666); # else - pData->fs = hdfsConnect("default", 0); - pData->fd = hdfsOpenFile(fs, newFileName, O_WRONLY|O_CREAT, 0, 0, 0); + dbgprintf("omhdfs: try to connect to host '%s' at port %d\n", + pData->hdfsHost, pData->hdfsPort); + pData->fs = hdfsConnect(pData->hdfsHost, pData->hdfsPort); + if(pData->fs == NULL) { + dbgprintf("omhdfs: error can not connect to hdfs\n"); + } + pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_APPEND, 0, 0, 0); + if(pData->fd == NULL) { + /* maybe the file does not exist, so we try to create it now. + * Note that we can not use hdfsExists() because of a deficit in + * it: https://issues.apache.org/jira/browse/HDFS-1154 + * As of my testing, libhdfs at least seems to return ENOENT if + * the file does not exist. + */ + if(errno == ENOENT) { + dbgprintf("omhdfs: ENOENT trying to append to '%s', now trying create\n", + newFileName); + pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_CREAT, 0, 0, 0); + } + } if(!pData->fd) { - dbgprintf(stderr, "Failed to open %s for writing!\n", newFileName); + dbgprintf("omhdfs: failed to open %s for writing!\n", newFileName); // TODO: suspend/error report } # endif } -static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) +static rsRetVal writeFile(uchar **ppString, instanceData *pData) { size_t lenWrite; DEFiRet; @@ -176,15 +199,14 @@ static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pDa //logerror((char*) pData->f_fname); } # else - lenWrite = strlen(char*ppString[0]); + lenWrite = strlen((char*) ppString[0]); tSize num_written_bytes = hdfsWrite(pData->fs, pData->fd, ppString[0], lenWrite); - if(num_written_bytes != lenWrite) { - dbgprintf("Failed to write %s, expected %lu bytes, written %lu\n", pData->fileName, - lenWrite, num_written_bytes); + if((unsigned) num_written_bytes != lenWrite) { + dbgprintf("omhdfs: failed to write %s, expected %lu bytes, written %lu\n", pData->fileName, + lenWrite, (unsigned long) num_written_bytes); // TODO: suspend/error report } # endif -else { dbgprintf("omhdfs has successfully written to file\n"); } RETiRet; } @@ -192,7 +214,7 @@ else { dbgprintf("omhdfs has successfully written to file\n"); } BEGINcreateInstance CODESTARTcreateInstance - pData->fd = -1; + //pData->fd = -1; ENDcreateInstance @@ -214,7 +236,7 @@ ENDtryResume BEGINdoAction CODESTARTdoAction dbgprintf(" (%s)\n", pData->fileName); - iRet = writeFile(ppString, iMsgOpts, pData); + iRet = writeFile(ppString, pData); ENDdoAction @@ -241,6 +263,12 @@ CODESTARTparseSelectorAct // TODO: check for NULL filename CHKmalloc(pData->fileName = ustrdup(fileName)); + if(hdfsHost == NULL) { + pData->hdfsHost = "default"; + } else { + CHKmalloc(pData->hdfsHost = strdup((char*)hdfsHost)); + } + pData->hdfsPort = hdfsPort; prepareFile(pData, pData->fileName); @@ -271,6 +299,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a fDirCreateMode = 0700; bCreateDirs = 1; */ + hdfsHost = NULL; + hdfsPort = 0; return RS_RET_OK; } @@ -291,6 +321,8 @@ CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); CODEmodInit_QueryRegCFSLineHdlr ENDmodInit |