summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/stpd/ChangeLog5
-rw-r--r--runtime/stpd/librelay.c314
-rw-r--r--runtime/stpd/stpd.c25
3 files changed, 131 insertions, 213 deletions
diff --git a/runtime/stpd/ChangeLog b/runtime/stpd/ChangeLog
index b3206bed..c1f1530e 100644
--- a/runtime/stpd/ChangeLog
+++ b/runtime/stpd/ChangeLog
@@ -1,3 +1,8 @@
+2005-08-19 Martin Hunt <hunt@redhat.com>
+
+ * stpd.c (main): Simplify buffer size code.
+ * librelay.c: Major changes to support procfs instead of netlink.
+
2005-08-03 Tom Zanussi <trz@us.ibm.com>
* librelay.c: Track subbuf info requests/replies
diff --git a/runtime/stpd/librelay.c b/runtime/stpd/librelay.c
index 98875e49..fabbd145 100644
--- a/runtime/stpd/librelay.c
+++ b/runtime/stpd/librelay.c
@@ -19,6 +19,9 @@
* Copyright (C) Red Hat Inc, 2005
*
*/
+
+#define USE_PROCFS 1
+
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
@@ -36,7 +39,6 @@
#include <pthread.h>
#include <sys/socket.h>
#include <linux/types.h>
-#include <linux/netlink.h>
#include <linux/limits.h>
#include "librelay.h"
@@ -54,6 +56,11 @@ static struct params
/* temporary per-cpu output written here, filebase0...N */
static char *percpu_tmpfilebase = "stpd_cpu";
+#ifdef USE_PROCFS
+static char proc_filebase[128];
+static int proc_file[NR_CPUS];
+#endif
+
/* probe output written here */
static char *outfile_name = "probe.out";
@@ -70,37 +77,14 @@ static char *relay_buffer[NR_CPUS];
static pthread_t reader[NR_CPUS];
static int pending_info[NR_CPUS];
-/* netlink control channel */
+/* control channel */
static int control_channel;
/* flags */
extern int print_only, quiet, merge, verbose;
-extern unsigned int opt_subbuf_size;
-extern unsigned int opt_n_subbufs;
+extern unsigned int buffer_size;
extern char *modname;
-/* used to communicate with kernel over control channel */
-
-struct buf_msg
-{
- struct nlmsghdr nlh;
- struct buf_info info;
-};
-
-struct transport_msg
-{
- struct nlmsghdr nlh;
- struct transport_info info;
-};
-
-struct start_msg
-{
- struct nlmsghdr nlh;
- struct transport_start ts;
-};
-
-static char *recvbuf[8192];
-
/* per-cpu buffer info */
static struct buf_status
{
@@ -117,7 +101,7 @@ static struct buf_status
*/
static int streaming(void)
{
- if (transport_mode == STP_TRANSPORT_NETLINK)
+ if (transport_mode == STP_TRANSPORT_PROC)
return 1;
return 0;
@@ -133,67 +117,13 @@ static int streaming(void)
*/
int send_request(int type, void *data, int len)
{
- struct nlmsghdr *req;
- int err, trylimit = 50;
- req = (struct nlmsghdr *)malloc(NLMSG_SPACE(len));
- if (req == 0) {
- fprintf(stderr, "WARNING: send_request malloc failed\n");
- return -1;
- }
- memset(req, 0, NLMSG_SPACE(len));
- req->nlmsg_len = NLMSG_LENGTH(len);
- req->nlmsg_type = type;
- req->nlmsg_flags = NLM_F_REQUEST;
- req->nlmsg_pid = getpid();
- memcpy(NLMSG_DATA(req), data, len);
- while ((err = send(control_channel, req, req->nlmsg_len, MSG_DONTWAIT)) < 0 && trylimit-- )
- usleep (5000);
- return err;
-}
-
-#if 0
-static unsigned int get_rmem(void)
-{
- char buf[32];
- FILE *fp = fopen ("/proc/sys/net/core/rmem_max", "r");
- if (fp == NULL)
- return 0;
- if (fgets (buf, 32, fp) == NULL) {
- fclose(fp);
- return 0;
- }
- fclose(fp);
- return atoi(buf);
+ char buf[1024];
+ memcpy(buf, &type, 4);
+ memcpy(&buf[4],data,len);
+ return write(control_channel, buf, len+4);
}
-#endif
-
-/**
- * open_control_channel - create netlink channel
- */
-static int open_control_channel()
-{
- struct sockaddr_nl snl;
- int channel, rcvsize = 512*1024;
- channel = socket(AF_NETLINK, SOCK_RAW, NETLINK_USERSOCK);
- if (channel < 0) {
- fprintf(stderr, "ERROR: socket() failed\n");
- return channel;
- }
-
- if (setsockopt(channel, SOL_SOCKET, SO_RCVBUF, &rcvsize, sizeof(int)))
- fprintf(stderr, "WARNING: failed to set socket receive buffer to %d\n", rcvsize);
- memset(&snl, 0, sizeof snl);
- snl.nl_family = AF_NETLINK;
- snl.nl_pid = getpid();
- snl.nl_groups = 0;
-
- if (bind (channel, (struct sockaddr *) &snl, sizeof snl))
- fprintf(stderr, "ERROR: bind() failed\n");
-
- return channel;
-}
/**
* summarize - print a summary if applicable
@@ -214,13 +144,24 @@ static void summarize(void)
}
}
+#ifdef USE_PROCFS
+static void close_proc_files()
+{
+ int i;
+ for (i = 0; i < ncpus; i++)
+ close(proc_file[i]);
+}
+#else
+static void close_proc_files() {}
+#endif
+
/**
* close_relayfs_files - close and munmap buffer and open output file
*/
static void close_relayfs_files(int cpu)
{
size_t total_bufsize = params.subbuf_size * params.n_subbufs;
-
+
munmap(relay_buffer[cpu], total_bufsize);
close(relay_file[cpu]);
fclose(percpu_tmpfile[cpu]);
@@ -259,7 +200,17 @@ static int open_relayfs_files(int cpu, const char *relay_filebase)
return -1;
}
- sprintf(tmp, "%s%d", percpu_tmpfilebase, cpu);
+#ifdef USE_PROCFS
+ sprintf(tmp, "%s/%d", proc_filebase, cpu);
+ //printf("Opening %s.\n", tmp);
+ proc_file[cpu] = open(tmp, O_RDWR | O_NONBLOCK);
+ if (proc_file[cpu] < 0) {
+ fprintf(stderr, "ERROR: couldn't open proc file %s: errcode = %s\n", tmp, strerror(errno));
+ return -1;
+ }
+#endif
+
+ sprintf(tmp, "%s%d", percpu_tmpfilebase, cpu);
if((percpu_tmpfile[cpu] = fopen(tmp, "w+")) == NULL) {
fprintf(stderr, "ERROR: Couldn't open output file %s: errcode = %s\n", tmp, strerror(errno));
close(relay_file[cpu]);
@@ -328,6 +279,7 @@ static int kill_percpu_threads(int n)
*/
static int request_last_buffers(void)
{
+#if 0
int cpu;
for (cpu = 0; cpu < ncpus; cpu++) {
if (send_request(STP_BUF_INFO, &cpu, sizeof(cpu)) < 0) {
@@ -336,6 +288,7 @@ static int request_last_buffers(void)
}
pending_info[cpu]++;
}
+#endif
return 0;
}
@@ -383,9 +336,10 @@ static void *reader_thread(void *data)
struct consumed_info consumed_info;
unsigned subbufs_consumed;
+ pollfd.fd = relay_file[cpu];
+ pollfd.events = POLLIN;
+
do {
- pollfd.fd = relay_file[cpu];
- pollfd.events = POLLIN;
rc = poll(&pollfd, 1, -1);
if (rc < 0) {
if (errno != EINTR) {
@@ -396,18 +350,7 @@ static void *reader_thread(void *data)
rc = 0;
}
- if (send_request(STP_BUF_INFO, &cpu, sizeof(cpu)) < 0)
- fprintf(stderr, "WARNING: info request failed for cpu %d\n", cpu);
- else
- pending_info[cpu]++;
-
- pthread_mutex_lock(&status[cpu].ready_mutex);
- if (status[cpu].info.produced == status[cpu].info.consumed)
- pthread_cond_wait(&status[cpu].ready_cond,
- &status[cpu].ready_mutex);
- pthread_mutex_unlock(&status[cpu].ready_mutex);
- // printf ("produced:%d consumed:%d\n", status[cpu].info.produced,status[cpu].info.consumed);
-
+ rc = read (proc_file[cpu], &status[cpu].info, sizeof(struct buf_info));
subbufs_consumed = process_subbufs(&status[cpu].info);
if (subbufs_consumed) {
if (subbufs_consumed > status[cpu].max_backlog)
@@ -415,9 +358,7 @@ static void *reader_thread(void *data)
status[cpu].info.consumed += subbufs_consumed;
consumed_info.cpu = cpu;
consumed_info.consumed = subbufs_consumed;
- send_request(STP_SUBBUFS_CONSUMED,
- &consumed_info,
- sizeof(struct consumed_info));
+ write (proc_file[cpu], &consumed_info, sizeof(struct consumed_info));
}
} while (1);
}
@@ -430,6 +371,7 @@ static void *reader_thread(void *data)
int init_relayfs(void)
{
int i, j;
+ //printf("initializing relayfs\n");
for (i = 0; i < ncpus; i++) {
if (open_relayfs_files(i, params.relay_filebase) < 0) {
@@ -444,7 +386,7 @@ int init_relayfs(void)
}
}
- if (print_totals)
+ if (print_totals && verbose)
printf("Using channel with %u sub-buffers of size %u.\n",
params.n_subbufs, params.subbuf_size);
@@ -490,21 +432,32 @@ int init_stp(const char *relay_filebase, int print_summary)
exit(-1);
}
if (WIFEXITED(rstatus) && WEXITSTATUS(rstatus)) {
- perror ("insmod");
fprintf(stderr, "ERROR, couldn't insmod probe module %s\n", modname);
return -1;
}
- control_channel = open_control_channel();
- if (control_channel < 0)
- return -1;
-
if (relay_filebase)
strcpy(params.relay_filebase, relay_filebase);
+
+#ifdef USE_PROCFS
+ sprintf (proc_filebase, "/proc/systemtap/%s", modname);
+ char *ptr = index(proc_filebase,'.');
+ if (ptr)
+ *ptr = 0;
+
+ sprintf(buf, "%s/cmd", proc_filebase);
+ //printf("Opening %s\n", buf);
+ control_channel = open(buf, O_RDWR);
+ if (control_channel < 0) {
+ fprintf(stderr, "ERROR: couldn't open control channel %s: errcode = %s\n", buf, strerror(errno));
+ return -1;
+ }
+#endif
/* now send TRANSPORT_INFO */
- ti.subbuf_size = opt_subbuf_size;
- ti.n_subbufs = opt_n_subbufs;
+ ti.buf_size = buffer_size;
+ ti.subbuf_size = 0;
+ ti.n_subbufs = 0;
ti.target = 0; // FIXME. not implemented yet
send_request(STP_TRANSPORT_INFO, &ti, sizeof(ti));
@@ -603,25 +556,6 @@ static int info_pending(void)
return 0;
}
-/**
- * postprocess_and_exit - postprocess the output and exit
- */
-static void postprocess_and_exit(void)
-{
- if (print_totals)
- summarize();
-
- if (transport_mode == STP_TRANSPORT_RELAYFS && merge) {
- close_all_relayfs_files();
- merge_output();
- delete_percpu_files();
- }
-
- close(control_channel);
-
- exit(0);
-}
-
static void cleanup_and_exit (int closed)
{
char tmpbuf[128];
@@ -631,12 +565,32 @@ static void cleanup_and_exit (int closed)
exiting = 1;
+ //printf ("CLEANUP AND EXIT closed=%d mode=%d\n", closed, transport_mode);
+
if (transport_mode == STP_TRANSPORT_RELAYFS) {
kill_percpu_threads(ncpus);
if (request_last_buffers() < 0)
exit(1);
}
+ close_proc_files();
+
+
+ if ( transport_mode == STP_TRANSPORT_RELAYFS && info_pending())
+ return;
+
+ if (print_totals && verbose)
+ summarize();
+
+ if (transport_mode == STP_TRANSPORT_RELAYFS && merge) {
+ close_all_relayfs_files();
+ merge_output();
+ delete_percpu_files();
+ }
+
+ //printf("closing control channel\n");
+ close(control_channel);
+
if (!closed) {
/* FIXME. overflow check */
strcpy (tmpbuf, "/sbin/rmmod ");
@@ -644,16 +598,10 @@ static void cleanup_and_exit (int closed)
if (system(tmpbuf)) {
fprintf(stderr, "ERROR: couldn't rmmod probe module %s. No output will be written.\n",
modname);
- close_all_relayfs_files();
- delete_percpu_files();
exit(1);
}
}
-
- if ( transport_mode == STP_TRANSPORT_RELAYFS && info_pending())
- return;
-
- postprocess_and_exit();
+ exit(0);
}
static void sigproc(int signum __attribute__((unused)))
@@ -664,70 +612,52 @@ static void sigproc(int signum __attribute__((unused)))
/**
* stp_main_loop - loop forever reading data
*/
+static char recvbuf[8192];
+
int stp_main_loop(void)
{
- int cpu, nb, rc;
+ int nb, rc;
struct transport_start ts;
-
- unsigned short *ptr;
- unsigned subbufs_consumed;
+ void *data;
+ int type;
signal(SIGINT, sigproc);
signal(SIGTERM, sigproc);
while (1) { /* handle messages from control channel */
- nb = recv(control_channel, recvbuf, sizeof(recvbuf), 0);
- struct nlmsghdr *nlh = (struct nlmsghdr *)recvbuf;
- if (nb < 0) {
- if (errno == EINTR)
- continue;
- fprintf(stderr, "WARNING: recv() failed\n");
- } else if (nb == 0)
- fprintf(stderr, "WARNING: unexpected EOF on netlink socket\n");
- if (!NLMSG_OK(nlh, (unsigned int) nb)) {
- fprintf(stderr, "WARNING: netlink message not ok, nb = %d\n", nb);
+ nb = read(control_channel, recvbuf, sizeof(recvbuf));
+ if (nb <= 0) {
+ perror("recv");
+ fprintf(stderr, "WARNING: unexpected EOF. nb=%d\n", nb);
continue;
}
- if (!transport_mode &&
- nlh->nlmsg_type != STP_TRANSPORT_INFO &&
- nlh->nlmsg_type != STP_EXIT)
- {
+ // printf("read %d bytes\n", nb);
+
+ type = *(int *)recvbuf;
+ data = (void *)(recvbuf + sizeof(int));
+
+ if (!transport_mode && type != STP_TRANSPORT_INFO && type != STP_EXIT) {
fprintf(stderr, "WARNING: invalid stp command: no transport\n");
continue;
}
- switch (nlh->nlmsg_type) {
- case STP_BUF_INFO:
- {
- struct buf_msg *buf_msg = (struct buf_msg *)nlh;
- cpu = buf_msg->info.cpu;
- status[cpu].info.produced = buf_msg->info.produced;
- pending_info[cpu]--;
-
- if (exiting) {
- subbufs_consumed = process_subbufs(&status[cpu].info);
- status[cpu].info.consumed += subbufs_consumed;
- if (!info_pending())
- postprocess_and_exit();
- break;
- }
- pthread_mutex_lock(&status[cpu].ready_mutex);
- if (status[cpu].info.produced > status[cpu].info.consumed)
- pthread_cond_signal(&status[cpu].ready_cond);
- pthread_mutex_unlock(&status[cpu].ready_mutex);
- break;
- }
+
+ switch (type) {
case STP_TRANSPORT_INFO:
{
- struct transport_msg *transport_msg = (struct transport_msg *)nlh;
- transport_mode = transport_msg->info.transport_mode;
- params.subbuf_size = transport_msg->info.subbuf_size;
- params.n_subbufs = transport_msg->info.n_subbufs;
- /*
- printf ("TRANSPORT_INFO recvd: %d bufs of %d bytes. mode=%d\n",
- params.n_subbufs,
- params.subbuf_size,
- transport_mode);
- */
+ struct transport_info *info = (struct transport_info *)data;
+
+ transport_mode = info->transport_mode;
+ params.subbuf_size = info->subbuf_size;
+ params.n_subbufs = info->n_subbufs;
+#if 0
+ if (transport_mode == STP_TRANSPORT_RELAYFS)
+ printf ("TRANSPORT_INFO recvd: RELAYFS %d bufs of %d bytes.\n",
+ params.n_subbufs,
+ params.subbuf_size);
+ else
+ printf ("TRANSPORT_INFO recvd: PROC with %d Mbyte buffers.\n",
+ info->buf_size);
+#endif
if (!streaming()) {
rc = init_relayfs();
if (rc < 0) {
@@ -741,27 +671,25 @@ int stp_main_loop(void)
break;
}
case STP_REALTIME_DATA:
- ptr = NLMSG_DATA(nlh);
- fputs ((char *)ptr, stdout);
+ fputs ((char *)data, stdout);
break;
case STP_EXIT:
{
/* module asks us to unload it and exit */
- int *closed = (int *)NLMSG_DATA(nlh);
+ int *closed = (int *)data;
cleanup_and_exit(*closed);
break;
}
case STP_START:
{
/* we only get this if probe_start() errors */
- struct start_msg *s = (struct start_msg *)nlh;
- struct transport_start *t = &s->ts;
+ struct transport_start *t = (struct transport_start *)data;
fprintf(stderr, "probe_start() returned %d\nExiting...\n", t->pid);
cleanup_and_exit(0);
break;
}
default:
- fprintf(stderr, "WARNING: ignored netlink message of type %d\n", (nlh->nlmsg_type));
+ fprintf(stderr, "WARNING: ignored netlink message of type %d\n", (type));
}
}
return 0;
diff --git a/runtime/stpd/stpd.c b/runtime/stpd/stpd.c
index 49b3951a..db0f328e 100644
--- a/runtime/stpd/stpd.c
+++ b/runtime/stpd/stpd.c
@@ -35,8 +35,6 @@ int print_only = 0;
int quiet = 0;
int merge = 1;
int verbose = 0;
-unsigned int opt_subbuf_size = 0;
-unsigned int opt_n_subbufs = 0;
unsigned int buffer_size = 0;
char *modname = NULL;
@@ -85,25 +83,14 @@ int main(int argc, char **argv)
break;
case 'b':
{
- char *ptr;
int size = (unsigned)atoi(optarg);
if (!size)
usage(argv[0]);
- ptr = index (optarg, 'x');
- if (ptr) {
- ptr++;
- opt_subbuf_size = (unsigned)atoi(ptr);
- printf("subbuf_size = %d\n", opt_subbuf_size);
- opt_n_subbufs = size;
- } else {
- if (size > 64) {
- fprintf(stderr, "Maximum buffer size is 64 (MB)\n");
- exit(1);
- }
- buffer_size = size * 1024 * 1024;
- opt_subbuf_size = ((size >> 2) + 1) * 65536;
- opt_n_subbufs = buffer_size / opt_subbuf_size;
+ if (size > 64) {
+ fprintf(stderr, "Maximum buffer size is 64 (MB)\n");
+ exit(1);
}
+ buffer_size = size;
break;
}
default:
@@ -114,8 +101,6 @@ int main(int argc, char **argv)
if (verbose) {
if (buffer_size)
printf ("Using a buffer of %u bytes.\n", buffer_size);
- else if (opt_n_subbufs)
- printf ("Using %u subbufs of %u bytes.\n", opt_n_subbufs, opt_subbuf_size);
}
if (optind < argc)
@@ -151,7 +136,7 @@ int main(int argc, char **argv)
sprintf(stpd_filebase, "/mnt/relay/%d/cpu", getpid());
if (init_stp(stpd_filebase, !quiet)) {
- fprintf(stderr, "Couldn't initialize stpd. Exiting.\n");
+ //fprintf(stderr, "Couldn't initialize stpd. Exiting.\n");
exit(1);
}