summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-10-05 14:52:23 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-10-05 14:52:23 +0200
commit1dbb85d7ba2e135593944afdaaa69aeb4fd49302 (patch)
tree40d81f1a2a6bb211157add77d5bf36acfce78022
parent6ea4a38079cb1508c103930e327fa92ada97176f (diff)
parenta627ac99ba2c3404ca926a19fb06cbd6f43b53c8 (diff)
downloadrsyslog-1dbb85d7ba2e135593944afdaaa69aeb4fd49302.tar.gz
rsyslog-1dbb85d7ba2e135593944afdaaa69aeb4fd49302.tar.xz
rsyslog-1dbb85d7ba2e135593944afdaaa69aeb4fd49302.zip
Merge branch 'v5-devel-hdfs' into v5-devel
-rw-r--r--ChangeLog3
-rw-r--r--Makefile.am4
-rw-r--r--configure.ac19
-rw-r--r--doc/omhdfs.html69
-rw-r--r--doc/rsyslog_conf_modules.html1
-rw-r--r--plugins/imuxsock/imuxsock.c4
-rw-r--r--plugins/omhdfs/Makefile.am6
-rw-r--r--plugins/omhdfs/omhdfs.c475
-rw-r--r--runtime/Makefile.am6
-rw-r--r--runtime/hashtable.c (renamed from runtime/hashtable/hashtable.c)42
-rw-r--r--runtime/hashtable.h7
-rw-r--r--runtime/hashtable_itr.c (renamed from runtime/hashtable/hashtable_itr.c)2
-rw-r--r--runtime/hashtable_itr.h (renamed from runtime/hashtable/hashtable_itr.h)4
-rw-r--r--runtime/hashtable_private.h (renamed from runtime/hashtable/hashtable_private.h)1
-rw-r--r--runtime/rsyslog.h3
15 files changed, 636 insertions, 10 deletions
diff --git a/ChangeLog b/ChangeLog
index 055f40f4..3bb3d882 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,7 @@
---------------------------------------------------------------------------
+Version 5.7.2 [V5-DEVEL] (rgerhards), 2010-09-??
+- support for Hadoop's HDFS added (via omhdfs)
+---------------------------------------------------------------------------
Version 5.7.1 [V5-DEVEL] (rgerhards), 2010-09-??
- imuxsock now optionally use SCM_CREDENTIALS to pull the pid from the log
socket itself
diff --git a/Makefile.am b/Makefile.am
index f3dca447..54b68153 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -147,6 +147,10 @@ if ENABLE_OMUXSOCK
SUBDIRS += plugins/omuxsock
endif
+if ENABLE_OMHDFS
+SUBDIRS += plugins/omhdfs
+endif
+
if ENABLE_OMTEMPLATE
SUBDIRS += plugins/omtemplate
endif
diff --git a/configure.ac b/configure.ac
index 16454d00..7254b99d 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1013,6 +1013,23 @@ AM_CONDITIONAL(ENABLE_OMTEMPLATE, test x$enable_omtemplate = xyes)
# end of copy template - be sure to search for omtemplate to find everything!
+# settings for the omhdfs;
+AC_ARG_ENABLE(omhdfs,
+ [AS_HELP_STRING([--enable-omhdfs],[Compiles omhdfs template module @<:@default=no@:>@])],
+ [case "${enableval}" in
+ yes) enable_omhdfs="yes" ;;
+ no) enable_omhdfs="no" ;;
+ *) AC_MSG_ERROR(bad value ${enableval} for --enable-omhdfs) ;;
+ esac],
+ [enable_omhdfs=no]
+)
+#
+# you may want to do some library checks here - see snmp, mysql, pgsql modules
+# for samples
+#
+AM_CONDITIONAL(ENABLE_OMHDFS, test x$enable_omhdfs = xyes)
+
+
AC_CONFIG_FILES([Makefile \
runtime/Makefile \
tools/Makefile \
@@ -1026,6 +1043,7 @@ AC_CONFIG_FILES([Makefile \
plugins/imklog/Makefile \
plugins/imtemplate/Makefile \
plugins/omtemplate/Makefile \
+ plugins/omhdfs/Makefile \
plugins/omprog/Makefile \
plugins/omstdout/Makefile \
plugins/pmrfc3164sd/Makefile \
@@ -1080,6 +1098,7 @@ echo "---{ output plugins }---"
echo " Mail support enabled: $enable_mail"
echo " omprog module will be compiled: $enable_omprog"
echo " omstdout module will be compiled: $enable_omstdout"
+echo " omhdfs module will be compiled: $enable_omhdfs"
echo " omruleset module will be compiled: $enable_omruleset"
echo " omdbalerting module will be compiled: $enable_omdbalerting"
echo " omudpspoof module will be compiled: $enable_omudpspoof"
diff --git a/doc/omhdfs.html b/doc/omhdfs.html
new file mode 100644
index 00000000..3849f167
--- /dev/null
+++ b/doc/omhdfs.html
@@ -0,0 +1,69 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html><head><title>rsyslog output module for HDFS (omhdfs)</title>
+<a href="features.html">back</a>
+</head>
+<body>
+<h1>Unix sockets Output Module (omhdfs)</h1>
+<p><b>Module Name:&nbsp;&nbsp;&nbsp; omhdfs</b></p>
+<p><b>Available since:&nbsp;&nbsp;&nbsp;</b> 5.7.2</p>
+<p><b>Author: </b>Rainer Gerhards &lt;rgerhards@adiscon.com&gt;</p>
+<p><b>Description</b>:</p>
+<p>This module supports writing message into files on Hadoop's HDFS
+file system.
+<p><b>Configuration Directives</b>:</p>
+<ul>
+<li><b>$OMHDFSFileName</b> [name]<br>
+The name of the file to which the output data shall be written.
+</li>
+<li><b>$OMHDFSHost</b> [name]<br>
+Name or IP address of the HDFS host to connect to.
+</li>
+<li><b>$OMHDFSPort</b> [name]<br>
+Port on which to connect to the HDFS host.
+</li>
+<li><b>$OMHDFSDefaultTemplate</b> [name]<br>
+Default template to be used when none is specified. This saves the work of
+specifying the same template ever and ever again. Of course, the default
+template can be overwritten via the usual method.
+</li>
+</ul>
+<b>Caveats/Known Bugs:</b>
+<p>Building omhdfs is a challenge because we could not yet find out how
+to integrate Java properly into the autotools build process. The issue is
+that HDFS is written in Java and libhdfs uses JNI to talk to it. That requires
+that various system-specific environment options and pathes be set correctly. At
+this point, we leave this to the user. If someone know how to do it better,
+please drop us a line!
+<ul>
+<li>In order to build, you need to set these environment variables BEFORE running
+./configure:
+<ul>
+<li>JAVA_INCLUDES - must have all include pathes that are needed to build
+JNI C programms, including the -I options necessary for gcc. An example is<br>
+# export JAVA_INCLUDES="-I/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/include -I/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/include/linux"
+<li>JAVA_LIBS - must have all library pathes that are needed to build
+JNI C programms, including the -l/-L options necessary for gcc. An example is<br>
+# export export JAVA_LIBS="-L/usr/java/jdk1.6.0_21/jre/lib/amd64 -L/usr/java/jdk1.6.0_21/jre/lib/amd64/server -ljava -ljvm -lverify"
+</ul>
+
+<li>As of HDFS architecture, you must make sure that all relevant environment
+variables (the usual Java stuff and HADOOP's home directory) are properly set.
+<li>As it looks, libhdfs makes Java throw exceptions to stdout. There is no
+known work-around for this (and it usually should not case any troubles.
+</ul>
+<p><b>Sample:</b></p>
+<p>
+</p>
+<textarea rows="4" cols="80">$ModLoad omhdfs
+
+$OMHDFSFileName /var/log/logfile
+*.* :omhdfs:
+</textarea>
+[<a href="manual.html">manual index</a>] [<a href="http://www.rsyslog.com/">rsyslog site</a>]</p>
+<p><font size="2">This documentation is part of the <a href="http://www.rsyslog.com/">rsyslog</a>
+project.<br>
+Copyright &copy; 2010 by <a href="http://www.gerhards.net/rainer">Rainer Gerhards</a> and
+<a href="http://www.adiscon.com/">Adiscon</a>.
+Released under the GNU GPL version 3 or higher.</font></p>
+
+</body></html>
diff --git a/doc/rsyslog_conf_modules.html b/doc/rsyslog_conf_modules.html
index 85899954..74aa319c 100644
--- a/doc/rsyslog_conf_modules.html
+++ b/doc/rsyslog_conf_modules.html
@@ -66,6 +66,7 @@ permits rsyslog to alert folks by mail if something important happens</li>
<li><a href="omoracle.html">omoracle</a> - output module for Oracle (native OCI interface)</li>
<li><a href="omudpspoof.html">omudpspoof</a> - output module sending UDP syslog messages with a spoofed address</li>
<li><a href="omuxsock.html">omuxsock</a> - output module Unix domain sockets</li>
+<li><a href="omhdfs.html">omhdfs</a> - output module for Hadoop's HDFS file system</li>
</ul>
<a name="pm"></a><h2>Parser Modules</h2>
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 75a6b00b..41bff4f9 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -279,7 +279,7 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal)
}
CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
if(ratelimitInterval > 0) {
- if((listeners[nfd].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn)) == NULL) {
+ if((listeners[nfd].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
/* in this case, we simply turn of rate-limiting */
dbgprintf("imuxsock: turning off rate limiting because we could not "
"create hash table\n");
@@ -749,7 +749,7 @@ CODESTARTwillRun
if(pLogSockName != NULL)
listeners[0].sockName = pLogSockName;
if(ratelimitIntervalSysSock > 0) {
- if((listeners[0].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn)) == NULL) {
+ if((listeners[0].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
/* in this case, we simply turn of rate-limiting */
dbgprintf("imuxsock: turning off rate limiting because we could not "
"create hash table\n");
diff --git a/plugins/omhdfs/Makefile.am b/plugins/omhdfs/Makefile.am
new file mode 100644
index 00000000..95e6b102
--- /dev/null
+++ b/plugins/omhdfs/Makefile.am
@@ -0,0 +1,6 @@
+pkglib_LTLIBRARIES = omhdfs.la
+
+omhdfs_la_SOURCES = omhdfs.c
+omhdfs_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) $(JAVA_INCLUDES)
+omhdfs_la_LDFLAGS = -module -avoid-version -lhdfs $(JAVA_LIBS)
+omhdfs_la_LIBADD = $(RSRT_LIBS)
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
new file mode 100644
index 00000000..9705b7fd
--- /dev/null
+++ b/plugins/omhdfs/omhdfs.c
@@ -0,0 +1,475 @@
+/* omhdfs.c
+ * This is an output module to support Hadoop's HDFS.
+ *
+ * NOTE: read comments in module-template.h to understand how this file
+ * works!
+ *
+ * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <assert.h>
+#include <errno.h>
+#include <ctype.h>
+#include <unistd.h>
+#include <sys/file.h>
+#include <pthread.h>
+#include <hdfs.h>
+
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "conf.h"
+#include "cfsysline.h"
+#include "module-template.h"
+#include "unicode-helper.h"
+#include "errmsg.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+
+MODULE_TYPE_OUTPUT
+
+/* internal structures
+ */
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+
+/* global data */
+static struct hashtable *files; /* holds all file objects that we know */
+
+/* globals for default values */
+static uchar *fileName = NULL;
+static uchar *hdfsHost = NULL;
+static uchar *dfltTplName = NULL; /* default template name to use */
+int hdfsPort = 0;
+/* end globals for default values */
+
+typedef struct {
+ uchar *name;
+ hdfsFS fs;
+ hdfsFile fh;
+ const char *hdfsHost;
+ tPort hdfsPort;
+ int nUsers;
+ pthread_mutex_t mut;
+} file_t;
+
+
+typedef struct _instanceData {
+ file_t *pFile;
+} instanceData;
+
+/* forward definitions (down here, need data types) */
+static inline rsRetVal fileClose(file_t *pFile);
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURERepeatedMsgReduction)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ printf("omhdfs: file:%s", pData->pFile->name);
+ENDdbgPrintInstInfo
+
+
+/* note that hdfsFileExists() does not work, so we did our
+ * own function to see if a pathname exists. Returns 0 if the
+ * file does not exists, something else otherwise. Note that
+ * we can also check a directroy (if that matters...)
+ */
+static int
+HDFSFileExists(hdfsFS fs, uchar *name)
+{
+ int r;
+ hdfsFileInfo *info;
+
+ info = hdfsGetPathInfo(fs, (char*)name);
+ /* if things go wrong, we assume it is because the file
+ * does not exist. We do not get too much information...
+ */
+ if(info == NULL) {
+ r = 0;
+ } else {
+ r = 1;
+ hdfsFreeFileInfo(info, 1);
+ }
+ return r;
+}
+
+static inline rsRetVal
+HDFSmkdir(hdfsFS fs, uchar *name)
+{
+ DEFiRet;
+ if(hdfsCreateDirectory(fs, (char*)name) == -1)
+ ABORT_FINALIZE(RS_RET_ERR);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* ---BEGIN FILE OBJECT---------------------------------------------------- */
+/* This code handles the "file object". This is split from the actual
+ * instance data, because several instances may write into the same file.
+ * If so, we need to use a single object, and also synchronize their writes.
+ * So we keep the file object separately, and just stick a reference into
+ * the instance data.
+ */
+
+static inline rsRetVal
+fileObjConstruct(file_t **ppFile)
+{
+ file_t *pFile;
+ DEFiRet;
+
+ CHKmalloc(pFile = malloc(sizeof(file_t)));
+ pFile->name = NULL;
+ pFile->hdfsHost = NULL;
+ pFile->fh = NULL;
+ pFile->nUsers = 0;
+
+ *ppFile = pFile;
+finalize_it:
+ RETiRet;
+}
+
+static inline void
+fileObjAddUser(file_t *pFile)
+{
+ /* init mutex only when second user is added */
+ ++pFile->nUsers;
+ if(pFile->nUsers == 2)
+ pthread_mutex_init(&pFile->mut, NULL);
+ DBGPRINTF("omhdfs: file %s now being used by %d actions\n", pFile->name, pFile->nUsers);
+}
+
+static rsRetVal
+fileObjDestruct(file_t **ppFile)
+{
+ file_t *pFile = *ppFile;
+ if(pFile->nUsers > 1)
+ pthread_mutex_destroy(&pFile->mut);
+ fileClose(pFile);
+ free(pFile->name);
+ free((char*)pFile->hdfsHost);
+ free(pFile->fh);
+
+ return RS_RET_OK;
+}
+
+
+/* check, and potentially create, all names inside a path */
+static rsRetVal
+filePrepare(file_t *pFile)
+{
+ uchar *p;
+ uchar *pszWork;
+ size_t len;
+ DEFiRet;
+
+ if(HDFSFileExists(pFile->fs, pFile->name))
+ FINALIZE;
+
+ /* file does not exist, create it (and eventually parent directories */
+ if(1) { // check if bCreateDirs
+ len = ustrlen(pFile->name) + 1;
+ CHKmalloc(pszWork = MALLOC(sizeof(uchar) * len));
+ memcpy(pszWork, pFile->name, len);
+ for(p = pszWork+1 ; *p ; p++)
+ if(*p == '/') {
+ /* temporarily terminate string, create dir and go on */
+ *p = '\0';
+ if(!HDFSFileExists(pFile->fs, pszWork)) {
+ CHKiRet(HDFSmkdir(pFile->fs, pszWork));
+ }
+ *p = '/';
+ }
+ free(pszWork);
+ return 0;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* this function is to be used as destructor for the
+ * hash table code.
+ */
+static void
+fileObjDestruct4Hashtable(void *ptr)
+{
+ file_t *pFile = (file_t*) ptr;
+ fileObjDestruct(&pFile);
+}
+
+
+static inline rsRetVal
+fileOpen(file_t *pFile)
+{
+ DEFiRet;
+
+ assert(pFile->fh == NULL);
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+
+ DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n",
+ pFile->hdfsHost, pFile->hdfsPort);
+ pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort);
+ if(pFile->fs == NULL) {
+ DBGPRINTF("omhdfs: error can not connect to hdfs\n");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+ CHKiRet(filePrepare(pFile));
+
+ pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_APPEND, 0, 0, 0);
+ if(pFile->fh == NULL) {
+ /* maybe the file does not exist, so we try to create it now.
+ * Note that we can not use hdfsExists() because of a deficit in
+ * it: https://issues.apache.org/jira/browse/HDFS-1154
+ * As of my testing, libhdfs at least seems to return ENOENT if
+ * the file does not exist.
+ */
+ if(errno == ENOENT) {
+ DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n",
+ pFile->name);
+ pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
+ }
+ }
+ if(pFile->fh == NULL) {
+ DBGPRINTF("omhdfs: failed to open %s for writing!\n", pFile->name);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+ RETiRet;
+}
+
+
+static inline rsRetVal
+fileWrite(file_t *pFile, uchar *buf)
+{
+ size_t lenWrite;
+ DEFiRet;
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+
+ /* open file if not open. This must be done *here* and while mutex-protected
+ * because of HUP handling (which is async to normal processing!).
+ */
+ if(pFile->fh == NULL) {
+ fileOpen(pFile);
+ if(pFile->fh == NULL) {
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ }
+
+ lenWrite = strlen((char*) buf);
+ tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
+ if((unsigned) num_written_bytes != lenWrite) {
+ errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, "
+ "written %lu\n", pFile->name, (unsigned long) lenWrite,
+ (unsigned long) num_written_bytes);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+ RETiRet;
+}
+
+
+static inline rsRetVal
+fileClose(file_t *pFile)
+{
+ DEFiRet;
+
+ if(pFile->fh == NULL)
+ FINALIZE;
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+
+ hdfsCloseFile(pFile->fs, pFile->fh);
+ pFile->fh = NULL;
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+
+finalize_it:
+ RETiRet;
+}
+
+/* ---END FILE OBJECT---------------------------------------------------- */
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ pData->pFile = NULL;
+ENDcreateInstance
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ if(pData->pFile != NULL)
+ fileObjDestruct(&pData->pFile);
+ENDfreeInstance
+
+
+BEGINtryResume
+CODESTARTtryResume
+ fileClose(pData->pFile);
+ fileOpen(pData->pFile);
+ if(pData->pFile->fh == NULL){
+ dbgprintf("omhdfs: tried to resume file %s, but still no luck...\n",
+ pData->pFile->name);
+ iRet = RS_RET_SUSPENDED;
+ }
+ENDtryResume
+
+BEGINdoAction
+CODESTARTdoAction
+ DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name);
+ iRet = fileWrite(pData->pFile, ppString[0]);
+ENDdoAction
+
+
+BEGINparseSelectorAct
+ file_t *pFile;
+ int r;
+ uchar *keybuf;
+CODESTARTparseSelectorAct
+
+ /* first check if this config line is actually for us */
+ if(strncmp((char*) p, ":omhdfs:", sizeof(":omhdfs:") - 1)) {
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+ }
+
+ /* ok, if we reach this point, we have something for us */
+ p += sizeof(":omhdfs:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
+ CHKiRet(createInstance(&pData));
+ CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0,
+ (dfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : dfltTplName));
+
+ if(fileName == NULL) {
+ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue");
+ ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED);
+ }
+
+ pFile = hashtable_search(files, fileName);
+ if(pFile == NULL) {
+ /* we need a new file object, this one not seen before */
+ CHKiRet(fileObjConstruct(&pFile));
+ CHKmalloc(pFile->name = fileName);
+ CHKmalloc(keybuf = ustrdup(fileName));
+ fileName = NULL; /* re-set, data passed to file object */
+ CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost));
+ pFile->hdfsPort = hdfsPort;
+ fileOpen(pFile);
+ if(pFile->fh == NULL){
+ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - "
+ "retrying later", pFile->name);
+ iRet = RS_RET_SUSPENDED;
+ }
+ r = hashtable_insert(files, keybuf, pFile);
+ if(r == 0)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ fileObjAddUser(pFile);
+ pData->pFile = pFile;
+
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINdoHUP
+ file_t *pFile;
+ struct hashtable_itr *itr;
+CODESTARTdoHUP
+ DBGPRINTF("omhdfs: HUP received (file count %d)\n", hashtable_count(files));
+ /* Iterator constructor only returns a valid iterator if
+ * the hashtable is not empty */
+ itr = hashtable_iterator(files);
+ if(hashtable_count(files) > 0)
+ {
+ do {
+ pFile = (file_t *) hashtable_iterator_value(itr);
+ fileClose(pFile);
+ DBGPRINTF("omhdfs: HUP, closing file %s\n", pFile->name);
+ } while (hashtable_iterator_advance(itr));
+ }
+ENDdoHUP
+
+
+/* Reset config variables for this module to default values.
+ * rgerhards, 2007-07-17
+ */
+static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
+{
+ hdfsHost = NULL;
+ hdfsPort = 0;
+ return RS_RET_OK;
+}
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ if(files != NULL)
+ hashtable_destroy(files, 1); /* 1 => free all values automatically */
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_doHUP
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string,
+ fileObjDestruct4Hashtable));
+
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
+CODEmodInit_QueryRegCFSLineHdlr
+ENDmodInit
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index 5a0c4437..93817e75 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -95,9 +95,11 @@ librsyslog_la_SOURCES = \
../parse.c \
../parse.h \
\
- hashtable/hashtable.c \
+ hashtable.c \
hashtable.h \
- hashtable/hashtable_private.h \
+ hashtable_itr.c \
+ hashtable_itr.h \
+ hashtable_private.h \
\
../outchannel.c \
../outchannel.h \
diff --git a/runtime/hashtable/hashtable.c b/runtime/hashtable.c
index a10e3bc6..41fc60fe 100644
--- a/runtime/hashtable/hashtable.c
+++ b/runtime/hashtable.c
@@ -29,7 +29,7 @@ const float max_load_factor = 0.65;
struct hashtable *
create_hashtable(unsigned int minsize,
unsigned int (*hashf) (void*),
- int (*eqf) (void*,void*))
+ int (*eqf) (void*,void*), void (*dest)(void*))
{
struct hashtable *h;
unsigned int pindex, size = primes[0];
@@ -49,6 +49,7 @@ create_hashtable(unsigned int minsize,
h->entrycount = 0;
h->hashfn = hashf;
h->eqfn = eqf;
+ h->dest = dest;
h->loadlimit = (unsigned int) ceil(size * max_load_factor);
return h;
}
@@ -225,7 +226,16 @@ hashtable_destroy(struct hashtable *h, int free_values)
{
e = table[i];
while (NULL != e)
- { f = e; e = e->next; freekey(f->k); free(f->v); free(f); }
+ {
+ f = e;
+ e = e->next;
+ freekey(f->k);
+ if(h->dest == NULL)
+ free(f->v);
+ else
+ h->dest(f->v);
+ free(f);
+ }
}
}
else
@@ -241,6 +251,34 @@ hashtable_destroy(struct hashtable *h, int free_values)
free(h);
}
+/* some generic hash functions */
+
+/* one provided by Aaaron Wiebe based on perl's hashng algorithm
+ * (so probably pretty generic). Not for excessively large strings!
+ */
+unsigned int
+hash_from_string(void *k)
+{
+ int len;
+ char *rkey = (char*) k;
+ unsigned hashval = 1;
+
+ len = (int) strlen(rkey);
+ while (len--)
+ hashval = hashval * 33 + *rkey++;
+
+ return hashval;
+}
+
+
+int
+key_equals_string(void *key1, void *key2)
+{
+ /* we must return true IF the keys are equal! */
+ return !strcmp(key1, key2);
+}
+
+
/*
* Copyright (c) 2002, Christopher Clark
* All rights reserved.
diff --git a/runtime/hashtable.h b/runtime/hashtable.h
index b90781ab..f777ad0b 100644
--- a/runtime/hashtable.h
+++ b/runtime/hashtable.h
@@ -68,13 +68,14 @@ struct hashtable;
* @param minsize minimum initial size of hashtable
* @param hashfunction function for hashing keys
* @param key_eq_fn function for determining key equality
+ * @param dest destructor for value entries (NULL -> use free())
* @return newly created hashtable or NULL on failure
*/
struct hashtable *
create_hashtable(unsigned int minsize,
unsigned int (*hashfunction) (void*),
- int (*key_eq_fn) (void*,void*));
+ int (*key_eq_fn) (void*,void*), void (*dest) (void*));
/*****************************************************************************
* hashtable_insert
@@ -196,4 +197,6 @@ hashtable_destroy(struct hashtable *h, int free_values);
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-*/
+ */
+unsigned int hash_from_string(void *k) ;
+int key_equals_string(void *key1, void *key2);
diff --git a/runtime/hashtable/hashtable_itr.c b/runtime/hashtable_itr.c
index 5dced841..967287f1 100644
--- a/runtime/hashtable/hashtable_itr.c
+++ b/runtime/hashtable_itr.c
@@ -38,6 +38,7 @@ hashtable_iterator(struct hashtable *h)
/* key - return the key of the (key,value) pair at the current position */
/* value - return the value of the (key,value) pair at the current position */
+#if 0 /* these are now inline functions! */
void *
hashtable_iterator_key(struct hashtable_itr *i)
{ return i->e->k; }
@@ -45,6 +46,7 @@ hashtable_iterator_key(struct hashtable_itr *i)
void *
hashtable_iterator_value(struct hashtable_itr *i)
{ return i->e->v; }
+#endif
/*****************************************************************************/
/* advance - advance the iterator to the next element
diff --git a/runtime/hashtable/hashtable_itr.h b/runtime/hashtable_itr.h
index eea699a7..1c206b6e 100644
--- a/runtime/hashtable/hashtable_itr.h
+++ b/runtime/hashtable_itr.h
@@ -28,7 +28,7 @@ hashtable_iterator(struct hashtable *h);
/* hashtable_iterator_key
* - return the value of the (key,value) pair at the current position */
-extern inline void *
+static inline void *
hashtable_iterator_key(struct hashtable_itr *i)
{
return i->e->k;
@@ -37,7 +37,7 @@ hashtable_iterator_key(struct hashtable_itr *i)
/*****************************************************************************/
/* value - return the value of the (key,value) pair at the current position */
-extern inline void *
+static inline void *
hashtable_iterator_value(struct hashtable_itr *i)
{
return i->e->v;
diff --git a/runtime/hashtable/hashtable_private.h b/runtime/hashtable_private.h
index 3e95f600..10b82da4 100644
--- a/runtime/hashtable/hashtable_private.h
+++ b/runtime/hashtable_private.h
@@ -21,6 +21,7 @@ struct hashtable {
unsigned int primeindex;
unsigned int (*hashfn) (void *k);
int (*eqfn) (void *k1, void *k2);
+ void (*dest) (void *v); /* destructor for values, if NULL use free() */
};
/*****************************************************************************/
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 43203378..7ccc9cb8 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -458,6 +458,9 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_INTERNAL_ERROR = -2175, /**< rsyslogd internal error, unexpected code path reached */
RS_RET_ERR_CRE_AFUX = -2176, /**< error creating AF_UNIX socket (and binding it) */
RS_RET_RATE_LIMITED = -2177, /**< some messages discarded due to exceeding a rate limit */
+ RS_RET_ERR_HDFS_WRITE = -2178, /**< error writing to HDFS */
+ RS_RET_ERR_HDFS_OPEN = -2179, /**< error during hdfsOpen (e.g. file does not exist) */
+ RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */