diff options
Diffstat (limited to 'runtime/stpd/librelay.c')
-rw-r--r-- | runtime/stpd/librelay.c | 62 |
1 files changed, 16 insertions, 46 deletions
diff --git a/runtime/stpd/librelay.c b/runtime/stpd/librelay.c index 33dc20fa..89c3b767 100644 --- a/runtime/stpd/librelay.c +++ b/runtime/stpd/librelay.c @@ -79,8 +79,6 @@ static int transport_mode; static int ncpus; static int print_totals; static int exiting; -static int processing; -static pthread_mutex_t processing_mutex; /* per-cpu data */ static int relay_file[NR_CPUS]; @@ -277,13 +275,23 @@ static int kill_percpu_threads(int n) if (pthread_cancel(reader[i]) == 0) killed++; } - if (killed != n) - fprintf(stderr, "WARNING: couldn't kill all per-cpu threads: %d killed, %d total\n", killed, n); return killed; } /** + * wait_for_percpu_threads - wait for all threads to exit + * @n: number of threads to wait for + */ +static void wait_for_percpu_threads(int n) +{ + int i; + + for (i = 0; i < n; i++) + pthread_join(reader[i], NULL); +} + +/** * process_subbufs - write ready subbufs to disk */ static int process_subbufs(struct buf_info *info) @@ -345,10 +353,6 @@ static void *reader_thread(void *data) rc = read(proc_file[cpu], &status[cpu].info, sizeof(struct buf_info)); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - pthread_mutex_lock(&processing_mutex); - processing++; - pthread_mutex_unlock(&processing_mutex); subbufs_consumed = process_subbufs(&status[cpu].info); if (subbufs_consumed) { if (subbufs_consumed > status[cpu].max_backlog) @@ -359,34 +363,11 @@ static void *reader_thread(void *data) if (write (proc_file[cpu], &consumed_info, sizeof(struct consumed_info)) < 0) fprintf(stderr,"WARNING: writing consumed info failed.\n"); } - pthread_mutex_lock(&processing_mutex); - processing--; - pthread_mutex_unlock(&processing_mutex); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + if (status[cpu].info.flushing) + pthread_exit(NULL); } while (1); } -static void read_last_buffers(void) -{ - int cpu, rc; - struct consumed_info consumed_info; - unsigned subbufs_consumed; - - for (cpu = 0; cpu < ncpus; cpu++) { - rc = read (proc_file[cpu], &status[cpu].info, sizeof(struct buf_info)); - subbufs_consumed = process_subbufs(&status[cpu].info); - if (subbufs_consumed) { - if (subbufs_consumed > status[cpu].max_backlog) - status[cpu].max_backlog = subbufs_consumed; - status[cpu].info.consumed += subbufs_consumed; - consumed_info.cpu = cpu; - consumed_info.consumed = subbufs_consumed; - if (write (proc_file[cpu], &consumed_info, sizeof(struct consumed_info)) < 0) - fprintf(stderr,"WARNING: writing consumed info failed.\n"); - } - } -} - #define RELAYFS_MAGIC 0xF0B4A981 #define DEBUGFS_MAGIC 0x64626720 /** @@ -690,18 +671,8 @@ static void cleanup_and_exit (int closed) fprintf(stderr,"\nWaititing for processes to exit\n"); while(wait(NULL) > 0); - if (transport_mode == STP_TRANSPORT_RELAYFS && relay_file[0] > 0) { - kill_percpu_threads(ncpus); - while(1) { - pthread_mutex_lock(&processing_mutex); - if (!processing) { - pthread_mutex_unlock(&processing_mutex); - break; - } - pthread_mutex_unlock(&processing_mutex); - } - read_last_buffers(); - } + if (transport_mode == STP_TRANSPORT_RELAYFS && relay_file[0] > 0) + wait_for_percpu_threads(ncpus); close_proc_files(); @@ -768,7 +739,6 @@ int stp_main_loop(void) FILE *ofp = stdout; setvbuf(ofp, (char *)NULL, _IOLBF, 0); - pthread_mutex_init(&processing_mutex, NULL); signal(SIGINT, sigproc); signal(SIGTERM, sigproc); |