From 3885cb1e253f0b5c84f8a8b09bfc7855437391db Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 18 Dec 2010 18:41:50 +0100 Subject: enhanced tcpflood to run multithreaded --- tests/tcpflood.c | 137 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 107 insertions(+), 30 deletions(-) (limited to 'tests/tcpflood.c') diff --git a/tests/tcpflood.c b/tests/tcpflood.c index acca4452..e7288008 100644 --- a/tests/tcpflood.c +++ b/tests/tcpflood.c @@ -46,6 +46,8 @@ * Note: UDP supports a single target port, only * -W wait time between sending batches of messages, in microseconds (Default: 0) * -b number of messages within a batch (default: 100,000,000 millions) + * -Y use multiple threads, one per connection (which means 1 if one only connection + * is configured!) * * Part of the testbench for rsyslog. * @@ -80,6 +82,7 @@ #include #include #include +#include #include #include @@ -98,7 +101,7 @@ static int dynFileIDs = 0; static int extraDataLen = 0; /* amount of extra data to add to message */ static int bRandomizeExtraData = 0; /* randomize amount of extra data added */ static int numMsgsToSend; /* number of messages to send */ -static int numConnections = 1; /* number of connections to create */ +static unsigned numConnections = 1; /* number of connections to create */ static int *sockArray; /* array of sockets to use */ static int msgNum = 0; /* initial message number to start with */ static int bShowProgress = 1; /* show progress messages */ @@ -117,7 +120,18 @@ static int bStatsRecords = 0; /* generate stats records */ static int bCSVoutput = 0; /* generate output in CSV (where applicable) */ static long long batchsize = 100000000ll; static int waittime = 0; - +static int runMultithreaded = 0; /* run tests in multithreaded mode */ +static int numThrds = 1; /* number of threads to use */ + +/* the following struct provides information for a generator instance (thread) */ +struct instdata { + /* lower and upper bounds for the thread in question */ + unsigned long long lower; + unsigned long long numMsgs; /* number of messages to send */ + unsigned long long numSent; /* number of messages already sent */ + unsigned idx; /**< index of fd to be used for sending */ + pthread_t thread; /**< thread processing this instance */ +} *instarray = NULL; /* the following structure is used to gather performance data */ struct runstats { @@ -204,7 +218,7 @@ int openConn(int *fd) */ int openConnections(void) { - int i; + unsigned i; char msgBuf[128]; size_t lenMsg; @@ -242,7 +256,7 @@ int openConnections(void) */ void closeConnections(void) { - int i; + unsigned i; size_t lenMsg; struct linger ling; char msgBuf[128]; @@ -282,7 +296,7 @@ void closeConnections(void) * of constructing test messages. -- rgerhards, 2010-03-31 */ static inline void -genMsg(char *buf, size_t maxBuf, int *pLenBuf, long long *numMsgsGen) +genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst) { int edLen; /* actual extra data length to use */ char extraData[MAX_EXTRADATA_LEN + 1]; @@ -326,7 +340,7 @@ genMsg(char *buf, size_t maxBuf, int *pLenBuf, long long *numMsgsGen) *pLenBuf = snprintf(buf, maxBuf, "%s\n", MsgToSend); } - if((*numMsgsGen)++ >= (unsigned) numMsgsToSend) + if(inst->numSent++ >= inst->numMsgs) *pLenBuf = 0; /* indicate end of run */ finalize_it: ; @@ -340,19 +354,18 @@ finalize_it: ; * last. All messages in between are sent over random connections. * Note that message numbers start at 0. */ -int sendMessages(void) +int sendMessages(struct instdata *inst) { - int i = 0; + unsigned i = 0; int socknum; int lenBuf; - int lenSend; - char *statusText; - long long numSent = 0; /* number of messages sent in this test */ + int lenSend = 0; + char *statusText = ""; char buf[MAX_EXTRADATA_LEN + 1024]; if(!bSilent) { if(dataFile == NULL) { - printf("Sending %d messages.\n", numMsgsToSend); + printf("Sending %llu messages.\n", inst->numMsgs); statusText = "messages"; } else { printf("Sending file '%s' %d times.\n", dataFile, @@ -363,15 +376,19 @@ int sendMessages(void) if(bShowProgress) printf("\r%8.8d %s sent", 0, statusText); while(1) { /* broken inside loop! */ - if(i < numConnections) - socknum = i; - else if(i >= numMsgsToSend - numConnections) - socknum = i - (numMsgsToSend - numConnections); - else { - int rnd = rand(); - socknum = rnd % numConnections; + if(runMultithreaded) { + socknum = inst->idx; + } else { + if(i < numConnections) + socknum = i; + else if(i >= inst->numMsgs - numConnections) + socknum = i - (inst->numMsgs - numConnections); + else { + int rnd = rand(); + socknum = rnd % numConnections; + } } - genMsg(buf, sizeof(buf), &lenBuf, &numSent); /* generate the message to send according to params */ + genMsg(buf, sizeof(buf), &lenBuf, inst); /* generate the message to send according to params */ if(lenBuf == 0) break; /* end of processing! */ if(transport == TP_TCP) { @@ -391,7 +408,7 @@ int sendMessages(void) fflush(stdout); perror("send test data"); printf("send() failed at socket %d, index %d, msgNum %lld\n", - sockArray[socknum], i, numSent); + sockArray[socknum], i, inst->numSent); fflush(stderr); return(1); } @@ -399,7 +416,7 @@ int sendMessages(void) if(bShowProgress) printf("\r%8.8d", i); } - if(bRandConnDrop) { + if(!runMultithreaded && bRandConnDrop) { /* if we need to randomly drop connections, see if we * are a victim */ @@ -409,7 +426,7 @@ int sendMessages(void) sockArray[socknum] = -1; } } - if(numSent % batchsize == 0) { + if(inst->numSent % batchsize == 0) { usleep(waittime); } ++msgNum; @@ -422,6 +439,66 @@ int sendMessages(void) } +/* this is the thread that starts a generator + */ +static void * +thrdStarter(void *arg) +{ + struct instdata *inst = (struct instdata*) arg; + if(sendMessages(inst) != 0) { + printf("error sending messages\n"); + } + return NULL; +} + + +/* This function initializes the actual traffic generators. The function sets up all required + * parameter blocks and starts threads. It returns when all threads are ready to run + * and the main task must just enable them. + */ +static inline void +prepareGenerators() +{ + int i; + long long msgsThrd; + long long starting = 0; + + if(runMultithreaded) { + bSilent = 1; + numThrds = numConnections; + } else { + numThrds = 1; + } + + if(instarray != NULL) { + free(instarray); + } + instarray = calloc(numThrds, sizeof(struct instdata)); + msgsThrd = numMsgsToSend / numThrds; + + for(i = 0 ; i < numThrds ; ++i) { + instarray[i].lower = starting; + instarray[i].numMsgs = msgsThrd; + instarray[i].numSent = 0; + instarray[i].idx = i; + pthread_create(&(instarray[i].thread), NULL, thrdStarter, instarray + i); + printf("started thread %x\n", (unsigned) instarray[i].thread); + starting += msgsThrd; + } +} + +/* Wait for all traffic generators to stop. + */ +static inline void +waitGenerators() +{ + int i; + for(i = 0 ; i < numThrds ; ++i) { + pthread_join(instarray[i].thread, NULL); + printf("thread %x stopped\n", (unsigned) instarray[i].thread); + } +} + /* functions related to computing statistics on the runtime of a test. This is * a separate function primarily not to mess up the test driver. * rgerhards, 2010-12-08 @@ -500,18 +577,16 @@ runTests(void) int run; stats.totalRuntime = 0; - stats.minRuntime = (unsigned long) 0xffffffffffffffff; + stats.minRuntime = (unsigned long long) 0xffffffffffffffffll; stats.maxRuntime = 0; stats.numRuns = numRuns; run = 1; while(1) { /* loop broken inside */ if(!bSilent) printf("starting run %d\n", run); + prepareGenerators(); gettimeofday(&tvStart, NULL); - if(sendMessages() != 0) { - printf("error sending messages (run %d)\n", run); - return 1; - } + waitGenerators(); endTiming(&tvStart, &stats); if(run == numRuns) break; @@ -551,7 +626,7 @@ int main(int argc, char *argv[]) setvbuf(stdout, buf, _IONBF, 48); - while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:M:rsBR:S:T:XW:")) != -1) { + while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:M:rsBR:S:T:XW:Y")) != -1) { switch (opt) { case 'b': batchsize = atoll(optarg); break; @@ -561,7 +636,7 @@ int main(int argc, char *argv[]) break; case 'n': numTargetPorts = atoi(optarg); break; - case 'c': numConnections = atoi(optarg); + case 'c': numConnections = (unsigned) atoi(optarg); break; case 'C': numFileIterations = atoi(optarg); break; @@ -617,6 +692,8 @@ int main(int argc, char *argv[]) break; case 'W': waittime = atoi(optarg); break; + case 'Y': runMultithreaded = 1; + break; default: printf("invalid option '%c' or value missing - terminating...\n", opt); exit (1); break; -- cgit