summaryrefslogtreecommitdiffstats
path: root/plugins/omhdfs/omhdfs.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-10-01 11:29:50 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-10-01 11:29:50 +0200
commitd8a1489f545179591abced679ba24831d85ca224 (patch)
tree361e35bc04d2f2b436321ff4eeb3cd1434aa11cb /plugins/omhdfs/omhdfs.c
parent1d609fc5ab9b5d1c89d5d7d8f321c4e8493b4437 (diff)
downloadrsyslog-d8a1489f545179591abced679ba24831d85ca224.tar.gz
rsyslog-d8a1489f545179591abced679ba24831d85ca224.tar.xz
rsyslog-d8a1489f545179591abced679ba24831d85ca224.zip
omhdfs: cleanup and lots of improvement
now things look much better, also done some prep in order to support a file cache (we need this for multiple selectors writing to the same file).
Diffstat (limited to 'plugins/omhdfs/omhdfs.c')
-rw-r--r--plugins/omhdfs/omhdfs.c247
1 files changed, 150 insertions, 97 deletions
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index fcdff6e5..71080585 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -23,13 +23,6 @@
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
-/* this is kind of a hack, if defined, it instructs omhdfs to use
- * the regular (non-hdfs) file system calls. This eases development
- * (and hopefully troubleshooting) especially in cases when no
- * hdfs environment is available.
- */
-//#define USE_REGULAR_FS 1
-
#include "config.h"
#include "rsyslog.h"
#include <stdio.h>
@@ -42,6 +35,8 @@
#include <ctype.h>
#include <unistd.h>
#include <sys/file.h>
+#include <pthread.h>
+#include <hdfs.h>
#include "syslogd-types.h"
#include "srUtils.h"
@@ -50,15 +45,14 @@
#include "cfsysline.h"
#include "module-template.h"
#include "unicode-helper.h"
-#ifndef USE_REGULAR_FS
-# include "hdfs.h"
-#endif
+#include "errmsg.h"
MODULE_TYPE_OUTPUT
/* internal structures
*/
DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
/* globals for default values */
static uchar *fileName = NULL;
@@ -66,18 +60,23 @@ static uchar *hdfsHost = NULL;
int hdfsPort = 0;
/* end globals for default values */
-typedef struct _instanceData {
- uchar *fileName;
-# ifdef USE_REGULAR_FS
- short fd; /* file descriptor for (current) file */
-# else
+typedef struct {
+ uchar *name;
hdfsFS fs;
- hdfsFile fd;
+ hdfsFile fh;
const char *hdfsHost;
tPort hdfsPort;
-# endif
+ int nUsers;
+ pthread_mutex_t mut;
+} file_t;
+
+
+typedef struct _instanceData {
+ file_t *pFile;
} instanceData;
+/* forward definitions (down here, need data types) */
+static inline rsRetVal fileClose(file_t *pFile);
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -88,9 +87,7 @@ ENDisCompatibleWithFeature
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
- printf("omhdfs: file:%s", pData->fileName);
- //if (pData->fd == -1)
- //printf(" (unused)");
+ printf("omhdfs: file:%s", pData->pFile->name);
ENDdbgPrintInstInfo
@@ -100,10 +97,10 @@ static void prepareFile(instanceData *pData, uchar *newFileName)
{
if(access((char*)newFileName, F_OK) == 0) {
/* file already exists */
- pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY,
+ pData->fh = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY,
pData->fCreateMode);
} else {
- pData->fd = -1;
+ pData->fh = -1;
/* file does not exist, create it (and eventually parent directories */
if(pData->bCreateDirs) {
/* we fist need to create parent dirs if they are missing
@@ -119,18 +116,18 @@ static void prepareFile(instanceData *pData, uchar *newFileName)
/* no matter if we needed to create directories or not, we now try to create
* the file. -- rgerhards, 2008-12-18 (based on patch from William Tisater)
*/
- pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY,
+ pData->fh = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY,
pData->fCreateMode);
- if(pData->fd != -1) {
+ if(pData->fh != -1) {
/* check and set uid/gid */
if(pData->fileUID != (uid_t)-1 || pData->fileGID != (gid_t) -1) {
/* we need to set owner/group */
- if(fchown(pData->fd, pData->fileUID,
+ if(fchown(pData->fh, pData->fileUID,
pData->fileGID) != 0) {
if(pData->bFailOnChown) {
int eSave = errno;
- close(pData->fd);
- pData->fd = -1;
+ close(pData->fh);
+ pData->fh = -1;
errno = eSave;
}
/* we will silently ignore the chown() failure
@@ -143,20 +140,71 @@ static void prepareFile(instanceData *pData, uchar *newFileName)
}
#endif
-static void
-prepareFile(instanceData *pData, uchar *newFileName)
+/* ---BEGIN FILE OBJECT---------------------------------------------------- */
+/* This code handles the "file object". This is split from the actual
+ * instance data, because several instances may write into the same file.
+ * If so, we need to use a single object, and also synchronize their writes.
+ * So we keep the file object separately, and just stick a reference into
+ * the instance data.
+ */
+
+static inline rsRetVal
+fileObjConstruct(file_t **ppFile)
+{
+ file_t *pFile;
+ DEFiRet;
+
+ CHKmalloc(pFile = malloc(sizeof(file_t)));
+ pFile->name = NULL;
+ pFile->hdfsHost = NULL;
+ pFile->fh = NULL;
+ pFile->nUsers = 0;
+
+ *ppFile = pFile;
+finalize_it:
+ RETiRet;
+}
+
+static inline void
+fileObjAddUser(file_t *pFile)
+{
+ /* init mutex only when second user is added */
+ ++pFile->nUsers;
+ if(pFile->nUsers == 2)
+ pthread_mutex_init(&pFile->mut, NULL);
+}
+
+static inline rsRetVal
+fileObjDestruct(file_t **ppFile)
+{
+ file_t *pFile = *ppFile;
+ if(pFile->nUsers > 1)
+ pthread_mutex_destroy(&pFile->mut);
+ fileClose(pFile);
+ free(pFile->name);
+ free((char*)pFile->hdfsHost);
+ free(pFile->fh);
+
+ return RS_RET_OK;
+}
+
+static inline rsRetVal
+fileOpen(file_t *pFile)
{
-# if USE_REGULAR_FS
- pData->fd = open((char*) newFileName, O_WRONLY|O_APPEND|O_CREAT|O_NOCTTY, 0666);
-# else
- dbgprintf("omhdfs: try to connect to host '%s' at port %d\n",
- pData->hdfsHost, pData->hdfsPort);
- pData->fs = hdfsConnect(pData->hdfsHost, pData->hdfsPort);
- if(pData->fs == NULL) {
- dbgprintf("omhdfs: error can not connect to hdfs\n");
+ DEFiRet;
+
+ assert(pFile->fh == NULL);
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+ DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n",
+ pFile->hdfsHost, pFile->hdfsPort);
+ pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort);
+ if(pFile->fs == NULL) {
+ DBGPRINTF("omhdfs: error can not connect to hdfs\n");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
- pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_APPEND, 0, 0, 0);
- if(pData->fd == NULL) {
+ pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_APPEND, 0, 0, 0);
+ if(pFile->fh == NULL) {
/* maybe the file does not exist, so we try to create it now.
* Note that we can not use hdfsExists() because of a deficit in
* it: https://issues.apache.org/jira/browse/HDFS-1154
@@ -164,68 +212,75 @@ prepareFile(instanceData *pData, uchar *newFileName)
* the file does not exist.
*/
if(errno == ENOENT) {
- dbgprintf("omhdfs: ENOENT trying to append to '%s', now trying create\n",
- newFileName);
- pData->fd = hdfsOpenFile(pData->fs, (char*)newFileName, O_WRONLY|O_CREAT, 0, 0, 0);
+ DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n",
+ pFile->name);
+ pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
}
}
- if(!pData->fd) {
- dbgprintf("omhdfs: failed to open %s for writing!\n", newFileName);
- // TODO: suspend/error report
+ if(pFile->fh == NULL) {
+ DBGPRINTF("omhdfs: failed to open %s for writing!\n", pFile->name);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
-# endif
+finalize_it:
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+ RETiRet;
}
-static rsRetVal writeFile(uchar **ppString, instanceData *pData)
+
+static inline rsRetVal
+fileWrite(file_t *pFile, uchar *buf)
{
size_t lenWrite;
DEFiRet;
-# if USE_REGULAR_FS
- if (write(pData->fd, ppString[0], strlen((char*)ppString[0])) < 0) {
- int e = errno;
- dbgprintf("omhdfs write error!\n");
-
- /* If the filesystem is filled up, just ignore
- * it for now and continue writing when possible
- */
- //if(pData->fileType == eTypeFILE && e == ENOSPC)
- //return RS_RET_OK;
-
- //(void) close(pData->fd);
- iRet = RS_RET_DISABLE_ACTION;
- errno = e;
- //logerror((char*) pData->f_fname);
- }
-# else
- lenWrite = strlen((char*) ppString[0]);
- tSize num_written_bytes = hdfsWrite(pData->fs, pData->fd, ppString[0], lenWrite);
+ assert(pFile->fh != NULL);
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+ lenWrite = strlen((char*) buf);
+ tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
if((unsigned) num_written_bytes != lenWrite) {
- dbgprintf("omhdfs: failed to write %s, expected %lu bytes, written %lu\n", pData->fileName,
- lenWrite, (unsigned long) num_written_bytes);
- // TODO: suspend/error report
+ errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, "
+ "written %lu\n", pFile->name, lenWrite, (unsigned long) num_written_bytes);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
-# endif
+finalize_it:
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
RETiRet;
}
+static inline rsRetVal
+fileClose(file_t *pFile)
+{
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+ if(pFile->fh != NULL) {
+ hdfsCloseFile(pFile->fs, pFile->fh);
+ pFile->fh = NULL;
+ }
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+
+ return RS_RET_OK;
+}
+
+/* ---END FILE OBJECT---------------------------------------------------- */
+
+
BEGINcreateInstance
CODESTARTcreateInstance
- //pData->fd = -1;
+ pData->pFile = NULL;
ENDcreateInstance
BEGINfreeInstance
CODESTARTfreeInstance
-# ifdef USE_REGULAR_FS
- if(pData->fd != -1)
- close(pData->fd);
-# else
- hdfsCloseFile(pData->fs, pData->fd);
-# endif
+ fileObjDestruct(&pData->pFile);
ENDfreeInstance
@@ -235,8 +290,8 @@ ENDtryResume
BEGINdoAction
CODESTARTdoAction
- dbgprintf(" (%s)\n", pData->fileName);
- iRet = writeFile(ppString, pData);
+ DBGPRINTF(" (%s)\n", pData->pFile->name);
+ iRet = fileWrite(pData->pFile, ppString[0]);
ENDdoAction
@@ -251,34 +306,29 @@ CODESTARTparseSelectorAct
/* ok, if we reach this point, we have something for us */
p += sizeof(":omhdfs:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
CHKiRet(createInstance(&pData));
-
CODE_STD_STRING_REQUESTparseSelectorAct(1)
- /* rgerhards 2004-11-17: from now, we need to have different
- * processing, because after the first comma, the template name
- * to use is specified. So we need to scan for the first coma first
- * and then look at the rest of the line.
- */
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0, (uchar*) "RSYSLOG_FileFormat"));
//(pszFileDfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : pszFileDfltTplName));
- // TODO: check for NULL filename
- CHKmalloc(pData->fileName = ustrdup(fileName));
+ if(fileName == NULL) {
+ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue");
+ ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED);
+ }
+
+ CHKiRet(fileObjConstruct(&pData->pFile));
if(hdfsHost == NULL) {
- pData->hdfsHost = "default";
+ CHKmalloc(pData->pFile->hdfsHost = strdup("default"));
} else {
- CHKmalloc(pData->hdfsHost = strdup((char*)hdfsHost));
+ CHKmalloc(pData->pFile->hdfsHost = strdup((char*)hdfsHost));
}
- pData->hdfsPort = hdfsPort;
+ pData->pFile->hdfsPort = hdfsPort;
- prepareFile(pData, pData->fileName);
+ fileOpen(pData->pFile);
-#if 0
- if ( pData->fd < 0 ){
- pData->fd = -1;
- dbgprintf("Error opening log file: %s\n", pData->f_fname);
- logerror((char*) pData->f_fname);
+ if(pData->pFile->fh == NULL){
+ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - retrying later", pData->pFile->name);
+ iRet = RS_RET_SUSPENDED;
}
-#endif
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -307,6 +357,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
BEGINmodExit
CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
ENDmodExit
@@ -320,6 +371,8 @@ BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION;
CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+
CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));