summaryrefslogtreecommitdiffstats
path: root/runtime/stpd/librelay.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/stpd/librelay.c')
-rw-r--r--runtime/stpd/librelay.c520
1 files changed, 520 insertions, 0 deletions
diff --git a/runtime/stpd/librelay.c b/runtime/stpd/librelay.c
new file mode 100644
index 00000000..04d1ba6b
--- /dev/null
+++ b/runtime/stpd/librelay.c
@@ -0,0 +1,520 @@
+/*
+ * librelay - relay-app user space 'library'
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (C) 2005 - Tom Zanussi (zanussi@us.ibm.com), IBM Corp
+ *
+ */
+#include <ctype.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/ioctl.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <linux/fd.h>
+#include <sys/mman.h>
+#include <sys/poll.h>
+#include <pthread.h>
+#include <sys/socket.h>
+#include <linux/types.h>
+#include <linux/netlink.h>
+#include "librelay.h"
+
+/* maximum number of CPUs we can handle - change if more */
+#define NR_CPUS 256
+
+ /* internal variables */
+static unsigned subbuf_size;
+static unsigned n_subbufs;
+static int ncpus;
+static int print_totals;
+static int logging;
+
+static int relay_file[NR_CPUS];
+static int out_file[NR_CPUS];
+static char *relay_buffer[NR_CPUS];
+
+ /* netlink control channel */
+static int control_channel;
+static int nl_unit;
+
+/* flags */
+extern int print_only;
+extern int quiet;
+
+/* used to communicate with kernel over control channel */
+
+struct buf_info
+{
+ int cpu;
+ unsigned produced;
+ unsigned consumed;
+};
+
+struct consumed_info
+{
+ int cpu;
+ unsigned consumed;
+};
+
+struct channel_create_info
+{
+ unsigned subbuf_size;
+ unsigned n_subbufs;
+};
+
+struct app_msg
+{
+ struct nlmsghdr nlh;
+ struct buf_info info;
+};
+
+static char *recvbuf[8192];
+
+/* per-cpu buffer info */
+static struct buf_status
+{
+ pthread_mutex_t ready_mutex;
+ pthread_cond_t ready_cond;
+ struct buf_info info;
+ unsigned max_backlog; /* max # sub-buffers ready at one time */
+} status[NR_CPUS];
+
+/* colors for printing */
+static char *color[] = {
+ "\033[31m", /* red */
+ "\033[32m", /* green */
+ "\033[33m", /* yellow */
+ "\033[34m", /* blue */
+ "\033[35m", /* magenta */
+ "\033[36m", /* cyan */
+};
+
+
+enum
+{
+ STP_REALTIME_DATA = RELAY_APP_USERCMD_START,
+ STP_EXIT,
+ STP_DONE
+};
+
+
+/**
+ * send_request - send request to kernel over control channel
+ * @type: the relay-app command id
+ * @data: pointer to the data to be sent
+ * @len: length of the data to be sent
+ *
+ * Returns 0 on success, negative otherwise.
+ */
+int send_request(int type, void *data, int len)
+{
+ struct nlmsghdr *req;
+ int err;
+
+ req = (struct nlmsghdr *)malloc(NLMSG_SPACE(len));
+ memset(req, 0, NLMSG_SPACE(len));
+ req->nlmsg_len = NLMSG_LENGTH(len);
+ req->nlmsg_type = type;
+ req->nlmsg_flags = NLM_F_REQUEST;
+ req->nlmsg_pid = getpid();
+ memcpy(NLMSG_DATA(req), data, len);
+
+ err = send(control_channel, req, req->nlmsg_len, 0);
+#if 0
+ if (err < 0)
+ fprintf(stderr, "netlink send error\n");
+#endif
+ return err;
+}
+
+/**
+ * open_control_channel - create netlink channel
+ */
+static int open_control_channel()
+{
+ struct sockaddr_nl snl;
+ int channel;
+
+ channel = socket(AF_NETLINK, SOCK_RAW, nl_unit);
+ if (channel < 0) {
+ printf("socket() failed\n");
+ return channel;
+ }
+
+ memset(&snl, 0, sizeof snl);
+ snl.nl_family = AF_NETLINK;
+ snl.nl_pid = getpid();
+ snl.nl_groups = 0;
+
+ if (bind (channel, (struct sockaddr *) &snl, sizeof snl))
+ printf("bind() failed\n");
+
+ return channel;
+}
+
+/**
+ * process_subbufs - write ready subbufs to disk and/or screen
+ */
+
+static int process_subbufs (struct buf_info *info)
+{
+ unsigned subbufs_ready, start_subbuf, end_subbuf, subbuf_idx;
+ int i, len, cpu = info->cpu;
+ char *subbuf_ptr;
+ int subbufs_consumed = 0;
+ unsigned padding;
+
+ subbufs_ready = info->produced - info->consumed;
+ start_subbuf = info->consumed % n_subbufs;
+ end_subbuf = start_subbuf + subbufs_ready;
+
+ if (!quiet)
+ fputs ( color[cpu % 4], stdout);
+
+ for (i = start_subbuf; i < end_subbuf; i++) {
+ subbuf_idx = i % n_subbufs;
+ subbuf_ptr = relay_buffer[cpu] + subbuf_idx * subbuf_size;
+ padding = *((unsigned *)subbuf_ptr);
+ subbuf_ptr += sizeof(padding);
+ len = (subbuf_size - sizeof(padding)) - padding;
+
+ if (!print_only)
+ {
+ if (write(out_file[cpu], subbuf_ptr, len) < 0)
+ {
+ printf("Couldn't write to output file for cpu %d, exiting: errcode = %d: %s\n", cpu, errno, strerror(errno));
+ exit(1);
+ }
+ }
+
+ if (!quiet)
+ fwrite (subbuf_ptr, len, 1, stdout);
+
+ subbufs_consumed++;
+ }
+
+ return subbufs_consumed;
+}
+
+
+
+/**
+ * reader_thread - per-cpu channel buffer reader
+ */
+static void *reader_thread(void *data)
+{
+ int rc;
+ long cpu = (long)data;
+ struct pollfd pollfd;
+ struct consumed_info consumed_info;
+ unsigned subbufs_consumed;
+
+ do {
+ pollfd.fd = relay_file[cpu];
+ pollfd.events = POLLIN;
+ rc = poll(&pollfd, 1, -1);
+ if (rc < 0) {
+ if (errno != EINTR) {
+ printf("poll error: %s\n",strerror(errno));
+ exit(1);
+ }
+ printf("poll warning: %s\n",strerror(errno));
+ rc = 0;
+ }
+
+ send_request(RELAY_APP_BUF_INFO, &status[cpu].info,
+ sizeof(struct buf_info));
+ if (status[cpu].info.produced == status[cpu].info.consumed)
+ pthread_cond_wait(&status[cpu].ready_cond,
+ &status[cpu].ready_mutex);
+ pthread_mutex_unlock(&status[cpu].ready_mutex);
+
+ subbufs_consumed = process_subbufs(&status[cpu].info);
+ if (subbufs_consumed) {
+ if (subbufs_consumed == n_subbufs)
+ fprintf(stderr, "cpu %ld buffer full. Consider using a larger buffer size.\n", cpu);
+ if (subbufs_consumed > status[cpu].max_backlog)
+ status[cpu].max_backlog = subbufs_consumed;
+ status[cpu].info.consumed += subbufs_consumed;
+ consumed_info.cpu = cpu;
+ consumed_info.consumed = subbufs_consumed;
+ send_request(RELAY_APP_SUBBUFS_CONSUMED,
+ &consumed_info,
+ sizeof(struct consumed_info));
+ }
+ } while (1);
+}
+
+static void summarize(void)
+{
+ int i;
+
+ printf("summary:\n");
+ for (i = 0; i < ncpus; i++) {
+ printf("%s cpu %u:\n", color[i % 4], i);
+ printf(" %u sub-buffers processed\n",
+ status[i].info.consumed);
+ printf(" %u max backlog\n", status[i].max_backlog);
+ }
+}
+
+/**
+ * close_files - close and munmap buffer and open output file
+ */
+static void close_files(int cpu)
+{
+ size_t total_bufsize = subbuf_size * n_subbufs;
+
+ munmap(relay_buffer[cpu], total_bufsize);
+ close(relay_file[cpu]);
+ if (!print_only) close(out_file[cpu]);
+}
+
+static void close_all_files(void)
+{
+ int i;
+
+ for (i = 0; i < ncpus; i++)
+ close_files(i);
+}
+
+static void sigalarm(int signum)
+{
+ if (print_totals)
+ summarize();
+ close_all_files();
+ send_request(RELAY_APP_CHAN_DESTROY, NULL, 0);
+ exit(0);
+}
+
+static void sigproc(int signum)
+{
+ send_request(STP_EXIT, NULL, 0);
+}
+
+/**
+ * open_files - open and mmap buffer and open output file
+ */
+static int open_files(int cpu, const char *relay_filebase,
+ const char *out_filebase)
+{
+ size_t total_bufsize;
+ char tmp[4096];
+
+ memset(&status[cpu], 0, sizeof(struct buf_status));
+ pthread_mutex_init(&status[cpu].ready_mutex, NULL);
+ pthread_cond_init(&status[cpu].ready_cond, NULL);
+ status[cpu].info.cpu = cpu;
+
+ sprintf(tmp, "%s%d", relay_filebase, cpu);
+ relay_file[cpu] = open(tmp, O_RDONLY | O_NONBLOCK);
+ if (relay_file[cpu] < 0) {
+ printf("Couldn't open relayfs file %s: errcode = %s\n",
+ tmp, strerror(errno));
+ return -1;
+ }
+
+ if (!print_only) {
+ sprintf(tmp, "%s%d", out_filebase, cpu);
+ if((out_file[cpu] = open(tmp, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR |
+ S_IWUSR | S_IRGRP | S_IROTH)) < 0) {
+ printf("Couldn't open output file %s: errcode = %s\n",
+ tmp, strerror(errno));
+ close(relay_file[cpu]);
+ return -1;
+ }
+ }
+
+ total_bufsize = subbuf_size * n_subbufs;
+ relay_buffer[cpu] = mmap(NULL, total_bufsize, PROT_READ,
+ MAP_PRIVATE | MAP_POPULATE, relay_file[cpu],
+ 0);
+ if(relay_buffer[cpu] == MAP_FAILED)
+ {
+ printf("Couldn't mmap relay file, total_bufsize (%d) = subbuf_size (%d) * n_subbufs(%d), error = %s \n", (int)total_bufsize, (int)subbuf_size, (int)n_subbufs, strerror(errno));
+ close(relay_file[cpu]);
+ if (!print_only) close(out_file[cpu]);
+ return -1;
+ }
+
+ return 0;
+}
+
+/**
+ * _init_relay_app - initialize the relay-app with specific netlink unit
+ * @relay_filebase: full path of base name of the per-cpu relayfs files
+ * @out_filebase: base name of the per-cpu files data will be written to
+ * @sub_buf_size: relayfs sub-buffer size of channel to be created
+ * @n_sub_bufs: relayfs number of sub-buffers of channel to be created
+ * @print_summary: boolean, print summary or not at end of run
+ * @netlink_unit: netlink unit, see netlink.h
+ *
+ * Returns 0 on success, negative otherwise.
+ *
+ * NOTE: use init_relay_app() instead if you don't need to specify a
+ * non-default netlink unit
+ */
+int _init_relay_app(const char *relay_filebase,
+ const char *out_filebase,
+ unsigned sub_buf_size,
+ unsigned n_sub_bufs,
+ int print_summary,
+ int netlink_unit)
+{
+ int i;
+ struct channel_create_info create_info;
+
+ ncpus = sysconf(_SC_NPROCESSORS_ONLN);
+ subbuf_size = sub_buf_size;
+ n_subbufs = n_sub_bufs;
+ print_totals = print_summary;
+ nl_unit = netlink_unit;
+
+ control_channel = open_control_channel();
+ if (control_channel < 0)
+ return -1;
+
+ create_info.subbuf_size = subbuf_size;
+ create_info.n_subbufs = n_subbufs;
+
+ send_request(RELAY_APP_STOP, NULL, 0); /* in case we exited badly before */
+ send_request(RELAY_APP_CHAN_CREATE, &create_info, sizeof(create_info));
+
+ for (i = 0; i < ncpus; i++) {
+ if (open_files(i, relay_filebase, out_filebase) < 0) {
+ printf("Couldn't open files\n");
+ send_request(RELAY_APP_STOP, NULL, 0);
+ send_request(RELAY_APP_CHAN_DESTROY, NULL, 0);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * init_relay_app - initialize the relay-app application
+ * @relay_filebase: full path of base name of the per-cpu relayfs files
+ * @out_filebase: base name of the per-cpu files data will be written to
+ * @sub_buf_size: relayfs sub-buffer size of channel to be created
+ * @n_sub_bufs: relayfs number of sub-buffers of channel to be created
+ * @print_summary: boolean, print summary or not at end of run
+ *
+ * Returns 0 on success, negative otherwise.
+ *
+ * The relayfs channel is created as a result of this function.
+ */
+int init_relay_app(const char *relay_filebase,
+ const char *out_filebase,
+ unsigned sub_buf_size,
+ unsigned n_sub_bufs,
+ int print_summary)
+{
+ return _init_relay_app(relay_filebase,
+ out_filebase,
+ sub_buf_size,
+ n_sub_bufs,
+ print_summary,
+ NETLINK_USERSOCK);
+}
+
+/**
+ * relay_app_main_loop - loop forever reading data
+ */
+int relay_app_main_loop(void)
+{
+ pthread_t thread;
+ int cpu, nb;
+ long i;
+ struct app_msg *msg;
+ unsigned short *ptr;
+ char tmpbuf[128];
+
+ signal(SIGINT, sigproc);
+ signal(SIGTERM, sigproc);
+ signal(SIGALRM, sigalarm);
+
+ send_request(RELAY_APP_START, NULL, 0);
+
+ for (i = 0; i < ncpus; i++) {
+ /* create a thread for each per-cpu buffer */
+ if (pthread_create(&thread, NULL, reader_thread, (void *)i) < 0) {
+ printf("Couldn't create thread\n");
+ send_request(RELAY_APP_STOP, NULL, 0);
+ send_request(RELAY_APP_CHAN_DESTROY, NULL, 0);
+ return -1;
+ }
+ }
+
+ logging = 1;
+
+ while (1) { /* handle messages from control channel */
+ nb = recv(control_channel, recvbuf, sizeof(recvbuf), 0);
+ struct nlmsghdr *nlh = (struct nlmsghdr *)recvbuf;
+
+ if (nb < 0) {
+ if (errno == EINTR)
+ continue;
+ printf("recv() failed\n");
+ } else if (nb == 0)
+ printf("unexpected EOF on netlink socket\n");
+ if (!NLMSG_OK(nlh, nb)) {
+ printf("netlink message not ok, nb = %d\n", nb);
+ continue;
+ }
+ switch (nlh->nlmsg_type) {
+ case RELAY_APP_BUF_INFO:
+ case RELAY_APP_SUBBUFS_CONSUMED:
+ msg = (struct app_msg *)nlh;
+ cpu = msg->info.cpu;
+ memcpy(&status[cpu].info, &msg->info, sizeof (struct buf_info));
+ pthread_mutex_lock(&status[cpu].ready_mutex);
+ if (status[cpu].info.produced > status[cpu].info.consumed)
+ pthread_cond_signal(&status[cpu].ready_cond);
+ pthread_mutex_unlock(&status[cpu].ready_mutex);
+ break;
+ case STP_REALTIME_DATA:
+ ptr = NLMSG_DATA(nlh);
+ fputs ( color[5], stdout);
+ fputs ((char *)ptr, stdout);
+ break;
+ case STP_EXIT:
+ /* module asks us to unload it and exit */
+ ptr = NLMSG_DATA(nlh);
+ /* FIXME. overflow check */
+ strcpy (tmpbuf, "/sbin/rmmod ");
+ strcpy (tmpbuf + strlen(tmpbuf), (char *)ptr);
+ printf ("Executing \"system %s\"\n", tmpbuf);
+ system (tmpbuf);
+ break;
+ case STP_DONE:
+ if (print_totals)
+ summarize();
+ close_all_files();
+ exit(0);
+ break;
+ default:
+ fprintf(stderr, "WARNING: ignored netlink message of type %d\n", (nlh->nlmsg_type));
+ }
+ }
+ return 0;
+}