diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2011-04-04 11:16:15 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2011-04-04 11:16:15 +0200 |
commit | 8653db699648321da785b3d5514fb67a7184411c (patch) | |
tree | cae9ef4d4f833b27967dc73d0be0b30cfe183299 | |
parent | a2a3c58c6c0d0f4e2cbe053979ed4feb32298ac5 (diff) | |
download | rsyslog-8653db699648321da785b3d5514fb67a7184411c.tar.gz rsyslog-8653db699648321da785b3d5514fb67a7184411c.tar.xz rsyslog-8653db699648321da785b3d5514fb67a7184411c.zip |
omfwd: speeded up tcp forwarding by reducing number of API calls
-rw-r--r-- | ChangeLog | 1 | ||||
-rw-r--r-- | tcpclt.c | 2 | ||||
-rw-r--r-- | tools/omfwd.c | 77 |
3 files changed, 68 insertions, 12 deletions
@@ -1,5 +1,6 @@ --------------------------------------------------------------------------- Version 6.1.7 [DEVEL] (rgerhards), 2011-03-?? +- speeded up tcp forwarding by reducing number of API calls - somewhat improved documentation index --------------------------------------------------------------------------- Version 6.1.6 [DEVEL] (rgerhards), 2011-03-14 @@ -308,7 +308,7 @@ Send(tcpclt_t *pThis, void *pData, char *msg, size_t len) CHKiRet(pThis->initFunc(pData)); iRet = pThis->sendFunc(pData, msg, len); - if(iRet == RS_RET_OK) { + if(iRet == RS_RET_OK || iRet == RS_RET_DEFER_COMMIT || iRet == RS_RET_PREVIOUS_COMMITTED) { /* we are done, we also use this as indication that the previous * message was succesfully received (it's not always the case, but its at * least our best shot at it -- rgerhards, 2008-03-12 diff --git a/tools/omfwd.c b/tools/omfwd.c index 7e2b7f89..dc9bfd48 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -97,6 +97,8 @@ typedef struct _instanceData { # define FORW_TCP 1 /* following fields for TCP-based delivery */ tcpclt_t *pTCPClt; /* our tcpclt object */ + uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */ + unsigned offsSndBuf; /* next free spot in send buffer */ } instanceData; /* config data */ @@ -181,6 +183,7 @@ DestructTCPInstanceData(instanceData *pData) BEGINcreateInstance CODESTARTcreateInstance + pData->offsSndBuf = 0; ENDcreateInstance @@ -290,21 +293,24 @@ finalize_it: /* CODE FOR SENDING TCP MESSAGES */ -/* Send a frame via plain TCP protocol - * rgerhards, 2007-12-28 +/* Send a buffer via TCP. Usually, this is used to send the current + * send buffer, but if a message is larger than the buffer, we need to + * have the capability to send the message buffer directly. + * rgerhards, 2011-04-04 */ -static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) +static rsRetVal +TCPSendBuf(instanceData *pData, uchar *buf, unsigned len) { DEFiRet; ssize_t lenSend; - instanceData *pData = (instanceData *) pvData; lenSend = len; +dbgprintf("omfwd: XXXX: pData %p, pNetStrm %p\n", pData, pData->pNetstrm); netstrm.CheckConnection(pData->pNetstrm); /* hack for plain tcp syslog - see ptcp driver for details */ - CHKiRet(netstrm.Send(pData->pNetstrm, (uchar*)msg, &lenSend)); - dbgprintf("TCP sent %ld bytes, requested %ld\n", (long) lenSend, (long) len); + CHKiRet(netstrm.Send(pData->pNetstrm, buf, &lenSend)); + DBGPRINTF("omfwd: TCP sent %ld bytes, requested %u\n", (long) lenSend, len); - if(lenSend != (ssize_t) len) { + if(lenSend != len) { /* no real error, could "just" not send everything... * For the time being, we ignore this... * rgerhards, 2005-10-25 @@ -319,6 +325,38 @@ finalize_it: } +/* Add frame to send buffer (or send, if requried) + */ +static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) +{ + DEFiRet; + instanceData *pData = (instanceData *) pvData; + + DBGPRINTF("omfwd: add %u bytes to send buffer (curr offs %u)\n", + (unsigned) len, pData->offsSndBuf); + if(pData->offsSndBuf != 0 && pData->offsSndBuf + len >= sizeof(pData->sndBuf)) { + /* no buffer space left, need to commit previous records */ + CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf)); + pData->offsSndBuf = 0; + iRet = RS_RET_PREVIOUS_COMMITTED; + } + + /* check if the message is too large to fit into buffer */ + if(len > sizeof(pData->sndBuf)) { + CHKiRet(TCPSendBuf(pData, (uchar*)msg, len)); + ABORT_FINALIZE(RS_RET_OK); /* committed everything so far */ + } + + /* we now know the buffer has enough free space */ + memcpy(pData->sndBuf + pData->offsSndBuf, msg, len); + pData->offsSndBuf += len; + iRet = RS_RET_DEFER_COMMIT; + +finalize_it: + RETiRet; +} + + /* This function is called immediately before a send retry is attempted. * It shall clean up whatever makes sense. * rgerhards, 2007-12-28 @@ -428,6 +466,13 @@ CODESTARTtryResume iRet = doTryResume(pData); ENDtryResume + +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omfwd: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction char *psz = NULL; /* temporary buffering */ register unsigned l; @@ -494,9 +539,8 @@ CODESTARTdoAction CHKiRet(UDPSend(pData, psz, l)); } else { /* forward via TCP */ - rsRetVal ret; - ret = tcpclt.Send(pData->pTCPClt, pData, psz, l); - if(ret != RS_RET_OK) { + iRet = tcpclt.Send(pData->pTCPClt, pData, psz, l); + if(iRet != RS_RET_OK && iRet != RS_RET_DEFER_COMMIT && iRet != RS_RET_PREVIOUS_COMMITTED) { /* error! */ dbgprintf("error forwarding via tcp, suspending\n"); DestructTCPInstanceData(pData); @@ -513,6 +557,17 @@ finalize_it: ENDdoAction +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf); + if(pData->offsSndBuf != 0) { + iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); + pData->offsSndBuf = 0; + } +ENDendTransaction + + + /* This function loads TCP support, if not already loaded. It will be called * during config processing. To server ressources, TCP support will only * be loaded if it actually is used. -- rgerhard, 2008-04-17 @@ -529,7 +584,6 @@ finalize_it: RETiRet; } - BEGINparseSelectorAct uchar *q; int i; @@ -736,6 +790,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt |