summaryrefslogtreecommitdiffstats
path: root/runtime/stpd/librelay.c
diff options
context:
space:
mode:
authortrz <trz>2005-10-14 18:35:30 +0000
committertrz <trz>2005-10-14 18:35:30 +0000
commit20b1a57d555b30478e224eec7c358ee3f7c7a6e9 (patch)
tree3905901b28bb2724a3ad037ce33f8229360ecb82 /runtime/stpd/librelay.c
parentab9c6bcec56162979d8b387b613530861a363876 (diff)
downloadsystemtap-steved-20b1a57d555b30478e224eec7c358ee3f7c7a6e9.tar.gz
systemtap-steved-20b1a57d555b30478e224eec7c358ee3f7c7a6e9.tar.xz
systemtap-steved-20b1a57d555b30478e224eec7c358ee3f7c7a6e9.zip
Fix for PR 1476
Diffstat (limited to 'runtime/stpd/librelay.c')
-rw-r--r--runtime/stpd/librelay.c35
1 files changed, 26 insertions, 9 deletions
diff --git a/runtime/stpd/librelay.c b/runtime/stpd/librelay.c
index bd550be3..b884e91d 100644
--- a/runtime/stpd/librelay.c
+++ b/runtime/stpd/librelay.c
@@ -66,6 +66,8 @@ static int transport_mode;
static int ncpus;
static int print_totals;
static int exiting;
+static int processing;
+static pthread_mutex_t processing_mutex;
/* per-cpu data */
static int relay_file[NR_CPUS];
@@ -87,8 +89,6 @@ extern char *target_cmd;
/* per-cpu buffer info */
static struct buf_status
{
- pthread_mutex_t ready_mutex;
- pthread_cond_t ready_cond;
struct buf_info info;
unsigned max_backlog; /* max # sub-buffers ready at one time */
} status[NR_CPUS];
@@ -184,8 +184,6 @@ static int open_relayfs_files(int cpu, const char *relay_filebase)
char tmp[PATH_MAX];
memset(&status[cpu], 0, sizeof(struct buf_status));
- pthread_mutex_init(&status[cpu].ready_mutex, NULL);
- pthread_cond_init(&status[cpu].ready_cond, NULL);
status[cpu].info.cpu = cpu;
sprintf(tmp, "%s%d", relay_filebase, cpu);
@@ -316,14 +314,21 @@ static void *reader_thread(void *data)
rc = poll(&pollfd, 1, -1);
if (rc < 0) {
if (errno != EINTR) {
- fprintf(stderr, "ERROR: poll error: %s\n",strerror(errno));
+ fprintf(stderr, "ERROR: poll error: %s\n",
+ strerror(errno));
exit(1);
}
- fprintf(stderr, "WARNING: poll warning: %s\n",strerror(errno));
+ fprintf(stderr, "WARNING: poll warning: %s\n",
+ strerror(errno));
rc = 0;
}
- rc = read (proc_file[cpu], &status[cpu].info, sizeof(struct buf_info));
+ rc = read(proc_file[cpu], &status[cpu].info,
+ sizeof(struct buf_info));
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_mutex_lock(&processing_mutex);
+ processing++;
+ pthread_mutex_unlock(&processing_mutex);
subbufs_consumed = process_subbufs(&status[cpu].info);
if (subbufs_consumed) {
if (subbufs_consumed > status[cpu].max_backlog)
@@ -334,6 +339,10 @@ static void *reader_thread(void *data)
if (write (proc_file[cpu], &consumed_info, sizeof(struct consumed_info)) < 0)
fprintf(stderr,"WARNING: writing consumed info failed.\n");
}
+ pthread_mutex_lock(&processing_mutex);
+ processing--;
+ pthread_mutex_unlock(&processing_mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
} while (1);
}
@@ -553,7 +562,6 @@ static int merge_output(void)
fputc_unlocked (c, ofp);
}
if (min && ++count != min) {
- // fprintf(stderr, "got %ld. expected %ld\n", min, count);
count = min;
dropped++ ;
}
@@ -588,6 +596,14 @@ static void cleanup_and_exit (int closed)
if (transport_mode == STP_TRANSPORT_RELAYFS) {
kill_percpu_threads(ncpus);
+ while(1) {
+ pthread_mutex_lock(&processing_mutex);
+ if (!processing) {
+ pthread_mutex_unlock(&processing_mutex);
+ break;
+ }
+ pthread_mutex_unlock(&processing_mutex);
+ }
read_last_buffers();
}
@@ -622,7 +638,6 @@ static void sigproc(int signum __attribute__((unused)))
{
signal(SIGINT, sigproc);
signal(SIGTERM, sigproc);
- //fprintf(stderr, "Exiting...\n");
send_request(STP_EXIT, NULL, 0);
}
@@ -638,6 +653,8 @@ int stp_main_loop(void)
void *data;
int type;
+ pthread_mutex_init(&processing_mutex, NULL);
+
signal(SIGINT, sigproc);
signal(SIGTERM, sigproc);
signal(SIGCHLD, sigproc);