diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-20 16:45:39 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-20 16:45:39 +0200 |
commit | 1359632eef6be90b37b64fc864196c1977e68913 (patch) | |
tree | 4c58713e0e0bb2522e4e16b7df8bd3a9c14a1e1e /tcpsrv.c | |
parent | caf6b75951d087f2406bcdda21a98dc2ee3eb145 (diff) | |
download | rsyslog-1359632eef6be90b37b64fc864196c1977e68913.tar.gz rsyslog-1359632eef6be90b37b64fc864196c1977e68913.tar.xz rsyslog-1359632eef6be90b37b64fc864196c1977e68913.zip |
re-structured tcpsrv.c a bit, no real change
... but this sets stage for potential future optimizations, especially
the capability to use multiple reception threads.
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 87 |
1 files changed, 49 insertions, 38 deletions
@@ -461,6 +461,53 @@ RunCancelCleanup(void *arg) } +/* process a receive request on one of the streams + * rgerhards, 2009-07-020 + */ +static rsRetVal +doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) +{ + char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */ + ssize_t iRcvd; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, tcpsrv); + dbgprintf("netstream %p with new data\n", (*ppSess)->pStrm); + + /* Receive message */ + iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd); + switch(iRet) { + case RS_RET_CLOSED: + pThis->pOnRegularClose(*ppSess); + tcps_sess.Destruct(ppSess); + break; + case RS_RET_RETRY: + /* we simply ignore retry - this is not an error, but we also have not received anything */ + break; + case RS_RET_OK: + /* valid data received, process it! */ + if(tcps_sess.DataRcvd(*ppSess, buf, iRcvd) != RS_RET_OK) { + /* in this case, something went awfully wrong. + * We are instructed to terminate the session. + */ + errmsg.LogError(0, NO_ERRCODE, "Tearing down TCP Session - see " + "previous messages for reason(s)\n"); + pThis->pOnErrClose(*ppSess); + tcps_sess.Destruct(ppSess); + } + break; + default: + errno = 0; + errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n", + (*ppSess)->pStrm); + pThis->pOnErrClose(*ppSess); + tcps_sess.Destruct(ppSess); + break; + } + RETiRet; +} + + /* This function is called to gather input. */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal @@ -473,15 +520,13 @@ Run(tcpsrv_t *pThis) int bIsReady; tcps_sess_t *pNewSess; nssel_t *pSel; - ssize_t iRcvd; ISOBJ_TYPE_assert(pThis, tcpsrv); /* this is an endless loop - it is terminated by the framework canelling * this thread. Thus, we also need to instantiate a cancel cleanup handler - * to prevent us from leaking anything. -- rgerharsd, 20080-04-24 + * to prevent us from leaking anything. -- rgerhards, 20080-04-24 */ -RUNLOG_STR("XXXX: tcp server runs\n"); pthread_cleanup_push(RunCancelCleanup, (void*) &pSel); while(1) { CHKiRet(nssel.Construct(&pSel)); @@ -502,7 +547,6 @@ RUNLOG_STR("XXXX: tcp server runs\n"); iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); } -RUNLOG_STR("XXXX: tcp server select\n"); /* wait for io to become ready */ CHKiRet(nssel.Wait(pSel, &nfds)); @@ -514,46 +558,13 @@ RUNLOG_STR("XXXX: tcp server select\n"); --nfds; /* indicate we have processed one */ } } -RUNLOG_STR("XXXX: tcp server post select\n"); /* now check the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); while(nfds && iTCPSess != -1) { CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */ - dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm); - - /* Receive message */ - iRet = pThis->pRcvData(pThis->pSessions[iTCPSess], buf, sizeof(buf), &iRcvd); - switch(iRet) { - case RS_RET_CLOSED: - pThis->pOnRegularClose(pThis->pSessions[iTCPSess]); - tcps_sess.Destruct(&pThis->pSessions[iTCPSess]); - break; - case RS_RET_RETRY: - /* we simply ignore retry - this is not an error, but we also have not received anything */ - break; - case RS_RET_OK: - /* valid data received, process it! */ - if(tcps_sess.DataRcvd(pThis->pSessions[iTCPSess], buf, iRcvd) != RS_RET_OK) { - /* in this case, something went awfully wrong. - * We are instructed to terminate the session. - */ - errmsg.LogError(0, NO_ERRCODE, "Tearing down TCP Session %d - see " - "previous messages for reason(s)\n", iTCPSess); - pThis->pOnErrClose(pThis->pSessions[iTCPSess]); - tcps_sess.Destruct(&pThis->pSessions[iTCPSess]); - } - break; - default: - errno = 0; - errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n", - pThis->pSessions[iTCPSess]->pStrm); - pThis->pOnErrClose(pThis->pSessions[iTCPSess]); - tcps_sess.Destruct(&pThis->pSessions[iTCPSess]); - break; - } + doReceive(pThis, &pThis->pSessions[iTCPSess]); --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); |