diff options
| author | David Sommerseth <davids@redhat.com> | 2009-10-21 10:31:56 +0200 |
|---|---|---|
| committer | David Sommerseth <davids@redhat.com> | 2009-10-21 10:31:56 +0200 |
| commit | 143167cc63b7c809abcb048ec4454d8c5cebcb42 (patch) | |
| tree | c7252293fbaaa4349f76f7ddf1ea8afd610b3312 /server/parser | |
| parent | 6e90b1e99b02a9cac45686c63ddeeae044466aca (diff) | |
| download | rteval-143167cc63b7c809abcb048ec4454d8c5cebcb42.tar.gz rteval-143167cc63b7c809abcb048ec4454d8c5cebcb42.tar.xz rteval-143167cc63b7c809abcb048ec4454d8c5cebcb42.zip | |
Implemented database LISTEN via db_wait_notification()
This introduces async waiting for report parsing. Commit
2584a3c36c97c757dc80108b898eede52b91dc44 introduced sending
a NOTIFY whenever an INSERT is done into the submissionqueue table.
With the db_wait_notification() the process_submission_queue() function
will sleep until a a notification is received.
The LISTEN, UNLISTEN and NOTIFY SQL commands are PostgreSQL dependent.
Other database implementations may use what that database supports, or
just simply do a sleep() to change the behaviour to a polling model.
Diffstat (limited to 'server/parser')
| -rw-r--r-- | server/parser/pgsql.c | 80 | ||||
| -rw-r--r-- | server/parser/pgsql.h | 1 | ||||
| -rw-r--r-- | server/parser/rteval_parserd.c | 16 |
3 files changed, 92 insertions, 5 deletions
diff --git a/server/parser/pgsql.c b/server/parser/pgsql.c index 595df97..168bdaa 100644 --- a/server/parser/pgsql.c +++ b/server/parser/pgsql.c @@ -21,6 +21,7 @@ #include <string.h> #include <pthread.h> #include <assert.h> +#include <errno.h> #include <libpq-fe.h> @@ -379,6 +380,84 @@ int db_rollback(dbconn *dbc) { /** + * This function blocks until a notification is received from the database + * + * @param dbc Database connection + * @param listenfor Name to be used when calling LISTEN + * + * @return Returns 1 on successful waiting, otherwise -1 + */ +int db_wait_notification(dbconn *dbc, const int *shutdown, const char *listenfor) { + int sock, ret = 0; + PGresult *dbres = NULL; + PGnotify *notify = NULL; + fd_set input_mask; + char *sql = NULL; + + sql = malloc_nullsafe(strlen_nullsafe(listenfor) + 12); + assert( sql != NULL ); + + // Initiate listening + sprintf(sql, "LISTEN %s\n", listenfor); + dbres = PQexec((PGconn *) dbc, sql); + if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) { + fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres)); + free_nullsafe(sql); + PQclear(dbres); + return -1; + } + + // Start listening and waiting + while( ret == 0 ) { + sock = PQsocket((PGconn *) dbc); + if (sock < 0) { + // shouldn't happen + ret = -1; + break; + } + + // Wait for something to happen on the database socket + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0) { + // If the shutdown flag is set, select() will fail due to a signal. Only + // report errors if we're not shutting down, or else exit normally with + // successful waiting. + if( *shutdown == 0 ) { + fprintf(stderr, "** ERROR ** select() failed: %s\n", strerror(errno)); + ret = -1; + } else { + ret = 1; + } + break; + } + + // Process the event + PQconsumeInput((PGconn *) dbc); + while ((notify = PQnotifies((PGconn *) dbc)) != NULL) { + // If a notification was received, inform and exit with success. + fprintf(stderr, "** INFO ** Received notfication from pid %d\n", + notify->be_pid); + PQfreemem(notify); + ret = 1; + break; + } + } + + // Stop listening when we exit + sprintf(sql, "UNLISTEN %s\n", listenfor); + dbres = PQexec((PGconn *) dbc, sql); + if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) { + fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres)); + free_nullsafe(sql); + ret = -1; + } + PQclear(dbres); + return ret; +} + + +/** * Retrive the first available submitted report * * @param dbc Database connection @@ -391,6 +470,7 @@ parseJob_t *db_get_submissionqueue_job(dbconn *dbc, pthread_mutex_t *mtx) { parseJob_t *job = NULL; PGresult *res = NULL; char sql[4098]; + job = (parseJob_t *) malloc_nullsafe(sizeof(parseJob_t)); if( !job ) { fprintf(stderr, "** ERROR ** Failed to allocate memory for a new parsing job\n"); diff --git a/server/parser/pgsql.h b/server/parser/pgsql.h index d4ea2c9..4238e5b 100644 --- a/server/parser/pgsql.h +++ b/server/parser/pgsql.h @@ -48,6 +48,7 @@ int db_commit(dbconn *dbc); int db_rollback(dbconn *dbc); /* rteval specific database functions */ +int db_wait_notification(dbconn *dbc, const int *shutdown, const char *listenfor); parseJob_t *db_get_submissionqueue_job(dbconn *dbc, pthread_mutex_t *mtx); int db_update_submissionqueue(dbconn *dbc, unsigned int submid, int status); int db_register_system(dbconn *dbc, xsltStylesheet *xslt, xmlDoc *summaryxml); diff --git a/server/parser/rteval_parserd.c b/server/parser/rteval_parserd.c index 92cabfa..c14e1a6 100644 --- a/server/parser/rteval_parserd.c +++ b/server/parser/rteval_parserd.c @@ -125,12 +125,17 @@ int process_submission_queue(dbconn *dbc, mqd_t msgq) { if( !job ) { fprintf(stderr, "** ERROR ** Failed to get submission queue job - shutting down\n"); shutdown = 1; - rc = 11; - break; + rc = 1; + goto exit; } if( job->status == jbNONE ) { free_nullsafe(job); - sleep(15); + if( db_wait_notification(dbc, &shutdown, "rteval_submq") < 1 ) { + fprintf(stderr, "** ERROR ** Failed to wait for DB notification\n"); + shutdown = 1; + rc = 1; + goto exit; + } continue; } @@ -145,16 +150,17 @@ int process_submission_queue(dbconn *dbc, mqd_t msgq) { fprintf(stderr, "** ERROR ** Could not send parse job to the queue\n"); shutdown = 1; rc = 2; - break; + goto exit; } else if( errno == EAGAIN ) { fprintf(stderr, - "** ERROR ** Message queue filled up. " + "** WARNING ** Message queue filled up. " "Will not add new messages to queue for the next 60 seconds\n"); sleep(60); } } while( (errno == EAGAIN) ); free_nullsafe(job); } + exit: return rc; } |
