From d8a1489f545179591abced679ba24831d85ca224 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 1 Oct 2010 11:29:50 +0200 Subject: omhdfs: cleanup and lots of improvement now things look much better, also done some prep in order to support a file cache (we need this for multiple selectors writing to the same file). --- plugins/omhdfs/omhdfs.c | 247 +++++++++++++++++++++++++++++------------------- 1 file changed, 150 insertions(+), 97 deletions(-) (limited to 'plugins/omhdfs/omhdfs.c') diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index fcdff6e5..71080585 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -23,13 +23,6 @@ * A copy of the GPL can be found in the file "COPYING" in this distribution. */ -/* this is kind of a hack, if defined, it instructs omhdfs to use - * the regular (non-hdfs) file system calls. This eases development - * (and hopefully troubleshooting) especially in cases when no - * hdfs environment is available. - */ -//#define USE_REGULAR_FS 1 - #include "config.h" #include "rsyslog.h" #include @@ -42,6 +35,8 @@ #include #include #include +#include +#include #include "syslogd-types.h" #include "srUtils.h" @@ -50,15 +45,14 @@ #include "cfsysline.h" #include "module-template.h" #include "unicode-helper.h" -#ifndef USE_REGULAR_FS -# include "hdfs.h" -#endif +#include "errmsg.h" MODULE_TYPE_OUTPUT /* internal structures */ DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) /* globals for default values */ static uchar *fileName = NULL; @@ -66,18 +60,23 @@ static uchar *hdfsHost = NULL; int hdfsPort = 0; /* end globals for default values */ -typedef struct _instanceData { - uchar *fileName; -# ifdef USE_REGULAR_FS - short fd; /* file descriptor for (current) file */ -# else +typedef struct { + uchar *name; hdfsFS fs; - hdfsFile fd; + hdfsFile fh; const char *hdfsHost; tPort hdfsPort; -# endif + int nUsers; + pthread_mutex_t mut; +} file_t; + + +typedef struct _instanceData { + file_t *pFile; } instanceData; +/* forward definitions (down here, need data types) */ +static inline rsRetVal fileClose(file_t *pFile); BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -88,9 +87,7 @@ ENDisCompatibleWithFeature BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo - printf("omhdfs: file:%s", pData->fileName); - //if (pData->fd == -1) - //printf(" (unused)"); + printf("omhdfs: file:%s", pData->pFile->name); ENDdbgPrintInstInfo @@ -100,10 +97,10 @@ static void prepareFile(instanceData *pData, uchar *newFileName) { if(access((char*)newFileName, F_OK) == 0) { /* file already exists */ - pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, + pData->fh = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, pData->fCreateMode); } else { - pData->fd = -1; + pData->fh = -1; /* file does not exist, create it (and eventually parent directories */ if(pData->bCreateDirs) { /* we fist need to create parent dirs if they are missing @@ -119,18 +116,18 @@ static void prepareFile(instanceData *pData, uchar *newFileName) /* no matter if we needed to create directories or not, we now try to create * the file. -- rgerhards, 2008-12-18 (based on patch from William Tisater) */ - pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, + pData->fh = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, pData->fCreateMode); - if(pData->fd != -1) { + if(pData->fh != -1) { /* check and set uid/gid */ if(pData->fileUID != (uid_t)-1 || pData->fileGID != (gid_t) -1) { /* we need to set owner/group */ - if(fchown(pData->fd, pData->fileUID, + if(fchown(pData->fh, pData->fileUID, pData->fileGID) != 0) { if(pData->bFailOnChown) { int eSave = errno; - close(pData->fd); - pData->fd = -1; + close(pData->fh); + pData->fh = -1; errno = eSave; } /* we will silently ignore the chown() failure @@ -143,20 +140,71 @@ static void prepareFile(instanceData *pData, uchar *newFileName) } #endif -static void -prepareFile(instanceData *pData, uchar *newFileName) +/* ---BEGIN FILE OBJECT---------------------------------------------------- */ +/* This code handles the "file object". This is split from the actual + * instance data, because several instances may write into the same file. + * If so, we need to use a single object, and also synchronize their writes. + * So we keep the file object separately, and just stick a reference into + * the instance data. + */ + +static inline rsRetVal +fileObjConstruct(file_t **ppFile) +{ + file_t *pFile; + DEFiRet; + + CHKmalloc(pFile = malloc(sizeof(file_t))); + pFile->name = NULL; + pFile->hdfsHost = NULL; + pFile->fh = NULL; + pFile->nUsers = 0; + + *ppFile = pFile; +finalize_it: + RETiRet; +} + +static inline void +fileObjAddUser(file_t *pFile) +{ + /* init mutex only when second user is added */ + ++pFile->nUsers; + if(pFile->nUsers == 2) + pthread_mutex_init(&pFile->mut, NULL); +} + +static inline rsRetVal +fileObjDestruct(file_t **ppFile) +{ + file_t *pFile = *ppFile; + if(pFile->nUsers > 1) + pthread_mutex_destroy(&pFile->mut); + fileClose(pFile); + free(pFile->name); + free((char*)pFile->hdfsHost); + free(pFile->fh); + + return RS_RET_OK; +} + +static inline rsRetVal +fileOpen(file_t *pFile) { -# if USE_REGULAR_FS - pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, 0666); -# else - dbgprintf("omhdfs: try to connect to host '%s' at port %d\n", - pData->hdfsHost, pData->hdfsPort); - pData->fs = hdfsConnect(pData->hdfsHost, pData->hdfsPort); - if(pData->fs == NULL) { - dbgprintf("omhdfs: error can not connect to hdfs\n"); + DEFiRet; + + assert(pFile->fh == NULL); + if(pFile->nUsers > 1) + d_pthread_mutex_lock(&pFile->mut); + DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n", + pFile->hdfsHost, pFile->hdfsPort); + pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort); + if(pFile->fs == NULL) { + DBGPRINTF("omhdfs: error can not connect to hdfs\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); } - pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_APPEND, 0, 0, 0); - if(pData->fd == NULL) { + pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_APPEND, 0, 0, 0); + if(pFile->fh == NULL) { /* maybe the file does not exist, so we try to create it now. * Note that we can not use hdfsExists() because of a deficit in * it: https://issues.apache.org/jira/browse/HDFS-1154 @@ -164,68 +212,75 @@ prepareFile(instanceData *pData, uchar *newFileName) * the file does not exist. */ if(errno == ENOENT) { - dbgprintf("omhdfs: ENOENT trying to append to '%s', now trying create\n", - newFileName); - pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_CREAT, 0, 0, 0); + DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n", + pFile->name); + pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); } } - if(!pData->fd) { - dbgprintf("omhdfs: failed to open %s for writing!\n", newFileName); - // TODO: suspend/error report + if(pFile->fh == NULL) { + DBGPRINTF("omhdfs: failed to open %s for writing!\n", pFile->name); + ABORT_FINALIZE(RS_RET_SUSPENDED); } -# endif +finalize_it: + if(pFile->nUsers > 1) + d_pthread_mutex_unlock(&pFile->mut); + RETiRet; } -static rsRetVal writeFile(uchar **ppString, instanceData *pData) + +static inline rsRetVal +fileWrite(file_t *pFile, uchar *buf) { size_t lenWrite; DEFiRet; -# if USE_REGULAR_FS - if (write(pData->fd, ppString[0], strlen((char*)ppString[0])) < 0) { - int e = errno; - dbgprintf("omhdfs write error!\n"); - - /* If the filesystem is filled up, just ignore - * it for now and continue writing when possible - */ - //if(pData->fileType == eTypeFILE && e == ENOSPC) - //return RS_RET_OK; - - //(void) close(pData->fd); - iRet = RS_RET_DISABLE_ACTION; - errno = e; - //logerror((char*) pData->f_fname); - } -# else - lenWrite = strlen((char*) ppString[0]); - tSize num_written_bytes = hdfsWrite(pData->fs, pData->fd, ppString[0], lenWrite); + assert(pFile->fh != NULL); + if(pFile->nUsers > 1) + d_pthread_mutex_lock(&pFile->mut); + lenWrite = strlen((char*) buf); + tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite); if((unsigned) num_written_bytes != lenWrite) { - dbgprintf("omhdfs: failed to write %s, expected %lu bytes, written %lu\n", pData->fileName, - lenWrite, (unsigned long) num_written_bytes); - // TODO: suspend/error report + errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, " + "written %lu\n", pFile->name, lenWrite, (unsigned long) num_written_bytes); + ABORT_FINALIZE(RS_RET_SUSPENDED); } -# endif +finalize_it: + if(pFile->nUsers > 1) + d_pthread_mutex_unlock(&pFile->mut); RETiRet; } +static inline rsRetVal +fileClose(file_t *pFile) +{ + if(pFile->nUsers > 1) + d_pthread_mutex_lock(&pFile->mut); + if(pFile->fh != NULL) { + hdfsCloseFile(pFile->fs, pFile->fh); + pFile->fh = NULL; + } + + if(pFile->nUsers > 1) + d_pthread_mutex_unlock(&pFile->mut); + + return RS_RET_OK; +} + +/* ---END FILE OBJECT---------------------------------------------------- */ + + BEGINcreateInstance CODESTARTcreateInstance - //pData->fd = -1; + pData->pFile = NULL; ENDcreateInstance BEGINfreeInstance CODESTARTfreeInstance -# ifdef USE_REGULAR_FS - if(pData->fd != -1) - close(pData->fd); -# else - hdfsCloseFile(pData->fs, pData->fd); -# endif + fileObjDestruct(&pData->pFile); ENDfreeInstance @@ -235,8 +290,8 @@ ENDtryResume BEGINdoAction CODESTARTdoAction - dbgprintf(" (%s)\n", pData->fileName); - iRet = writeFile(ppString, pData); + DBGPRINTF(" (%s)\n", pData->pFile->name); + iRet = fileWrite(pData->pFile, ppString[0]); ENDdoAction @@ -251,34 +306,29 @@ CODESTARTparseSelectorAct /* ok, if we reach this point, we have something for us */ p += sizeof(":omhdfs:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ CHKiRet(createInstance(&pData)); - CODE_STD_STRING_REQUESTparseSelectorAct(1) - /* rgerhards 2004-11-17: from now, we need to have different - * processing, because after the first comma, the template name - * to use is specified. So we need to scan for the first coma first - * and then look at the rest of the line. - */ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0, (uchar*) "RSYSLOG_FileFormat")); //(pszFileDfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : pszFileDfltTplName)); - // TODO: check for NULL filename - CHKmalloc(pData->fileName = ustrdup(fileName)); + if(fileName == NULL) { + errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue"); + ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED); + } + + CHKiRet(fileObjConstruct(&pData->pFile)); if(hdfsHost == NULL) { - pData->hdfsHost = "default"; + CHKmalloc(pData->pFile->hdfsHost = strdup("default")); } else { - CHKmalloc(pData->hdfsHost = strdup((char*)hdfsHost)); + CHKmalloc(pData->pFile->hdfsHost = strdup((char*)hdfsHost)); } - pData->hdfsPort = hdfsPort; + pData->pFile->hdfsPort = hdfsPort; - prepareFile(pData, pData->fileName); + fileOpen(pData->pFile); -#if 0 - if ( pData->fd < 0 ){ - pData->fd = -1; - dbgprintf("Error opening log file: %s\n", pData->f_fname); - logerror((char*) pData->f_fname); + if(pData->pFile->fh == NULL){ + errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - retrying later", pData->pFile->name); + iRet = RS_RET_SUSPENDED; } -#endif CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -307,6 +357,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a BEGINmodExit CODESTARTmodExit + objRelease(errmsg, CORE_COMPONENT); ENDmodExit @@ -320,6 +371,8 @@ BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL)); -- cgit