diff options
Diffstat (limited to 'server/parser/rteval-parserd.c')
-rw-r--r-- | server/parser/rteval-parserd.c | 530 |
1 files changed, 530 insertions, 0 deletions
diff --git a/server/parser/rteval-parserd.c b/server/parser/rteval-parserd.c new file mode 100644 index 0000000..01287a0 --- /dev/null +++ b/server/parser/rteval-parserd.c @@ -0,0 +1,530 @@ +/* + * Copyright (C) 2009 Red Hat Inc. + * + * This application is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; version 2. + * + * This application is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ + +/** + * @file rteval-parserd.c + * @author David Sommerseth <davids@redhat.com> + * @date Thu Oct 15 11:59:27 2009 + * + * @brief Polls the rteval.submissionqueue table for notifications + * from new inserts and sends the file to a processing thread + * + * + * + */ + +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <signal.h> +#include <pthread.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <errno.h> +#include <assert.h> + +#include <eurephia_nullsafe.h> +#include <eurephia_values.h> +#include <configparser.h> +#include <pgsql.h> +#include <threadinfo.h> +#include <parsethread.h> +#include <argparser.h> + +#define DEFAULT_MSG_MAX 5 /**< Default size of the message queue */ +#define XMLPARSER_XSL "xmlparser.xsl" /**< rteval report parser XSLT, parses XML into database friendly data*/ + +static int shutdown = 0; /**< Variable indicating if the program should shutdown */ +static LogContext *logctx = NULL; /**< Initialsed log context, to be used by sigcatch() */ + + +/** + * Simple signal catcher. Used for SIGINT and SIGTERM signals, and will set the global shutdown + * shutdown flag. It's expected that all threads behaves properly and exits as soon as their current + * work is completed + * + * @param sig Recieved signal (not used) + */ +void sigcatch(int sig) { + switch( sig ) { + case SIGINT: + case SIGTERM: + if( shutdown == 0 ) { + shutdown = 1; + writelog(logctx, LOG_INFO, "[SIGNAL] Shutting down"); + } else { + writelog(logctx, LOG_INFO, "[SIGNAL] Shutdown in progress ... please be patient ..."); + } + break; + + case SIGUSR1: + writelog(logctx, LOG_EMERG, "[SIGNAL] Shutdown alarm from a worker thread"); + shutdown = 1; + break; + + default: + break; + } + + // re-enable signals, to avoid brute force exits. + // If brute force is needed, SIGKILL is available. + signal(sig, sigcatch); +} + + +/** + * Opens and reads /proc/sys/fs/mqueue/msg_max, to get the maximum number of allowed messages + * on POSIX MQ queues. rteval-parserd will use as much of this as possible when needed. + * + * @return Returns the system msg_max value, or DEFAULT_MSG_MAX on failure to read the setting. + */ +unsigned int get_mqueue_msg_max(LogContext *log) { + FILE *fp = NULL; + char buf[130]; + unsigned int msg_max = DEFAULT_MSG_MAX; + + fp = fopen("/proc/sys/fs/mqueue/msg_max", "r"); + if( !fp ) { + writelog(log, LOG_WARNING, + "Could not open /proc/sys/fs/mqueue/msg_max, defaulting to %i", + msg_max); + writelog(log, LOG_INFO, "%s", strerror(errno)); + return msg_max; + } + + memset(&buf, 0, 130); + if( fread(&buf, 1, 128, fp) < 1 ) { + writelog(log, LOG_WARNING, + "Could not read /proc/sys/fs/mqueue/msg_max, defaulting to %i", + msg_max); + writelog(log, LOG_INFO, "%s", strerror(errno)); + } else { + msg_max = atoi_nullsafe(buf); + if( msg_max < 1 ) { + msg_max = DEFAULT_MSG_MAX; + writelog(log, LOG_WARNING, + "Failed to parse /proc/sys/fs/mqueue/msg_max," + "defaulting to %i", msg_max); + } + } + fclose(fp); + return msg_max; +} + + +/** + * Main loop, which polls the submissionqueue table and puts jobs found here into a POSIX MQ queue + * which the worker threads will pick up. + * + * @param dbc Database connection, where to query the submission queue + * @param msgq file descriptor for the message queue + * @param activethreads Pointer to an int value containing active worker threads. Each thread updates + * this value directly, and this function should only read it. + * + * @return Returns 0 on successful run, otherwise > 0 on errors. + */ +int process_submission_queue(dbconn *dbc, mqd_t msgq, int *activethreads) { + pthread_mutex_t mtx_submq = PTHREAD_MUTEX_INITIALIZER; + parseJob_t *job = NULL; + int rc = 0, i, actthr_cp = 0; + + while( shutdown == 0 ) { + // Check status if the worker threads + // If we don't have any worker threads, shut down immediately + writelog(dbc->log, LOG_DEBUG, "Active worker threads: %i", *activethreads); + if( *activethreads < 1 ) { + writelog(dbc->log, LOG_EMERG, + "All worker threads ceased to exist. Shutting down!"); + shutdown = 1; + rc = 1; + goto exit; + } + + if( db_ping(dbc) != 1 ) { + writelog(dbc->log, LOG_EMERG, "Lost connection to database. Shutting down!"); + shutdown = 1; + rc = 1; + goto exit; + } + + // Fetch an available job + job = db_get_submissionqueue_job(dbc, &mtx_submq); + if( !job ) { + writelog(dbc->log, LOG_EMERG, + "Failed to get submission queue job. Shutting down!"); + shutdown = 1; + rc = 1; + goto exit; + } + if( job->status == jbNONE ) { + free_nullsafe(job); + if( db_wait_notification(dbc, &shutdown, "rteval_submq") < 1 ) { + writelog(dbc->log, LOG_EMERG, + "Failed to wait for DB notification. Shutting down!"); + shutdown = 1; + rc = 1; + goto exit; + } + continue; + } + + // Send the job to the queue + writelog(dbc->log, LOG_INFO, "** New job: submid %i, %s", job->submid, job->filename); + do { + int res; + + errno = 0; + res = mq_send(msgq, (char *) job, sizeof(parseJob_t), 1); + if( (res < 0) && (errno != EAGAIN) ) { + writelog(dbc->log, LOG_EMERG, + "Could not send parse job to the queue. " + "Shutting down!"); + shutdown = 1; + rc = 2; + goto exit; + } else if( errno == EAGAIN ) { + writelog(dbc->log, LOG_WARNING, + "Message queue filled up. " + "Will not add new messages to queue for the next 60 seconds"); + sleep(60); + } + } while( (errno == EAGAIN) ); + free_nullsafe(job); + } + + exit: + // Send empty messages to the threads, to make them have a look at the shutdown flag + job = (parseJob_t *) malloc_nullsafe(dbc->log, sizeof(parseJob_t)); + errno = 0; + // Need to make a copy, as *activethreads will change when threads completes shutdown + actthr_cp = *activethreads; + for( i = 0; i < actthr_cp; i++ ) { + do { + int res; + + writelog(dbc->log, LOG_DEBUG, "%s shutdown message %i of %i", + (errno == EAGAIN ? "Resending" : "Sending"), i+1, *activethreads); + errno = 0; + res = mq_send(msgq, (char *) job, sizeof(parseJob_t), 1); + if( (res < 0) && (errno != EAGAIN) ) { + writelog(dbc->log, LOG_EMERG, + "Could not send shutdown notification to the queue."); + free_nullsafe(job); + return rc; + } else if( errno == EAGAIN ) { + writelog(dbc->log, LOG_WARNING, + "Message queue filled up. " + "Will not add new messages to queue for the next 10 seconds"); + sleep(10); + } + } while( (errno == EAGAIN) ); + } + free_nullsafe(job); + return rc; +} + + +/** + * Prepares the program to be daemonised + * + * @param log Initialised log context, where log info of the process is reported + * + * @return Returns 1 on success, otherwise -1 + */ +int daemonise(LogContext *log) { + pid_t pid, sid; + int i = 0; + + if( (log->logtype == ltCONSOLE) ) { + writelog(log, LOG_EMERG, + "Cannot daemonise when logging to a console (stdout: or stderr:)"); + return -1; + } + + pid = fork(); + if (pid < 0) { + writelog(log, LOG_EMERG, "Failed to daemonise the process (fork)"); + return -1; + } else if (pid > 0) { + writelog(log, LOG_INFO, "Daemon pid: %ld", pid); + exit(EXIT_SUCCESS); + } + + umask(0); + + sid = setsid(); + if (sid < 0) { + writelog(log, LOG_EMERG, "Failed to daemonise the process (setsid)"); + return -1; + } + + if ((chdir("/")) < 0) { + writelog(log, LOG_EMERG, "Failed to daemonise the process (fork)"); + return -1; + } + + // Prepare stdin, stdout and stderr for daemon mode + close(2); + close(1); + close(0); + i = open("/dev/null", O_RDWR); /* open stdin */ + dup(i); /* stdout */ + dup(i); /* stderr */ + + writelog(log, LOG_INFO, "Daemonised successfully"); + return 1; +} + + +/** + * rtevald_parser main function. + * + * @param argc + * @param argv + * + * @return Returns the result of the process_submission_queue() function. + */ +int main(int argc, char **argv) { + eurephiaVALUES *config = NULL, *prgargs = NULL; + char xsltfile[2050], *reportdir = NULL; + xsltStylesheet *xslt = NULL; + dbconn *dbc = NULL; + pthread_t **threads = NULL; + pthread_attr_t **thread_attrs = NULL; + pthread_mutex_t mtx_sysreg = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_t mtx_thrcnt = PTHREAD_MUTEX_INITIALIZER; + threadData_t **thrdata = NULL; + struct mq_attr msgq_attr; + mqd_t msgq = 0; + int i,rc, mq_init = 0, max_threads = 0, started_threads = 0, activethreads = 0; + unsigned int max_report_size = 0; + + // Initialise XML and XSLT libraries + xsltInit(); + xmlInitParser(); + + prgargs = parse_arguments(argc, argv); + if( prgargs == NULL ) { + fprintf(stderr, "** ERROR ** Failed to parse program arguments\n"); + rc = 2; + goto exit; + } + + // Setup a log context + logctx = init_log(eGet_value(prgargs, "log"), eGet_value(prgargs, "loglevel")); + if( !logctx ) { + fprintf(stderr, "** ERROR ** Could not setup a log context\n"); + eFree_values(prgargs); + rc = 2; + goto exit; + } + + // Fetch configuration + config = read_config(logctx, prgargs, "xmlrpc_parser"); + eFree_values(prgargs); // read_config() copies prgargs into config, we don't need prgargs anymore + + // Daemonise process if requested + if( atoi_nullsafe(eGet_value(config, "daemon")) == 1 ) { + if( daemonise(logctx) < 1 ) { + rc = 3; + goto exit; + } + } + + + // Parse XSLT template + snprintf(xsltfile, 512, "%s/%s", eGet_value(config, "xsltpath"), XMLPARSER_XSL); + writelog(logctx, LOG_DEBUG, "Parsing XSLT file: %s", xsltfile); + xslt = xsltParseStylesheetFile((xmlChar *) xsltfile); + if( !xslt ) { + writelog(logctx, LOG_EMERG, "Could not parse XSLT template: %s", xsltfile); + rc = 2; + goto exit; + } + + // Open a POSIX MQ + writelog(logctx, LOG_DEBUG, "Preparing POSIX MQ queue: /rteval_parsequeue"); + memset(&msgq, 0, sizeof(mqd_t)); + msgq_attr.mq_maxmsg = get_mqueue_msg_max(logctx); + msgq_attr.mq_msgsize = sizeof(parseJob_t); + msgq_attr.mq_flags = O_NONBLOCK; + msgq = mq_open("/rteval_parsequeue", O_RDWR | O_CREAT, 0600, &msgq_attr); + if( msgq < 0 ) { + writelog(logctx, LOG_EMERG, + "Could not open message queue: %s", strerror(errno)); + rc = 2; + goto exit; + } + mq_init = 1; + + // Get the number of worker threads + max_threads = atoi_nullsafe(eGet_value(config, "threads")); + if( max_threads == 0 ) { + max_threads = 4; + } + + // Get a database connection for the main thread + dbc = db_connect(config, max_threads, logctx); + if( !dbc ) { + rc = 4; + goto exit; + } + + // Prepare all threads + threads = calloc(max_threads + 1, sizeof(pthread_t *)); + thread_attrs = calloc(max_threads + 1, sizeof(pthread_attr_t *)); + thrdata = calloc(max_threads + 1, sizeof(threadData_t *)); + assert( (threads != NULL) && (thread_attrs != NULL) && (thrdata != NULL) ); + + reportdir = eGet_value(config, "reportdir"); + writelog(logctx, LOG_INFO, "Starting %i worker threads", max_threads); + max_report_size = defaultIntValue(atoi_nullsafe(eGet_value(config, "max_report_size")), 1024*1024); + for( i = 0; i < max_threads; i++ ) { + // Prepare thread specific data + thrdata[i] = malloc_nullsafe(logctx, sizeof(threadData_t)); + if( !thrdata[i] ) { + writelog(logctx, LOG_EMERG, + "Could not allocate memory for thread data"); + rc = 2; + goto exit; + } + + // Get a database connection for the thread + thrdata[i]->dbc = db_connect(config, i, logctx); + if( !thrdata[i]->dbc ) { + writelog(logctx, LOG_EMERG, + "Could not connect to the database for thread %i", i); + rc = 2; + shutdown = 1; + goto exit; + } + + thrdata[i]->shutdown = &shutdown; + thrdata[i]->threadcount = &activethreads; + thrdata[i]->mtx_thrcnt = &mtx_thrcnt; + thrdata[i]->id = i; + thrdata[i]->msgq = msgq; + thrdata[i]->mtx_sysreg = &mtx_sysreg; + thrdata[i]->xslt = xslt; + thrdata[i]->destdir = reportdir; + thrdata[i]->max_report_size = max_report_size; + + thread_attrs[i] = malloc_nullsafe(logctx, sizeof(pthread_attr_t)); + if( !thread_attrs[i] ) { + writelog(logctx, LOG_EMERG, + "Could not allocate memory for thread attributes"); + rc = 2; + goto exit; + } + pthread_attr_init(thread_attrs[i]); + pthread_attr_setdetachstate(thread_attrs[i], PTHREAD_CREATE_JOINABLE); + + threads[i] = malloc_nullsafe(logctx, sizeof(pthread_t)); + if( !threads[i] ) { + writelog(logctx, LOG_EMERG, + "Could not allocate memory for pthread_t"); + rc = 2; + goto exit; + } + } + + // Setup signal catching + signal(SIGINT, sigcatch); + signal(SIGTERM, sigcatch); + signal(SIGHUP, SIG_IGN); + signal(SIGUSR1, sigcatch); + signal(SIGUSR2, SIG_IGN); + + // Start the threads + for( i = 0; i < max_threads; i++ ) { + int thr_rc = pthread_create(threads[i], thread_attrs[i], parsethread, thrdata[i]); + if( thr_rc < 0 ) { + writelog(logctx, LOG_EMERG, + "** ERROR ** Failed to start thread %i: %s", + i, strerror(thr_rc)); + rc = 3; + goto exit; + } + started_threads++; + } + + // Main routine + // + // checks the submission queue and puts unprocessed records on the POSIX MQ + // to be parsed by one of the threads + // + sleep(3); // Allow at least a few parser threads to settle down first before really starting + writelog(logctx, LOG_DEBUG, "Starting submission queue checker"); + rc = process_submission_queue(dbc, msgq, &activethreads); + writelog(logctx, LOG_DEBUG, "Submission queue checker shut down"); + + exit: + // Clean up all threads + for( i = 0; i < max_threads; i++ ) { + // Wait for all threads to exit + if( (i < started_threads) && threads && threads[i] ) { + void *thread_rc; + int j_rc; + + if( (j_rc = pthread_join(*threads[i], &thread_rc)) != 0 ) { + writelog(logctx, LOG_CRIT, + "Failed to join thread %i: %s", + i, strerror(j_rc)); + } + pthread_attr_destroy(thread_attrs[i]); + } + if( threads ) { + free_nullsafe(threads[i]); + } + if( thread_attrs ) { + free_nullsafe(thread_attrs[i]); + } + + // Disconnect threads database connection + if( thrdata && thrdata[i] ) { + db_disconnect(thrdata[i]->dbc); + free_nullsafe(thrdata[i]); + } + } + free_nullsafe(thrdata); + free_nullsafe(threads); + free_nullsafe(thread_attrs); + + // Close message queue + if( mq_init == 1 ) { + errno = 0; + if( mq_close(msgq) < 0 ) { + writelog(logctx, LOG_CRIT, "Failed to close message queue: %s", + strerror(errno)); + } + errno = 0; + if( mq_unlink("/rteval_parsequeue") < 0 ) { + writelog(logctx, LOG_ALERT, "Failed to remove the message queue: %s", + strerror(errno)); + } + } + + // Disconnect from database, main thread connection + db_disconnect(dbc); + + // Free up the rest + eFree_values(config); + xsltFreeStylesheet(xslt); + xmlCleanupParser(); + xsltCleanupGlobals(); + + writelog(logctx, LOG_EMERG, "rteval-parserd is stopped"); + close_log(logctx); + return rc; +} + |