diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-10-05 14:52:23 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-10-05 14:52:23 +0200 |
commit | 1dbb85d7ba2e135593944afdaaa69aeb4fd49302 (patch) | |
tree | 40d81f1a2a6bb211157add77d5bf36acfce78022 | |
parent | 6ea4a38079cb1508c103930e327fa92ada97176f (diff) | |
parent | a627ac99ba2c3404ca926a19fb06cbd6f43b53c8 (diff) | |
download | rsyslog-1dbb85d7ba2e135593944afdaaa69aeb4fd49302.tar.gz rsyslog-1dbb85d7ba2e135593944afdaaa69aeb4fd49302.tar.xz rsyslog-1dbb85d7ba2e135593944afdaaa69aeb4fd49302.zip |
Merge branch 'v5-devel-hdfs' into v5-devel
-rw-r--r-- | ChangeLog | 3 | ||||
-rw-r--r-- | Makefile.am | 4 | ||||
-rw-r--r-- | configure.ac | 19 | ||||
-rw-r--r-- | doc/omhdfs.html | 69 | ||||
-rw-r--r-- | doc/rsyslog_conf_modules.html | 1 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 4 | ||||
-rw-r--r-- | plugins/omhdfs/Makefile.am | 6 | ||||
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 475 | ||||
-rw-r--r-- | runtime/Makefile.am | 6 | ||||
-rw-r--r-- | runtime/hashtable.c (renamed from runtime/hashtable/hashtable.c) | 42 | ||||
-rw-r--r-- | runtime/hashtable.h | 7 | ||||
-rw-r--r-- | runtime/hashtable_itr.c (renamed from runtime/hashtable/hashtable_itr.c) | 2 | ||||
-rw-r--r-- | runtime/hashtable_itr.h (renamed from runtime/hashtable/hashtable_itr.h) | 4 | ||||
-rw-r--r-- | runtime/hashtable_private.h (renamed from runtime/hashtable/hashtable_private.h) | 1 | ||||
-rw-r--r-- | runtime/rsyslog.h | 3 |
15 files changed, 636 insertions, 10 deletions
@@ -1,4 +1,7 @@ --------------------------------------------------------------------------- +Version 5.7.2 [V5-DEVEL] (rgerhards), 2010-09-?? +- support for Hadoop's HDFS added (via omhdfs) +--------------------------------------------------------------------------- Version 5.7.1 [V5-DEVEL] (rgerhards), 2010-09-?? - imuxsock now optionally use SCM_CREDENTIALS to pull the pid from the log socket itself diff --git a/Makefile.am b/Makefile.am index f3dca447..54b68153 100644 --- a/Makefile.am +++ b/Makefile.am @@ -147,6 +147,10 @@ if ENABLE_OMUXSOCK SUBDIRS += plugins/omuxsock endif +if ENABLE_OMHDFS +SUBDIRS += plugins/omhdfs +endif + if ENABLE_OMTEMPLATE SUBDIRS += plugins/omtemplate endif diff --git a/configure.ac b/configure.ac index 16454d00..7254b99d 100644 --- a/configure.ac +++ b/configure.ac @@ -1013,6 +1013,23 @@ AM_CONDITIONAL(ENABLE_OMTEMPLATE, test x$enable_omtemplate = xyes) # end of copy template - be sure to search for omtemplate to find everything! +# settings for the omhdfs; +AC_ARG_ENABLE(omhdfs, + [AS_HELP_STRING([--enable-omhdfs],[Compiles omhdfs template module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_omhdfs="yes" ;; + no) enable_omhdfs="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-omhdfs) ;; + esac], + [enable_omhdfs=no] +) +# +# you may want to do some library checks here - see snmp, mysql, pgsql modules +# for samples +# +AM_CONDITIONAL(ENABLE_OMHDFS, test x$enable_omhdfs = xyes) + + AC_CONFIG_FILES([Makefile \ runtime/Makefile \ tools/Makefile \ @@ -1026,6 +1043,7 @@ AC_CONFIG_FILES([Makefile \ plugins/imklog/Makefile \ plugins/imtemplate/Makefile \ plugins/omtemplate/Makefile \ + plugins/omhdfs/Makefile \ plugins/omprog/Makefile \ plugins/omstdout/Makefile \ plugins/pmrfc3164sd/Makefile \ @@ -1080,6 +1098,7 @@ echo "---{ output plugins }---" echo " Mail support enabled: $enable_mail" echo " omprog module will be compiled: $enable_omprog" echo " omstdout module will be compiled: $enable_omstdout" +echo " omhdfs module will be compiled: $enable_omhdfs" echo " omruleset module will be compiled: $enable_omruleset" echo " omdbalerting module will be compiled: $enable_omdbalerting" echo " omudpspoof module will be compiled: $enable_omudpspoof" diff --git a/doc/omhdfs.html b/doc/omhdfs.html new file mode 100644 index 00000000..3849f167 --- /dev/null +++ b/doc/omhdfs.html @@ -0,0 +1,69 @@ +<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> +<html><head><title>rsyslog output module for HDFS (omhdfs)</title> +<a href="features.html">back</a> +</head> +<body> +<h1>Unix sockets Output Module (omhdfs)</h1> +<p><b>Module Name: omhdfs</b></p> +<p><b>Available since: </b> 5.7.2</p> +<p><b>Author: </b>Rainer Gerhards <rgerhards@adiscon.com></p> +<p><b>Description</b>:</p> +<p>This module supports writing message into files on Hadoop's HDFS +file system. +<p><b>Configuration Directives</b>:</p> +<ul> +<li><b>$OMHDFSFileName</b> [name]<br> +The name of the file to which the output data shall be written. +</li> +<li><b>$OMHDFSHost</b> [name]<br> +Name or IP address of the HDFS host to connect to. +</li> +<li><b>$OMHDFSPort</b> [name]<br> +Port on which to connect to the HDFS host. +</li> +<li><b>$OMHDFSDefaultTemplate</b> [name]<br> +Default template to be used when none is specified. This saves the work of +specifying the same template ever and ever again. Of course, the default +template can be overwritten via the usual method. +</li> +</ul> +<b>Caveats/Known Bugs:</b> +<p>Building omhdfs is a challenge because we could not yet find out how +to integrate Java properly into the autotools build process. The issue is +that HDFS is written in Java and libhdfs uses JNI to talk to it. That requires +that various system-specific environment options and pathes be set correctly. At +this point, we leave this to the user. If someone know how to do it better, +please drop us a line! +<ul> +<li>In order to build, you need to set these environment variables BEFORE running +./configure: +<ul> +<li>JAVA_INCLUDES - must have all include pathes that are needed to build +JNI C programms, including the -I options necessary for gcc. An example is<br> +# export JAVA_INCLUDES="-I/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/include -I/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/include/linux" +<li>JAVA_LIBS - must have all library pathes that are needed to build +JNI C programms, including the -l/-L options necessary for gcc. An example is<br> +# export export JAVA_LIBS="-L/usr/java/jdk1.6.0_21/jre/lib/amd64 -L/usr/java/jdk1.6.0_21/jre/lib/amd64/server -ljava -ljvm -lverify" +</ul> + +<li>As of HDFS architecture, you must make sure that all relevant environment +variables (the usual Java stuff and HADOOP's home directory) are properly set. +<li>As it looks, libhdfs makes Java throw exceptions to stdout. There is no +known work-around for this (and it usually should not case any troubles. +</ul> +<p><b>Sample:</b></p> +<p> +</p> +<textarea rows="4" cols="80">$ModLoad omhdfs + +$OMHDFSFileName /var/log/logfile +*.* :omhdfs: +</textarea> +[<a href="manual.html">manual index</a>] [<a href="http://www.rsyslog.com/">rsyslog site</a>]</p> +<p><font size="2">This documentation is part of the <a href="http://www.rsyslog.com/">rsyslog</a> +project.<br> +Copyright © 2010 by <a href="http://www.gerhards.net/rainer">Rainer Gerhards</a> and +<a href="http://www.adiscon.com/">Adiscon</a>. +Released under the GNU GPL version 3 or higher.</font></p> + +</body></html> diff --git a/doc/rsyslog_conf_modules.html b/doc/rsyslog_conf_modules.html index 85899954..74aa319c 100644 --- a/doc/rsyslog_conf_modules.html +++ b/doc/rsyslog_conf_modules.html @@ -66,6 +66,7 @@ permits rsyslog to alert folks by mail if something important happens</li> <li><a href="omoracle.html">omoracle</a> - output module for Oracle (native OCI interface)</li> <li><a href="omudpspoof.html">omudpspoof</a> - output module sending UDP syslog messages with a spoofed address</li> <li><a href="omuxsock.html">omuxsock</a> - output module Unix domain sockets</li> +<li><a href="omhdfs.html">omhdfs</a> - output module for Hadoop's HDFS file system</li> </ul> <a name="pm"></a><h2>Parser Modules</h2> diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 75a6b00b..41bff4f9 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -279,7 +279,7 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal) } CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName)); if(ratelimitInterval > 0) { - if((listeners[nfd].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn)) == NULL) { + if((listeners[nfd].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { /* in this case, we simply turn of rate-limiting */ dbgprintf("imuxsock: turning off rate limiting because we could not " "create hash table\n"); @@ -749,7 +749,7 @@ CODESTARTwillRun if(pLogSockName != NULL) listeners[0].sockName = pLogSockName; if(ratelimitIntervalSysSock > 0) { - if((listeners[0].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn)) == NULL) { + if((listeners[0].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { /* in this case, we simply turn of rate-limiting */ dbgprintf("imuxsock: turning off rate limiting because we could not " "create hash table\n"); diff --git a/plugins/omhdfs/Makefile.am b/plugins/omhdfs/Makefile.am new file mode 100644 index 00000000..95e6b102 --- /dev/null +++ b/plugins/omhdfs/Makefile.am @@ -0,0 +1,6 @@ +pkglib_LTLIBRARIES = omhdfs.la + +omhdfs_la_SOURCES = omhdfs.c +omhdfs_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) $(JAVA_INCLUDES) +omhdfs_la_LDFLAGS = -module -avoid-version -lhdfs $(JAVA_LIBS) +omhdfs_la_LIBADD = $(RSRT_LIBS) diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c new file mode 100644 index 00000000..9705b7fd --- /dev/null +++ b/plugins/omhdfs/omhdfs.c @@ -0,0 +1,475 @@ +/* omhdfs.c + * This is an output module to support Hadoop's HDFS. + * + * NOTE: read comments in module-template.h to understand how this file + * works! + * + * Copyright 2010 Rainer Gerhards and Adiscon GmbH. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ + +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <assert.h> +#include <errno.h> +#include <ctype.h> +#include <unistd.h> +#include <sys/file.h> +#include <pthread.h> +#include <hdfs.h> + +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "conf.h" +#include "cfsysline.h" +#include "module-template.h" +#include "unicode-helper.h" +#include "errmsg.h" +#include "hashtable.h" +#include "hashtable_itr.h" + +MODULE_TYPE_OUTPUT + +/* internal structures + */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + +/* global data */ +static struct hashtable *files; /* holds all file objects that we know */ + +/* globals for default values */ +static uchar *fileName = NULL; +static uchar *hdfsHost = NULL; +static uchar *dfltTplName = NULL; /* default template name to use */ +int hdfsPort = 0; +/* end globals for default values */ + +typedef struct { + uchar *name; + hdfsFS fs; + hdfsFile fh; + const char *hdfsHost; + tPort hdfsPort; + int nUsers; + pthread_mutex_t mut; +} file_t; + + +typedef struct _instanceData { + file_t *pFile; +} instanceData; + +/* forward definitions (down here, need data types) */ +static inline rsRetVal fileClose(file_t *pFile); + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo + printf("omhdfs: file:%s", pData->pFile->name); +ENDdbgPrintInstInfo + + +/* note that hdfsFileExists() does not work, so we did our + * own function to see if a pathname exists. Returns 0 if the + * file does not exists, something else otherwise. Note that + * we can also check a directroy (if that matters...) + */ +static int +HDFSFileExists(hdfsFS fs, uchar *name) +{ + int r; + hdfsFileInfo *info; + + info = hdfsGetPathInfo(fs, (char*)name); + /* if things go wrong, we assume it is because the file + * does not exist. We do not get too much information... + */ + if(info == NULL) { + r = 0; + } else { + r = 1; + hdfsFreeFileInfo(info, 1); + } + return r; +} + +static inline rsRetVal +HDFSmkdir(hdfsFS fs, uchar *name) +{ + DEFiRet; + if(hdfsCreateDirectory(fs, (char*)name) == -1) + ABORT_FINALIZE(RS_RET_ERR); + +finalize_it: + RETiRet; +} + + +/* ---BEGIN FILE OBJECT---------------------------------------------------- */ +/* This code handles the "file object". This is split from the actual + * instance data, because several instances may write into the same file. + * If so, we need to use a single object, and also synchronize their writes. + * So we keep the file object separately, and just stick a reference into + * the instance data. + */ + +static inline rsRetVal +fileObjConstruct(file_t **ppFile) +{ + file_t *pFile; + DEFiRet; + + CHKmalloc(pFile = malloc(sizeof(file_t))); + pFile->name = NULL; + pFile->hdfsHost = NULL; + pFile->fh = NULL; + pFile->nUsers = 0; + + *ppFile = pFile; +finalize_it: + RETiRet; +} + +static inline void +fileObjAddUser(file_t *pFile) +{ + /* init mutex only when second user is added */ + ++pFile->nUsers; + if(pFile->nUsers == 2) + pthread_mutex_init(&pFile->mut, NULL); + DBGPRINTF("omhdfs: file %s now being used by %d actions\n", pFile->name, pFile->nUsers); +} + +static rsRetVal +fileObjDestruct(file_t **ppFile) +{ + file_t *pFile = *ppFile; + if(pFile->nUsers > 1) + pthread_mutex_destroy(&pFile->mut); + fileClose(pFile); + free(pFile->name); + free((char*)pFile->hdfsHost); + free(pFile->fh); + + return RS_RET_OK; +} + + +/* check, and potentially create, all names inside a path */ +static rsRetVal +filePrepare(file_t *pFile) +{ + uchar *p; + uchar *pszWork; + size_t len; + DEFiRet; + + if(HDFSFileExists(pFile->fs, pFile->name)) + FINALIZE; + + /* file does not exist, create it (and eventually parent directories */ + if(1) { // check if bCreateDirs + len = ustrlen(pFile->name) + 1; + CHKmalloc(pszWork = MALLOC(sizeof(uchar) * len)); + memcpy(pszWork, pFile->name, len); + for(p = pszWork+1 ; *p ; p++) + if(*p == '/') { + /* temporarily terminate string, create dir and go on */ + *p = '\0'; + if(!HDFSFileExists(pFile->fs, pszWork)) { + CHKiRet(HDFSmkdir(pFile->fs, pszWork)); + } + *p = '/'; + } + free(pszWork); + return 0; + } + +finalize_it: + RETiRet; +} + + +/* this function is to be used as destructor for the + * hash table code. + */ +static void +fileObjDestruct4Hashtable(void *ptr) +{ + file_t *pFile = (file_t*) ptr; + fileObjDestruct(&pFile); +} + + +static inline rsRetVal +fileOpen(file_t *pFile) +{ + DEFiRet; + + assert(pFile->fh == NULL); + if(pFile->nUsers > 1) + d_pthread_mutex_lock(&pFile->mut); + + DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n", + pFile->hdfsHost, pFile->hdfsPort); + pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort); + if(pFile->fs == NULL) { + DBGPRINTF("omhdfs: error can not connect to hdfs\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + + CHKiRet(filePrepare(pFile)); + + pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_APPEND, 0, 0, 0); + if(pFile->fh == NULL) { + /* maybe the file does not exist, so we try to create it now. + * Note that we can not use hdfsExists() because of a deficit in + * it: https://issues.apache.org/jira/browse/HDFS-1154 + * As of my testing, libhdfs at least seems to return ENOENT if + * the file does not exist. + */ + if(errno == ENOENT) { + DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n", + pFile->name); + pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); + } + } + if(pFile->fh == NULL) { + DBGPRINTF("omhdfs: failed to open %s for writing!\n", pFile->name); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(pFile->nUsers > 1) + d_pthread_mutex_unlock(&pFile->mut); + RETiRet; +} + + +static inline rsRetVal +fileWrite(file_t *pFile, uchar *buf) +{ + size_t lenWrite; + DEFiRet; + + if(pFile->nUsers > 1) + d_pthread_mutex_lock(&pFile->mut); + + /* open file if not open. This must be done *here* and while mutex-protected + * because of HUP handling (which is async to normal processing!). + */ + if(pFile->fh == NULL) { + fileOpen(pFile); + if(pFile->fh == NULL) { + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + } + + lenWrite = strlen((char*) buf); + tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite); + if((unsigned) num_written_bytes != lenWrite) { + errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, " + "written %lu\n", pFile->name, (unsigned long) lenWrite, + (unsigned long) num_written_bytes); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(pFile->nUsers > 1) + d_pthread_mutex_unlock(&pFile->mut); + RETiRet; +} + + +static inline rsRetVal +fileClose(file_t *pFile) +{ + DEFiRet; + + if(pFile->fh == NULL) + FINALIZE; + + if(pFile->nUsers > 1) + d_pthread_mutex_lock(&pFile->mut); + + hdfsCloseFile(pFile->fs, pFile->fh); + pFile->fh = NULL; + + if(pFile->nUsers > 1) + d_pthread_mutex_unlock(&pFile->mut); + +finalize_it: + RETiRet; +} + +/* ---END FILE OBJECT---------------------------------------------------- */ + + +BEGINcreateInstance +CODESTARTcreateInstance + pData->pFile = NULL; +ENDcreateInstance + + +BEGINfreeInstance +CODESTARTfreeInstance + if(pData->pFile != NULL) + fileObjDestruct(&pData->pFile); +ENDfreeInstance + + +BEGINtryResume +CODESTARTtryResume + fileClose(pData->pFile); + fileOpen(pData->pFile); + if(pData->pFile->fh == NULL){ + dbgprintf("omhdfs: tried to resume file %s, but still no luck...\n", + pData->pFile->name); + iRet = RS_RET_SUSPENDED; + } +ENDtryResume + +BEGINdoAction +CODESTARTdoAction + DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name); + iRet = fileWrite(pData->pFile, ppString[0]); +ENDdoAction + + +BEGINparseSelectorAct + file_t *pFile; + int r; + uchar *keybuf; +CODESTARTparseSelectorAct + + /* first check if this config line is actually for us */ + if(strncmp((char*) p, ":omhdfs:", sizeof(":omhdfs:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + + /* ok, if we reach this point, we have something for us */ + p += sizeof(":omhdfs:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + CODE_STD_STRING_REQUESTparseSelectorAct(1) + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0, + (dfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : dfltTplName)); + + if(fileName == NULL) { + errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue"); + ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED); + } + + pFile = hashtable_search(files, fileName); + if(pFile == NULL) { + /* we need a new file object, this one not seen before */ + CHKiRet(fileObjConstruct(&pFile)); + CHKmalloc(pFile->name = fileName); + CHKmalloc(keybuf = ustrdup(fileName)); + fileName = NULL; /* re-set, data passed to file object */ + CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost)); + pFile->hdfsPort = hdfsPort; + fileOpen(pFile); + if(pFile->fh == NULL){ + errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - " + "retrying later", pFile->name); + iRet = RS_RET_SUSPENDED; + } + r = hashtable_insert(files, keybuf, pFile); + if(r == 0) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + fileObjAddUser(pFile); + pData->pFile = pFile; + +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + + +BEGINdoHUP + file_t *pFile; + struct hashtable_itr *itr; +CODESTARTdoHUP + DBGPRINTF("omhdfs: HUP received (file count %d)\n", hashtable_count(files)); + /* Iterator constructor only returns a valid iterator if + * the hashtable is not empty */ + itr = hashtable_iterator(files); + if(hashtable_count(files) > 0) + { + do { + pFile = (file_t *) hashtable_iterator_value(itr); + fileClose(pFile); + DBGPRINTF("omhdfs: HUP, closing file %s\n", pFile->name); + } while (hashtable_iterator_advance(itr)); + } +ENDdoHUP + + +/* Reset config variables for this module to default values. + * rgerhards, 2007-07-17 + */ +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + hdfsHost = NULL; + hdfsPort = 0; + return RS_RET_OK; +} + + +BEGINmodExit +CODESTARTmodExit + objRelease(errmsg, CORE_COMPONENT); + if(files != NULL) + hashtable_destroy(files, 1); /* 1 => free all values automatically */ +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_doHUP +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string, + fileObjDestruct4Hashtable)); + + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); +CODEmodInit_QueryRegCFSLineHdlr +ENDmodInit diff --git a/runtime/Makefile.am b/runtime/Makefile.am index 5a0c4437..93817e75 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -95,9 +95,11 @@ librsyslog_la_SOURCES = \ ../parse.c \ ../parse.h \ \ - hashtable/hashtable.c \ + hashtable.c \ hashtable.h \ - hashtable/hashtable_private.h \ + hashtable_itr.c \ + hashtable_itr.h \ + hashtable_private.h \ \ ../outchannel.c \ ../outchannel.h \ diff --git a/runtime/hashtable/hashtable.c b/runtime/hashtable.c index a10e3bc6..41fc60fe 100644 --- a/runtime/hashtable/hashtable.c +++ b/runtime/hashtable.c @@ -29,7 +29,7 @@ const float max_load_factor = 0.65; struct hashtable * create_hashtable(unsigned int minsize, unsigned int (*hashf) (void*), - int (*eqf) (void*,void*)) + int (*eqf) (void*,void*), void (*dest)(void*)) { struct hashtable *h; unsigned int pindex, size = primes[0]; @@ -49,6 +49,7 @@ create_hashtable(unsigned int minsize, h->entrycount = 0; h->hashfn = hashf; h->eqfn = eqf; + h->dest = dest; h->loadlimit = (unsigned int) ceil(size * max_load_factor); return h; } @@ -225,7 +226,16 @@ hashtable_destroy(struct hashtable *h, int free_values) { e = table[i]; while (NULL != e) - { f = e; e = e->next; freekey(f->k); free(f->v); free(f); } + { + f = e; + e = e->next; + freekey(f->k); + if(h->dest == NULL) + free(f->v); + else + h->dest(f->v); + free(f); + } } } else @@ -241,6 +251,34 @@ hashtable_destroy(struct hashtable *h, int free_values) free(h); } +/* some generic hash functions */ + +/* one provided by Aaaron Wiebe based on perl's hashng algorithm + * (so probably pretty generic). Not for excessively large strings! + */ +unsigned int +hash_from_string(void *k) +{ + int len; + char *rkey = (char*) k; + unsigned hashval = 1; + + len = (int) strlen(rkey); + while (len--) + hashval = hashval * 33 + *rkey++; + + return hashval; +} + + +int +key_equals_string(void *key1, void *key2) +{ + /* we must return true IF the keys are equal! */ + return !strcmp(key1, key2); +} + + /* * Copyright (c) 2002, Christopher Clark * All rights reserved. diff --git a/runtime/hashtable.h b/runtime/hashtable.h index b90781ab..f777ad0b 100644 --- a/runtime/hashtable.h +++ b/runtime/hashtable.h @@ -68,13 +68,14 @@ struct hashtable; * @param minsize minimum initial size of hashtable * @param hashfunction function for hashing keys * @param key_eq_fn function for determining key equality + * @param dest destructor for value entries (NULL -> use free()) * @return newly created hashtable or NULL on failure */ struct hashtable * create_hashtable(unsigned int minsize, unsigned int (*hashfunction) (void*), - int (*key_eq_fn) (void*,void*)); + int (*key_eq_fn) (void*,void*), void (*dest) (void*)); /***************************************************************************** * hashtable_insert @@ -196,4 +197,6 @@ hashtable_destroy(struct hashtable *h, int free_values); * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ + */ +unsigned int hash_from_string(void *k) ; +int key_equals_string(void *key1, void *key2); diff --git a/runtime/hashtable/hashtable_itr.c b/runtime/hashtable_itr.c index 5dced841..967287f1 100644 --- a/runtime/hashtable/hashtable_itr.c +++ b/runtime/hashtable_itr.c @@ -38,6 +38,7 @@ hashtable_iterator(struct hashtable *h) /* key - return the key of the (key,value) pair at the current position */ /* value - return the value of the (key,value) pair at the current position */ +#if 0 /* these are now inline functions! */ void * hashtable_iterator_key(struct hashtable_itr *i) { return i->e->k; } @@ -45,6 +46,7 @@ hashtable_iterator_key(struct hashtable_itr *i) void * hashtable_iterator_value(struct hashtable_itr *i) { return i->e->v; } +#endif /*****************************************************************************/ /* advance - advance the iterator to the next element diff --git a/runtime/hashtable/hashtable_itr.h b/runtime/hashtable_itr.h index eea699a7..1c206b6e 100644 --- a/runtime/hashtable/hashtable_itr.h +++ b/runtime/hashtable_itr.h @@ -28,7 +28,7 @@ hashtable_iterator(struct hashtable *h); /* hashtable_iterator_key * - return the value of the (key,value) pair at the current position */ -extern inline void * +static inline void * hashtable_iterator_key(struct hashtable_itr *i) { return i->e->k; @@ -37,7 +37,7 @@ hashtable_iterator_key(struct hashtable_itr *i) /*****************************************************************************/ /* value - return the value of the (key,value) pair at the current position */ -extern inline void * +static inline void * hashtable_iterator_value(struct hashtable_itr *i) { return i->e->v; diff --git a/runtime/hashtable/hashtable_private.h b/runtime/hashtable_private.h index 3e95f600..10b82da4 100644 --- a/runtime/hashtable/hashtable_private.h +++ b/runtime/hashtable_private.h @@ -21,6 +21,7 @@ struct hashtable { unsigned int primeindex; unsigned int (*hashfn) (void *k); int (*eqfn) (void *k1, void *k2); + void (*dest) (void *v); /* destructor for values, if NULL use free() */ }; /*****************************************************************************/ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 43203378..7ccc9cb8 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -458,6 +458,9 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_INTERNAL_ERROR = -2175, /**< rsyslogd internal error, unexpected code path reached */ RS_RET_ERR_CRE_AFUX = -2176, /**< error creating AF_UNIX socket (and binding it) */ RS_RET_RATE_LIMITED = -2177, /**< some messages discarded due to exceeding a rate limit */ + RS_RET_ERR_HDFS_WRITE = -2178, /**< error writing to HDFS */ + RS_RET_ERR_HDFS_OPEN = -2179, /**< error during hdfsOpen (e.g. file does not exist) */ + RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ |