/* * libstp - stpd 'library' * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * * Copyright (C) IBM Corporation, 2005 * Copyright (C) Redhat Inc, 2005 * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "librelay.h" /* maximum number of CPUs we can handle - change if more */ #define NR_CPUS 256 /* relayfs parameters */ static struct params { unsigned subbuf_size; unsigned n_subbufs; char relay_filebase[256]; } params; /* temporary per-cpu output written here, filebase0...N */ static char *percpu_tmpfilebase = "stpd_cpu"; /* temporary merged/sorted output written here */ static char *tmpfile_name = "sorted.tmp"; /* probe output written here */ static char *outfile_name = "probe.out"; /* internal variables */ static int transport_mode; static int ncpus; static int print_totals; static int final_cpus_processed; static int exiting; /* per-cpu data */ static int relay_file[NR_CPUS]; static int percpu_tmpfile[NR_CPUS]; static char *relay_buffer[NR_CPUS]; static pthread_t reader[NR_CPUS]; /* netlink control channel */ static int control_channel; /* flags */ extern int print_only; extern int quiet; /* used to communicate with kernel over control channel */ struct buf_info { int cpu; unsigned produced; unsigned consumed; }; struct consumed_info { int cpu; unsigned consumed; }; struct transport_info { int transport_mode; unsigned subbuf_size; unsigned n_subbufs; }; struct buf_msg { struct nlmsghdr nlh; struct buf_info info; }; struct transport_msg { struct nlmsghdr nlh; struct transport_info info; }; static char *recvbuf[8192]; /* per-cpu buffer info */ static struct buf_status { pthread_mutex_t ready_mutex; pthread_cond_t ready_cond; struct buf_info info; unsigned max_backlog; /* max # sub-buffers ready at one time */ } status[NR_CPUS]; /* colors for printing */ static char *color[] = { "\033[31m", /* red */ "\033[32m", /* green */ "\033[33m", /* yellow */ "\033[34m", /* blue */ "\033[35m", /* magenta */ "\033[36m", /* cyan */ }; /** * streaming - is the current transport mode streaming or not? * * Returns 1 if in streaming mode, 0 otherwise. */ static int streaming(void) { if (transport_mode == STP_TRANSPORT_NETLINK) return 1; return 0; } /** * send_request - send request to kernel over control channel * @type: the relay-app command id * @data: pointer to the data to be sent * @len: length of the data to be sent * * Returns 0 on success, negative otherwise. */ int send_request(int type, void *data, int len) { struct nlmsghdr *req; int err; req = (struct nlmsghdr *)malloc(NLMSG_SPACE(len)); if (req == 0) { fprintf(stderr, "WARNING: send_request malloc failed\n"); return -1; } memset(req, 0, NLMSG_SPACE(len)); req->nlmsg_len = NLMSG_LENGTH(len); req->nlmsg_type = type; req->nlmsg_flags = NLM_F_REQUEST; req->nlmsg_pid = getpid(); memcpy(NLMSG_DATA(req), data, len); err = send(control_channel, req, req->nlmsg_len, MSG_DONTWAIT); return err; } /** * open_control_channel - create netlink channel */ static int open_control_channel() { struct sockaddr_nl snl; int channel; channel = socket(AF_NETLINK, SOCK_RAW, NETLINK_USERSOCK); if (channel < 0) { fprintf(stderr, "ERROR: socket() failed\n"); return channel; } memset(&snl, 0, sizeof snl); snl.nl_family = AF_NETLINK; snl.nl_pid = getpid(); snl.nl_groups = 0; if (bind (channel, (struct sockaddr *) &snl, sizeof snl)) fprintf(stderr, "ERROR: bind() failed\n"); return channel; } /** * summarize - print a summary if applicable */ static void summarize(void) { int i; if (streaming()) return; printf("summary:\n"); for (i = 0; i < ncpus; i++) { printf("%s cpu %u:\n", color[i % 4], i); printf(" %u sub-buffers processed\n", status[i].info.consumed); printf(" %u max backlog\n", status[i].max_backlog); } } /** * close_relayfs_files - close and munmap buffer and open output file */ static void close_relayfs_files(int cpu) { size_t total_bufsize = params.subbuf_size * params.n_subbufs; munmap(relay_buffer[cpu], total_bufsize); close(relay_file[cpu]); close(percpu_tmpfile[cpu]); } /** * close_all_relayfs_files - close and munmap buffers and output files */ static void close_all_relayfs_files(void) { int i; if (!streaming()) { for (i = 0; i < ncpus; i++) close_relayfs_files(i); } } /** * open_relayfs_files - open and mmap buffer and open output file */ static int open_relayfs_files(int cpu, const char *relay_filebase) { size_t total_bufsize; char tmp[PATH_MAX]; memset(&status[cpu], 0, sizeof(struct buf_status)); pthread_mutex_init(&status[cpu].ready_mutex, NULL); pthread_cond_init(&status[cpu].ready_cond, NULL); status[cpu].info.cpu = cpu; sprintf(tmp, "%s%d", relay_filebase, cpu); relay_file[cpu] = open(tmp, O_RDONLY | O_NONBLOCK); if (relay_file[cpu] < 0) { fprintf(stderr, "ERROR: couldn't open relayfs file %s: errcode = %s\n", tmp, strerror(errno)); return -1; } sprintf(tmp, "%s%d", percpu_tmpfilebase, cpu); if((percpu_tmpfile[cpu] = open(tmp, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < 0) { fprintf(stderr, "ERROR: Couldn't open output file %s: errcode = %s\n", tmp, strerror(errno)); close(relay_file[cpu]); return -1; } total_bufsize = params.subbuf_size * params.n_subbufs; relay_buffer[cpu] = mmap(NULL, total_bufsize, PROT_READ, MAP_PRIVATE | MAP_POPULATE, relay_file[cpu], 0); if(relay_buffer[cpu] == MAP_FAILED) { fprintf(stderr, "ERROR: couldn't mmap relay file, total_bufsize (%d) = subbuf_size (%d) * n_subbufs(%d), error = %s \n", (int)total_bufsize, (int)params.subbuf_size, (int)params.n_subbufs, strerror(errno)); close(relay_file[cpu]); close(percpu_tmpfile[cpu]); return -1; } return 0; } /** * delete_percpu_files - delete temporary per-cpu output files * * Returns 0 if successful, -1 otherwise. */ static int delete_percpu_files(void) { int i; char tmp[PATH_MAX]; for (i = 0; i < ncpus; i++) { sprintf(tmp, "%s%d", percpu_tmpfilebase, i); if (unlink(tmp) < 0) { fprintf(stderr, "ERROR: couldn't unlink percpu file %s: errcode = %s\n", tmp, strerror(errno)); return -1; } } return 0; } /** * kill_percpu_threads - kill per-cpu threads 0->n-1 * @n: number of threads to kill * * Returns number of threads killed. */ static int kill_percpu_threads(int n) { int i, killed = 0; for (i = 0; i < n; i++) { if (pthread_cancel(reader[i]) == 0) killed++; } if (killed != n) fprintf(stderr, "WARNING: couldn't kill all per-cpu threads: %d killed, %d total\n", killed, n); return killed; } /** * request_last_buffers - request end-of-trace last buffer processing * * Returns 0 if successful, negative otherwise */ static int request_last_buffers(void) { int i; for (i = 0; i < ncpus; i++) { if (send_request(STP_BUF_INFO, &status[i].info, sizeof(struct buf_info)) < 0) { fprintf(stderr, "WARNING: couldn't request last buffers for cpu %d\n", i); return -1; } } return 0; } /** * process_subbufs - write ready subbufs to disk */ static int process_subbufs(struct buf_info *info) { unsigned subbufs_ready, start_subbuf, end_subbuf, subbuf_idx; int i, len, cpu = info->cpu; char *subbuf_ptr; int subbufs_consumed = 0; unsigned padding; subbufs_ready = info->produced - info->consumed; start_subbuf = info->consumed % params.n_subbufs; end_subbuf = start_subbuf + subbufs_ready; for (i = start_subbuf; i < end_subbuf; i++) { subbuf_idx = i % params.n_subbufs; subbuf_ptr = relay_buffer[cpu] + subbuf_idx * params.subbuf_size; padding = *((unsigned *)subbuf_ptr); subbuf_ptr += sizeof(padding); len = (params.subbuf_size - sizeof(padding)) - padding; if (write(percpu_tmpfile[cpu], subbuf_ptr, len) < 0) { fprintf(stderr, "ERROR: couldn't write to output file for cpu %d, exiting: errcode = %d: %s\n", cpu, errno, strerror(errno)); exit(1); } subbufs_consumed++; } return subbufs_consumed; } /** * reader_thread - per-cpu channel buffer reader */ static void *reader_thread(void *data) { int rc; long cpu = (long)data; struct pollfd pollfd; struct consumed_info consumed_info; unsigned subbufs_consumed; do { pollfd.fd = relay_file[cpu]; pollfd.events = POLLIN; rc = poll(&pollfd, 1, -1); if (rc < 0) { if (errno != EINTR) { fprintf(stderr, "ERROR: poll error: %s\n",strerror(errno)); exit(1); } fprintf(stderr, "WARNING: poll warning: %s\n",strerror(errno)); rc = 0; } send_request(STP_BUF_INFO, &status[cpu].info, sizeof(struct buf_info)); if (status[cpu].info.produced == status[cpu].info.consumed) pthread_cond_wait(&status[cpu].ready_cond, &status[cpu].ready_mutex); pthread_mutex_unlock(&status[cpu].ready_mutex); subbufs_consumed = process_subbufs(&status[cpu].info); if (subbufs_consumed) { if (subbufs_consumed == params.n_subbufs) fprintf(stderr, "WARNING: cpu %ld buffer full. Consider using a larger buffer size.\n", cpu); if (subbufs_consumed > status[cpu].max_backlog) status[cpu].max_backlog = subbufs_consumed; status[cpu].info.consumed += subbufs_consumed; consumed_info.cpu = cpu; consumed_info.consumed = subbufs_consumed; send_request(STP_SUBBUFS_CONSUMED, &consumed_info, sizeof(struct consumed_info)); } } while (1); } /** * init_relayfs - create files and threads for relayfs processing * * Returns 0 if successful, negative otherwise */ int init_relayfs(void) { int i, j; for (i = 0; i < ncpus; i++) { if (open_relayfs_files(i, params.relay_filebase) < 0) { fprintf(stderr, "ERROR: couldn't open relayfs files, cpu = %d\n", i); goto err; } /* create a thread for each per-cpu buffer */ if (pthread_create(&reader[i], NULL, reader_thread, (void *)i) < 0) { close_relayfs_files(i); fprintf(stderr, "ERROR: Couldn't create reader thread, cpu = %d\n", i); goto err; } } printf("Using channel with %u sub-buffers of size %u.\n", params.n_subbufs, params.subbuf_size); return 0; err: for (j = 0; j < i; j++) close_relayfs_files(j); kill_percpu_threads(i); return -1; } /** * init_stp - initialize the app * @modname: the name of the stp module to load * @relay_filebase: full path of base name of the per-cpu relayfs files * @print_summary: boolean, print summary or not at end of run * * Returns 0 on success, negative otherwise. */ int init_stp(const char *modname, const char *relay_filebase, int print_summary) { int daemon_pid; char buf[1024]; ncpus = sysconf(_SC_NPROCESSORS_ONLN); print_totals = print_summary; daemon_pid = getpid(); sprintf(buf, "insmod %s pid=%d", modname, daemon_pid); if (system(buf)) { fprintf(stderr, "ERROR, couldn't insmod probe module %s\n", modname); return -1; } control_channel = open_control_channel(); if (control_channel < 0) return -1; if (relay_filebase) strcpy(params.relay_filebase, relay_filebase); return 0; } /* length of timestamp in output field 0 - TODO: make binary, variable */ #define TIMESTAMP_SIZE 11 /** * write_output - either to screen, outfile or both */ static int write_output(void) { struct stat sbuf; int i, len, err = 0; const char *subbuf_ptr; int tmpfile; FILE *outfile = NULL; char *tmpbuf; const char *rec_txt = subbuf_ptr + i; const char * rec_cnt; tmpfile = open(tmpfile_name, O_RDONLY); if (tmpfile < 0) { fprintf(stderr, "ERROR: couldn't open tmp file %s: errcode = %s\n", tmpfile_name, strerror(errno)); return -1; } if (fstat(tmpfile, &sbuf)) { fprintf(stderr, "ERROR: couldn't stat tmp file %s: errcode = %s\n", tmpfile_name, strerror(errno)); return -1; } len = sbuf.st_size; if (len <= TIMESTAMP_SIZE) goto rm_tmp; tmpbuf = mmap(NULL, len, PROT_READ, MAP_PRIVATE, tmpfile, 0); if(tmpbuf == MAP_FAILED) { close(tmpfile); fprintf(stderr, "ERROR: couldn't mmap tmp file %s: errcode = %s\n", tmpfile_name, strerror(errno)); err = -1; goto rm_tmp; } subbuf_ptr = tmpbuf; i = TIMESTAMP_SIZE; rec_txt = subbuf_ptr + i; if (!print_only) { outfile = fopen(outfile_name, "w"); if (!outfile) { fprintf(stderr, "ERROR: couldn't open output file %s: errcode = %s\n", outfile_name, strerror(errno)); err = -1; goto unmap_rm_tmp; } } while (i < len) { if (subbuf_ptr[i] == '\0') { rec_cnt = rec_txt - TIMESTAMP_SIZE; if (!quiet) { #if 0 fwrite(rec_cnt, TIMESTAMP_SIZE, 1, stdout); #endif fwrite(rec_txt, (subbuf_ptr + i) - rec_txt, 1, stdout); } if (!print_only) { #if 0 fwrite(rec_cnt, TIMESTAMP_SIZE, 1, outfile); #endif fwrite(rec_txt, (subbuf_ptr + i) - rec_txt, 1, outfile); } rec_txt = subbuf_ptr + i + 1 + TIMESTAMP_SIZE; i += TIMESTAMP_SIZE; } i++; } if (!print_only) fclose(outfile); unmap_rm_tmp: munmap(tmpbuf, len); rm_tmp: close(tmpfile); if (unlink(tmpfile_name) < 0) { fprintf(stderr, "Couldn't unlink tmp file %s: errcode = %s\n", tmpfile_name, strerror(errno)); err = -1; } return err; } /** * sort_output - merge and sort per-cpu output * * TODO: replace this with our own sort */ static int sort_output(void) { char tmpbuf[PATH_MAX]; sprintf(tmpbuf, "sort -z -b -n -o %s %s*", tmpfile_name, percpu_tmpfilebase); system(tmpbuf); if (system(tmpbuf)) { fprintf(stderr, "ERROR: couldn't sort output: %s failed. No output will be written.\n", tmpbuf); return -1; } return 0; } /** * postprocess_and_exit - postprocess the output and exit */ static void postprocess_and_exit(void) { if (print_totals) summarize(); if (!streaming()) { if (sort_output() == 0) write_output(); close_all_relayfs_files(); delete_percpu_files(); } close(control_channel); exit(0); } static void sigproc(int signum) { while (send_request(STP_EXIT, NULL, 0) < 0) usleep (10000); } /** * stp_main_loop - loop forever reading data */ int stp_main_loop(void) { int cpu, nb, rc; struct buf_msg *buf_msg; struct transport_msg *transport_msg; unsigned short *ptr; unsigned subbufs_consumed; char tmpbuf[128]; signal(SIGINT, sigproc); signal(SIGTERM, sigproc); send_request(STP_TRANSPORT_MODE, &transport_mode, sizeof(transport_mode)); while (1) { /* handle messages from control channel */ nb = recv(control_channel, recvbuf, sizeof(recvbuf), 0); struct nlmsghdr *nlh = (struct nlmsghdr *)recvbuf; if (nb < 0) { if (errno == EINTR) continue; fprintf(stderr, "WARNING: recv() failed\n"); } else if (nb == 0) fprintf(stderr, "WARNING: unexpected EOF on netlink socket\n"); if (!NLMSG_OK(nlh, nb)) { fprintf(stderr, "WARNING: netlink message not ok, nb = %d\n", nb); continue; } if (!transport_mode && nlh->nlmsg_type != STP_TRANSPORT_MODE && nlh->nlmsg_type != STP_EXIT) { fprintf(stderr, "WARNING: invalid stp command: no transport\n"); continue; } switch (nlh->nlmsg_type) { case STP_BUF_INFO: buf_msg = (struct buf_msg *)nlh; cpu = buf_msg->info.cpu; memcpy(&status[cpu].info, &buf_msg->info, sizeof (struct buf_info)); if (exiting) { final_cpus_processed++; subbufs_consumed = process_subbufs(&status[cpu].info); status[cpu].info.consumed += subbufs_consumed; if (final_cpus_processed >= ncpus) postprocess_and_exit(); break; } pthread_mutex_lock(&status[cpu].ready_mutex); if (status[cpu].info.produced > status[cpu].info.consumed) pthread_cond_signal(&status[cpu].ready_cond); pthread_mutex_unlock(&status[cpu].ready_mutex); break; case STP_TRANSPORT_MODE: transport_msg = (struct transport_msg *)nlh; transport_mode = transport_msg->info.transport_mode; params.subbuf_size = transport_msg->info.subbuf_size; params.n_subbufs = transport_msg->info.n_subbufs; if (!streaming()) { rc = init_relayfs(); if (rc < 0) { close(control_channel); fprintf(stderr, "ERROR: couldn't init relayfs, exiting\n"); exit(1); } } break; case STP_REALTIME_DATA: ptr = NLMSG_DATA(nlh); fputs ( color[5], stdout); fputs ((char *)ptr, stdout); break; case STP_EXIT: if (exiting) break; exiting = 1; if (!streaming()) { kill_percpu_threads(ncpus); if (request_last_buffers() < 0) exit(1); } /* module asks us to unload it and exit */ ptr = NLMSG_DATA(nlh); /* FIXME. overflow check */ strcpy (tmpbuf, "/sbin/rmmod "); strcpy (tmpbuf + strlen(tmpbuf), (char *)ptr); #if 0 printf ("Executing \"system %s\"\n", tmpbuf); #endif if (system(tmpbuf)) { fprintf(stderr, "ERROR: couldn't rmmod probe module %s. No output will be written.\n", (char *)ptr); close_all_relayfs_files(); delete_percpu_files(); exit(1); } if (!streaming() && (final_cpus_processed < ncpus)) break; postprocess_and_exit(); break; default: fprintf(stderr, "WARNING: ignored netlink message of type %d\n", (nlh->nlmsg_type)); } } return 0; }