summaryrefslogtreecommitdiffstats
path: root/server/parser
diff options
context:
space:
mode:
authorDavid Sommerseth <davids@redhat.com>2009-10-21 10:31:56 +0200
committerDavid Sommerseth <davids@redhat.com>2009-10-21 10:31:56 +0200
commit143167cc63b7c809abcb048ec4454d8c5cebcb42 (patch)
treec7252293fbaaa4349f76f7ddf1ea8afd610b3312 /server/parser
parent6e90b1e99b02a9cac45686c63ddeeae044466aca (diff)
downloadrteval-143167cc63b7c809abcb048ec4454d8c5cebcb42.tar.gz
rteval-143167cc63b7c809abcb048ec4454d8c5cebcb42.tar.xz
rteval-143167cc63b7c809abcb048ec4454d8c5cebcb42.zip
Implemented database LISTEN via db_wait_notification()
This introduces async waiting for report parsing. Commit 2584a3c36c97c757dc80108b898eede52b91dc44 introduced sending a NOTIFY whenever an INSERT is done into the submissionqueue table. With the db_wait_notification() the process_submission_queue() function will sleep until a a notification is received. The LISTEN, UNLISTEN and NOTIFY SQL commands are PostgreSQL dependent. Other database implementations may use what that database supports, or just simply do a sleep() to change the behaviour to a polling model.
Diffstat (limited to 'server/parser')
-rw-r--r--server/parser/pgsql.c80
-rw-r--r--server/parser/pgsql.h1
-rw-r--r--server/parser/rteval_parserd.c16
3 files changed, 92 insertions, 5 deletions
diff --git a/server/parser/pgsql.c b/server/parser/pgsql.c
index 595df97..168bdaa 100644
--- a/server/parser/pgsql.c
+++ b/server/parser/pgsql.c
@@ -21,6 +21,7 @@
#include <string.h>
#include <pthread.h>
#include <assert.h>
+#include <errno.h>
#include <libpq-fe.h>
@@ -379,6 +380,84 @@ int db_rollback(dbconn *dbc) {
/**
+ * This function blocks until a notification is received from the database
+ *
+ * @param dbc Database connection
+ * @param listenfor Name to be used when calling LISTEN
+ *
+ * @return Returns 1 on successful waiting, otherwise -1
+ */
+int db_wait_notification(dbconn *dbc, const int *shutdown, const char *listenfor) {
+ int sock, ret = 0;
+ PGresult *dbres = NULL;
+ PGnotify *notify = NULL;
+ fd_set input_mask;
+ char *sql = NULL;
+
+ sql = malloc_nullsafe(strlen_nullsafe(listenfor) + 12);
+ assert( sql != NULL );
+
+ // Initiate listening
+ sprintf(sql, "LISTEN %s\n", listenfor);
+ dbres = PQexec((PGconn *) dbc, sql);
+ if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) {
+ fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres));
+ free_nullsafe(sql);
+ PQclear(dbres);
+ return -1;
+ }
+
+ // Start listening and waiting
+ while( ret == 0 ) {
+ sock = PQsocket((PGconn *) dbc);
+ if (sock < 0) {
+ // shouldn't happen
+ ret = -1;
+ break;
+ }
+
+ // Wait for something to happen on the database socket
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0) {
+ // If the shutdown flag is set, select() will fail due to a signal. Only
+ // report errors if we're not shutting down, or else exit normally with
+ // successful waiting.
+ if( *shutdown == 0 ) {
+ fprintf(stderr, "** ERROR ** select() failed: %s\n", strerror(errno));
+ ret = -1;
+ } else {
+ ret = 1;
+ }
+ break;
+ }
+
+ // Process the event
+ PQconsumeInput((PGconn *) dbc);
+ while ((notify = PQnotifies((PGconn *) dbc)) != NULL) {
+ // If a notification was received, inform and exit with success.
+ fprintf(stderr, "** INFO ** Received notfication from pid %d\n",
+ notify->be_pid);
+ PQfreemem(notify);
+ ret = 1;
+ break;
+ }
+ }
+
+ // Stop listening when we exit
+ sprintf(sql, "UNLISTEN %s\n", listenfor);
+ dbres = PQexec((PGconn *) dbc, sql);
+ if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) {
+ fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres));
+ free_nullsafe(sql);
+ ret = -1;
+ }
+ PQclear(dbres);
+ return ret;
+}
+
+
+/**
* Retrive the first available submitted report
*
* @param dbc Database connection
@@ -391,6 +470,7 @@ parseJob_t *db_get_submissionqueue_job(dbconn *dbc, pthread_mutex_t *mtx) {
parseJob_t *job = NULL;
PGresult *res = NULL;
char sql[4098];
+
job = (parseJob_t *) malloc_nullsafe(sizeof(parseJob_t));
if( !job ) {
fprintf(stderr, "** ERROR ** Failed to allocate memory for a new parsing job\n");
diff --git a/server/parser/pgsql.h b/server/parser/pgsql.h
index d4ea2c9..4238e5b 100644
--- a/server/parser/pgsql.h
+++ b/server/parser/pgsql.h
@@ -48,6 +48,7 @@ int db_commit(dbconn *dbc);
int db_rollback(dbconn *dbc);
/* rteval specific database functions */
+int db_wait_notification(dbconn *dbc, const int *shutdown, const char *listenfor);
parseJob_t *db_get_submissionqueue_job(dbconn *dbc, pthread_mutex_t *mtx);
int db_update_submissionqueue(dbconn *dbc, unsigned int submid, int status);
int db_register_system(dbconn *dbc, xsltStylesheet *xslt, xmlDoc *summaryxml);
diff --git a/server/parser/rteval_parserd.c b/server/parser/rteval_parserd.c
index 92cabfa..c14e1a6 100644
--- a/server/parser/rteval_parserd.c
+++ b/server/parser/rteval_parserd.c
@@ -125,12 +125,17 @@ int process_submission_queue(dbconn *dbc, mqd_t msgq) {
if( !job ) {
fprintf(stderr, "** ERROR ** Failed to get submission queue job - shutting down\n");
shutdown = 1;
- rc = 11;
- break;
+ rc = 1;
+ goto exit;
}
if( job->status == jbNONE ) {
free_nullsafe(job);
- sleep(15);
+ if( db_wait_notification(dbc, &shutdown, "rteval_submq") < 1 ) {
+ fprintf(stderr, "** ERROR ** Failed to wait for DB notification\n");
+ shutdown = 1;
+ rc = 1;
+ goto exit;
+ }
continue;
}
@@ -145,16 +150,17 @@ int process_submission_queue(dbconn *dbc, mqd_t msgq) {
fprintf(stderr, "** ERROR ** Could not send parse job to the queue\n");
shutdown = 1;
rc = 2;
- break;
+ goto exit;
} else if( errno == EAGAIN ) {
fprintf(stderr,
- "** ERROR ** Message queue filled up. "
+ "** WARNING ** Message queue filled up. "
"Will not add new messages to queue for the next 60 seconds\n");
sleep(60);
}
} while( (errno == EAGAIN) );
free_nullsafe(job);
}
+ exit:
return rc;
}