summaryrefslogtreecommitdiffstats
path: root/runtime/stpd/librelay.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/stpd/librelay.c')
-rw-r--r--runtime/stpd/librelay.c189
1 files changed, 85 insertions, 104 deletions
diff --git a/runtime/stpd/librelay.c b/runtime/stpd/librelay.c
index b3f32ff0..2868fafc 100644
--- a/runtime/stpd/librelay.c
+++ b/runtime/stpd/librelay.c
@@ -66,7 +66,7 @@ static int exiting;
/* per-cpu data */
static int relay_file[NR_CPUS];
-static int percpu_tmpfile[NR_CPUS];
+static FILE *percpu_tmpfile[NR_CPUS];
static char *relay_buffer[NR_CPUS];
static pthread_t reader[NR_CPUS];
@@ -77,29 +77,12 @@ static int control_channel;
extern int print_only;
extern int quiet;
extern int merge;
+extern unsigned int opt_subbuf_size;
+extern unsigned int opt_n_subbufs;
+extern char *modname;
/* used to communicate with kernel over control channel */
-struct buf_info
-{
- int cpu;
- unsigned produced;
- unsigned consumed;
-};
-
-struct consumed_info
-{
- int cpu;
- unsigned consumed;
-};
-
-struct transport_info
-{
- int transport_mode;
- unsigned subbuf_size;
- unsigned n_subbufs;
-};
-
struct buf_msg
{
struct nlmsghdr nlh;
@@ -112,6 +95,12 @@ struct transport_msg
struct transport_info info;
};
+struct start_msg
+{
+ struct nlmsghdr nlh;
+ struct transport_start ts;
+};
+
static char *recvbuf[8192];
/* per-cpu buffer info */
@@ -147,8 +136,7 @@ static int streaming(void)
int send_request(int type, void *data, int len)
{
struct nlmsghdr *req;
- int err;
-
+ int err, trylimit = 50;
req = (struct nlmsghdr *)malloc(NLMSG_SPACE(len));
if (req == 0) {
fprintf(stderr, "WARNING: send_request malloc failed\n");
@@ -160,7 +148,8 @@ int send_request(int type, void *data, int len)
req->nlmsg_flags = NLM_F_REQUEST;
req->nlmsg_pid = getpid();
memcpy(NLMSG_DATA(req), data, len);
- err = send(control_channel, req, req->nlmsg_len, MSG_DONTWAIT);
+ while ((err = send(control_channel, req, req->nlmsg_len, MSG_DONTWAIT)) < 0 && trylimit-- )
+ usleep (5000);
return err;
}
@@ -217,7 +206,7 @@ static void close_relayfs_files(int cpu)
munmap(relay_buffer[cpu], total_bufsize);
close(relay_file[cpu]);
- close(percpu_tmpfile[cpu]);
+ fclose(percpu_tmpfile[cpu]);
}
/**
@@ -254,8 +243,7 @@ static int open_relayfs_files(int cpu, const char *relay_filebase)
}
sprintf(tmp, "%s%d", percpu_tmpfilebase, cpu);
- if((percpu_tmpfile[cpu] = open(tmp, O_CREAT | O_RDWR | O_TRUNC,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < 0) {
+ if((percpu_tmpfile[cpu] = fopen(tmp, "w+")) == NULL) {
fprintf(stderr, "ERROR: Couldn't open output file %s: errcode = %s\n", tmp, strerror(errno));
close(relay_file[cpu]);
return -1;
@@ -269,7 +257,7 @@ static int open_relayfs_files(int cpu, const char *relay_filebase)
{
fprintf(stderr, "ERROR: couldn't mmap relay file, total_bufsize (%d) = subbuf_size (%d) * n_subbufs(%d), error = %s \n", (int)total_bufsize, (int)params.subbuf_size, (int)params.n_subbufs, strerror(errno));
close(relay_file[cpu]);
- close(percpu_tmpfile[cpu]);
+ fclose(percpu_tmpfile[cpu]);
return -1;
}
@@ -323,15 +311,13 @@ static int kill_percpu_threads(int n)
*/
static int request_last_buffers(void)
{
- int i;
-
- for (i = 0; i < ncpus; i++) {
- if (send_request(STP_BUF_INFO, &status[i].info, sizeof(struct buf_info)) < 0) {
- fprintf(stderr, "WARNING: couldn't request last buffers for cpu %d\n", i);
+ int cpu;
+ for (cpu = 0; cpu < ncpus; cpu++) {
+ if (send_request(STP_BUF_INFO, &cpu, sizeof(cpu)) < 0) {
+ fprintf(stderr, "WARNING: couldn't request last buffers for cpu %d\n", cpu);
return -1;
}
}
-
return 0;
}
@@ -356,11 +342,11 @@ static int process_subbufs(struct buf_info *info)
padding = *((unsigned *)subbuf_ptr);
subbuf_ptr += sizeof(padding);
len = (params.subbuf_size - sizeof(padding)) - padding;
-
- if (write(percpu_tmpfile[cpu], subbuf_ptr, len) < 0)
- {
- fprintf(stderr, "ERROR: couldn't write to output file for cpu %d, exiting: errcode = %d: %s\n", cpu, errno, strerror(errno));
- exit(1);
+ if (len) {
+ if (fwrite_unlocked (subbuf_ptr, len, 1, percpu_tmpfile[cpu]) < 0) {
+ fprintf(stderr, "ERROR: couldn't write to output file for cpu %d, exiting: errcode = %d: %s\n", cpu, errno, strerror(errno));
+ exit(1);
+ }
}
subbufs_consumed++;
}
@@ -374,7 +360,7 @@ static int process_subbufs(struct buf_info *info)
static void *reader_thread(void *data)
{
int rc;
- long cpu = (long)data;
+ int cpu = (long)data;
struct pollfd pollfd;
struct consumed_info consumed_info;
unsigned subbufs_consumed;
@@ -391,19 +377,19 @@ static void *reader_thread(void *data)
fprintf(stderr, "WARNING: poll warning: %s\n",strerror(errno));
rc = 0;
}
- send_request(STP_BUF_INFO, &status[cpu].info,
- sizeof(struct buf_info));
-
+ send_request(STP_BUF_INFO, &cpu, sizeof(cpu));
+
pthread_mutex_lock(&status[cpu].ready_mutex);
if (status[cpu].info.produced == status[cpu].info.consumed)
pthread_cond_wait(&status[cpu].ready_cond,
&status[cpu].ready_mutex);
pthread_mutex_unlock(&status[cpu].ready_mutex);
+ // printf ("produced:%d consumed:%d\n", status[cpu].info.produced,status[cpu].info.consumed);
subbufs_consumed = process_subbufs(&status[cpu].info);
if (subbufs_consumed) {
if (subbufs_consumed == params.n_subbufs)
- fprintf(stderr, "WARNING: cpu %ld buffer full. Consider using a larger buffer size.\n", cpu);
+ fprintf(stderr, "WARNING: cpu %d buffer full. Consider using a larger buffer size.\n", cpu);
if (subbufs_consumed > status[cpu].max_backlog)
status[cpu].max_backlog = subbufs_consumed;
status[cpu].info.consumed += subbufs_consumed;
@@ -452,54 +438,27 @@ err:
}
#include <sys/wait.h>
-static void cleanup_and_exit (char *mod_name);
-static int module_loaded = 0;
-static void sigchld(int signum __attribute__((unused)))
-{
- int status_code;
- pid_t pid = wait(&status_code);
- if (pid > 0 && WIFEXITED(status_code) && WEXITSTATUS(status_code))
- cleanup_and_exit("");
- signal(SIGCHLD, SIG_DFL);
- module_loaded = 1;
-
- if (quiet)
- printf("Logging... Press Control-C to stop.\n");
- else
- printf("Press Control-C to stop.\n");
-}
+static void cleanup_and_exit (int);
/**
* init_stp - initialize the app
- * @modname: the name of the stp module to load
* @relay_filebase: full path of base name of the per-cpu relayfs files
* @print_summary: boolean, print summary or not at end of run
*
* Returns 0 on success, negative otherwise.
*/
-int init_stp(const char *modname,
- const char *relay_filebase,
- int print_summary)
+int init_stp(const char *relay_filebase, int print_summary)
{
- int daemon_pid;
char buf[1024];
- pid_t pid;
+ struct transport_info ti;
ncpus = sysconf(_SC_NPROCESSORS_ONLN);
print_totals = print_summary;
- daemon_pid = getpid();
-
- sprintf(buf, "_stp_pid=%d", daemon_pid);
- signal(SIGCHLD, sigchld);
-
- if ((pid = vfork()) < 0) {
- perror ("vfork");
- exit(-1);
- } else if (pid == 0) {
- execl("/sbin/insmod", "insmod", modname, buf, NULL);
- perror ("exec");
- exit(-1);
+ sprintf(buf, "insmod %s _stp_pid=%d", modname, (int)getpid());
+ if (system(buf)) {
+ fprintf(stderr, "ERROR, couldn't insmod probe module %s\n", modname);
+ return -1;
}
control_channel = open_control_channel();
@@ -509,6 +468,12 @@ int init_stp(const char *modname,
if (relay_filebase)
strcpy(params.relay_filebase, relay_filebase);
+ /* now send TRANSPORT_INFO */
+ ti.subbuf_size = opt_subbuf_size;
+ ti.n_subbufs = opt_n_subbufs;
+ ti.target = 0; // FIXME. not implemented yet
+ send_request(STP_TRANSPORT_INFO, &ti, sizeof(ti));
+
return 0;
}
@@ -612,7 +577,7 @@ static void postprocess_and_exit(void)
exit(0);
}
-static void cleanup_and_exit (char *mod_name)
+static void cleanup_and_exit (int closed)
{
char tmpbuf[128];
@@ -627,34 +592,28 @@ static void cleanup_and_exit (char *mod_name)
exit(1);
}
- if (mod_name && *mod_name) {
+ if (!closed) {
/* FIXME. overflow check */
strcpy (tmpbuf, "/sbin/rmmod ");
- strcpy (tmpbuf + strlen(tmpbuf), mod_name);
+ strcpy (tmpbuf + strlen(tmpbuf), modname);
if (system(tmpbuf)) {
fprintf(stderr, "ERROR: couldn't rmmod probe module %s. No output will be written.\n",
- mod_name);
+ modname);
close_all_relayfs_files();
delete_percpu_files();
exit(1);
}
-
- if ( transport_mode == STP_TRANSPORT_RELAYFS && final_cpus_processed < ncpus)
- return;
}
+
+ if ( transport_mode == STP_TRANSPORT_RELAYFS && final_cpus_processed < ncpus)
+ return;
+
postprocess_and_exit();
}
static void sigproc(int signum __attribute__((unused)))
{
- if (transport_mode && module_loaded) {
- int trylimit = 50;
- while (send_request(STP_EXIT, NULL, 0) < 0 && trylimit--)
- usleep (10000);
- } else {
- close(control_channel);
- exit(0);
- }
+ send_request(STP_EXIT, NULL, 0);
}
/**
@@ -663,18 +622,17 @@ static void sigproc(int signum __attribute__((unused)))
int stp_main_loop(void)
{
int cpu, nb, rc;
- struct buf_msg *buf_msg;
- struct transport_msg *transport_msg;
+ struct transport_start ts;
+
unsigned short *ptr;
unsigned subbufs_consumed;
signal(SIGINT, sigproc);
signal(SIGTERM, sigproc);
-
+
while (1) { /* handle messages from control channel */
nb = recv(control_channel, recvbuf, sizeof(recvbuf), 0);
struct nlmsghdr *nlh = (struct nlmsghdr *)recvbuf;
-
if (nb < 0) {
if (errno == EINTR)
continue;
@@ -686,7 +644,7 @@ int stp_main_loop(void)
continue;
}
if (!transport_mode &&
- nlh->nlmsg_type != STP_TRANSPORT_MODE &&
+ nlh->nlmsg_type != STP_TRANSPORT_INFO &&
nlh->nlmsg_type != STP_EXIT)
{
fprintf(stderr, "WARNING: invalid stp command: no transport\n");
@@ -694,7 +652,8 @@ int stp_main_loop(void)
}
switch (nlh->nlmsg_type) {
case STP_BUF_INFO:
- buf_msg = (struct buf_msg *)nlh;
+ {
+ struct buf_msg *buf_msg = (struct buf_msg *)nlh;
cpu = buf_msg->info.cpu;
memcpy(&status[cpu].info, &buf_msg->info, sizeof (struct buf_info));
if (exiting) {
@@ -710,11 +669,19 @@ int stp_main_loop(void)
pthread_cond_signal(&status[cpu].ready_cond);
pthread_mutex_unlock(&status[cpu].ready_mutex);
break;
- case STP_TRANSPORT_MODE:
- transport_msg = (struct transport_msg *)nlh;
+ }
+ case STP_TRANSPORT_INFO:
+ {
+ struct transport_msg *transport_msg = (struct transport_msg *)nlh;
transport_mode = transport_msg->info.transport_mode;
params.subbuf_size = transport_msg->info.subbuf_size;
params.n_subbufs = transport_msg->info.n_subbufs;
+ /*
+ printf ("TRANSPORT_INFO recvd: %d bufs of %d bytes. mode=%d\n",
+ params.n_subbufs,
+ params.subbuf_size,
+ transport_mode);
+ */
if (!streaming()) {
rc = init_relayfs();
if (rc < 0) {
@@ -723,16 +690,30 @@ int stp_main_loop(void)
exit(1);
}
}
+ ts.pid = 0; // FIXME. not implemented yet
+ send_request(STP_START, &ts, sizeof(ts));
break;
+ }
case STP_REALTIME_DATA:
ptr = NLMSG_DATA(nlh);
fputs ((char *)ptr, stdout);
break;
- case STP_EXIT:
+ case STP_EXIT:
+ {
/* module asks us to unload it and exit */
- ptr = NLMSG_DATA(nlh);
- cleanup_and_exit((char *)ptr);
+ int *closed = (int *)NLMSG_DATA(nlh);
+ cleanup_and_exit(*closed);
+ break;
+ }
+ case STP_START:
+ {
+ /* we only get this if probe_start() errors */
+ struct start_msg *s = (struct start_msg *)nlh;
+ struct transport_start *t = &s->ts;
+ fprintf(stderr, "probe_start() returned %d\nExiting...\n", t->pid);
+ cleanup_and_exit(0);
break;
+ }
default:
fprintf(stderr, "WARNING: ignored netlink message of type %d\n", (nlh->nlmsg_type));
}