From 143167cc63b7c809abcb048ec4454d8c5cebcb42 Mon Sep 17 00:00:00 2001 From: David Sommerseth Date: Wed, 21 Oct 2009 10:31:56 +0200 Subject: Implemented database LISTEN via db_wait_notification() This introduces async waiting for report parsing. Commit 2584a3c36c97c757dc80108b898eede52b91dc44 introduced sending a NOTIFY whenever an INSERT is done into the submissionqueue table. With the db_wait_notification() the process_submission_queue() function will sleep until a a notification is received. The LISTEN, UNLISTEN and NOTIFY SQL commands are PostgreSQL dependent. Other database implementations may use what that database supports, or just simply do a sleep() to change the behaviour to a polling model. --- server/parser/pgsql.c | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) (limited to 'server/parser/pgsql.c') diff --git a/server/parser/pgsql.c b/server/parser/pgsql.c index 595df97..168bdaa 100644 --- a/server/parser/pgsql.c +++ b/server/parser/pgsql.c @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -378,6 +379,84 @@ int db_rollback(dbconn *dbc) { } +/** + * This function blocks until a notification is received from the database + * + * @param dbc Database connection + * @param listenfor Name to be used when calling LISTEN + * + * @return Returns 1 on successful waiting, otherwise -1 + */ +int db_wait_notification(dbconn *dbc, const int *shutdown, const char *listenfor) { + int sock, ret = 0; + PGresult *dbres = NULL; + PGnotify *notify = NULL; + fd_set input_mask; + char *sql = NULL; + + sql = malloc_nullsafe(strlen_nullsafe(listenfor) + 12); + assert( sql != NULL ); + + // Initiate listening + sprintf(sql, "LISTEN %s\n", listenfor); + dbres = PQexec((PGconn *) dbc, sql); + if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) { + fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres)); + free_nullsafe(sql); + PQclear(dbres); + return -1; + } + + // Start listening and waiting + while( ret == 0 ) { + sock = PQsocket((PGconn *) dbc); + if (sock < 0) { + // shouldn't happen + ret = -1; + break; + } + + // Wait for something to happen on the database socket + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0) { + // If the shutdown flag is set, select() will fail due to a signal. Only + // report errors if we're not shutting down, or else exit normally with + // successful waiting. + if( *shutdown == 0 ) { + fprintf(stderr, "** ERROR ** select() failed: %s\n", strerror(errno)); + ret = -1; + } else { + ret = 1; + } + break; + } + + // Process the event + PQconsumeInput((PGconn *) dbc); + while ((notify = PQnotifies((PGconn *) dbc)) != NULL) { + // If a notification was received, inform and exit with success. + fprintf(stderr, "** INFO ** Received notfication from pid %d\n", + notify->be_pid); + PQfreemem(notify); + ret = 1; + break; + } + } + + // Stop listening when we exit + sprintf(sql, "UNLISTEN %s\n", listenfor); + dbres = PQexec((PGconn *) dbc, sql); + if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) { + fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres)); + free_nullsafe(sql); + ret = -1; + } + PQclear(dbres); + return ret; +} + + /** * Retrive the first available submitted report * @@ -391,6 +470,7 @@ parseJob_t *db_get_submissionqueue_job(dbconn *dbc, pthread_mutex_t *mtx) { parseJob_t *job = NULL; PGresult *res = NULL; char sql[4098]; + job = (parseJob_t *) malloc_nullsafe(sizeof(parseJob_t)); if( !job ) { fprintf(stderr, "** ERROR ** Failed to allocate memory for a new parsing job\n"); -- cgit