From c5461b548d303e6e66e20048544814338b46efb5 Mon Sep 17 00:00:00 2001 From: Dmitri Pal Date: Tue, 25 Aug 2009 16:54:01 -0400 Subject: ELAPI sinks and providers This patch drills down to the next level of ELAPI functionality. I adds the creation and loading of the sinks. It also implements a skeleton for the first low level provider which will be capable of writing to a file. The configuration ini file is extended to define new configuration parameters and their meanings. --- common/elapi/Makefile.am | 26 +- common/elapi/elapi_internal.c | 247 ++++++++------ common/elapi/elapi_priv.h | 72 +++- common/elapi/elapi_sink.c | 500 ++++++++++++++++++++++++++++ common/elapi/elapi_sink.h | 39 +-- common/elapi/elapi_test/Makefile.am | 14 +- common/elapi/elapi_test/elapi_ut.conf | 27 +- common/elapi/providers/file/file_provider.c | 155 +++++++++ common/elapi/providers/file/file_provider.h | 79 +++++ 9 files changed, 1016 insertions(+), 143 deletions(-) create mode 100644 common/elapi/elapi_sink.c create mode 100644 common/elapi/providers/file/file_provider.c create mode 100644 common/elapi/providers/file/file_provider.h (limited to 'common') diff --git a/common/elapi/Makefile.am b/common/elapi/Makefile.am index 2ebf9e833..1fdc9c69e 100644 --- a/common/elapi/Makefile.am +++ b/common/elapi/Makefile.am @@ -7,11 +7,13 @@ APP_NAME_SIZE=@appnamesize@ SUBDIRS = elapi_test topdir=$(srcdir)/.. +prvdrdir=$(srcdir)/providers -AM_CFLAGS = -DELAPI_DEFAULT_CONFIG_DIR=\"$(DEFAULT_CONF_DIR)\" \ - -DELAPI_DEFAULT_CONFIG_APP_DIR=\"$(DEFAULT_CONF_APP_DIR)\" \ - -DELAPI_DEFAULT_APP_NAME=\"$(APP_NAME)\" \ - -DELAPI_DEFAULT_APP_NAME_SIZE=$(APP_NAME_SIZE) +AM_CFLAGS = \ + -DELAPI_DEFAULT_CONFIG_DIR=\"$(DEFAULT_CONF_DIR)\" \ + -DELAPI_DEFAULT_CONFIG_APP_DIR=\"$(DEFAULT_CONF_APP_DIR)\" \ + -DELAPI_DEFAULT_APP_NAME=\"$(APP_NAME)\" \ + -DELAPI_DEFAULT_APP_NAME_SIZE=$(APP_NAME_SIZE) if HAVE_GCC AM_CFLAGS += \ @@ -19,7 +21,8 @@ if HAVE_GCC -Wcast-align -Wwrite-strings endif -AM_CPPFLAGS = -I$(topdir) -I$(topdir)/ini -I$(topdir)/trace -I$(topdir)/collection $(TRACE_LEVEL) +AM_CPPFLAGS = -I$(topdir) -I$(topdir)/ini -I$(topdir)/trace -I$(topdir)/collection \ + -I$(topdir)/elapi/providers/file $(TRACE_LEVEL) ACLOCAL_AMFLAGS = -I m4 @@ -27,15 +30,22 @@ ACLOCAL_AMFLAGS = -I m4 pkgconfigdir = $(libdir)/pkgconfig dist_noinst_DATA = elapi.pc -# Build library -noinst_LTLIBRARIES = libelapi.la +# Build libraries +noinst_LTLIBRARIES = libprovider.la libelapi.la + +libprovider_la_SOURCES = \ + $(prvdrdir)/file/file_provider.c \ + $(prvdrdir)/file/file_provider.h + libelapi_la_SOURCES = \ elapi_event.c \ elapi_log.c \ elapi_internal.c \ elapi_event.h \ + elapi_sink.c \ elapi_priv.h \ elapi_sink.h \ elapi_log.h \ elapi_async.h \ - elapi.h + elapi.h \ + ./libprovider.la diff --git a/common/elapi/elapi_internal.c b/common/elapi/elapi_internal.c index 3007fc2e5..8b1071e8d 100644 --- a/common/elapi/elapi_internal.c +++ b/common/elapi/elapi_internal.c @@ -105,60 +105,19 @@ int elapi_tgt_free_cb(const char *target, return EOK; } - - -int elapi_sink_cb(const char *sink, - int sink_len, - int type, - void *data, - int length, - void *passed_data, - int *stop) -{ - TRACE_FLOW_STRING("elapi_sink_cb", "Entry."); - - /* FIXME THIS IS A PLACEHOLDER FUNCTION FOR NOW */ - - /* Skip header */ - if (type == COL_TYPE_COLLECTION) { - TRACE_FLOW_STRING("elapi_sink_cb - skip header", "Exit."); - return EOK; - } - - printf("Sink: %s\n", sink); - - TRACE_FLOW_STRING("elapi_sink_cb", "Exit."); - return EOK; -} - -/* Internal sink cleanup function */ -int elapi_sink_free_cb(const char *sink, - int sink_len, - int type, - void *data, - int length, - void *passed_data, - int *stop) -{ - TRACE_FLOW_STRING("elapi_sink_free_cb", "Entry."); - - /* FIXME THIS IS A PLACEHOLDER FUNCTION FOR NOW */ - - printf("Cleaning Sink: %s\n", sink); - - TRACE_FLOW_STRING("elapi_sink_free_cb", "Exit."); - return EOK; -} - /* Function to add a sink to the collection */ +/* This function belongs to this module. + * It adds sink into the collection + * of sinks inside dispatcher and puts + * reference into the target's reference list. + */ /* FIXME - other arguments might be added later */ int elapi_sink_add(struct collection_item **sink_ref, char *sink, struct elapi_dispatcher *handle) { int error = EOK; - struct elapi_sink_ctx sink_context; - struct collection_item *provider_cfg_item = NULL; + struct elapi_sink_ctx *sink_context = NULL; TRACE_FLOW_STRING("elapi_sink_add", "Entry"); @@ -183,42 +142,36 @@ int elapi_sink_add(struct collection_item **sink_ref, if (!(*sink_ref)) { TRACE_FLOW_STRING("No such sink yet, adding new sink:", sink); - /* First check if this sink is properly configured and get its provider */ - error = get_config_item(sink, - ELAPI_SINK_PROVIDER, - handle->ini_config, - &provider_cfg_item); - if (error) { - TRACE_ERROR_NUMBER("Attempt to read provider attribute returned error", error); + /* Create a sink object */ + error = elapi_sink_create(&sink_context, sink, handle->ini_config); + if (error != 0) { + TRACE_ERROR_NUMBER("Failed to add sink data as property", error); + /* If create failed there is nothing to destroy */ return error; } - /* Do we have provider? */ - if (provider_cfg_item == NULL) { - /* There is no provider - return error */ - TRACE_ERROR_STRING("Required key is missing in the configuration.", "Fatal Error!"); - return ENOENT; + /* If there was an internal error but sink is optional + * no error is returned but context is NULL. + * We need to check for this situation. + */ + if (sink_context) { + TRACE_FLOW_STRING("Loaded sink:", sink); + /* We got a valid sink so add it to the collection */ + error = col_add_binary_property_with_ref(handle->sink_list, + NULL, + sink, + (void *)(&sink_context), + sizeof(struct elapi_sink_ctx *), + sink_ref); + if (error != 0) { + TRACE_ERROR_NUMBER("Failed to add sink data as property", error); + elapi_sink_destroy(sink_context); + return error; + } } - - - /* FIXME: PLACEHOLDER - * This is the area where the actual sink is loaded. - * CODE WILL BE ADDED HERE... - */ - sink_context.async_mode = 0; - sink_context.in_queue = NULL; - sink_context.pending = NULL; - - /* We got a valid sink so add it to the collection */ - error = col_add_binary_property_with_ref(handle->sink_list, - NULL, - sink, - (void *)(&sink_context), - sizeof(struct elapi_sink_ctx), - sink_ref); - if (error != 0) { - TRACE_ERROR_NUMBER("Failed to add sink data as property", error); - return error; + else { + *sink_ref = NULL; + TRACE_FLOW_STRING("Setting sink reference to NULL", ""); } } @@ -258,6 +211,7 @@ int elapi_tgt_create(struct elapi_tgt_ctx **context, char **sinks; char **current_sink; struct collection_item *sink_ref; + unsigned count; TRACE_FLOW_STRING("elapi_tgt_create", "Entry."); @@ -281,7 +235,7 @@ int elapi_tgt_create(struct elapi_tgt_ctx **context, /* Allocate context */ target_context = (struct elapi_tgt_ctx *)malloc(sizeof(struct elapi_tgt_ctx)); if (target_context == NULL) { - TRACE_ERROR_NUMBER("Memory allocation failed. Error", target_context); + TRACE_ERROR_NUMBER("Memory allocation failed. Error", ENOMEM); return ENOMEM; } @@ -362,22 +316,45 @@ int elapi_tgt_create(struct elapi_tgt_ctx **context, return error; } - /* Add reference to it into the target object */ - error = col_add_binary_property(target_context->sink_ref_list, NULL, - *current_sink, (void *)(&sink_ref), - sizeof(struct collection_item *)); - if (error != 0) { - TRACE_ERROR_NUMBER("Failed to add sink reference", error); - elapi_tgt_destroy(target_context); - free_string_config_array(sinks); - return error; + /* It might be that is was an error wit the optional sink so + * we need to check if the reference is not NULL; + */ + if (sink_ref) { + /* Add reference to it into the target object */ + error = col_add_binary_property(target_context->sink_ref_list, NULL, + *current_sink, (void *)(&sink_ref), + sizeof(struct collection_item *)); + if (error != 0) { + TRACE_ERROR_NUMBER("Failed to add sink reference", error); + elapi_tgt_destroy(target_context); + free_string_config_array(sinks); + return error; + } + } + else { + TRACE_INFO_STRING("Sink reference is NULL.", "Skipping the sink"); } - current_sink++; } free_string_config_array(sinks); + /* Get count of the references in the list */ + error = col_get_collection_count(target_context->sink_ref_list, &count); + if (error) { + TRACE_ERROR_NUMBER("Failed to get count", error); + elapi_tgt_destroy(target_context); + return error; + } + + /* Check count */ + if (count <= 1) { + /* Nothing but header? - Bad! */ + TRACE_ERROR_NUMBER("No sinks loaded for target!", "This is a fatal error!"); + elapi_tgt_destroy(target_context); + return ENOENT; + } + *context = target_context; TRACE_FLOW_STRING("elapi_tgt_create", "Exit."); @@ -503,6 +480,54 @@ void elapi_dump_ini_err(struct collection_item *error_list) TRACE_FLOW_STRING("elapi_dump_ini_err", "Exit"); } +/****************************************************************************/ +/* Functions below are added for debugging purposes */ +/****************************************************************************/ +#ifdef ELAPI_UTEST + +void elapi_print_sink_ctx(struct elapi_sink_ctx *sink_context) +{ + /* This will not print well on 64 bit but it is just debugging + * so it is OK to have it. + */ + printf("Printing sink context using address %X\n", (uint32_t)(sink_context)); + + printf("Mode: %s\n", sink_context->async_mode ? "true" : "false"); + if (sink_context->in_queue) col_print_collection(sink_context->in_queue); + else printf("Queue is not initialized.\n"); + + if (sink_context->pending) col_print_collection(sink_context->pending); + else printf("Pending list is not initialized.\n"); + + if (sink_context->sink_cfg.provider) printf("Provider: %s\n", + sink_context->sink_cfg.provider); + else printf("Provider is not defined.\n"); + + printf("Is provider required? %s\n", ((sink_context->sink_cfg.required > 0) ? "Yes" : "No")); + printf("On error: %s\n", ((sink_context->sink_cfg.onerror == 0) ? "retry" : "fail")); + printf("Timout: %d\n", sink_context->sink_cfg.timeout); + printf("Sync configuration: %s\n", sink_context->sink_cfg.synch ? "true" : "false"); + + if (sink_context->sink_cfg.priv_ctx) printf("Private context allocated.\n"); + else printf("Private context is NULL.\n"); + + if (sink_context->sink_cfg.libhandle) printf("Lib handle is allocated.\n"); + else printf("Lib handle is NULL.\n"); + + if (sink_context->sink_cfg.ability) printf("Capability function is present\n"); + else printf("NO capability function.\n"); + + if (sink_context->sink_cfg.cpb_cb.init_cb) printf("Init callback is OK.\n"); + else printf("Init callback is missing.\n"); + + if (sink_context->sink_cfg.cpb_cb.submit_cb) printf("Submit callback is OK.\n"); + else printf("Submit callback is missing.\n"); + + if (sink_context->sink_cfg.cpb_cb.close_cb) printf("Close callback is OK.\n"); + else printf("Close callback is missing.\n"); + + +} /* Handler for printing target internals */ static int elapi_sink_ref_dbg_cb(const char *sink, @@ -525,19 +550,38 @@ static int elapi_sink_ref_dbg_cb(const char *sink, printf("\nReferenced sink name is: %s\n", col_get_item_property(sink_item, NULL)); - sink_context = (struct elapi_sink_ctx *)(col_get_item_data(sink_item)); + sink_context = *((struct elapi_sink_ctx **)(col_get_item_data(sink_item))); - printf("Mode: %s\n", sink_context->async_mode ? "true" : "false"); - if (sink_context->in_queue) col_print_collection(sink_context->in_queue); - else printf("Queue is not initialized.\n"); + elapi_print_sink_ctx(sink_context); - if (sink_context->pending) col_print_collection(sink_context->pending); - else printf("Pending list is not initialized.\n"); return EOK; } +/* Handler for printing sink internals */ +static int elapi_sink_dbg_cb(const char *sink, + int sink_len, + int type, + void *data, + int length, + void *passed_data, + int *stop) +{ + struct elapi_sink_ctx *sink_context; + /* Skip header */ + if (type == COL_TYPE_COLLECTION) { + return EOK; + } + + sink_context = *((struct elapi_sink_ctx **)(data)); + + printf("\nSink name is: %s\n", sink); + + elapi_print_sink_ctx(sink_context); + + return EOK; +} /* Handler for printing target internals */ static int elapi_tgt_dbg_cb(const char *target, @@ -569,6 +613,7 @@ static int elapi_tgt_dbg_cb(const char *target, } + /* Internal function to print dispatcher internals - useful for testing */ void elapi_print_dispatcher(struct elapi_dispatcher *handle) { @@ -604,7 +649,17 @@ void elapi_print_dispatcher(struct elapi_dispatcher *handle) elapi_tgt_dbg_cb, NULL); } + printf("\n\nDeep sink inspection:\n\n"); + if (handle->sink_list) { + (void)col_traverse_collection(handle->sink_list, + COL_TRAVERSE_ONELEVEL, + elapi_sink_dbg_cb, + NULL); + } /* FIXME: Async data... */ printf("DISPATCHER END\n\n"); + fflush(stdout); } + +#endif diff --git a/common/elapi/elapi_priv.h b/common/elapi/elapi_priv.h index eb1d11f8b..4b55f9648 100644 --- a/common/elapi/elapi_priv.h +++ b/common/elapi/elapi_priv.h @@ -24,6 +24,8 @@ #include "collection.h" #include "elapi_async.h" +#include "elapi_sink.h" + /* Classes of the collections used by ELAPI internally */ #define COL_CLASS_ELAPI_BASE 21000 #define COL_CLASS_ELAPI_EVENT COL_CLASS_ELAPI_BASE + 0 @@ -45,6 +47,28 @@ #define ELAPI_SINK_REFS "srefs" #define ELAPI_TARGET_VALUE "value" #define ELAPI_SINK_PROVIDER "provider" +#define ELAPI_SINK_REQUIRED "required" +#define ELAPI_SINK_ONERROR "onerror" +#define ELAPI_SINK_TIMEOUT "timeout" +#define ELAPI_SINK_SYNCH "synch" + +/* Default timout before dispatcher tries to revive sink. + * The actual value is configurable on per sink basis + * so I do not see a value in making this a compile time + * option (at least at the moment). + */ +#define ELAPI_SINK_DEFAULT_TIMEOUT 60 + +/* Names of embedded providers */ +#define ELAPI_EMB_PRVDR_FILE "file" +#define ELAPI_EMB_PRVDR_STDERR "stderr" +#define ELAPI_EMB_PRVDR_SYSLOG "syslog" + +/* Numbers for embedded providers */ +#define ELAPI_EMB_PRVDR_FILENUM 0 +#define ELAPI_EMB_PRVDR_STDERRNUM 1 +#define ELAPI_EMB_PRVDR_SYSLOGNUM 2 + #define ELAPI_TARGET_ALL 0xFFFF /* 65k targets should be enough */ @@ -96,21 +120,47 @@ struct elapi_tgt_ctx { */ }; +/* FIXME: Compbine with context */ +struct sink_status { + int suspended; + time_t lasttry; +}; + +/* Common configuration items for all sinks */ +struct elapi_sink_cfg { + char *provider; + uint32_t required; + uint32_t onerror; + uint32_t timeout; + uint32_t synch; + void *priv_ctx; + void *libhandle; + sink_cpb_fn ability; + struct sink_cpb cpb_cb; +}; + /* The structure that describes the sink in the dispatcher */ struct elapi_sink_ctx { - /* Inpit queue of a sink */ + /* Input queue of a sink */ struct collection_item *in_queue; /* Pending list */ struct collection_item *pending; /* FIXME: add: * sink's error status - * sink's common config data (common between all sinks) - * sink personal specific config data (config data specific to this sink) */ - /* Is this a sink or async sink */ uint32_t async_mode; + /* Sink configuration data */ + struct elapi_sink_cfg sink_cfg; +}; + +/* Generic data structure for the data output */ +struct elapi_data_out { + char *buffer; + size_t size; + size_t written; }; + /* The structure to hold a command and a result of the command execution */ struct elapi_get_sink { int action; @@ -153,6 +203,14 @@ int elapi_sink_add(struct collection_item **sink_ref, char *sink, struct elapi_dispatcher *handle); +/* Function to create a sink */ +int elapi_sink_create(struct elapi_sink_ctx **sink_ctx, + char *name, + struct collection_item *ini_config); + +/* Destroy sink */ +void elapi_sink_destroy(struct elapi_sink_ctx *context); + /* Create target object */ int elapi_tgt_create(struct elapi_tgt_ctx **context, char *target, @@ -185,8 +243,12 @@ int elapi_tgt_mklist(struct elapi_dispatcher *handle); /* Send ELAPI config errors into a file */ void elapi_dump_ini_err(struct collection_item *error_list); -/* Print dispatcher internals for testing and debugin purposes */ +#ifdef ELAPI_UTEST +/* Print dispatcher internals for testing and debugging purposes */ void elapi_print_dispatcher(struct elapi_dispatcher *handle); +/* Print sink context details */ +void elapi_print_sink_ctx(struct elapi_sink_ctx *sink_context); +#endif #endif diff --git a/common/elapi/elapi_sink.c b/common/elapi/elapi_sink.c new file mode 100644 index 000000000..3908c3adf --- /dev/null +++ b/common/elapi/elapi_sink.c @@ -0,0 +1,500 @@ +/* + ELAPI + + Module that contains functions that manipulate ELAPI sinks. + + Copyright (C) Dmitri Pal 2009 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#define _GNU_SOURCE +#include /* for stat() */ +#include /* for errors */ +#include /* for memset() and other */ +#include /* for free() */ +#include /* for va_arg */ +#include /* for dlopen() */ + +#include "elapi_priv.h" +#include "ini_config.h" +#include "file_provider.h" +#include "trace.h" +#include "config.h" + +/* NOTE: Add new provider names here */ +const char *providers[] = { ELAPI_EMB_PRVDR_FILE, + ELAPI_EMB_PRVDR_STDERR, + ELAPI_EMB_PRVDR_SYSLOG, + NULL }; + + +/* This is a traverse callback for sink list */ +int elapi_sink_cb(const char *sink, + int sink_len, + int type, + void *data, + int length, + void *passed_data, + int *stop) +{ + TRACE_FLOW_STRING("elapi_sink_cb", "Entry."); + + /* FIXME THIS IS A PLACEHOLDER FUNCTION FOR NOW */ + + /* Skip header */ + if (type == COL_TYPE_COLLECTION) { + TRACE_FLOW_STRING("elapi_sink_cb - skip header", "Exit."); + return EOK; + } + + printf("Sink: %s\n", sink); + + TRACE_FLOW_STRING("elapi_sink_cb", "Exit."); + return EOK; +} + +/* Destroy sink */ +void elapi_sink_destroy(struct elapi_sink_ctx *context) +{ + TRACE_FLOW_STRING("elapi_sink_destroy", "Entry."); + +#ifdef ELAPI_UTEST + /* FIXME: Can be removeed when the interface is stable */ + /* For testing purposes print the context we are trying to free */ + elapi_print_sink_ctx(context); +#endif + + if (context) { + TRACE_INFO_STRING("Context is not null.", "Destroying sink."); + /* FIXME: Do something about pending data if any */ + /* Assume for now that we do not care about pending data */ + + /* If the private data has been allocated and close callback is there + * call a callback to clean the data and free it. + */ + if (context->sink_cfg.priv_ctx) { + TRACE_INFO_STRING("Calling provider's close function.", ""); + /* Call close function of the provider */ + context->sink_cfg.cpb_cb.close_cb(&(context->sink_cfg.priv_ctx)); + } + + /* Now if the handle of the provider is set, offload the library instance */ + if (context->sink_cfg.libhandle) { + TRACE_INFO_STRING("Offloading shared library.", ""); + dlclose(context->sink_cfg.libhandle); + context->sink_cfg.libhandle = NULL; + } + + if (context->sink_cfg.provider) { + TRACE_INFO_STRING("Cleaning provider.", ""); + free(context->sink_cfg.provider); + context->sink_cfg.provider = NULL; + } + + TRACE_INFO_STRING("Freeing context", ""); + free(context); + } + + TRACE_FLOW_STRING("elapi_sink_destroy", "Exit."); +} + +/* Internal sink cleanup function */ +int elapi_sink_free_cb(const char *sink, + int sink_len, + int type, + void *data, + int length, + void *passed_data, + int *stop) +{ + TRACE_FLOW_STRING("elapi_sink_free_cb", "Entry."); + + /* Skip header */ + if (type == COL_TYPE_COLLECTION) { + TRACE_FLOW_STRING("elapi_sink_free_cb - skip header", "Exit."); + return EOK; + } + + TRACE_INFO_STRING("Cleaning Sink:", sink); + + elapi_sink_destroy(*((struct elapi_sink_ctx **)(data))); + + TRACE_FLOW_STRING("elapi_sink_free_cb", "Exit."); + return EOK; +} + +/* Function to read sink common configuration */ +static int elapi_read_sink_cfg(struct elapi_sink_cfg *sink_cfg, + char *name, + struct collection_item *ini_config) +{ + int error = EOK; + struct collection_item *cfg_item = NULL; + const char *provider; + + TRACE_FLOW_STRING("elapi_read_sink_cfg", "Entry point"); + + /*********** Provider *************/ + + /* First check if this sink is properly configured and get its provider */ + error = get_config_item(name, + ELAPI_SINK_PROVIDER, + ini_config, + &cfg_item); + if (error) { + TRACE_ERROR_NUMBER("Attempt to read \"provider\" attribute returned error", error); + return error; + } + + /* Do we have provider? */ + if (cfg_item == NULL) { + /* There is no provider - return error */ + TRACE_ERROR_STRING("Required key is missing in the configuration.", "Fatal Error!"); + return ENOENT; + } + + /* Get provider value */ + provider = get_const_string_config_value(cfg_item, &error); + if ((error) || (!provider)) { + TRACE_ERROR_STRING("Invalid \"provider\" value", "Fatal Error!"); + return EINVAL; + } + + /* Save provider inside configuration data */ + sink_cfg->provider = strdup(provider); + if (sink_cfg->provider == NULL) { + /* Failed to save the provider value */ + TRACE_ERROR_STRING("Failed to save \"provider\" value.", "Fatal Error!"); + return ENOMEM; + } + + /*********** Required *************/ + /* Next is "required" field */ + cfg_item = NULL; + error = get_config_item(name, + ELAPI_SINK_REQUIRED, + ini_config, + &cfg_item); + if (error) { + TRACE_ERROR_NUMBER("Attempt to read \"required\" attribute returned error", error); + return error; + } + + /* Do we have "required"? */ + if (cfg_item == NULL) { + /* There is no attribute - assume default */ + TRACE_INFO_STRING("No \"required\" attribute.", "Assume optional"); + sink_cfg->required = 0; + } + else { + sink_cfg->required = (uint32_t) get_bool_config_value(cfg_item, '\0', &error); + if (error) { + TRACE_ERROR_STRING("Invalid \"required\" value", "Fatal Error!"); + return EINVAL; + } + } + + /*********** Onerror *************/ + /* Next is "onerror" field */ + cfg_item = NULL; + error = get_config_item(name, + ELAPI_SINK_ONERROR, + ini_config, + &cfg_item); + if (error) { + TRACE_ERROR_NUMBER("Attempt to read \"onerror\" attribute returned error", error); + return error; + } + + /* Do we have "required"? */ + if (cfg_item == NULL) { + /* There is no attribute - assume default */ + TRACE_INFO_STRING("No \"onerror\" attribute.", "Assume retry (0)"); + sink_cfg->onerror = 0; + } + else { + sink_cfg->onerror = (uint32_t) get_unsigned_config_value(cfg_item, 1, 0, &error); + if (error) { + TRACE_ERROR_STRING("Invalid \"onerror\" value", "Fatal Error!"); + return EINVAL; + } + } + + /*********** Timeout *************/ + /* Next is "timeout" field */ + cfg_item = NULL; + error = get_config_item(name, + ELAPI_SINK_TIMEOUT, + ini_config, + &cfg_item); + if (error) { + TRACE_ERROR_NUMBER("Attempt to read \"timeout\" attribute returned error", error); + return error; + } + + /* Do we have "required"? */ + if (cfg_item == NULL) { + /* There is no attribute - assume default */ + TRACE_INFO_STRING("No \"timeout\" attribute.", "Assume default timeout"); + sink_cfg->timeout = ELAPI_SINK_DEFAULT_TIMEOUT; + } + else { + sink_cfg->timeout = (uint32_t) get_unsigned_config_value(cfg_item, + 1, + ELAPI_SINK_DEFAULT_TIMEOUT, + &error); + if (error) { + TRACE_ERROR_STRING("Invalid \"timeout\" value", "Fatal Error!"); + return EINVAL; + } + } + + /*********** Synch *************/ + /* Next is "synch" field */ + cfg_item = NULL; + error = get_config_item(name, + ELAPI_SINK_SYNCH, + ini_config, + &cfg_item); + if (error) { + TRACE_ERROR_NUMBER("Attempt to read \"synch\" attribute returned error", error); + return error; + } + + /* Do we have "required"? */ + if (cfg_item == NULL) { + /* There is no attribute - assume default */ + TRACE_INFO_STRING("No \"synch\" attribute.", "Assume retry (0)"); + sink_cfg->synch = 0; + } + else { + sink_cfg->synch = (uint32_t) get_bool_config_value(cfg_item, '\0', &error); + if (error) { + TRACE_ERROR_STRING("Invalid \"synch\" value", "Fatal Error!"); + return EINVAL; + } + } + + TRACE_FLOW_STRING("elapi_read_sink_cfg", "Exit"); + return error; +} + +/* Function to load external sink library */ +static int elapi_load_lib(void **libhandle, sink_cpb_fn *sink_fn, char *name) +{ + char sink_lib_name[SINK_LIB_NAME_SIZE]; + sink_cpb_fn sink_symbol = NULL; + void *handle = NULL; + char *lib_error = NULL; + + TRACE_FLOW_STRING("elapi_load_lib", "Entry point"); + + if ((strlen(name) + sizeof(SINK_NAME_TEMPLATE)) >= SINK_LIB_NAME_SIZE) { + TRACE_ERROR_STRING("Provider string is too long:", name); + return EINVAL; + } + + sprintf(sink_lib_name, SINK_NAME_TEMPLATE, name); + TRACE_INFO_STRING("Name of the library to try to load:", sink_lib_name); + + /* Load library */ + handle = dlopen(sink_lib_name, RTLD_LAZY); + if (!handle) { + TRACE_ERROR_STRING("Dlopen returned error", dlerror()); + return ELIBACC; + } + + /* Clear any existing error */ + dlerror(); + /* Get addres to the main entry point */ + sink_symbol = (sink_cpb_fn)(dlsym(handle, SINK_ENTRY_POINT)); + if ((lib_error = dlerror()) != NULL) { + TRACE_ERROR_STRING("Dlsym returned error", lib_error); + dlclose(handle); + return ELIBACC; + } + + *libhandle = handle; + *sink_fn = sink_symbol; + + TRACE_FLOW_STRING("elapi_load_lib", "Exit"); + return EOK; +} + +/* Function to load sink provider */ +int elapi_sink_loader(struct elapi_sink_cfg *sink_cfg) +{ + int error = EOK; + int num = 0; + + TRACE_FLOW_STRING("elapi_sink_loader", "Entry point"); + + while (providers[num]) { + TRACE_INFO_STRING("Checking provider:", providers[num]); + if (strcasecmp(providers[num], sink_cfg->provider) == 0) break; + num++; + } + + TRACE_INFO_NUMBER("Provider number:", num); + + /* NOTE: Add provider handler into the switch below */ + switch (num) { + case ELAPI_EMB_PRVDR_FILENUM: + TRACE_INFO_STRING("Using \"file\" provider:", ""); + sink_cfg->ability = file_ability; + break; +/* FIXME: Not implemented yet + case ELAPI_EMB_PRVDR_STDERRNUM: + TRACE_INFO_STRING("Using \"stderr\" provider:", ""); + sink_cfg->ability = stderr_ability; + break; + case ELAPI_EMB_PRVDR_SYSLOGNUM: + TRACE_INFO_STRING("Using \"syslog\" provider:", ""); + sink_cfg->ability = syslog_ability; + break; +*/ + default: + /* It is an external provider */ + error = elapi_load_lib(&(sink_cfg->libhandle), &(sink_cfg->ability), sink_cfg->provider); + if (error) { + TRACE_ERROR_NUMBER("Failed to load library", error); + return error; + } + break; + } + + TRACE_FLOW_STRING("elapi_sink_loader", "Exit"); + return error; +} + + +/* Function to load sink provider */ +int elapi_load_sink(struct elapi_sink_cfg *sink_cfg, + char *name, + struct collection_item *ini_config) +{ + int error = EOK; + TRACE_FLOW_STRING("elapi_load_sink", "Entry point"); + + /* Use sink loading wrapper */ + error = elapi_sink_loader(sink_cfg); + if (error) { + TRACE_ERROR_NUMBER("Failed to load sink", error); + return error; + } + + /* Call ability function to fill in the pointers */ + sink_cfg->ability(&(sink_cfg->cpb_cb)); + + /* Make sure the callbacks are initialized */ + if ((sink_cfg->cpb_cb.init_cb == NULL) || + (sink_cfg->cpb_cb.submit_cb == NULL) || + (sink_cfg->cpb_cb.close_cb == NULL)) { + TRACE_ERROR_NUMBER("One of the callbacks is missing", + "Bad provider!"); + return EINVAL; + } + + + /* Call init entry point */ + /* NOTE: it is the responsibility of the provider + * to enforce singleton in case provider can't + * be loaded more than once like syslog for example. + */ + error = sink_cfg->cpb_cb.init_cb(&(sink_cfg->priv_ctx), + name, + ini_config); + if (error) { + TRACE_ERROR_NUMBER("Failed to initalize sink", error); + return error; + } + + TRACE_FLOW_STRING("elapi_load_sink", "Exit"); + return error; + +} + +/* Function to create a sink */ +int elapi_sink_create(struct elapi_sink_ctx **sink_ctx, + char *name, + struct collection_item *ini_config) +{ + int error = EOK; + uint32_t required; + struct elapi_sink_ctx *sink_context = NULL; + + TRACE_FLOW_STRING("elapi_sink_create", "Entry point"); + + /* Allocate context */ + sink_context = (struct elapi_sink_ctx *)malloc(sizeof(struct elapi_sink_ctx)); + if (sink_context == NULL) { + TRACE_ERROR_NUMBER("Memory allocation failed. Error", ENOMEM); + return ENOMEM; + } + + /* Initialize the allocatable items so that we can call destroy function + * in case of error. + * FIXME - add initialization here for other elements as they are added. + */ + + sink_context->async_mode = 0; + sink_context->in_queue = NULL; + sink_context->pending = NULL; + sink_context->sink_cfg.provider = NULL; + sink_context->sink_cfg.priv_ctx = NULL; + sink_context->sink_cfg.libhandle = NULL; + sink_context->sink_cfg.ability = NULL; + sink_context->sink_cfg.cpb_cb.init_cb = NULL; + sink_context->sink_cfg.cpb_cb.submit_cb = NULL; + sink_context->sink_cfg.cpb_cb.close_cb = NULL; + + /* Read common fields */ + error = elapi_read_sink_cfg(&(sink_context->sink_cfg), + name, ini_config); + if (error) { + TRACE_ERROR_NUMBER("Failed to read sink configuration", error); + elapi_sink_destroy(sink_context); + return error; + } + + TRACE_INFO_NUMBER("Address of init function", + sink_context->sink_cfg.cpb_cb.init_cb); + TRACE_INFO_NUMBER("Address of submit function", + sink_context->sink_cfg.cpb_cb.submit_cb); + TRACE_INFO_NUMBER("Address of close function", + sink_context->sink_cfg.cpb_cb.close_cb); + + /* Load sink */ + error = elapi_load_sink(&(sink_context->sink_cfg), + name, ini_config); + if (error) { + TRACE_ERROR_NUMBER("Failed to load sink", error); + required = sink_context->sink_cfg.required; + elapi_sink_destroy(sink_context); + if (required) { + TRACE_ERROR_NUMBER("Sink is required so returning error", error); + return error; + } + else { + *sink_ctx = NULL; + TRACE_FLOW_STRING("Sink is not required so OK", "Exit"); + return EOK; + } + } + + /* We are done so return the context to the caller */ + *sink_ctx = sink_context; + + TRACE_FLOW_STRING("elapi_sink_create", "Exit"); + return error; +} diff --git a/common/elapi/elapi_sink.h b/common/elapi/elapi_sink.h index 40b12a266..41a89896f 100644 --- a/common/elapi/elapi_sink.h +++ b/common/elapi/elapi_sink.h @@ -31,48 +31,29 @@ #define SINK_LIB_NAME_SIZE 100 #define SINK_ENTRY_POINT "get_sink_info" #define SINK_NAME_TEMPLATE "libelapi_sink_%s.so" -#define SINK_NEVER_RETRY -1 /* Flags related to loading sinks */ #define SINK_FLAG_NO_LIMIT 0x00000000 /* NO limits to loading and manipulating this sink - default */ -#define SINK_FLAG_LOAD_SINGLE 0x00000001 /* Only allow one instance of the sink per process */ +#define SINK_FLAG_LOAD_SINGLE 0x00000001 /* Only allow one instance of the provider per process */ -struct data_descriptor { - char *appname; - void *config; - void *internal_data; -}; /* Log facility callbacks */ -typedef int (*init_fn)(struct data_descriptor *dblock); -typedef void (*cleanup_fn)(struct data_descriptor *dblock); -typedef int (*format_fn)(struct data_descriptor *dblock, const char *format_str, struct collection_item *event); -typedef int (*submit_fn)(struct data_descriptor *dblock); -typedef void (*close_fn)(struct data_descriptor *dblock); +/* FIXME - the signatures need to take into the account async processing */ +typedef int (*init_fn)(void **priv_ctx, char *name, struct collection_item *ini_config); +typedef int (*submit_fn)(void *priv_ctx, struct collection_item *event); +typedef void (*close_fn)(void **priv_ctx); -struct sink_capability { - int retry_interval; - int flags; - int instance; +struct sink_cpb { init_fn init_cb; - cleanup_fn cleanup_cb; - format_fn format_cb; submit_fn submit_cb; close_fn close_cb; }; -/* The only open function the link can expose */ -typedef void (*capability_fn)(struct sink_capability *sink_cpb_block); +/* The only open function the sink can expose */ +typedef void (*sink_cpb_fn)(struct sink_cpb *sink_cpb_block); -struct sink_descriptor { - struct sink_capability sink_cpb_block; - struct data_descriptor dblock; - int suspended; - time_t lasttry; - void *lib_handle; -}; -/*Standard capability function */ -void get_sink_info(struct sink_capability *sink_cpb_block); +/* Standard capability function */ +void get_sink_info(struct sink_cpb *cpb_block); #endif diff --git a/common/elapi/elapi_test/Makefile.am b/common/elapi/elapi_test/Makefile.am index b16102c83..cac0ead6d 100644 --- a/common/elapi/elapi_test/Makefile.am +++ b/common/elapi/elapi_test/Makefile.am @@ -5,7 +5,8 @@ topdir=$(srcdir)/../.. AM_CFLAGS = -DELAPI_DEFAULT_CONFIG_DIR=\"$(srcdir)\" \ -DELAPI_DEFAULT_CONFIG_APP_DIR=\"$(srcdir)\" \ -DELAPI_DEFAULT_APP_NAME=\"elapi_ut\" \ - -DELAPI_DEFAULT_APP_NAME_SIZE=127 + -DELAPI_DEFAULT_APP_NAME_SIZE=127 \ + -DELAPI_UTEST if HAVE_GCC AM_CFLAGS += \ @@ -13,26 +14,31 @@ if HAVE_GCC -Wcast-align -Wwrite-strings endif -AM_CPPFLAGS = -I$(topdir) -I$(topdir)/ini -I$(topdir)/trace -I$(topdir)/collection -I$(topdir)/elapi $(TRACE_LEVEL) +AM_CPPFLAGS = -I$(topdir) -I$(topdir)/ini -I$(topdir)/trace -I$(topdir)/collection -I$(topdir)/elapi \ + -I$(topdir)/elapi/providers/file $(TRACE_LEVEL) ACLOCAL_AMFLAGS = -I m4 # Build library noinst_LTLIBRARIES = libelapi_test.la + libelapi_test_la_SOURCES = \ ../elapi_event.c \ ../elapi_log.c \ ../elapi_internal.c \ + ../elapi_sink.c \ ../elapi_event.h \ ../elapi_priv.h \ ../elapi_sink.h \ ../elapi_log.h \ ../elapi_async.h \ - ../elapi.h + ../elapi.h \ + ../providers/file/file_provider.c \ + ../providers/file/file_provider.h # Build unit test check_PROGRAMS = elapi_ut elapi_ut_SOURCES = elapi_ut.c -elapi_ut_LDADD = libelapi_test.la ../../ini/libini_config.la ../../collection/libcollection.la +elapi_ut_LDADD = libelapi_test.la ../../ini/libini_config.la ../../collection/libcollection.la -ldl TESTS = elapi_ut diff --git a/common/elapi/elapi_test/elapi_ut.conf b/common/elapi/elapi_test/elapi_ut.conf index d15a4550d..a5ad7a0b2 100644 --- a/common/elapi/elapi_test/elapi_ut.conf +++ b/common/elapi/elapi_test/elapi_ut.conf @@ -79,19 +79,44 @@ sinks = logfile, syslog ; In this case the ELAPI will try to load shared library with the name ; constructed using specified value. In the given example ; ELAPI will try to load libelapi_sink_custom_audit.so library. -; The general pattern is: libelapi_sink_.so +; The general pattern is: libelapi_sink_.so +; +; required - (optional) +; Defines whether it is a required sink? +; If not present the dispatcher will return error at load time. +; +; onerror - if the sink got an error what should dispatcher do? +; 0 - retry (default) +; 1 - disable +; +; timeout - for how long one should wait before trying to revive the sink +; default is 60 seconds +; +; synch - yes/no (default no) - a flag that forces the sink to act synchronously +; even if it can support async operations [debugfile] provider=file +required=true +onerror=1 +timeout=90 [logfile] provider=file +required=true +onerror=0 +timeout=60 [auditfile] provider=file +required=true +onerror=1 +timeout=90 [stderr] provider=stderr +synch=false [syslog] provider=syslog +synch=yes diff --git a/common/elapi/providers/file/file_provider.c b/common/elapi/providers/file/file_provider.c new file mode 100644 index 000000000..589ed5eb7 --- /dev/null +++ b/common/elapi/providers/file/file_provider.c @@ -0,0 +1,155 @@ +/* + ELAPI + + Module implements a provider for sinks based on file. + + Copyright (C) Dmitri Pal 2009 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#define _GNU_SOURCE +#include /* for errors */ +#include /* for free() */ + +#include "file_provider.h" +#include "ini_config.h" +#include "trace.h" +#include "config.h" +/* FIXME: temporary for debugging */ +#include "collection_tools.h" + + +/* Function to read configuration */ +int file_read_cfg(struct file_prvdr_cfg *file_cfg, + char *name, + struct collection_item *ini_config) +{ + int error = EOK; + TRACE_FLOW_STRING("file_read_cfg", "Entry point"); + + /* FIXME: read configuration items */ + + TRACE_FLOW_STRING("file_read_cfg", "Exit"); + return error; +} + + +/* Function to create context */ +int file_create_ctx(struct file_prvdr_ctx **file_ctx, + char *name, + struct collection_item *ini_config) +{ + int error = EOK; + struct file_prvdr_ctx *ctx = NULL; + + TRACE_FLOW_STRING("file_create_ctx", "Entry point"); + + ctx = (struct file_prvdr_ctx *)malloc(sizeof(struct file_prvdr_ctx)); + if (ctx == NULL) { + TRACE_ERROR_NUMBER("Failed to allocate context", ENOMEM); + return ENOMEM; + } + + /* Init allocatable items */ + ctx->config.filename = NULL; + + /* Read configuration data */ + error = file_read_cfg(&(ctx->config), name, ini_config); + if (error) { + TRACE_ERROR_NUMBER("Error reading sink configuration", error); + free(ctx); + return error; + } + + *file_ctx = ctx; + + TRACE_FLOW_STRING("file_create_ctx", "Exit"); + return error; +} + + +/* File init function */ +int file_init(void **priv_ctx, + char *name, + struct collection_item *ini_config) +{ + int error = EOK; + TRACE_FLOW_STRING("file_init", "Entry point"); + + /* Start with creating context */ + error = file_create_ctx((struct file_prvdr_ctx **)priv_ctx, + name, + ini_config); + if (error) { + TRACE_ERROR_NUMBER("Failed to create context", error); + return error; + } + + /* Open file */ + /* FIXME: ... */ + + TRACE_FLOW_STRING("file_init", "Exit"); + return error; +} + +/* File close function */ +void file_close(void **priv_ctx) +{ + struct file_prvdr_ctx **ctx = NULL; + + TRACE_FLOW_STRING("file_close", "Entry point"); + + ctx = (struct file_prvdr_ctx **)priv_ctx; + + /* Close file */ + /* FIXME: ... */ + + /* If we allocated file name free it */ + if ((*ctx)->config.filename != NULL) { + TRACE_INFO_STRING("Freeing file name", (*ctx)->config.filename); + free((*ctx)->config.filename); + } + + /* Free and indicate that the context is freed */ + free(*ctx); + *ctx = NULL; + + TRACE_FLOW_STRING("file_close", "Exit"); +} + +/* File submit function */ +int file_submit(void *priv_ctx, struct collection_item *event) +{ + int error = EOK; + TRACE_FLOW_STRING("file_submit", "Entry point"); + + + /* FIXME: Placeholder for now */ + col_print_collection(event); + + TRACE_FLOW_STRING("file_sumbit", "Exit"); + return error; +} + + +/* This is the equivalent of the get info function */ +void file_ability(struct sink_cpb *cpb_block) +{ + TRACE_FLOW_STRING("file_ability", "Entry point"); + + cpb_block->init_cb = file_init; + cpb_block->submit_cb = file_submit; + cpb_block->close_cb = file_close; + + TRACE_FLOW_STRING("file_ability", "Exit"); +} diff --git a/common/elapi/providers/file/file_provider.h b/common/elapi/providers/file/file_provider.h new file mode 100644 index 000000000..218f69f58 --- /dev/null +++ b/common/elapi/providers/file/file_provider.h @@ -0,0 +1,79 @@ +/* + ELAPI + + Header file used internally by the "file" provider. + + Copyright (C) Dmitri Pal 2009 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#ifndef ELAPI_FILE_PROVIDER_H +#define ELAPI_FILE_PROVIDER_H + +#include + +#include "elapi_sink.h" + +/* Structure that holds internal configuration of the file + * provider. + */ +struct file_prvdr_cfg { + char *filename; /* File name */ + uint32_t keepopen; /* Do we need to keep file open */ + uint32_t fsyncmode; /* How frequently data is fsynced */ + uint32_t outmode; /* Output mode */ + struct collection_item *set; /* Field set without leftovers symbol */ + uint32_t use_leftovers; /* Was there a leftover symbol */ + uint32_t jam_leftovers; /* leftovers should be serialized into one field */ + uint32_t mode_leftovers; /* Format for the leftover fields */ + uint32_t csvheader; /* Include csv header or not? */ + char csvqualifier; /* What is the qualifier? */ + char csvseparator; /* What is the separator? */ + uint32_t csvescape; /* Do we need to escape strings ? */ + char csvescchar; /* What is the escape character? */ +}; + +/* File context */ +struct file_prvdr_ctx { + struct file_prvdr_cfg config; /* Configuration */ + int outfile; /* File handle */ + /* FIXME - other things go here */ +}; + + + +/* Function to read configuration */ +int file_read_cfg(struct file_prvdr_cfg *file_cfg, + char *name, + struct collection_item *ini_config); + +/* Function to create context */ +int file_create_ctx(struct file_prvdr_ctx **file_ctx, + char *name, + struct collection_item *ini_config); + +/* File init function */ +int file_init(void **priv_ctx, + char *name, + struct collection_item *ini_config); + +/* File close function */ +void file_close(void **priv_ctx); + +/* File submit function */ +int file_submit(void *priv_ctx, struct collection_item *event); + +/* This is the equivalent of the get info function */ +void file_ability(struct sink_cpb *cpb_block); + +#endif -- cgit