diff options
| author | David Sommerseth <davids@redhat.com> | 2009-10-21 10:31:56 +0200 |
|---|---|---|
| committer | David Sommerseth <davids@redhat.com> | 2009-10-21 10:31:56 +0200 |
| commit | 143167cc63b7c809abcb048ec4454d8c5cebcb42 (patch) | |
| tree | c7252293fbaaa4349f76f7ddf1ea8afd610b3312 /server/parser/pgsql.c | |
| parent | 6e90b1e99b02a9cac45686c63ddeeae044466aca (diff) | |
| download | rteval-143167cc63b7c809abcb048ec4454d8c5cebcb42.tar.gz rteval-143167cc63b7c809abcb048ec4454d8c5cebcb42.tar.xz rteval-143167cc63b7c809abcb048ec4454d8c5cebcb42.zip | |
Implemented database LISTEN via db_wait_notification()
This introduces async waiting for report parsing. Commit
2584a3c36c97c757dc80108b898eede52b91dc44 introduced sending
a NOTIFY whenever an INSERT is done into the submissionqueue table.
With the db_wait_notification() the process_submission_queue() function
will sleep until a a notification is received.
The LISTEN, UNLISTEN and NOTIFY SQL commands are PostgreSQL dependent.
Other database implementations may use what that database supports, or
just simply do a sleep() to change the behaviour to a polling model.
Diffstat (limited to 'server/parser/pgsql.c')
| -rw-r--r-- | server/parser/pgsql.c | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/server/parser/pgsql.c b/server/parser/pgsql.c index 595df97..168bdaa 100644 --- a/server/parser/pgsql.c +++ b/server/parser/pgsql.c @@ -21,6 +21,7 @@ #include <string.h> #include <pthread.h> #include <assert.h> +#include <errno.h> #include <libpq-fe.h> @@ -379,6 +380,84 @@ int db_rollback(dbconn *dbc) { /** + * This function blocks until a notification is received from the database + * + * @param dbc Database connection + * @param listenfor Name to be used when calling LISTEN + * + * @return Returns 1 on successful waiting, otherwise -1 + */ +int db_wait_notification(dbconn *dbc, const int *shutdown, const char *listenfor) { + int sock, ret = 0; + PGresult *dbres = NULL; + PGnotify *notify = NULL; + fd_set input_mask; + char *sql = NULL; + + sql = malloc_nullsafe(strlen_nullsafe(listenfor) + 12); + assert( sql != NULL ); + + // Initiate listening + sprintf(sql, "LISTEN %s\n", listenfor); + dbres = PQexec((PGconn *) dbc, sql); + if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) { + fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres)); + free_nullsafe(sql); + PQclear(dbres); + return -1; + } + + // Start listening and waiting + while( ret == 0 ) { + sock = PQsocket((PGconn *) dbc); + if (sock < 0) { + // shouldn't happen + ret = -1; + break; + } + + // Wait for something to happen on the database socket + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0) { + // If the shutdown flag is set, select() will fail due to a signal. Only + // report errors if we're not shutting down, or else exit normally with + // successful waiting. + if( *shutdown == 0 ) { + fprintf(stderr, "** ERROR ** select() failed: %s\n", strerror(errno)); + ret = -1; + } else { + ret = 1; + } + break; + } + + // Process the event + PQconsumeInput((PGconn *) dbc); + while ((notify = PQnotifies((PGconn *) dbc)) != NULL) { + // If a notification was received, inform and exit with success. + fprintf(stderr, "** INFO ** Received notfication from pid %d\n", + notify->be_pid); + PQfreemem(notify); + ret = 1; + break; + } + } + + // Stop listening when we exit + sprintf(sql, "UNLISTEN %s\n", listenfor); + dbres = PQexec((PGconn *) dbc, sql); + if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) { + fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres)); + free_nullsafe(sql); + ret = -1; + } + PQclear(dbres); + return ret; +} + + +/** * Retrive the first available submitted report * * @param dbc Database connection @@ -391,6 +470,7 @@ parseJob_t *db_get_submissionqueue_job(dbconn *dbc, pthread_mutex_t *mtx) { parseJob_t *job = NULL; PGresult *res = NULL; char sql[4098]; + job = (parseJob_t *) malloc_nullsafe(sizeof(parseJob_t)); if( !job ) { fprintf(stderr, "** ERROR ** Failed to allocate memory for a new parsing job\n"); |
