summaryrefslogtreecommitdiffstats
path: root/tools/omfwd.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-04-04 11:16:15 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2011-04-04 11:16:15 +0200
commit8653db699648321da785b3d5514fb67a7184411c (patch)
treecae9ef4d4f833b27967dc73d0be0b30cfe183299 /tools/omfwd.c
parenta2a3c58c6c0d0f4e2cbe053979ed4feb32298ac5 (diff)
downloadrsyslog-8653db699648321da785b3d5514fb67a7184411c.tar.gz
rsyslog-8653db699648321da785b3d5514fb67a7184411c.tar.xz
rsyslog-8653db699648321da785b3d5514fb67a7184411c.zip
omfwd: speeded up tcp forwarding by reducing number of API calls
Diffstat (limited to 'tools/omfwd.c')
-rw-r--r--tools/omfwd.c77
1 files changed, 66 insertions, 11 deletions
diff --git a/tools/omfwd.c b/tools/omfwd.c
index 7e2b7f89..dc9bfd48 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -97,6 +97,8 @@ typedef struct _instanceData {
# define FORW_TCP 1
/* following fields for TCP-based delivery */
tcpclt_t *pTCPClt; /* our tcpclt object */
+ uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */
+ unsigned offsSndBuf; /* next free spot in send buffer */
} instanceData;
/* config data */
@@ -181,6 +183,7 @@ DestructTCPInstanceData(instanceData *pData)
BEGINcreateInstance
CODESTARTcreateInstance
+ pData->offsSndBuf = 0;
ENDcreateInstance
@@ -290,21 +293,24 @@ finalize_it:
/* CODE FOR SENDING TCP MESSAGES */
-/* Send a frame via plain TCP protocol
- * rgerhards, 2007-12-28
+/* Send a buffer via TCP. Usually, this is used to send the current
+ * send buffer, but if a message is larger than the buffer, we need to
+ * have the capability to send the message buffer directly.
+ * rgerhards, 2011-04-04
*/
-static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len)
+static rsRetVal
+TCPSendBuf(instanceData *pData, uchar *buf, unsigned len)
{
DEFiRet;
ssize_t lenSend;
- instanceData *pData = (instanceData *) pvData;
lenSend = len;
+dbgprintf("omfwd: XXXX: pData %p, pNetStrm %p\n", pData, pData->pNetstrm);
netstrm.CheckConnection(pData->pNetstrm); /* hack for plain tcp syslog - see ptcp driver for details */
- CHKiRet(netstrm.Send(pData->pNetstrm, (uchar*)msg, &lenSend));
- dbgprintf("TCP sent %ld bytes, requested %ld\n", (long) lenSend, (long) len);
+ CHKiRet(netstrm.Send(pData->pNetstrm, buf, &lenSend));
+ DBGPRINTF("omfwd: TCP sent %ld bytes, requested %u\n", (long) lenSend, len);
- if(lenSend != (ssize_t) len) {
+ if(lenSend != len) {
/* no real error, could "just" not send everything...
* For the time being, we ignore this...
* rgerhards, 2005-10-25
@@ -319,6 +325,38 @@ finalize_it:
}
+/* Add frame to send buffer (or send, if requried)
+ */
+static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len)
+{
+ DEFiRet;
+ instanceData *pData = (instanceData *) pvData;
+
+ DBGPRINTF("omfwd: add %u bytes to send buffer (curr offs %u)\n",
+ (unsigned) len, pData->offsSndBuf);
+ if(pData->offsSndBuf != 0 && pData->offsSndBuf + len >= sizeof(pData->sndBuf)) {
+ /* no buffer space left, need to commit previous records */
+ CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf));
+ pData->offsSndBuf = 0;
+ iRet = RS_RET_PREVIOUS_COMMITTED;
+ }
+
+ /* check if the message is too large to fit into buffer */
+ if(len > sizeof(pData->sndBuf)) {
+ CHKiRet(TCPSendBuf(pData, (uchar*)msg, len));
+ ABORT_FINALIZE(RS_RET_OK); /* committed everything so far */
+ }
+
+ /* we now know the buffer has enough free space */
+ memcpy(pData->sndBuf + pData->offsSndBuf, msg, len);
+ pData->offsSndBuf += len;
+ iRet = RS_RET_DEFER_COMMIT;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* This function is called immediately before a send retry is attempted.
* It shall clean up whatever makes sense.
* rgerhards, 2007-12-28
@@ -428,6 +466,13 @@ CODESTARTtryResume
iRet = doTryResume(pData);
ENDtryResume
+
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+dbgprintf("omfwd: beginTransaction\n");
+ENDbeginTransaction
+
+
BEGINdoAction
char *psz = NULL; /* temporary buffering */
register unsigned l;
@@ -494,9 +539,8 @@ CODESTARTdoAction
CHKiRet(UDPSend(pData, psz, l));
} else {
/* forward via TCP */
- rsRetVal ret;
- ret = tcpclt.Send(pData->pTCPClt, pData, psz, l);
- if(ret != RS_RET_OK) {
+ iRet = tcpclt.Send(pData->pTCPClt, pData, psz, l);
+ if(iRet != RS_RET_OK && iRet != RS_RET_DEFER_COMMIT && iRet != RS_RET_PREVIOUS_COMMITTED) {
/* error! */
dbgprintf("error forwarding via tcp, suspending\n");
DestructTCPInstanceData(pData);
@@ -513,6 +557,17 @@ finalize_it:
ENDdoAction
+BEGINendTransaction
+CODESTARTendTransaction
+dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf);
+ if(pData->offsSndBuf != 0) {
+ iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf);
+ pData->offsSndBuf = 0;
+ }
+ENDendTransaction
+
+
+
/* This function loads TCP support, if not already loaded. It will be called
* during config processing. To server ressources, TCP support will only
* be loaded if it actually is used. -- rgerhard, 2008-04-17
@@ -529,7 +584,6 @@ finalize_it:
RETiRet;
}
-
BEGINparseSelectorAct
uchar *q;
int i;
@@ -736,6 +790,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt