summaryrefslogtreecommitdiffstats
path: root/plugins/omhdfs/omhdfs.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-09-30 13:55:26 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2010-09-30 13:55:26 +0000
commit7ac7ad166b82034eea4a37c1937ca5ddd618ec45 (patch)
tree91c4467d8e595577eb2133318ec4ba465707edbe /plugins/omhdfs/omhdfs.c
parent11617d8ab5192c12e3b33cd9c08ac32f1d334a85 (diff)
downloadrsyslog-7ac7ad166b82034eea4a37c1937ca5ddd618ec45.tar.gz
rsyslog-7ac7ad166b82034eea4a37c1937ca5ddd618ec45.tar.xz
rsyslog-7ac7ad166b82034eea4a37c1937ca5ddd618ec45.zip
omhdfs: added "real" libhdfs code, now actually works on hdfs
very crude implementation, but probably good enough to gather some early performance data and experience with the module. No real error handling done, if something breaks, the whole thing will be blown up ;)
Diffstat (limited to 'plugins/omhdfs/omhdfs.c')
-rw-r--r--plugins/omhdfs/omhdfs.c62
1 files changed, 47 insertions, 15 deletions
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index 42e5dc8a..fcdff6e5 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -28,7 +28,7 @@
* (and hopefully troubleshooting) especially in cases when no
* hdfs environment is available.
*/
-#define USE_REGULAR_FS 1
+//#define USE_REGULAR_FS 1
#include "config.h"
#include "rsyslog.h"
@@ -62,6 +62,8 @@ DEF_OMOD_STATIC_DATA
/* globals for default values */
static uchar *fileName = NULL;
+static uchar *hdfsHost = NULL;
+int hdfsPort = 0;
/* end globals for default values */
typedef struct _instanceData {
@@ -71,6 +73,8 @@ typedef struct _instanceData {
# else
hdfsFS fs;
hdfsFile fd;
+ const char *hdfsHost;
+ tPort hdfsPort;
# endif
} instanceData;
@@ -85,8 +89,8 @@ ENDisCompatibleWithFeature
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
printf("omhdfs: file:%s", pData->fileName);
- if (pData->fd == -1)
- printf(" (unused)");
+ //if (pData->fd == -1)
+ //printf(" (unused)");
ENDdbgPrintInstInfo
@@ -139,22 +143,41 @@ static void prepareFile(instanceData *pData, uchar *newFileName)
}
#endif
-static void prepareFile(instanceData *pData, uchar *newFileName)
+static void
+prepareFile(instanceData *pData, uchar *newFileName)
{
# if USE_REGULAR_FS
pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, 0666);
# else
- pData->fs = hdfsConnect("default", 0);
- pData->fd = hdfsOpenFile(fs, newFileName, O_WRONLY|O_CREAT, 0, 0, 0);
+ dbgprintf("omhdfs: try to connect to host '%s' at port %d\n",
+ pData->hdfsHost, pData->hdfsPort);
+ pData->fs = hdfsConnect(pData->hdfsHost, pData->hdfsPort);
+ if(pData->fs == NULL) {
+ dbgprintf("omhdfs: error can not connect to hdfs\n");
+ }
+ pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_APPEND, 0, 0, 0);
+ if(pData->fd == NULL) {
+ /* maybe the file does not exist, so we try to create it now.
+ * Note that we can not use hdfsExists() because of a deficit in
+ * it: https://issues.apache.org/jira/browse/HDFS-1154
+ * As of my testing, libhdfs at least seems to return ENOENT if
+ * the file does not exist.
+ */
+ if(errno == ENOENT) {
+ dbgprintf("omhdfs: ENOENT trying to append to '%s', now trying create\n",
+ newFileName);
+ pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_CREAT, 0, 0, 0);
+ }
+ }
if(!pData->fd) {
- dbgprintf(stderr, "Failed to open %s for writing!\n", newFileName);
+ dbgprintf("omhdfs: failed to open %s for writing!\n", newFileName);
// TODO: suspend/error report
}
# endif
}
-static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
+static rsRetVal writeFile(uchar **ppString, instanceData *pData)
{
size_t lenWrite;
DEFiRet;
@@ -176,15 +199,14 @@ static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pDa
//logerror((char*) pData->f_fname);
}
# else
- lenWrite = strlen(char*ppString[0]);
+ lenWrite = strlen((char*) ppString[0]);
tSize num_written_bytes = hdfsWrite(pData->fs, pData->fd, ppString[0], lenWrite);
- if(num_written_bytes != lenWrite) {
- dbgprintf("Failed to write %s, expected %lu bytes, written %lu\n", pData->fileName,
- lenWrite, num_written_bytes);
+ if((unsigned) num_written_bytes != lenWrite) {
+ dbgprintf("omhdfs: failed to write %s, expected %lu bytes, written %lu\n", pData->fileName,
+ lenWrite, (unsigned long) num_written_bytes);
// TODO: suspend/error report
}
# endif
-else { dbgprintf("omhdfs has successfully written to file\n"); }
RETiRet;
}
@@ -192,7 +214,7 @@ else { dbgprintf("omhdfs has successfully written to file\n"); }
BEGINcreateInstance
CODESTARTcreateInstance
- pData->fd = -1;
+ //pData->fd = -1;
ENDcreateInstance
@@ -214,7 +236,7 @@ ENDtryResume
BEGINdoAction
CODESTARTdoAction
dbgprintf(" (%s)\n", pData->fileName);
- iRet = writeFile(ppString, iMsgOpts, pData);
+ iRet = writeFile(ppString, pData);
ENDdoAction
@@ -241,6 +263,12 @@ CODESTARTparseSelectorAct
// TODO: check for NULL filename
CHKmalloc(pData->fileName = ustrdup(fileName));
+ if(hdfsHost == NULL) {
+ pData->hdfsHost = "default";
+ } else {
+ CHKmalloc(pData->hdfsHost = strdup((char*)hdfsHost));
+ }
+ pData->hdfsPort = hdfsPort;
prepareFile(pData, pData->fileName);
@@ -271,6 +299,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
fDirCreateMode = 0700;
bCreateDirs = 1;
*/
+ hdfsHost = NULL;
+ hdfsPort = 0;
return RS_RET_OK;
}
@@ -291,6 +321,8 @@ CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION;
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
CODEmodInit_QueryRegCFSLineHdlr
ENDmodInit