diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-09-30 13:55:26 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-09-30 13:55:26 +0000 |
commit | 7ac7ad166b82034eea4a37c1937ca5ddd618ec45 (patch) | |
tree | 91c4467d8e595577eb2133318ec4ba465707edbe /plugins/omhdfs/omhdfs.c | |
parent | 11617d8ab5192c12e3b33cd9c08ac32f1d334a85 (diff) | |
download | rsyslog-7ac7ad166b82034eea4a37c1937ca5ddd618ec45.tar.gz rsyslog-7ac7ad166b82034eea4a37c1937ca5ddd618ec45.tar.xz rsyslog-7ac7ad166b82034eea4a37c1937ca5ddd618ec45.zip |
omhdfs: added "real" libhdfs code, now actually works on hdfs
very crude implementation, but probably good enough to gather some
early performance data and experience with the module. No real
error handling done, if something breaks, the whole thing will
be blown up ;)
Diffstat (limited to 'plugins/omhdfs/omhdfs.c')
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index 42e5dc8a..fcdff6e5 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -28,7 +28,7 @@ * (and hopefully troubleshooting) especially in cases when no * hdfs environment is available. */ -#define USE_REGULAR_FS 1 +//#define USE_REGULAR_FS 1 #include "config.h" #include "rsyslog.h" @@ -62,6 +62,8 @@ DEF_OMOD_STATIC_DATA /* globals for default values */ static uchar *fileName = NULL; +static uchar *hdfsHost = NULL; +int hdfsPort = 0; /* end globals for default values */ typedef struct _instanceData { @@ -71,6 +73,8 @@ typedef struct _instanceData { # else hdfsFS fs; hdfsFile fd; + const char *hdfsHost; + tPort hdfsPort; # endif } instanceData; @@ -85,8 +89,8 @@ ENDisCompatibleWithFeature BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo printf("omhdfs: file:%s", pData->fileName); - if (pData->fd == -1) - printf(" (unused)"); + //if (pData->fd == -1) + //printf(" (unused)"); ENDdbgPrintInstInfo @@ -139,22 +143,41 @@ static void prepareFile(instanceData *pData, uchar *newFileName) } #endif -static void prepareFile(instanceData *pData, uchar *newFileName) +static void +prepareFile(instanceData *pData, uchar *newFileName) { # if USE_REGULAR_FS pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, 0666); # else - pData->fs = hdfsConnect("default", 0); - pData->fd = hdfsOpenFile(fs, newFileName, O_WRONLY|O_CREAT, 0, 0, 0); + dbgprintf("omhdfs: try to connect to host '%s' at port %d\n", + pData->hdfsHost, pData->hdfsPort); + pData->fs = hdfsConnect(pData->hdfsHost, pData->hdfsPort); + if(pData->fs == NULL) { + dbgprintf("omhdfs: error can not connect to hdfs\n"); + } + pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_APPEND, 0, 0, 0); + if(pData->fd == NULL) { + /* maybe the file does not exist, so we try to create it now. + * Note that we can not use hdfsExists() because of a deficit in + * it: https://issues.apache.org/jira/browse/HDFS-1154 + * As of my testing, libhdfs at least seems to return ENOENT if + * the file does not exist. + */ + if(errno == ENOENT) { + dbgprintf("omhdfs: ENOENT trying to append to '%s', now trying create\n", + newFileName); + pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_CREAT, 0, 0, 0); + } + } if(!pData->fd) { - dbgprintf(stderr, "Failed to open %s for writing!\n", newFileName); + dbgprintf("omhdfs: failed to open %s for writing!\n", newFileName); // TODO: suspend/error report } # endif } -static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) +static rsRetVal writeFile(uchar **ppString, instanceData *pData) { size_t lenWrite; DEFiRet; @@ -176,15 +199,14 @@ static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pDa //logerror((char*) pData->f_fname); } # else - lenWrite = strlen(char*ppString[0]); + lenWrite = strlen((char*) ppString[0]); tSize num_written_bytes = hdfsWrite(pData->fs, pData->fd, ppString[0], lenWrite); - if(num_written_bytes != lenWrite) { - dbgprintf("Failed to write %s, expected %lu bytes, written %lu\n", pData->fileName, - lenWrite, num_written_bytes); + if((unsigned) num_written_bytes != lenWrite) { + dbgprintf("omhdfs: failed to write %s, expected %lu bytes, written %lu\n", pData->fileName, + lenWrite, (unsigned long) num_written_bytes); // TODO: suspend/error report } # endif -else { dbgprintf("omhdfs has successfully written to file\n"); } RETiRet; } @@ -192,7 +214,7 @@ else { dbgprintf("omhdfs has successfully written to file\n"); } BEGINcreateInstance CODESTARTcreateInstance - pData->fd = -1; + //pData->fd = -1; ENDcreateInstance @@ -214,7 +236,7 @@ ENDtryResume BEGINdoAction CODESTARTdoAction dbgprintf(" (%s)\n", pData->fileName); - iRet = writeFile(ppString, iMsgOpts, pData); + iRet = writeFile(ppString, pData); ENDdoAction @@ -241,6 +263,12 @@ CODESTARTparseSelectorAct // TODO: check for NULL filename CHKmalloc(pData->fileName = ustrdup(fileName)); + if(hdfsHost == NULL) { + pData->hdfsHost = "default"; + } else { + CHKmalloc(pData->hdfsHost = strdup((char*)hdfsHost)); + } + pData->hdfsPort = hdfsPort; prepareFile(pData, pData->fileName); @@ -271,6 +299,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a fDirCreateMode = 0700; bCreateDirs = 1; */ + hdfsHost = NULL; + hdfsPort = 0; return RS_RET_OK; } @@ -291,6 +321,8 @@ CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); CODEmodInit_QueryRegCFSLineHdlr ENDmodInit |