diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/stpd/ChangeLog | 5 | ||||
-rw-r--r-- | runtime/stpd/librelay.c | 314 | ||||
-rw-r--r-- | runtime/stpd/stpd.c | 25 |
3 files changed, 131 insertions, 213 deletions
diff --git a/runtime/stpd/ChangeLog b/runtime/stpd/ChangeLog index b3206bed..c1f1530e 100644 --- a/runtime/stpd/ChangeLog +++ b/runtime/stpd/ChangeLog @@ -1,3 +1,8 @@ +2005-08-19 Martin Hunt <hunt@redhat.com> + + * stpd.c (main): Simplify buffer size code. + * librelay.c: Major changes to support procfs instead of netlink. + 2005-08-03 Tom Zanussi <trz@us.ibm.com> * librelay.c: Track subbuf info requests/replies diff --git a/runtime/stpd/librelay.c b/runtime/stpd/librelay.c index 98875e49..fabbd145 100644 --- a/runtime/stpd/librelay.c +++ b/runtime/stpd/librelay.c @@ -19,6 +19,9 @@ * Copyright (C) Red Hat Inc, 2005 * */ + +#define USE_PROCFS 1 + #include <ctype.h> #include <stdio.h> #include <stdlib.h> @@ -36,7 +39,6 @@ #include <pthread.h> #include <sys/socket.h> #include <linux/types.h> -#include <linux/netlink.h> #include <linux/limits.h> #include "librelay.h" @@ -54,6 +56,11 @@ static struct params /* temporary per-cpu output written here, filebase0...N */ static char *percpu_tmpfilebase = "stpd_cpu"; +#ifdef USE_PROCFS +static char proc_filebase[128]; +static int proc_file[NR_CPUS]; +#endif + /* probe output written here */ static char *outfile_name = "probe.out"; @@ -70,37 +77,14 @@ static char *relay_buffer[NR_CPUS]; static pthread_t reader[NR_CPUS]; static int pending_info[NR_CPUS]; -/* netlink control channel */ +/* control channel */ static int control_channel; /* flags */ extern int print_only, quiet, merge, verbose; -extern unsigned int opt_subbuf_size; -extern unsigned int opt_n_subbufs; +extern unsigned int buffer_size; extern char *modname; -/* used to communicate with kernel over control channel */ - -struct buf_msg -{ - struct nlmsghdr nlh; - struct buf_info info; -}; - -struct transport_msg -{ - struct nlmsghdr nlh; - struct transport_info info; -}; - -struct start_msg -{ - struct nlmsghdr nlh; - struct transport_start ts; -}; - -static char *recvbuf[8192]; - /* per-cpu buffer info */ static struct buf_status { @@ -117,7 +101,7 @@ static struct buf_status */ static int streaming(void) { - if (transport_mode == STP_TRANSPORT_NETLINK) + if (transport_mode == STP_TRANSPORT_PROC) return 1; return 0; @@ -133,67 +117,13 @@ static int streaming(void) */ int send_request(int type, void *data, int len) { - struct nlmsghdr *req; - int err, trylimit = 50; - req = (struct nlmsghdr *)malloc(NLMSG_SPACE(len)); - if (req == 0) { - fprintf(stderr, "WARNING: send_request malloc failed\n"); - return -1; - } - memset(req, 0, NLMSG_SPACE(len)); - req->nlmsg_len = NLMSG_LENGTH(len); - req->nlmsg_type = type; - req->nlmsg_flags = NLM_F_REQUEST; - req->nlmsg_pid = getpid(); - memcpy(NLMSG_DATA(req), data, len); - while ((err = send(control_channel, req, req->nlmsg_len, MSG_DONTWAIT)) < 0 && trylimit-- ) - usleep (5000); - return err; -} - -#if 0 -static unsigned int get_rmem(void) -{ - char buf[32]; - FILE *fp = fopen ("/proc/sys/net/core/rmem_max", "r"); - if (fp == NULL) - return 0; - if (fgets (buf, 32, fp) == NULL) { - fclose(fp); - return 0; - } - fclose(fp); - return atoi(buf); + char buf[1024]; + memcpy(buf, &type, 4); + memcpy(&buf[4],data,len); + return write(control_channel, buf, len+4); } -#endif - -/** - * open_control_channel - create netlink channel - */ -static int open_control_channel() -{ - struct sockaddr_nl snl; - int channel, rcvsize = 512*1024; - channel = socket(AF_NETLINK, SOCK_RAW, NETLINK_USERSOCK); - if (channel < 0) { - fprintf(stderr, "ERROR: socket() failed\n"); - return channel; - } - - if (setsockopt(channel, SOL_SOCKET, SO_RCVBUF, &rcvsize, sizeof(int))) - fprintf(stderr, "WARNING: failed to set socket receive buffer to %d\n", rcvsize); - memset(&snl, 0, sizeof snl); - snl.nl_family = AF_NETLINK; - snl.nl_pid = getpid(); - snl.nl_groups = 0; - - if (bind (channel, (struct sockaddr *) &snl, sizeof snl)) - fprintf(stderr, "ERROR: bind() failed\n"); - - return channel; -} /** * summarize - print a summary if applicable @@ -214,13 +144,24 @@ static void summarize(void) } } +#ifdef USE_PROCFS +static void close_proc_files() +{ + int i; + for (i = 0; i < ncpus; i++) + close(proc_file[i]); +} +#else +static void close_proc_files() {} +#endif + /** * close_relayfs_files - close and munmap buffer and open output file */ static void close_relayfs_files(int cpu) { size_t total_bufsize = params.subbuf_size * params.n_subbufs; - + munmap(relay_buffer[cpu], total_bufsize); close(relay_file[cpu]); fclose(percpu_tmpfile[cpu]); @@ -259,7 +200,17 @@ static int open_relayfs_files(int cpu, const char *relay_filebase) return -1; } - sprintf(tmp, "%s%d", percpu_tmpfilebase, cpu); +#ifdef USE_PROCFS + sprintf(tmp, "%s/%d", proc_filebase, cpu); + //printf("Opening %s.\n", tmp); + proc_file[cpu] = open(tmp, O_RDWR | O_NONBLOCK); + if (proc_file[cpu] < 0) { + fprintf(stderr, "ERROR: couldn't open proc file %s: errcode = %s\n", tmp, strerror(errno)); + return -1; + } +#endif + + sprintf(tmp, "%s%d", percpu_tmpfilebase, cpu); if((percpu_tmpfile[cpu] = fopen(tmp, "w+")) == NULL) { fprintf(stderr, "ERROR: Couldn't open output file %s: errcode = %s\n", tmp, strerror(errno)); close(relay_file[cpu]); @@ -328,6 +279,7 @@ static int kill_percpu_threads(int n) */ static int request_last_buffers(void) { +#if 0 int cpu; for (cpu = 0; cpu < ncpus; cpu++) { if (send_request(STP_BUF_INFO, &cpu, sizeof(cpu)) < 0) { @@ -336,6 +288,7 @@ static int request_last_buffers(void) } pending_info[cpu]++; } +#endif return 0; } @@ -383,9 +336,10 @@ static void *reader_thread(void *data) struct consumed_info consumed_info; unsigned subbufs_consumed; + pollfd.fd = relay_file[cpu]; + pollfd.events = POLLIN; + do { - pollfd.fd = relay_file[cpu]; - pollfd.events = POLLIN; rc = poll(&pollfd, 1, -1); if (rc < 0) { if (errno != EINTR) { @@ -396,18 +350,7 @@ static void *reader_thread(void *data) rc = 0; } - if (send_request(STP_BUF_INFO, &cpu, sizeof(cpu)) < 0) - fprintf(stderr, "WARNING: info request failed for cpu %d\n", cpu); - else - pending_info[cpu]++; - - pthread_mutex_lock(&status[cpu].ready_mutex); - if (status[cpu].info.produced == status[cpu].info.consumed) - pthread_cond_wait(&status[cpu].ready_cond, - &status[cpu].ready_mutex); - pthread_mutex_unlock(&status[cpu].ready_mutex); - // printf ("produced:%d consumed:%d\n", status[cpu].info.produced,status[cpu].info.consumed); - + rc = read (proc_file[cpu], &status[cpu].info, sizeof(struct buf_info)); subbufs_consumed = process_subbufs(&status[cpu].info); if (subbufs_consumed) { if (subbufs_consumed > status[cpu].max_backlog) @@ -415,9 +358,7 @@ static void *reader_thread(void *data) status[cpu].info.consumed += subbufs_consumed; consumed_info.cpu = cpu; consumed_info.consumed = subbufs_consumed; - send_request(STP_SUBBUFS_CONSUMED, - &consumed_info, - sizeof(struct consumed_info)); + write (proc_file[cpu], &consumed_info, sizeof(struct consumed_info)); } } while (1); } @@ -430,6 +371,7 @@ static void *reader_thread(void *data) int init_relayfs(void) { int i, j; + //printf("initializing relayfs\n"); for (i = 0; i < ncpus; i++) { if (open_relayfs_files(i, params.relay_filebase) < 0) { @@ -444,7 +386,7 @@ int init_relayfs(void) } } - if (print_totals) + if (print_totals && verbose) printf("Using channel with %u sub-buffers of size %u.\n", params.n_subbufs, params.subbuf_size); @@ -490,21 +432,32 @@ int init_stp(const char *relay_filebase, int print_summary) exit(-1); } if (WIFEXITED(rstatus) && WEXITSTATUS(rstatus)) { - perror ("insmod"); fprintf(stderr, "ERROR, couldn't insmod probe module %s\n", modname); return -1; } - control_channel = open_control_channel(); - if (control_channel < 0) - return -1; - if (relay_filebase) strcpy(params.relay_filebase, relay_filebase); + +#ifdef USE_PROCFS + sprintf (proc_filebase, "/proc/systemtap/%s", modname); + char *ptr = index(proc_filebase,'.'); + if (ptr) + *ptr = 0; + + sprintf(buf, "%s/cmd", proc_filebase); + //printf("Opening %s\n", buf); + control_channel = open(buf, O_RDWR); + if (control_channel < 0) { + fprintf(stderr, "ERROR: couldn't open control channel %s: errcode = %s\n", buf, strerror(errno)); + return -1; + } +#endif /* now send TRANSPORT_INFO */ - ti.subbuf_size = opt_subbuf_size; - ti.n_subbufs = opt_n_subbufs; + ti.buf_size = buffer_size; + ti.subbuf_size = 0; + ti.n_subbufs = 0; ti.target = 0; // FIXME. not implemented yet send_request(STP_TRANSPORT_INFO, &ti, sizeof(ti)); @@ -603,25 +556,6 @@ static int info_pending(void) return 0; } -/** - * postprocess_and_exit - postprocess the output and exit - */ -static void postprocess_and_exit(void) -{ - if (print_totals) - summarize(); - - if (transport_mode == STP_TRANSPORT_RELAYFS && merge) { - close_all_relayfs_files(); - merge_output(); - delete_percpu_files(); - } - - close(control_channel); - - exit(0); -} - static void cleanup_and_exit (int closed) { char tmpbuf[128]; @@ -631,12 +565,32 @@ static void cleanup_and_exit (int closed) exiting = 1; + //printf ("CLEANUP AND EXIT closed=%d mode=%d\n", closed, transport_mode); + if (transport_mode == STP_TRANSPORT_RELAYFS) { kill_percpu_threads(ncpus); if (request_last_buffers() < 0) exit(1); } + close_proc_files(); + + + if ( transport_mode == STP_TRANSPORT_RELAYFS && info_pending()) + return; + + if (print_totals && verbose) + summarize(); + + if (transport_mode == STP_TRANSPORT_RELAYFS && merge) { + close_all_relayfs_files(); + merge_output(); + delete_percpu_files(); + } + + //printf("closing control channel\n"); + close(control_channel); + if (!closed) { /* FIXME. overflow check */ strcpy (tmpbuf, "/sbin/rmmod "); @@ -644,16 +598,10 @@ static void cleanup_and_exit (int closed) if (system(tmpbuf)) { fprintf(stderr, "ERROR: couldn't rmmod probe module %s. No output will be written.\n", modname); - close_all_relayfs_files(); - delete_percpu_files(); exit(1); } } - - if ( transport_mode == STP_TRANSPORT_RELAYFS && info_pending()) - return; - - postprocess_and_exit(); + exit(0); } static void sigproc(int signum __attribute__((unused))) @@ -664,70 +612,52 @@ static void sigproc(int signum __attribute__((unused))) /** * stp_main_loop - loop forever reading data */ +static char recvbuf[8192]; + int stp_main_loop(void) { - int cpu, nb, rc; + int nb, rc; struct transport_start ts; - - unsigned short *ptr; - unsigned subbufs_consumed; + void *data; + int type; signal(SIGINT, sigproc); signal(SIGTERM, sigproc); while (1) { /* handle messages from control channel */ - nb = recv(control_channel, recvbuf, sizeof(recvbuf), 0); - struct nlmsghdr *nlh = (struct nlmsghdr *)recvbuf; - if (nb < 0) { - if (errno == EINTR) - continue; - fprintf(stderr, "WARNING: recv() failed\n"); - } else if (nb == 0) - fprintf(stderr, "WARNING: unexpected EOF on netlink socket\n"); - if (!NLMSG_OK(nlh, (unsigned int) nb)) { - fprintf(stderr, "WARNING: netlink message not ok, nb = %d\n", nb); + nb = read(control_channel, recvbuf, sizeof(recvbuf)); + if (nb <= 0) { + perror("recv"); + fprintf(stderr, "WARNING: unexpected EOF. nb=%d\n", nb); continue; } - if (!transport_mode && - nlh->nlmsg_type != STP_TRANSPORT_INFO && - nlh->nlmsg_type != STP_EXIT) - { + // printf("read %d bytes\n", nb); + + type = *(int *)recvbuf; + data = (void *)(recvbuf + sizeof(int)); + + if (!transport_mode && type != STP_TRANSPORT_INFO && type != STP_EXIT) { fprintf(stderr, "WARNING: invalid stp command: no transport\n"); continue; } - switch (nlh->nlmsg_type) { - case STP_BUF_INFO: - { - struct buf_msg *buf_msg = (struct buf_msg *)nlh; - cpu = buf_msg->info.cpu; - status[cpu].info.produced = buf_msg->info.produced; - pending_info[cpu]--; - - if (exiting) { - subbufs_consumed = process_subbufs(&status[cpu].info); - status[cpu].info.consumed += subbufs_consumed; - if (!info_pending()) - postprocess_and_exit(); - break; - } - pthread_mutex_lock(&status[cpu].ready_mutex); - if (status[cpu].info.produced > status[cpu].info.consumed) - pthread_cond_signal(&status[cpu].ready_cond); - pthread_mutex_unlock(&status[cpu].ready_mutex); - break; - } + + switch (type) { case STP_TRANSPORT_INFO: { - struct transport_msg *transport_msg = (struct transport_msg *)nlh; - transport_mode = transport_msg->info.transport_mode; - params.subbuf_size = transport_msg->info.subbuf_size; - params.n_subbufs = transport_msg->info.n_subbufs; - /* - printf ("TRANSPORT_INFO recvd: %d bufs of %d bytes. mode=%d\n", - params.n_subbufs, - params.subbuf_size, - transport_mode); - */ + struct transport_info *info = (struct transport_info *)data; + + transport_mode = info->transport_mode; + params.subbuf_size = info->subbuf_size; + params.n_subbufs = info->n_subbufs; +#if 0 + if (transport_mode == STP_TRANSPORT_RELAYFS) + printf ("TRANSPORT_INFO recvd: RELAYFS %d bufs of %d bytes.\n", + params.n_subbufs, + params.subbuf_size); + else + printf ("TRANSPORT_INFO recvd: PROC with %d Mbyte buffers.\n", + info->buf_size); +#endif if (!streaming()) { rc = init_relayfs(); if (rc < 0) { @@ -741,27 +671,25 @@ int stp_main_loop(void) break; } case STP_REALTIME_DATA: - ptr = NLMSG_DATA(nlh); - fputs ((char *)ptr, stdout); + fputs ((char *)data, stdout); break; case STP_EXIT: { /* module asks us to unload it and exit */ - int *closed = (int *)NLMSG_DATA(nlh); + int *closed = (int *)data; cleanup_and_exit(*closed); break; } case STP_START: { /* we only get this if probe_start() errors */ - struct start_msg *s = (struct start_msg *)nlh; - struct transport_start *t = &s->ts; + struct transport_start *t = (struct transport_start *)data; fprintf(stderr, "probe_start() returned %d\nExiting...\n", t->pid); cleanup_and_exit(0); break; } default: - fprintf(stderr, "WARNING: ignored netlink message of type %d\n", (nlh->nlmsg_type)); + fprintf(stderr, "WARNING: ignored netlink message of type %d\n", (type)); } } return 0; diff --git a/runtime/stpd/stpd.c b/runtime/stpd/stpd.c index 49b3951a..db0f328e 100644 --- a/runtime/stpd/stpd.c +++ b/runtime/stpd/stpd.c @@ -35,8 +35,6 @@ int print_only = 0; int quiet = 0; int merge = 1; int verbose = 0; -unsigned int opt_subbuf_size = 0; -unsigned int opt_n_subbufs = 0; unsigned int buffer_size = 0; char *modname = NULL; @@ -85,25 +83,14 @@ int main(int argc, char **argv) break; case 'b': { - char *ptr; int size = (unsigned)atoi(optarg); if (!size) usage(argv[0]); - ptr = index (optarg, 'x'); - if (ptr) { - ptr++; - opt_subbuf_size = (unsigned)atoi(ptr); - printf("subbuf_size = %d\n", opt_subbuf_size); - opt_n_subbufs = size; - } else { - if (size > 64) { - fprintf(stderr, "Maximum buffer size is 64 (MB)\n"); - exit(1); - } - buffer_size = size * 1024 * 1024; - opt_subbuf_size = ((size >> 2) + 1) * 65536; - opt_n_subbufs = buffer_size / opt_subbuf_size; + if (size > 64) { + fprintf(stderr, "Maximum buffer size is 64 (MB)\n"); + exit(1); } + buffer_size = size; break; } default: @@ -114,8 +101,6 @@ int main(int argc, char **argv) if (verbose) { if (buffer_size) printf ("Using a buffer of %u bytes.\n", buffer_size); - else if (opt_n_subbufs) - printf ("Using %u subbufs of %u bytes.\n", opt_n_subbufs, opt_subbuf_size); } if (optind < argc) @@ -151,7 +136,7 @@ int main(int argc, char **argv) sprintf(stpd_filebase, "/mnt/relay/%d/cpu", getpid()); if (init_stp(stpd_filebase, !quiet)) { - fprintf(stderr, "Couldn't initialize stpd. Exiting.\n"); + //fprintf(stderr, "Couldn't initialize stpd. Exiting.\n"); exit(1); } |