summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-10-05 14:52:23 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-10-05 14:52:23 +0200
commit1dbb85d7ba2e135593944afdaaa69aeb4fd49302 (patch)
tree40d81f1a2a6bb211157add77d5bf36acfce78022 /plugins
parent6ea4a38079cb1508c103930e327fa92ada97176f (diff)
parenta627ac99ba2c3404ca926a19fb06cbd6f43b53c8 (diff)
downloadrsyslog-1dbb85d7ba2e135593944afdaaa69aeb4fd49302.tar.gz
rsyslog-1dbb85d7ba2e135593944afdaaa69aeb4fd49302.tar.xz
rsyslog-1dbb85d7ba2e135593944afdaaa69aeb4fd49302.zip
Merge branch 'v5-devel-hdfs' into v5-devel
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imuxsock/imuxsock.c4
-rw-r--r--plugins/omhdfs/Makefile.am6
-rw-r--r--plugins/omhdfs/omhdfs.c475
3 files changed, 483 insertions, 2 deletions
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 75a6b00b..41bff4f9 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -279,7 +279,7 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal)
}
CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
if(ratelimitInterval > 0) {
- if((listeners[nfd].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn)) == NULL) {
+ if((listeners[nfd].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
/* in this case, we simply turn of rate-limiting */
dbgprintf("imuxsock: turning off rate limiting because we could not "
"create hash table\n");
@@ -749,7 +749,7 @@ CODESTARTwillRun
if(pLogSockName != NULL)
listeners[0].sockName = pLogSockName;
if(ratelimitIntervalSysSock > 0) {
- if((listeners[0].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn)) == NULL) {
+ if((listeners[0].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
/* in this case, we simply turn of rate-limiting */
dbgprintf("imuxsock: turning off rate limiting because we could not "
"create hash table\n");
diff --git a/plugins/omhdfs/Makefile.am b/plugins/omhdfs/Makefile.am
new file mode 100644
index 00000000..95e6b102
--- /dev/null
+++ b/plugins/omhdfs/Makefile.am
@@ -0,0 +1,6 @@
+pkglib_LTLIBRARIES = omhdfs.la
+
+omhdfs_la_SOURCES = omhdfs.c
+omhdfs_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) $(JAVA_INCLUDES)
+omhdfs_la_LDFLAGS = -module -avoid-version -lhdfs $(JAVA_LIBS)
+omhdfs_la_LIBADD = $(RSRT_LIBS)
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
new file mode 100644
index 00000000..9705b7fd
--- /dev/null
+++ b/plugins/omhdfs/omhdfs.c
@@ -0,0 +1,475 @@
+/* omhdfs.c
+ * This is an output module to support Hadoop's HDFS.
+ *
+ * NOTE: read comments in module-template.h to understand how this file
+ * works!
+ *
+ * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <assert.h>
+#include <errno.h>
+#include <ctype.h>
+#include <unistd.h>
+#include <sys/file.h>
+#include <pthread.h>
+#include <hdfs.h>
+
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "conf.h"
+#include "cfsysline.h"
+#include "module-template.h"
+#include "unicode-helper.h"
+#include "errmsg.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+
+MODULE_TYPE_OUTPUT
+
+/* internal structures
+ */
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+
+/* global data */
+static struct hashtable *files; /* holds all file objects that we know */
+
+/* globals for default values */
+static uchar *fileName = NULL;
+static uchar *hdfsHost = NULL;
+static uchar *dfltTplName = NULL; /* default template name to use */
+int hdfsPort = 0;
+/* end globals for default values */
+
+typedef struct {
+ uchar *name;
+ hdfsFS fs;
+ hdfsFile fh;
+ const char *hdfsHost;
+ tPort hdfsPort;
+ int nUsers;
+ pthread_mutex_t mut;
+} file_t;
+
+
+typedef struct _instanceData {
+ file_t *pFile;
+} instanceData;
+
+/* forward definitions (down here, need data types) */
+static inline rsRetVal fileClose(file_t *pFile);
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURERepeatedMsgReduction)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ printf("omhdfs: file:%s", pData->pFile->name);
+ENDdbgPrintInstInfo
+
+
+/* note that hdfsFileExists() does not work, so we did our
+ * own function to see if a pathname exists. Returns 0 if the
+ * file does not exists, something else otherwise. Note that
+ * we can also check a directroy (if that matters...)
+ */
+static int
+HDFSFileExists(hdfsFS fs, uchar *name)
+{
+ int r;
+ hdfsFileInfo *info;
+
+ info = hdfsGetPathInfo(fs, (char*)name);
+ /* if things go wrong, we assume it is because the file
+ * does not exist. We do not get too much information...
+ */
+ if(info == NULL) {
+ r = 0;
+ } else {
+ r = 1;
+ hdfsFreeFileInfo(info, 1);
+ }
+ return r;
+}
+
+static inline rsRetVal
+HDFSmkdir(hdfsFS fs, uchar *name)
+{
+ DEFiRet;
+ if(hdfsCreateDirectory(fs, (char*)name) == -1)
+ ABORT_FINALIZE(RS_RET_ERR);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* ---BEGIN FILE OBJECT---------------------------------------------------- */
+/* This code handles the "file object". This is split from the actual
+ * instance data, because several instances may write into the same file.
+ * If so, we need to use a single object, and also synchronize their writes.
+ * So we keep the file object separately, and just stick a reference into
+ * the instance data.
+ */
+
+static inline rsRetVal
+fileObjConstruct(file_t **ppFile)
+{
+ file_t *pFile;
+ DEFiRet;
+
+ CHKmalloc(pFile = malloc(sizeof(file_t)));
+ pFile->name = NULL;
+ pFile->hdfsHost = NULL;
+ pFile->fh = NULL;
+ pFile->nUsers = 0;
+
+ *ppFile = pFile;
+finalize_it:
+ RETiRet;
+}
+
+static inline void
+fileObjAddUser(file_t *pFile)
+{
+ /* init mutex only when second user is added */
+ ++pFile->nUsers;
+ if(pFile->nUsers == 2)
+ pthread_mutex_init(&pFile->mut, NULL);
+ DBGPRINTF("omhdfs: file %s now being used by %d actions\n", pFile->name, pFile->nUsers);
+}
+
+static rsRetVal
+fileObjDestruct(file_t **ppFile)
+{
+ file_t *pFile = *ppFile;
+ if(pFile->nUsers > 1)
+ pthread_mutex_destroy(&pFile->mut);
+ fileClose(pFile);
+ free(pFile->name);
+ free((char*)pFile->hdfsHost);
+ free(pFile->fh);
+
+ return RS_RET_OK;
+}
+
+
+/* check, and potentially create, all names inside a path */
+static rsRetVal
+filePrepare(file_t *pFile)
+{
+ uchar *p;
+ uchar *pszWork;
+ size_t len;
+ DEFiRet;
+
+ if(HDFSFileExists(pFile->fs, pFile->name))
+ FINALIZE;
+
+ /* file does not exist, create it (and eventually parent directories */
+ if(1) { // check if bCreateDirs
+ len = ustrlen(pFile->name) + 1;
+ CHKmalloc(pszWork = MALLOC(sizeof(uchar) * len));
+ memcpy(pszWork, pFile->name, len);
+ for(p = pszWork+1 ; *p ; p++)
+ if(*p == '/') {
+ /* temporarily terminate string, create dir and go on */
+ *p = '\0';
+ if(!HDFSFileExists(pFile->fs, pszWork)) {
+ CHKiRet(HDFSmkdir(pFile->fs, pszWork));
+ }
+ *p = '/';
+ }
+ free(pszWork);
+ return 0;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* this function is to be used as destructor for the
+ * hash table code.
+ */
+static void
+fileObjDestruct4Hashtable(void *ptr)
+{
+ file_t *pFile = (file_t*) ptr;
+ fileObjDestruct(&pFile);
+}
+
+
+static inline rsRetVal
+fileOpen(file_t *pFile)
+{
+ DEFiRet;
+
+ assert(pFile->fh == NULL);
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+
+ DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n",
+ pFile->hdfsHost, pFile->hdfsPort);
+ pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort);
+ if(pFile->fs == NULL) {
+ DBGPRINTF("omhdfs: error can not connect to hdfs\n");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+ CHKiRet(filePrepare(pFile));
+
+ pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_APPEND, 0, 0, 0);
+ if(pFile->fh == NULL) {
+ /* maybe the file does not exist, so we try to create it now.
+ * Note that we can not use hdfsExists() because of a deficit in
+ * it: https://issues.apache.org/jira/browse/HDFS-1154
+ * As of my testing, libhdfs at least seems to return ENOENT if
+ * the file does not exist.
+ */
+ if(errno == ENOENT) {
+ DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n",
+ pFile->name);
+ pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
+ }
+ }
+ if(pFile->fh == NULL) {
+ DBGPRINTF("omhdfs: failed to open %s for writing!\n", pFile->name);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+ RETiRet;
+}
+
+
+static inline rsRetVal
+fileWrite(file_t *pFile, uchar *buf)
+{
+ size_t lenWrite;
+ DEFiRet;
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+
+ /* open file if not open. This must be done *here* and while mutex-protected
+ * because of HUP handling (which is async to normal processing!).
+ */
+ if(pFile->fh == NULL) {
+ fileOpen(pFile);
+ if(pFile->fh == NULL) {
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ }
+
+ lenWrite = strlen((char*) buf);
+ tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
+ if((unsigned) num_written_bytes != lenWrite) {
+ errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, "
+ "written %lu\n", pFile->name, (unsigned long) lenWrite,
+ (unsigned long) num_written_bytes);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+ RETiRet;
+}
+
+
+static inline rsRetVal
+fileClose(file_t *pFile)
+{
+ DEFiRet;
+
+ if(pFile->fh == NULL)
+ FINALIZE;
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+
+ hdfsCloseFile(pFile->fs, pFile->fh);
+ pFile->fh = NULL;
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+
+finalize_it:
+ RETiRet;
+}
+
+/* ---END FILE OBJECT---------------------------------------------------- */
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ pData->pFile = NULL;
+ENDcreateInstance
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ if(pData->pFile != NULL)
+ fileObjDestruct(&pData->pFile);
+ENDfreeInstance
+
+
+BEGINtryResume
+CODESTARTtryResume
+ fileClose(pData->pFile);
+ fileOpen(pData->pFile);
+ if(pData->pFile->fh == NULL){
+ dbgprintf("omhdfs: tried to resume file %s, but still no luck...\n",
+ pData->pFile->name);
+ iRet = RS_RET_SUSPENDED;
+ }
+ENDtryResume
+
+BEGINdoAction
+CODESTARTdoAction
+ DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name);
+ iRet = fileWrite(pData->pFile, ppString[0]);
+ENDdoAction
+
+
+BEGINparseSelectorAct
+ file_t *pFile;
+ int r;
+ uchar *keybuf;
+CODESTARTparseSelectorAct
+
+ /* first check if this config line is actually for us */
+ if(strncmp((char*) p, ":omhdfs:", sizeof(":omhdfs:") - 1)) {
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+ }
+
+ /* ok, if we reach this point, we have something for us */
+ p += sizeof(":omhdfs:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
+ CHKiRet(createInstance(&pData));
+ CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0,
+ (dfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : dfltTplName));
+
+ if(fileName == NULL) {
+ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue");
+ ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED);
+ }
+
+ pFile = hashtable_search(files, fileName);
+ if(pFile == NULL) {
+ /* we need a new file object, this one not seen before */
+ CHKiRet(fileObjConstruct(&pFile));
+ CHKmalloc(pFile->name = fileName);
+ CHKmalloc(keybuf = ustrdup(fileName));
+ fileName = NULL; /* re-set, data passed to file object */
+ CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost));
+ pFile->hdfsPort = hdfsPort;
+ fileOpen(pFile);
+ if(pFile->fh == NULL){
+ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - "
+ "retrying later", pFile->name);
+ iRet = RS_RET_SUSPENDED;
+ }
+ r = hashtable_insert(files, keybuf, pFile);
+ if(r == 0)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ fileObjAddUser(pFile);
+ pData->pFile = pFile;
+
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINdoHUP
+ file_t *pFile;
+ struct hashtable_itr *itr;
+CODESTARTdoHUP
+ DBGPRINTF("omhdfs: HUP received (file count %d)\n", hashtable_count(files));
+ /* Iterator constructor only returns a valid iterator if
+ * the hashtable is not empty */
+ itr = hashtable_iterator(files);
+ if(hashtable_count(files) > 0)
+ {
+ do {
+ pFile = (file_t *) hashtable_iterator_value(itr);
+ fileClose(pFile);
+ DBGPRINTF("omhdfs: HUP, closing file %s\n", pFile->name);
+ } while (hashtable_iterator_advance(itr));
+ }
+ENDdoHUP
+
+
+/* Reset config variables for this module to default values.
+ * rgerhards, 2007-07-17
+ */
+static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
+{
+ hdfsHost = NULL;
+ hdfsPort = 0;
+ return RS_RET_OK;
+}
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ if(files != NULL)
+ hashtable_destroy(files, 1); /* 1 => free all values automatically */
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_doHUP
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string,
+ fileObjDestruct4Hashtable));
+
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
+CODEmodInit_QueryRegCFSLineHdlr
+ENDmodInit