From 8653db699648321da785b3d5514fb67a7184411c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 4 Apr 2011 11:16:15 +0200 Subject: omfwd: speeded up tcp forwarding by reducing number of API calls --- tools/omfwd.c | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 66 insertions(+), 11 deletions(-) (limited to 'tools/omfwd.c') diff --git a/tools/omfwd.c b/tools/omfwd.c index 7e2b7f89..dc9bfd48 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -97,6 +97,8 @@ typedef struct _instanceData { # define FORW_TCP 1 /* following fields for TCP-based delivery */ tcpclt_t *pTCPClt; /* our tcpclt object */ + uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */ + unsigned offsSndBuf; /* next free spot in send buffer */ } instanceData; /* config data */ @@ -181,6 +183,7 @@ DestructTCPInstanceData(instanceData *pData) BEGINcreateInstance CODESTARTcreateInstance + pData->offsSndBuf = 0; ENDcreateInstance @@ -290,21 +293,24 @@ finalize_it: /* CODE FOR SENDING TCP MESSAGES */ -/* Send a frame via plain TCP protocol - * rgerhards, 2007-12-28 +/* Send a buffer via TCP. Usually, this is used to send the current + * send buffer, but if a message is larger than the buffer, we need to + * have the capability to send the message buffer directly. + * rgerhards, 2011-04-04 */ -static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) +static rsRetVal +TCPSendBuf(instanceData *pData, uchar *buf, unsigned len) { DEFiRet; ssize_t lenSend; - instanceData *pData = (instanceData *) pvData; lenSend = len; +dbgprintf("omfwd: XXXX: pData %p, pNetStrm %p\n", pData, pData->pNetstrm); netstrm.CheckConnection(pData->pNetstrm); /* hack for plain tcp syslog - see ptcp driver for details */ - CHKiRet(netstrm.Send(pData->pNetstrm, (uchar*)msg, &lenSend)); - dbgprintf("TCP sent %ld bytes, requested %ld\n", (long) lenSend, (long) len); + CHKiRet(netstrm.Send(pData->pNetstrm, buf, &lenSend)); + DBGPRINTF("omfwd: TCP sent %ld bytes, requested %u\n", (long) lenSend, len); - if(lenSend != (ssize_t) len) { + if(lenSend != len) { /* no real error, could "just" not send everything... * For the time being, we ignore this... * rgerhards, 2005-10-25 @@ -319,6 +325,38 @@ finalize_it: } +/* Add frame to send buffer (or send, if requried) + */ +static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) +{ + DEFiRet; + instanceData *pData = (instanceData *) pvData; + + DBGPRINTF("omfwd: add %u bytes to send buffer (curr offs %u)\n", + (unsigned) len, pData->offsSndBuf); + if(pData->offsSndBuf != 0 && pData->offsSndBuf + len >= sizeof(pData->sndBuf)) { + /* no buffer space left, need to commit previous records */ + CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf)); + pData->offsSndBuf = 0; + iRet = RS_RET_PREVIOUS_COMMITTED; + } + + /* check if the message is too large to fit into buffer */ + if(len > sizeof(pData->sndBuf)) { + CHKiRet(TCPSendBuf(pData, (uchar*)msg, len)); + ABORT_FINALIZE(RS_RET_OK); /* committed everything so far */ + } + + /* we now know the buffer has enough free space */ + memcpy(pData->sndBuf + pData->offsSndBuf, msg, len); + pData->offsSndBuf += len; + iRet = RS_RET_DEFER_COMMIT; + +finalize_it: + RETiRet; +} + + /* This function is called immediately before a send retry is attempted. * It shall clean up whatever makes sense. * rgerhards, 2007-12-28 @@ -428,6 +466,13 @@ CODESTARTtryResume iRet = doTryResume(pData); ENDtryResume + +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omfwd: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction char *psz = NULL; /* temporary buffering */ register unsigned l; @@ -494,9 +539,8 @@ CODESTARTdoAction CHKiRet(UDPSend(pData, psz, l)); } else { /* forward via TCP */ - rsRetVal ret; - ret = tcpclt.Send(pData->pTCPClt, pData, psz, l); - if(ret != RS_RET_OK) { + iRet = tcpclt.Send(pData->pTCPClt, pData, psz, l); + if(iRet != RS_RET_OK && iRet != RS_RET_DEFER_COMMIT && iRet != RS_RET_PREVIOUS_COMMITTED) { /* error! */ dbgprintf("error forwarding via tcp, suspending\n"); DestructTCPInstanceData(pData); @@ -513,6 +557,17 @@ finalize_it: ENDdoAction +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf); + if(pData->offsSndBuf != 0) { + iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); + pData->offsSndBuf = 0; + } +ENDendTransaction + + + /* This function loads TCP support, if not already loaded. It will be called * during config processing. To server ressources, TCP support will only * be loaded if it actually is used. -- rgerhard, 2008-04-17 @@ -529,7 +584,6 @@ finalize_it: RETiRet; } - BEGINparseSelectorAct uchar *q; int i; @@ -736,6 +790,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt -- cgit