summaryrefslogtreecommitdiffstats
path: root/runtime/stpd/librelay.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/stpd/librelay.c')
-rw-r--r--runtime/stpd/librelay.c79
1 files changed, 47 insertions, 32 deletions
diff --git a/runtime/stpd/librelay.c b/runtime/stpd/librelay.c
index e348928d..13b00ebf 100644
--- a/runtime/stpd/librelay.c
+++ b/runtime/stpd/librelay.c
@@ -40,6 +40,7 @@
#include <linux/limits.h>
#include <sys/wait.h>
#include <sys/statfs.h>
+#include <stdint.h>
#include "librelay.h"
/* stp_check script */
@@ -57,6 +58,7 @@ static struct params
{
unsigned subbuf_size;
unsigned n_subbufs;
+ int merge;
char relay_filebase[256];
} params;
@@ -90,7 +92,7 @@ static pthread_t reader[NR_CPUS];
static int control_channel;
/* flags */
-extern int print_only, quiet, merge, verbose;
+extern int print_only, quiet, verbose;
extern unsigned int buffer_size;
extern char *modname;
extern char *modpath;
@@ -581,20 +583,20 @@ int init_stp(int print_summary)
/* length of timestamp in output field 0 */
-#define TIMESTAMP_SIZE (sizeof(int))
+#define TIMESTAMP_SIZE (sizeof(uint32_t))
/**
* merge_output - merge per-cpu output
*
*/
-#define MERGE_BUF_SIZE 32768
-
+#define MERGE_BUF_SIZE 16*1024
static int merge_output(void)
{
- int c, i, j, dropped=0;
- long count=0, min, num[ncpus];
- char buf[32], tmp[PATH_MAX];
+ char *buf[ncpus], tmp[PATH_MAX];
+ int i, j, dropped=0;
+ uint32_t count=0, min, num[ncpus];
FILE *ofp, *fp[ncpus];
+ uint32_t length[ncpus];
for (i = 0; i < ncpus; i++) {
sprintf (tmp, "%s%d", percpu_tmpfilebase, i);
@@ -603,10 +605,19 @@ static int merge_output(void)
fprintf (stderr, "error opening file %s.\n", tmp);
return -1;
}
- if (fread (buf, TIMESTAMP_SIZE, 1, fp[i]))
- num[i] = *((int *)buf);
- else
- num[i] = 0;
+ num[i] = 0;
+ buf[i] = malloc(MERGE_BUF_SIZE);
+ printf("buf[%d] = %p\n", i, buf[i]);
+ if (!buf[i]) {
+ fprintf(stderr,"Out of memory in merge_output(). Aborting merge.\n");
+ printf("Out of memory in merge_output(). Aborting merge.\n");
+ return -1;
+ }
+
+ if (fread_unlocked (&length[i], sizeof(uint32_t), 1, fp[i])) {
+ if (fread_unlocked (buf[i], length[i]+TIMESTAMP_SIZE, 1, fp[i]))
+ num[i] = *((uint32_t *)buf[i]);
+ }
}
if (!outfile_name)
@@ -629,34 +640,35 @@ static int merge_output(void)
}
}
- while (1) {
- c = fgetc_unlocked (fp[j]);
- if (c == 0 || c == EOF)
- break;
- if (!quiet)
- fputc_unlocked (c, stdout);
- if (!print_only)
- fputc_unlocked (c, ofp);
- }
+ if (!quiet)
+ fwrite_unlocked (buf[j]+TIMESTAMP_SIZE, length[j], 1, stdout);
+ if (!print_only)
+ fwrite_unlocked (buf[j]+TIMESTAMP_SIZE, length[j], 1, ofp);
+
if (min && ++count != min) {
count = min;
dropped++ ;
}
- if (fread (buf, TIMESTAMP_SIZE, 1, fp[j]))
- num[j] = *((int *)buf);
- else
- num[j] = 0;
+ num[j] = 0;
+ if (fread_unlocked (&length[j], sizeof(uint32_t), 1, fp[j])) {
+ printf("length[%d] = %d\n", j, length[j]);
+ printf("buf[%d] = %p\n", j, buf[j]);
+ if (fread_unlocked (buf[j], length[j]+TIMESTAMP_SIZE, 1, fp[j]))
+ num[j] = *((uint32_t *)buf[j]);
+ }
} while (min);
if (!print_only)
- fputs ("\n", ofp);
+ fwrite_unlocked ("\n", 1, 1, ofp);
for (i = 0; i < ncpus; i++)
fclose (fp[i]);
fclose (ofp);
+
if (dropped)
fprintf (stderr, "Sequence had %d drops.\n", dropped);
+
return 0;
}
@@ -697,7 +709,7 @@ static void cleanup_and_exit (int closed)
if (transport_mode == STP_TRANSPORT_RELAYFS) {
close_all_relayfs_files();
- if (merge) {
+ if (params.merge) {
merge_output();
delete_percpu_files();
}
@@ -790,13 +802,16 @@ int stp_main_loop(void)
transport_mode = info->transport_mode;
params.subbuf_size = info->subbuf_size;
params.n_subbufs = info->n_subbufs;
+ params.merge = info->merge;
#ifdef DEBUG
- if (transport_mode == STP_TRANSPORT_RELAYFS)
- fprintf (stderr, "TRANSPORT_INFO recvd: RELAYFS %d bufs of %d bytes.\n",
- params.n_subbufs,
- params.subbuf_size);
- else
- fprintf (stderr, "TRANSPORT_INFO recvd: PROC with %d Mbyte buffers.\n",
+ if (transport_mode == STP_TRANSPORT_RELAYFS) {
+ fprintf(stderr,"TRANSPORT_INFO recvd: RELAYFS %d bufs of %d bytes.\n",
+ params.n_subbufs,
+ params.subbuf_size);
+ if (params.merge)
+ fprintf(stderr,"Merge output\n");
+ } else
+ fprintf(stderr,"TRANSPORT_INFO recvd: PROC with %d Mbyte buffers.\n",
info->buf_size);
#endif
if (!streaming()) {