summaryrefslogtreecommitdiffstats
path: root/server/parser/pgsql.c
diff options
context:
space:
mode:
Diffstat (limited to 'server/parser/pgsql.c')
-rw-r--r--server/parser/pgsql.c80
1 files changed, 80 insertions, 0 deletions
diff --git a/server/parser/pgsql.c b/server/parser/pgsql.c
index 595df97..168bdaa 100644
--- a/server/parser/pgsql.c
+++ b/server/parser/pgsql.c
@@ -21,6 +21,7 @@
#include <string.h>
#include <pthread.h>
#include <assert.h>
+#include <errno.h>
#include <libpq-fe.h>
@@ -379,6 +380,84 @@ int db_rollback(dbconn *dbc) {
/**
+ * This function blocks until a notification is received from the database
+ *
+ * @param dbc Database connection
+ * @param listenfor Name to be used when calling LISTEN
+ *
+ * @return Returns 1 on successful waiting, otherwise -1
+ */
+int db_wait_notification(dbconn *dbc, const int *shutdown, const char *listenfor) {
+ int sock, ret = 0;
+ PGresult *dbres = NULL;
+ PGnotify *notify = NULL;
+ fd_set input_mask;
+ char *sql = NULL;
+
+ sql = malloc_nullsafe(strlen_nullsafe(listenfor) + 12);
+ assert( sql != NULL );
+
+ // Initiate listening
+ sprintf(sql, "LISTEN %s\n", listenfor);
+ dbres = PQexec((PGconn *) dbc, sql);
+ if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) {
+ fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres));
+ free_nullsafe(sql);
+ PQclear(dbres);
+ return -1;
+ }
+
+ // Start listening and waiting
+ while( ret == 0 ) {
+ sock = PQsocket((PGconn *) dbc);
+ if (sock < 0) {
+ // shouldn't happen
+ ret = -1;
+ break;
+ }
+
+ // Wait for something to happen on the database socket
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0) {
+ // If the shutdown flag is set, select() will fail due to a signal. Only
+ // report errors if we're not shutting down, or else exit normally with
+ // successful waiting.
+ if( *shutdown == 0 ) {
+ fprintf(stderr, "** ERROR ** select() failed: %s\n", strerror(errno));
+ ret = -1;
+ } else {
+ ret = 1;
+ }
+ break;
+ }
+
+ // Process the event
+ PQconsumeInput((PGconn *) dbc);
+ while ((notify = PQnotifies((PGconn *) dbc)) != NULL) {
+ // If a notification was received, inform and exit with success.
+ fprintf(stderr, "** INFO ** Received notfication from pid %d\n",
+ notify->be_pid);
+ PQfreemem(notify);
+ ret = 1;
+ break;
+ }
+ }
+
+ // Stop listening when we exit
+ sprintf(sql, "UNLISTEN %s\n", listenfor);
+ dbres = PQexec((PGconn *) dbc, sql);
+ if( PQresultStatus(dbres) != PGRES_COMMAND_OK ) {
+ fprintf(stderr, "** ERROR ** SQL %s\n", PQresultErrorMessage(dbres));
+ free_nullsafe(sql);
+ ret = -1;
+ }
+ PQclear(dbres);
+ return ret;
+}
+
+
+/**
* Retrive the first available submitted report
*
* @param dbc Database connection
@@ -391,6 +470,7 @@ parseJob_t *db_get_submissionqueue_job(dbconn *dbc, pthread_mutex_t *mtx) {
parseJob_t *job = NULL;
PGresult *res = NULL;
char sql[4098];
+
job = (parseJob_t *) malloc_nullsafe(sizeof(parseJob_t));
if( !job ) {
fprintf(stderr, "** ERROR ** Failed to allocate memory for a new parsing job\n");