diff options
| author | David Sommerseth <davids@redhat.com> | 2009-10-23 15:26:10 +0200 |
|---|---|---|
| committer | David Sommerseth <davids@redhat.com> | 2009-10-23 15:26:10 +0200 |
| commit | 70c850c13dabed16a69bb735ff66e5ff2db8a669 (patch) | |
| tree | 18a14a6814cd717e090e0eda9ede3439007b55ff /server/parser | |
| parent | b32b671718dfd44ad1a72b0f4f4d84eabf6c7724 (diff) | |
| download | rteval-70c850c13dabed16a69bb735ff66e5ff2db8a669.tar.gz rteval-70c850c13dabed16a69bb735ff66e5ff2db8a669.tar.xz rteval-70c850c13dabed16a69bb735ff66e5ff2db8a669.zip | |
Get rid of the sleep() calls and use blocking POSIX MQ calls
This way, the rteval_parserd will also shutdown almost instantly
unless one of the threads are in the progress of parsing a report.
Diffstat (limited to 'server/parser')
| -rw-r--r-- | server/parser/parsethread.c | 10 | ||||
| -rw-r--r-- | server/parser/rteval_parserd.c | 43 |
2 files changed, 41 insertions, 12 deletions
diff --git a/server/parser/parsethread.c b/server/parser/parsethread.c index 11e9e5c..fbf6777 100644 --- a/server/parser/parsethread.c +++ b/server/parser/parsethread.c @@ -269,8 +269,6 @@ void *parsethread(void *thrargs) { (*(args->threadcount)) += 1; pthread_mutex_unlock(args->mtx_thrcnt); - sleep( args->id * 2 ); // Avoids most of the threads to do the polling in parallel - // Polling loop while( *(args->shutdown) == 0 ) { int len = 0; @@ -303,6 +301,11 @@ void *parsethread(void *thrargs) { pthread_exit((void *) 1); } + // Ignore whatever message if the shutdown flag is set. + if( *(args->shutdown) != 0 ) { + break; + } + // If we have a message, then process the parse job if( (errno != EAGAIN) && (len > 0) ) { int res = 0; @@ -322,9 +325,6 @@ void *parsethread(void *thrargs) { "Failed to mark submid %i as STAT_INPROG", jobinfo.submid); } - } else { - // If no message was retrieved, sleep for a little while - sleep(15); } } writelog(args->dbc->log, LOG_DEBUG, "[Thread %i] Shut down", args->id); diff --git a/server/parser/rteval_parserd.c b/server/parser/rteval_parserd.c index 5dda49c..be883b5 100644 --- a/server/parser/rteval_parserd.c +++ b/server/parser/rteval_parserd.c @@ -134,7 +134,7 @@ unsigned int get_mqueue_msg_max(LogContext *log) { int process_submission_queue(dbconn *dbc, mqd_t msgq, int *activethreads) { pthread_mutex_t mtx_submq = PTHREAD_MUTEX_INITIALIZER; parseJob_t *job = NULL; - int rc = 0; + int rc = 0, i, actthr_cp = 0; while( shutdown == 0 ) { // Check status if the worker threads @@ -199,7 +199,35 @@ int process_submission_queue(dbconn *dbc, mqd_t msgq, int *activethreads) { } while( (errno == EAGAIN) ); free_nullsafe(job); } + exit: + // Send empty messages to the threads, to make them have a look at the shutdown flag + job = (parseJob_t *) malloc_nullsafe(dbc->log, sizeof(parseJob_t)); + errno = 0; + // Need to make a copy, as *activethreads will change when threads completes shutdown + actthr_cp = *activethreads; + for( i = 0; i < actthr_cp; i++ ) { + do { + int res; + + writelog(dbc->log, LOG_DEBUG, "%s shutdown message %i of %i", + (errno == EAGAIN ? "Resending" : "Sending"), i+1, *activethreads); + errno = 0; + res = mq_send(msgq, (char *) job, sizeof(parseJob_t), 1); + if( (res < 0) && (errno != EAGAIN) ) { + writelog(dbc->log, LOG_EMERG, + "Could not send parse job to the queue. " + "Shutting down!"); + shutdown = 1; + return rc; + } else if( errno == EAGAIN ) { + writelog(dbc->log, LOG_WARNING, + "Message queue filled up. " + "Will not add new messages to queue for the next 10 seconds"); + sleep(10); + } + } while( (errno == EAGAIN) ); + } return rc; } @@ -276,7 +304,7 @@ int main(int argc, char **argv) { threadData_t **thrdata = NULL; struct mq_attr msgq_attr; mqd_t msgq = 0; - int i,rc, mq_init = 0, max_threads = 0, activethreads = 0; + int i,rc, mq_init = 0, max_threads = 0, started_threads = 0, activethreads = 0; // Initialise XML and XSLT libraries xsltInit(); @@ -327,7 +355,7 @@ int main(int argc, char **argv) { msgq_attr.mq_maxmsg = get_mqueue_msg_max(logctx); msgq_attr.mq_msgsize = sizeof(parseJob_t); msgq_attr.mq_flags = O_NONBLOCK; - msgq = mq_open("/rteval_parsequeue", O_RDWR | O_CREAT | O_NONBLOCK, 0600, &msgq_attr); + msgq = mq_open("/rteval_parsequeue", O_RDWR | O_CREAT, 0600, &msgq_attr); if( msgq < 0 ) { writelog(logctx, LOG_EMERG, "Could not open message queue: %s", strerror(errno)); @@ -406,7 +434,7 @@ int main(int argc, char **argv) { } // Setup signal catching - signal(SIGINT, sigcatch); + signal(SIGINT, sigcatch); signal(SIGTERM, sigcatch); signal(SIGHUP, SIG_IGN); signal(SIGUSR1, sigcatch); @@ -422,6 +450,7 @@ int main(int argc, char **argv) { rc = 3; goto exit; } + started_threads++; } // Main routine @@ -437,7 +466,7 @@ int main(int argc, char **argv) { // Clean up all threads for( i = 0; i < max_threads; i++ ) { // Wait for all threads to exit - if( threads && threads[i] ) { + if( (i < started_threads) && threads && threads[i] ) { void *thread_rc; int j_rc; @@ -447,9 +476,9 @@ int main(int argc, char **argv) { i, strerror(j_rc)); } pthread_attr_destroy(thread_attrs[i]); - free_nullsafe(threads[i]); - free_nullsafe(thread_attrs[i]); } + free_nullsafe(threads[i]); + free_nullsafe(thread_attrs[i]); // Disconnect threads database connection if( thrdata && thrdata[i] ) { |
