diff options
Diffstat (limited to 'server/parser/pgsql.c')
| -rw-r--r-- | server/parser/pgsql.c | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/server/parser/pgsql.c b/server/parser/pgsql.c index 595df97..168bdaa 100644 --- a/server/parser/pgsql.c +++ b/server/parser/pgsql.c @@ -21,6 +21,7 @@ #include <string.h> #include <pthread.h> #include <assert.h> +#include <errno.h> #include <libpq-fe.h> @@ -379,6 +380,84 @@ int db_rollback(dbconn *dbc) { /** + * This function blocks until a notification is received from the database + * + * @param dbc Database connection + * @param listenfor Name to be used when calling LISTEN + * + * @return Returns 1 on successful waiting, otherwise -1 + */ +int db_wait_notification(dbconn *dbc, const int *shutdown, const char *listenfor) { + int sock, ret = 0; + PGresult *dbres = NULL; + PGnotify *notify = NULL; + fd_set input_mask; + char *sql = NULL; + + sql = malloc_nullsafe(strlen_nullsafe(listenfor) + 12); + assert( sql != NULL ); + + // Initiate listening + sprintf(sql, "LISTEN %s\n", listenfor); + dbres = PQexec((PGconn *) dbc, sql); + if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) { + fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres)); + free_nullsafe(sql); + PQclear(dbres); + return -1; + } + + // Start listening and waiting + while( ret == 0 ) { + sock = PQsocket((PGconn *) dbc); + if (sock < 0) { + // shouldn't happen + ret = -1; + break; + } + + // Wait for something to happen on the database socket + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0) { + // If the shutdown flag is set, select() will fail due to a signal. Only + // report errors if we're not shutting down, or else exit normally with + // successful waiting. + if( *shutdown == 0 ) { + fprintf(stderr, "** ERROR ** select() failed: %s\n", strerror(errno)); + ret = -1; + } else { + ret = 1; + } + break; + } + + // Process the event + PQconsumeInput((PGconn *) dbc); + while ((notify = PQnotifies((PGconn *) dbc)) != NULL) { + // If a notification was received, inform and exit with success. + fprintf(stderr, "** INFO ** Received notfication from pid %d\n", + notify->be_pid); + PQfreemem(notify); + ret = 1; + break; + } + } + + // Stop listening when we exit + sprintf(sql, "UNLISTEN %s\n", listenfor); + dbres = PQexec((PGconn *) dbc, sql); + if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) { + fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres)); + free_nullsafe(sql); + ret = -1; + } + PQclear(dbres); + return ret; +} + + +/** * Retrive the first available submitted report * * @param dbc Database connection @@ -391,6 +470,7 @@ parseJob_t *db_get_submissionqueue_job(dbconn *dbc, pthread_mutex_t *mtx) { parseJob_t *job = NULL; PGresult *res = NULL; char sql[4098]; + job = (parseJob_t *) malloc_nullsafe(sizeof(parseJob_t)); if( !job ) { fprintf(stderr, "** ERROR ** Failed to allocate memory for a new parsing job\n"); |
