summaryrefslogtreecommitdiffstats
path: root/runtime/stpd
diff options
context:
space:
mode:
authortrz <trz>2005-06-21 14:13:01 +0000
committertrz <trz>2005-06-21 14:13:01 +0000
commitaac3ed25e8dc7355b5f28fae2878f644df14ef7d (patch)
treec2d284ccffb7dd9a82b000bd9c7152e5abbf160f /runtime/stpd
parentbd2b1e6816b486d5c85a4693f0b3579df4376ed5 (diff)
downloadsystemtap-steved-aac3ed25e8dc7355b5f28fae2878f644df14ef7d.tar.gz
systemtap-steved-aac3ed25e8dc7355b5f28fae2878f644df14ef7d.tar.xz
systemtap-steved-aac3ed25e8dc7355b5f28fae2878f644df14ef7d.zip
Added merging/sorting of per-cpu data, transport config/selection by probe, etc
Diffstat (limited to 'runtime/stpd')
-rw-r--r--runtime/stpd/ChangeLog17
-rw-r--r--runtime/stpd/librelay.c639
-rw-r--r--runtime/stpd/librelay.h12
-rw-r--r--runtime/stpd/stpd.c48
4 files changed, 478 insertions, 238 deletions
diff --git a/runtime/stpd/ChangeLog b/runtime/stpd/ChangeLog
index 421b27c6..2d77c405 100644
--- a/runtime/stpd/ChangeLog
+++ b/runtime/stpd/ChangeLog
@@ -1,6 +1,21 @@
+2005-06-20 Tom Zanussi <zanussi@us.ibm.com>
+
+ * librelay.c: Large refactoring, important changes are
+ added transport_mode command, for relayfs transport
+ display results only when probe completes and/or write
+ output file, merge, sort and delete the per-cpu files
+ in postprocessing, refactor so that relayfs files aren't
+ created until transport command received, removed sigalrm,
+ read the final subbuffers on exit
+
+ * stpd.c: Remove all command-line args except for -p
+ and -q as well as all code related to buffer sizes.
+
+ * librelay.h: Add transport mode command and struct.
+
2005-05-16 Martin Hunt <hunt@redhat.com>
* librelay.c (sigproc): If STP_EXIT send fails, keep retrying
every 10ms.
(init_stp): Don't set n_subbufs and subbuf_size params.
-
+
diff --git a/runtime/stpd/librelay.c b/runtime/stpd/librelay.c
index a1e8562f..60a513b7 100644
--- a/runtime/stpd/librelay.c
+++ b/runtime/stpd/librelay.c
@@ -37,29 +37,48 @@
#include <sys/socket.h>
#include <linux/types.h>
#include <linux/netlink.h>
+#include <linux/limits.h>
#include "librelay.h"
/* maximum number of CPUs we can handle - change if more */
#define NR_CPUS 256
- /* internal variables */
-static unsigned subbuf_size;
-static unsigned n_subbufs;
+ /* relayfs parameters */
+static struct params
+{
+ unsigned subbuf_size;
+ unsigned n_subbufs;
+ char relay_filebase[256];
+} params;
+
+/* temporary per-cpu output written here, filebase0...N */
+static char *percpu_tmpfilebase = "stpd_cpu";
+
+/* temporary merged/sorted output written here */
+static char *tmpfile_name = "sorted.tmp";
+
+/* probe output written here */
+static char *outfile_name = "probe.out";
+
+/* internal variables */
+static int transport_mode;
static int ncpus;
static int print_totals;
-static int logging;
+static int final_cpus_processed;
+static int exiting;
+/* per-cpu data */
static int relay_file[NR_CPUS];
-static int out_file[NR_CPUS];
+static int percpu_tmpfile[NR_CPUS];
static char *relay_buffer[NR_CPUS];
+static pthread_t reader[NR_CPUS];
- /* netlink control channel */
+/* netlink control channel */
static int control_channel;
/* flags */
extern int print_only;
extern int quiet;
-extern int streaming;
/* used to communicate with kernel over control channel */
@@ -76,12 +95,25 @@ struct consumed_info
unsigned consumed;
};
-struct app_msg
+struct transport_info
+{
+ int transport_mode;
+ unsigned subbuf_size;
+ unsigned n_subbufs;
+};
+
+struct buf_msg
{
struct nlmsghdr nlh;
struct buf_info info;
};
+struct transport_msg
+{
+ struct nlmsghdr nlh;
+ struct transport_info info;
+};
+
static char *recvbuf[8192];
/* per-cpu buffer info */
@@ -104,6 +136,19 @@ static char *color[] = {
};
/**
+ * streaming - is the current transport mode streaming or not?
+ *
+ * Returns 1 if in streaming mode, 0 otherwise.
+ */
+static int streaming(void)
+{
+ if (transport_mode == STP_TRANSPORT_NETLINK)
+ return 1;
+
+ return 0;
+}
+
+/**
* send_request - send request to kernel over control channel
* @type: the relay-app command id
* @data: pointer to the data to be sent
@@ -118,7 +163,7 @@ int send_request(int type, void *data, int len)
req = (struct nlmsghdr *)malloc(NLMSG_SPACE(len));
if (req == 0) {
- fprintf(stderr, "send_request malloc failed\n");
+ fprintf(stderr, "WARNING: send_request malloc failed\n");
return -1;
}
memset(req, 0, NLMSG_SPACE(len));
@@ -142,7 +187,7 @@ static int open_control_channel()
channel = socket(AF_NETLINK, SOCK_RAW, NETLINK_USERSOCK);
if (channel < 0) {
- printf("socket() failed\n");
+ fprintf(stderr, "ERROR: socket() failed\n");
return channel;
}
@@ -152,13 +197,159 @@ static int open_control_channel()
snl.nl_groups = 0;
if (bind (channel, (struct sockaddr *) &snl, sizeof snl))
- printf("bind() failed\n");
+ fprintf(stderr, "ERROR: bind() failed\n");
return channel;
}
/**
- * process_subbufs - write ready subbufs to disk and/or screen
+ * summarize - print a summary if applicable
+ */
+static void summarize(void)
+{
+ int i;
+
+ if (streaming())
+ return;
+
+ printf("summary:\n");
+ for (i = 0; i < ncpus; i++) {
+ printf("%s cpu %u:\n", color[i % 4], i);
+ printf(" %u sub-buffers processed\n",
+ status[i].info.consumed);
+ printf(" %u max backlog\n", status[i].max_backlog);
+ }
+}
+
+/**
+ * close_relayfs_files - close and munmap buffer and open output file
+ */
+static void close_relayfs_files(int cpu)
+{
+ size_t total_bufsize = params.subbuf_size * params.n_subbufs;
+
+ munmap(relay_buffer[cpu], total_bufsize);
+ close(relay_file[cpu]);
+ close(percpu_tmpfile[cpu]);
+}
+
+/**
+ * close_all_relayfs_files - close and munmap buffers and output files
+ */
+static void close_all_relayfs_files(void)
+{
+ int i;
+
+ if (!streaming()) {
+ for (i = 0; i < ncpus; i++)
+ close_relayfs_files(i);
+ }
+}
+
+/**
+ * open_relayfs_files - open and mmap buffer and open output file
+ */
+static int open_relayfs_files(int cpu, const char *relay_filebase)
+{
+ size_t total_bufsize;
+ char tmp[PATH_MAX];
+
+ memset(&status[cpu], 0, sizeof(struct buf_status));
+ pthread_mutex_init(&status[cpu].ready_mutex, NULL);
+ pthread_cond_init(&status[cpu].ready_cond, NULL);
+ status[cpu].info.cpu = cpu;
+
+ sprintf(tmp, "%s%d", relay_filebase, cpu);
+ relay_file[cpu] = open(tmp, O_RDONLY | O_NONBLOCK);
+ if (relay_file[cpu] < 0) {
+ fprintf(stderr, "ERROR: couldn't open relayfs file %s: errcode = %s\n", tmp, strerror(errno));
+ return -1;
+ }
+
+ sprintf(tmp, "%s%d", percpu_tmpfilebase, cpu);
+ if((percpu_tmpfile[cpu] = open(tmp, O_CREAT | O_RDWR | O_TRUNC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < 0) {
+ fprintf(stderr, "ERROR: Couldn't open output file %s: errcode = %s\n", tmp, strerror(errno));
+ close(relay_file[cpu]);
+ return -1;
+ }
+
+ total_bufsize = params.subbuf_size * params.n_subbufs;
+ relay_buffer[cpu] = mmap(NULL, total_bufsize, PROT_READ,
+ MAP_PRIVATE | MAP_POPULATE, relay_file[cpu],
+ 0);
+ if(relay_buffer[cpu] == MAP_FAILED)
+ {
+ fprintf(stderr, "ERROR: couldn't mmap relay file, total_bufsize (%d) = subbuf_size (%d) * n_subbufs(%d), error = %s \n", (int)total_bufsize, (int)params.subbuf_size, (int)params.n_subbufs, strerror(errno));
+ close(relay_file[cpu]);
+ close(percpu_tmpfile[cpu]);
+ return -1;
+ }
+
+ return 0;
+}
+
+/**
+ * delete_percpu_files - delete temporary per-cpu output files
+ *
+ * Returns 0 if successful, -1 otherwise.
+ */
+static int delete_percpu_files(void)
+{
+ int i;
+ char tmp[PATH_MAX];
+
+ for (i = 0; i < ncpus; i++) {
+ sprintf(tmp, "%s%d", percpu_tmpfilebase, i);
+ if (unlink(tmp) < 0) {
+ fprintf(stderr, "ERROR: couldn't unlink percpu file %s: errcode = %s\n", tmp, strerror(errno));
+ return -1;
+ }
+ }
+ return 0;
+}
+
+/**
+ * kill_percpu_threads - kill per-cpu threads 0->n-1
+ * @n: number of threads to kill
+ *
+ * Returns number of threads killed.
+ */
+static int kill_percpu_threads(int n)
+{
+ int i, killed = 0;
+
+ for (i = 0; i < n; i++) {
+ if (pthread_cancel(reader[i]) == 0)
+ killed++;
+ }
+ if (killed != n)
+ fprintf(stderr, "WARNING: couldn't kill all per-cpu threads: %d killed, %d total\n", killed, n);
+
+ return killed;
+}
+
+/**
+ * request_last_buffers - request end-of-trace last buffer processing
+ *
+ * Returns 0 if successful, negative otherwise
+ */
+static int request_last_buffers(void)
+{
+ int i;
+
+ for (i = 0; i < ncpus; i++) {
+ if (send_request(STP_BUF_INFO, &status[i].info, sizeof(struct buf_info)) < 0) {
+ fprintf(stderr, "WARNING: couldn't request last buffers for cpu %d\n", i);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * process_subbufs - write ready subbufs to disk
*/
static int process_subbufs(struct buf_info *info)
{
@@ -169,31 +360,21 @@ static int process_subbufs(struct buf_info *info)
unsigned padding;
subbufs_ready = info->produced - info->consumed;
- start_subbuf = info->consumed % n_subbufs;
+ start_subbuf = info->consumed % params.n_subbufs;
end_subbuf = start_subbuf + subbufs_ready;
- if (!quiet)
- fputs ( color[cpu % 4], stdout);
-
for (i = start_subbuf; i < end_subbuf; i++) {
- subbuf_idx = i % n_subbufs;
- subbuf_ptr = relay_buffer[cpu] + subbuf_idx * subbuf_size;
+ subbuf_idx = i % params.n_subbufs;
+ subbuf_ptr = relay_buffer[cpu] + subbuf_idx * params.subbuf_size;
padding = *((unsigned *)subbuf_ptr);
subbuf_ptr += sizeof(padding);
- len = (subbuf_size - sizeof(padding)) - padding;
+ len = (params.subbuf_size - sizeof(padding)) - padding;
- if (!print_only)
+ if (write(percpu_tmpfile[cpu], subbuf_ptr, len) < 0)
{
- if (write(out_file[cpu], subbuf_ptr, len) < 0)
- {
- printf("Couldn't write to output file for cpu %d, exiting: errcode = %d: %s\n", cpu, errno, strerror(errno));
- exit(1);
- }
+ fprintf(stderr, "ERROR: couldn't write to output file for cpu %d, exiting: errcode = %d: %s\n", cpu, errno, strerror(errno));
+ exit(1);
}
-
- if (!quiet)
- fwrite (subbuf_ptr, len, 1, stdout);
-
subbufs_consumed++;
}
@@ -217,24 +398,24 @@ static void *reader_thread(void *data)
rc = poll(&pollfd, 1, -1);
if (rc < 0) {
if (errno != EINTR) {
- printf("poll error: %s\n",strerror(errno));
+ fprintf(stderr, "ERROR: poll error: %s\n",strerror(errno));
exit(1);
}
- printf("poll warning: %s\n",strerror(errno));
+ fprintf(stderr, "WARNING: poll warning: %s\n",strerror(errno));
rc = 0;
}
-
send_request(STP_BUF_INFO, &status[cpu].info,
sizeof(struct buf_info));
+
if (status[cpu].info.produced == status[cpu].info.consumed)
pthread_cond_wait(&status[cpu].ready_cond,
&status[cpu].ready_mutex);
pthread_mutex_unlock(&status[cpu].ready_mutex);
-
+
subbufs_consumed = process_subbufs(&status[cpu].info);
if (subbufs_consumed) {
- if (subbufs_consumed == n_subbufs)
- fprintf(stderr, "cpu %ld buffer full. Consider using a larger buffer size.\n", cpu);
+ if (subbufs_consumed == params.n_subbufs)
+ fprintf(stderr, "WARNING: cpu %ld buffer full. Consider using a larger buffer size.\n", cpu);
if (subbufs_consumed > status[cpu].max_backlog)
status[cpu].max_backlog = subbufs_consumed;
status[cpu].info.consumed += subbufs_consumed;
@@ -247,135 +428,62 @@ static void *reader_thread(void *data)
} while (1);
}
-static void summarize(void)
-{
- int i;
-
- if (streaming)
- return;
-
- printf("summary:\n");
- for (i = 0; i < ncpus; i++) {
- printf("%s cpu %u:\n", color[i % 4], i);
- printf(" %u sub-buffers processed\n",
- status[i].info.consumed);
- printf(" %u max backlog\n", status[i].max_backlog);
- }
-}
-
-/**
- * close_files - close and munmap buffer and open output file
- */
-static void close_files(int cpu)
-{
- size_t total_bufsize = subbuf_size * n_subbufs;
-
- munmap(relay_buffer[cpu], total_bufsize);
- close(relay_file[cpu]);
- if (!print_only) close(out_file[cpu]);
-}
-
-static void close_all_files(void)
-{
- int i;
- close(control_channel);
- for (i = 0; i < ncpus; i++)
- close_files(i);
-}
-
-static void sigalarm(int signum)
-{
- if (print_totals)
- summarize();
- close_all_files();
- exit(0);
-}
-
-static void sigproc(int signum)
-{
- while (send_request(STP_EXIT, NULL, 0) < 0)
- usleep (10000);
-}
-
/**
- * open_files - open and mmap buffer and open output file
+ * init_relayfs - create files and threads for relayfs processing
+ *
+ * Returns 0 if successful, negative otherwise
*/
-static int open_files(int cpu, const char *relay_filebase,
- const char *out_filebase)
+int init_relayfs(void)
{
- size_t total_bufsize;
- char tmp[4096];
-
- memset(&status[cpu], 0, sizeof(struct buf_status));
- pthread_mutex_init(&status[cpu].ready_mutex, NULL);
- pthread_cond_init(&status[cpu].ready_cond, NULL);
- status[cpu].info.cpu = cpu;
-
- sprintf(tmp, "%s%d", relay_filebase, cpu);
- relay_file[cpu] = open(tmp, O_RDONLY | O_NONBLOCK);
- if (relay_file[cpu] < 0) {
- printf("Couldn't open relayfs file %s: errcode = %s\n",
- tmp, strerror(errno));
- return -1;
- }
-
- if (!print_only) {
- sprintf(tmp, "%s%d", out_filebase, cpu);
- if((out_file[cpu] = open(tmp, O_CREAT | O_RDWR | O_TRUNC,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < 0) {
- printf("Couldn't open output file %s: errcode = %s\n",
- tmp, strerror(errno));
- close(relay_file[cpu]);
- return -1;
+ int i, j;
+
+ for (i = 0; i < ncpus; i++) {
+ if (open_relayfs_files(i, params.relay_filebase) < 0) {
+ fprintf(stderr, "ERROR: couldn't open relayfs files, cpu = %d\n", i);
+ goto err;
+ }
+ /* create a thread for each per-cpu buffer */
+ if (pthread_create(&reader[i], NULL, reader_thread, (void *)i) < 0) {
+ close_relayfs_files(i);
+ fprintf(stderr, "ERROR: Couldn't create reader thread, cpu = %d\n", i);
+ goto err;
}
}
- total_bufsize = subbuf_size * n_subbufs;
- relay_buffer[cpu] = mmap(NULL, total_bufsize, PROT_READ,
- MAP_PRIVATE | MAP_POPULATE, relay_file[cpu],
- 0);
- if(relay_buffer[cpu] == MAP_FAILED)
- {
- printf("Couldn't mmap relay file, total_bufsize (%d) = subbuf_size (%d) * n_subbufs(%d), error = %s \n", (int)total_bufsize, (int)subbuf_size, (int)n_subbufs, strerror(errno));
- close(relay_file[cpu]);
- if (!print_only) close(out_file[cpu]);
- return -1;
- }
-
+ printf("Using channel with %u sub-buffers of size %u.\n",
+ params.n_subbufs, params.subbuf_size);
+
return 0;
+err:
+ for (j = 0; j < i; j++)
+ close_relayfs_files(j);
+ kill_percpu_threads(i);
+
+ return -1;
}
/**
* init_stp - initialize the app
+ * @modname: the name of the stp module to load
* @relay_filebase: full path of base name of the per-cpu relayfs files
- * @out_filebase: base name of the per-cpu files data will be written to
- * @sub_buf_size: relayfs sub-buffer size of channel to be created
- * @n_sub_bufs: relayfs number of sub-buffers of channel to be created
* @print_summary: boolean, print summary or not at end of run
*
* Returns 0 on success, negative otherwise.
*/
int init_stp(const char *modname,
const char *relay_filebase,
- const char *out_filebase,
- unsigned sub_buf_size,
- unsigned n_sub_bufs,
int print_summary)
{
- int i;
int daemon_pid;
char buf[1024];
ncpus = sysconf(_SC_NPROCESSORS_ONLN);
- subbuf_size = sub_buf_size;
- n_subbufs = n_sub_bufs;
print_totals = print_summary;
-
daemon_pid = getpid();
sprintf(buf, "insmod %s pid=%d", modname, daemon_pid);
if (system(buf)) {
- printf("Couldn't insmod probe module %s\n", modname);
+ fprintf(stderr, "ERROR, couldn't insmod probe module %s\n", modname);
return -1;
}
@@ -383,48 +491,161 @@ int init_stp(const char *modname,
if (control_channel < 0)
return -1;
- if (streaming)
- return 0;
+ if (relay_filebase)
+ strcpy(params.relay_filebase, relay_filebase);
- for (i = 0; i < ncpus; i++) {
- if (open_files(i, relay_filebase, out_filebase) < 0) {
- printf("Couldn't open files\n");
- close (control_channel);
- return -1;
+ return 0;
+}
+
+/* length of timestamp in output field 0 - TODO: make binary, variable */
+#define TIMESTAMP_SIZE 11
+
+/**
+ * write_output - either to screen, outfile or both
+ */
+static int write_output(void)
+{
+ struct stat sbuf;
+ int i, len, err = 0;
+ const char *subbuf_ptr;
+ int tmpfile;
+ FILE *outfile = NULL;
+ char *tmpbuf;
+ const char *rec_txt = subbuf_ptr + i;
+ const char * rec_cnt;
+
+ tmpfile = open(tmpfile_name, O_RDONLY);
+ if (tmpfile < 0) {
+ fprintf(stderr, "ERROR: couldn't open tmp file %s: errcode = %s\n", tmpfile_name, strerror(errno));
+ return -1;
+ }
+
+ if (fstat(tmpfile, &sbuf)) {
+ fprintf(stderr, "ERROR: couldn't stat tmp file %s: errcode = %s\n", tmpfile_name, strerror(errno));
+ return -1;
+ }
+ len = sbuf.st_size;
+ if (len <= TIMESTAMP_SIZE)
+ goto rm_tmp;
+
+ tmpbuf = mmap(NULL, len, PROT_READ, MAP_PRIVATE, tmpfile, 0);
+ if(tmpbuf == MAP_FAILED)
+ {
+ close(tmpfile);
+ fprintf(stderr, "ERROR: couldn't mmap tmp file %s: errcode = %s\n", tmpfile_name, strerror(errno));
+ err = -1;
+ goto rm_tmp;
+ }
+ subbuf_ptr = tmpbuf;
+ i = TIMESTAMP_SIZE;
+ rec_txt = subbuf_ptr + i;
+
+ if (!print_only) {
+ outfile = fopen(outfile_name, "w");
+ if (!outfile) {
+ fprintf(stderr, "ERROR: couldn't open output file %s: errcode = %s\n", outfile_name, strerror(errno));
+ err = -1;
+ goto unmap_rm_tmp;
+ }
+ }
+
+ while (i < len) {
+ if (subbuf_ptr[i] == '\0') {
+ rec_cnt = rec_txt - TIMESTAMP_SIZE;
+ if (!quiet) {
+#if 0
+ fwrite(rec_cnt, TIMESTAMP_SIZE, 1, stdout);
+#endif
+ fwrite(rec_txt, (subbuf_ptr + i) - rec_txt, 1, stdout);
+ }
+ if (!print_only) {
+#if 0
+ fwrite(rec_cnt, TIMESTAMP_SIZE, 1, outfile);
+#endif
+ fwrite(rec_txt, (subbuf_ptr + i) - rec_txt, 1, outfile);
+ }
+ rec_txt = subbuf_ptr + i + 1 + TIMESTAMP_SIZE;
+ i += TIMESTAMP_SIZE;
}
+ i++;
}
+ if (!print_only)
+ fclose(outfile);
+unmap_rm_tmp:
+ munmap(tmpbuf, len);
+rm_tmp:
+ close(tmpfile);
+ if (unlink(tmpfile_name) < 0) {
+ fprintf(stderr, "Couldn't unlink tmp file %s: errcode = %s\n",
+ tmpfile_name, strerror(errno));
+ err = -1;
+ }
+
+ return err;
+}
+
+/**
+ * sort_output - merge and sort per-cpu output
+ *
+ * TODO: replace this with our own sort
+ */
+static int sort_output(void)
+{
+ char tmpbuf[PATH_MAX];
+
+ sprintf(tmpbuf, "sort -z -b -n -o %s %s*", tmpfile_name, percpu_tmpfilebase);
+ system(tmpbuf);
+ if (system(tmpbuf)) {
+ fprintf(stderr, "ERROR: couldn't sort output: %s failed. No output will be written.\n", tmpbuf);
+ return -1;
+ }
+
return 0;
}
/**
+ * postprocess_and_exit - postprocess the output and exit
+ */
+static void postprocess_and_exit(void)
+{
+ if (print_totals)
+ summarize();
+
+ if (!streaming()) {
+ if (sort_output() == 0)
+ write_output();
+ close_all_relayfs_files();
+ delete_percpu_files();
+ }
+
+ exit(0);
+}
+
+static void sigproc(int signum)
+{
+ while (send_request(STP_EXIT, NULL, 0) < 0)
+ usleep (10000);
+}
+
+/**
* stp_main_loop - loop forever reading data
*/
int stp_main_loop(void)
{
- pthread_t thread;
- int cpu, nb;
- long i;
- struct app_msg *msg;
+ int cpu, nb, rc;
+ struct buf_msg *buf_msg;
+ struct transport_msg *transport_msg;
unsigned short *ptr;
+ unsigned subbufs_consumed;
char tmpbuf[128];
-
+
signal(SIGINT, sigproc);
signal(SIGTERM, sigproc);
- signal(SIGALRM, sigalarm);
-
- if (!streaming) {
- for (i = 0; i < ncpus; i++) {
- /* create a thread for each per-cpu buffer */
- if (pthread_create(&thread, NULL, reader_thread, (void *)i) < 0) {
- printf("Couldn't create thread\n");
- return -1;
- }
- }
- }
- logging = 1;
-
+ send_request(STP_TRANSPORT_MODE, &transport_mode,
+ sizeof(transport_mode));
+
while (1) { /* handle messages from control channel */
nb = recv(control_channel, recvbuf, sizeof(recvbuf), 0);
struct nlmsghdr *nlh = (struct nlmsghdr *)recvbuf;
@@ -432,46 +653,86 @@ int stp_main_loop(void)
if (nb < 0) {
if (errno == EINTR)
continue;
- printf("recv() failed\n");
+ fprintf(stderr, "WARNING: recv() failed\n");
} else if (nb == 0)
- printf("unexpected EOF on netlink socket\n");
+ fprintf(stderr, "WARNING: unexpected EOF on netlink socket\n");
if (!NLMSG_OK(nlh, nb)) {
- printf("netlink message not ok, nb = %d\n", nb);
+ fprintf(stderr, "WARNING: netlink message not ok, nb = %d\n", nb);
+ continue;
+ }
+ if (!transport_mode &&
+ nlh->nlmsg_type != STP_TRANSPORT_MODE &&
+ nlh->nlmsg_type != STP_EXIT)
+ {
+ fprintf(stderr, "WARNING: invalid stp command: no transport\n");
continue;
}
switch (nlh->nlmsg_type) {
case STP_BUF_INFO:
- msg = (struct app_msg *)nlh;
- cpu = msg->info.cpu;
- memcpy(&status[cpu].info, &msg->info, sizeof (struct buf_info));
- pthread_mutex_lock(&status[cpu].ready_mutex);
- if (status[cpu].info.produced > status[cpu].info.consumed)
- pthread_cond_signal(&status[cpu].ready_cond);
- pthread_mutex_unlock(&status[cpu].ready_mutex);
- break;
+ buf_msg = (struct buf_msg *)nlh;
+ cpu = buf_msg->info.cpu;
+ memcpy(&status[cpu].info, &buf_msg->info, sizeof (struct buf_info));
+ if (exiting) {
+ final_cpus_processed++;
+ subbufs_consumed = process_subbufs(&status[cpu].info);
+ status[cpu].info.consumed += subbufs_consumed;
+ if (final_cpus_processed >= ncpus)
+ postprocess_and_exit();
+ break;
+ }
+ pthread_mutex_lock(&status[cpu].ready_mutex);
+ if (status[cpu].info.produced > status[cpu].info.consumed)
+ pthread_cond_signal(&status[cpu].ready_cond);
+ pthread_mutex_unlock(&status[cpu].ready_mutex);
+ break;
+ case STP_TRANSPORT_MODE:
+ transport_msg = (struct transport_msg *)nlh;
+ transport_mode = transport_msg->info.transport_mode;
+ params.subbuf_size = transport_msg->info.subbuf_size;
+ params.n_subbufs = transport_msg->info.n_subbufs;
+ if (!streaming()) {
+ rc = init_relayfs();
+ if (rc < 0) {
+ close(control_channel);
+ fprintf(stderr, "ERROR: couldn't init relayfs, exiting\n");
+ exit(1);
+ }
+ }
+ break;
case STP_REALTIME_DATA:
- ptr = NLMSG_DATA(nlh);
- fputs ( color[5], stdout);
- fputs ((char *)ptr, stdout);
- break;
+ ptr = NLMSG_DATA(nlh);
+ fputs ( color[5], stdout);
+ fputs ((char *)ptr, stdout);
+ break;
case STP_EXIT:
- /* module asks us to unload it and exit */
- ptr = NLMSG_DATA(nlh);
- /* FIXME. overflow check */
- strcpy (tmpbuf, "/sbin/rmmod ");
- strcpy (tmpbuf + strlen(tmpbuf), (char *)ptr);
+ if (!streaming() && !exiting) {
+ exiting = 1;
+ kill_percpu_threads(ncpus);
+ if (request_last_buffers() < 0)
+ exit(1);
+ }
+ /* module asks us to unload it and exit */
+ ptr = NLMSG_DATA(nlh);
+ /* FIXME. overflow check */
+ strcpy (tmpbuf, "/sbin/rmmod ");
+ strcpy (tmpbuf + strlen(tmpbuf), (char *)ptr);
#if 0
- printf ("Executing \"system %s\"\n", tmpbuf);
+ printf ("Executing \"system %s\"\n", tmpbuf);
#endif
- system (tmpbuf);
- if (print_totals)
- summarize();
- if (!streaming)
- close_all_files();
- exit(0);
- break;
+ if (system(tmpbuf)) {
+ fprintf(stderr, "ERROR: couldn't rmmod probe module %s. No output will be written.\n", (char *)ptr);
+ close_all_relayfs_files();
+ delete_percpu_files();
+ exit(1);
+ }
+ if (!streaming() && (final_cpus_processed < ncpus))
+ break;
+
+ close(control_channel);
+ postprocess_and_exit();
+ break;
default:
- fprintf(stderr, "WARNING: ignored netlink message of type %d\n", (nlh->nlmsg_type));
+ fprintf(stderr, "WARNING: ignored netlink message of type %d\n", (nlh->nlmsg_type));
}
}
return 0;
diff --git a/runtime/stpd/librelay.h b/runtime/stpd/librelay.h
index 033976d9..59273d8e 100644
--- a/runtime/stpd/librelay.h
+++ b/runtime/stpd/librelay.h
@@ -4,18 +4,22 @@ enum
STP_BUF_INFO = 1,
STP_SUBBUFS_CONSUMED,
STP_REALTIME_DATA,
+ STP_TRANSPORT_MODE,
STP_EXIT,
};
+/* SystemTap transport options */
+enum
+{
+ STP_TRANSPORT_NETLINK = 1,
+ STP_TRANSPORT_RELAYFS
+};
+
/*
* stp external API functions
*/
extern int init_stp(const char *modname,
const char *relay_filebase,
- const char *out_filebase,
- unsigned sub_buf_size,
- unsigned n_sub_bufs,
int print_summary);
-
extern int stp_main_loop(void);
extern int send_request(int type, void *data, int len);
diff --git a/runtime/stpd/stpd.c b/runtime/stpd/stpd.c
index 881556cc..c31019d7 100644
--- a/runtime/stpd/stpd.c
+++ b/runtime/stpd/stpd.c
@@ -25,65 +25,39 @@
#include <unistd.h>
#include "librelay.h"
- /* packet logging output written here, filebase0...N */
-static char *stpd_outfilebase = "stpd_cpu";
-
-#define DEFAULT_SUBBUF_SIZE (262144)
-#define DEFAULT_N_SUBBUFS (4)
-static unsigned subbuf_size = DEFAULT_SUBBUF_SIZE;
-static unsigned n_subbufs = DEFAULT_N_SUBBUFS;
-
extern char *optarg;
extern int optopt;
extern int optind;
int print_only = 0;
int quiet = 0;
-int streaming = 1;
+int transport_mode = 0;
/* relayfs base file name */
static char stpd_filebase[1024];
static void usage(char *prog)
{
- fprintf(stderr, "%s [-p] [-q] [-b subbuf_size -n n_subbufs] kmod-name\n", prog);
+ fprintf(stderr, "%s [-p] [-q] kmod-name\n", prog);
fprintf(stderr, "-p Print only. Don't log to files.\n");
fprintf(stderr, "-q Quiet. Don't display trace to stdout.\n");
- fprintf(stderr, "-r Use relayfs for buffering i.e. non-streaming mode.\n");
- fprintf(stderr, "-b subbuf_size (default is %d)\n", DEFAULT_SUBBUF_SIZE);
- fprintf(stderr, "-n subbufs (default is %d)\n", DEFAULT_N_SUBBUFS);
exit(1);
}
int main(int argc, char **argv)
{
int c;
- unsigned opt_subbuf_size = 0;
- unsigned opt_n_subbufs = 0;
char *modname = NULL;
- while ((c = getopt(argc, argv, "b:n:pqr")) != EOF)
+ while ((c = getopt(argc, argv, "pq")) != EOF)
{
switch (c) {
- case 'b':
- opt_subbuf_size = (unsigned)atoi(optarg);
- if (!opt_subbuf_size)
- usage(argv[0]);
- break;
- case 'n':
- opt_n_subbufs = (unsigned)atoi(optarg);
- if (!opt_n_subbufs)
- usage(argv[0]);
- break;
case 'p':
print_only = 1;
break;
case 'q':
quiet = 1;
break;
- case 'r':
- streaming = 0;
- break;
default:
usage(argv[0]);
}
@@ -101,27 +75,13 @@ int main(int argc, char **argv)
fprintf (stderr, "Cannot do \"-p\" and \"-q\" both.\n");
usage(argv[0]);
}
-
- if ((opt_n_subbufs && !opt_subbuf_size) ||
- (!opt_n_subbufs && opt_subbuf_size))
- usage(argv[0]);
-
- if (opt_n_subbufs && opt_n_subbufs) {
- subbuf_size = opt_subbuf_size;
- n_subbufs = opt_n_subbufs;
- }
sprintf(stpd_filebase, "/mnt/relay/%d/cpu", getpid());
- if (init_stp(modname, stpd_filebase, stpd_outfilebase,
- subbuf_size, n_subbufs, 1)) {
+ if (init_stp(modname, stpd_filebase, 1)) {
fprintf(stderr, "Couldn't initialize stpd. Exiting.\n");
exit(1);
}
- if (!streaming)
- printf("Creating channel with %u sub-buffers of size %u.\n",
- n_subbufs, subbuf_size);
-
if (quiet)
printf("Logging... Press Control-C to stop.\n");
else