summaryrefslogtreecommitdiffstats
path: root/server/parser
diff options
context:
space:
mode:
authorDavid Sommerseth <davids@redhat.com>2009-10-23 15:26:10 +0200
committerDavid Sommerseth <davids@redhat.com>2009-10-23 15:26:10 +0200
commit70c850c13dabed16a69bb735ff66e5ff2db8a669 (patch)
tree18a14a6814cd717e090e0eda9ede3439007b55ff /server/parser
parentb32b671718dfd44ad1a72b0f4f4d84eabf6c7724 (diff)
downloadrteval-70c850c13dabed16a69bb735ff66e5ff2db8a669.tar.gz
rteval-70c850c13dabed16a69bb735ff66e5ff2db8a669.tar.xz
rteval-70c850c13dabed16a69bb735ff66e5ff2db8a669.zip
Get rid of the sleep() calls and use blocking POSIX MQ calls
This way, the rteval_parserd will also shutdown almost instantly unless one of the threads are in the progress of parsing a report.
Diffstat (limited to 'server/parser')
-rw-r--r--server/parser/parsethread.c10
-rw-r--r--server/parser/rteval_parserd.c43
2 files changed, 41 insertions, 12 deletions
diff --git a/server/parser/parsethread.c b/server/parser/parsethread.c
index 11e9e5c..fbf6777 100644
--- a/server/parser/parsethread.c
+++ b/server/parser/parsethread.c
@@ -269,8 +269,6 @@ void *parsethread(void *thrargs) {
(*(args->threadcount)) += 1;
pthread_mutex_unlock(args->mtx_thrcnt);
- sleep( args->id * 2 ); // Avoids most of the threads to do the polling in parallel
-
// Polling loop
while( *(args->shutdown) == 0 ) {
int len = 0;
@@ -303,6 +301,11 @@ void *parsethread(void *thrargs) {
pthread_exit((void *) 1);
}
+ // Ignore whatever message if the shutdown flag is set.
+ if( *(args->shutdown) != 0 ) {
+ break;
+ }
+
// If we have a message, then process the parse job
if( (errno != EAGAIN) && (len > 0) ) {
int res = 0;
@@ -322,9 +325,6 @@ void *parsethread(void *thrargs) {
"Failed to mark submid %i as STAT_INPROG",
jobinfo.submid);
}
- } else {
- // If no message was retrieved, sleep for a little while
- sleep(15);
}
}
writelog(args->dbc->log, LOG_DEBUG, "[Thread %i] Shut down", args->id);
diff --git a/server/parser/rteval_parserd.c b/server/parser/rteval_parserd.c
index 5dda49c..be883b5 100644
--- a/server/parser/rteval_parserd.c
+++ b/server/parser/rteval_parserd.c
@@ -134,7 +134,7 @@ unsigned int get_mqueue_msg_max(LogContext *log) {
int process_submission_queue(dbconn *dbc, mqd_t msgq, int *activethreads) {
pthread_mutex_t mtx_submq = PTHREAD_MUTEX_INITIALIZER;
parseJob_t *job = NULL;
- int rc = 0;
+ int rc = 0, i, actthr_cp = 0;
while( shutdown == 0 ) {
// Check status if the worker threads
@@ -199,7 +199,35 @@ int process_submission_queue(dbconn *dbc, mqd_t msgq, int *activethreads) {
} while( (errno == EAGAIN) );
free_nullsafe(job);
}
+
exit:
+ // Send empty messages to the threads, to make them have a look at the shutdown flag
+ job = (parseJob_t *) malloc_nullsafe(dbc->log, sizeof(parseJob_t));
+ errno = 0;
+ // Need to make a copy, as *activethreads will change when threads completes shutdown
+ actthr_cp = *activethreads;
+ for( i = 0; i < actthr_cp; i++ ) {
+ do {
+ int res;
+
+ writelog(dbc->log, LOG_DEBUG, "%s shutdown message %i of %i",
+ (errno == EAGAIN ? "Resending" : "Sending"), i+1, *activethreads);
+ errno = 0;
+ res = mq_send(msgq, (char *) job, sizeof(parseJob_t), 1);
+ if( (res < 0) && (errno != EAGAIN) ) {
+ writelog(dbc->log, LOG_EMERG,
+ "Could not send parse job to the queue. "
+ "Shutting down!");
+ shutdown = 1;
+ return rc;
+ } else if( errno == EAGAIN ) {
+ writelog(dbc->log, LOG_WARNING,
+ "Message queue filled up. "
+ "Will not add new messages to queue for the next 10 seconds");
+ sleep(10);
+ }
+ } while( (errno == EAGAIN) );
+ }
return rc;
}
@@ -276,7 +304,7 @@ int main(int argc, char **argv) {
threadData_t **thrdata = NULL;
struct mq_attr msgq_attr;
mqd_t msgq = 0;
- int i,rc, mq_init = 0, max_threads = 0, activethreads = 0;
+ int i,rc, mq_init = 0, max_threads = 0, started_threads = 0, activethreads = 0;
// Initialise XML and XSLT libraries
xsltInit();
@@ -327,7 +355,7 @@ int main(int argc, char **argv) {
msgq_attr.mq_maxmsg = get_mqueue_msg_max(logctx);
msgq_attr.mq_msgsize = sizeof(parseJob_t);
msgq_attr.mq_flags = O_NONBLOCK;
- msgq = mq_open("/rteval_parsequeue", O_RDWR | O_CREAT | O_NONBLOCK, 0600, &msgq_attr);
+ msgq = mq_open("/rteval_parsequeue", O_RDWR | O_CREAT, 0600, &msgq_attr);
if( msgq < 0 ) {
writelog(logctx, LOG_EMERG,
"Could not open message queue: %s", strerror(errno));
@@ -406,7 +434,7 @@ int main(int argc, char **argv) {
}
// Setup signal catching
- signal(SIGINT, sigcatch);
+ signal(SIGINT, sigcatch);
signal(SIGTERM, sigcatch);
signal(SIGHUP, SIG_IGN);
signal(SIGUSR1, sigcatch);
@@ -422,6 +450,7 @@ int main(int argc, char **argv) {
rc = 3;
goto exit;
}
+ started_threads++;
}
// Main routine
@@ -437,7 +466,7 @@ int main(int argc, char **argv) {
// Clean up all threads
for( i = 0; i < max_threads; i++ ) {
// Wait for all threads to exit
- if( threads && threads[i] ) {
+ if( (i < started_threads) && threads && threads[i] ) {
void *thread_rc;
int j_rc;
@@ -447,9 +476,9 @@ int main(int argc, char **argv) {
i, strerror(j_rc));
}
pthread_attr_destroy(thread_attrs[i]);
- free_nullsafe(threads[i]);
- free_nullsafe(thread_attrs[i]);
}
+ free_nullsafe(threads[i]);
+ free_nullsafe(thread_attrs[i]);
// Disconnect threads database connection
if( thrdata && thrdata[i] ) {