/* nsdpoll_ptcp.c
*
* An implementation of the nsd epoll() interface for plain tcp sockets.
*
* Copyright 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
* The rsyslog runtime library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The rsyslog runtime library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the rsyslog runtime library. If not, see .
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
*/
#include "config.h"
#ifdef HAVE_EPOLL_CREATE /* this module requires epoll! */
#include
#include
#include
#include
#if HAVE_SYS_EPOLL_H
# include
#endif
#include "rsyslog.h"
#include "module-template.h"
#include "obj.h"
#include "errmsg.h"
#include "srUtils.h"
#include "nspoll.h"
#include "nsd_ptcp.h"
#include "nsdpoll_ptcp.h"
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
/* -START------------------------- helpers for event list ------------------------------------ */
/* add new entry to list. We assume that the fd is not already present and DO NOT check this!
* Returns newly created entry in pEvtLst.
* Note that we currently need to use level-triggered mode, because the upper layers do not work
* in parallel. As such, in edge-triggered mode we may not get notified, because new data comes
* in after we have read everything that was present. To use ET mode, we need to change the upper
* peers so that they immediately start a new wait before processing the data read. That obviously
* requires more elaborate redesign and we postpone this until the current more simplictic mode has
* been proven OK in practice.
* rgerhards, 2009-11-18
*/
static inline rsRetVal
addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock, nsdpoll_epollevt_lst_t **pEvtLst) {
nsdpoll_epollevt_lst_t *pNew;
DEFiRet;
CHKmalloc(pNew = (nsdpoll_epollevt_lst_t*) malloc(sizeof(nsdpoll_epollevt_lst_t)));
pNew->id = id;
pNew->pUsr = pUsr;
pNew->pSock = pSock;
pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */
//pNew->event.events = EPOLLET;
if(mode & NSDPOLL_IN)
pNew->event.events |= EPOLLIN;
if(mode & NSDPOLL_OUT)
pNew->event.events |= EPOLLOUT;
pNew->event.data.u64 = (uint64) pNew;
pthread_mutex_lock(&pThis->mutEvtLst);
pNew->pNext = pThis->pRoot;
pThis->pRoot = pNew;
pthread_mutex_unlock(&pThis->mutEvtLst);
*pEvtLst = pNew;
finalize_it:
RETiRet;
}
/* find and unlink the entry identified by id/pUsr from the list.
* rgerhards, 2009-11-23
*/
static inline rsRetVal
unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **ppEvtLst) {
nsdpoll_epollevt_lst_t *pEvtLst;
nsdpoll_epollevt_lst_t *pPrev = NULL;
DEFiRet;
pthread_mutex_lock(&pThis->mutEvtLst);
pEvtLst = pThis->pRoot;
while(pEvtLst != NULL && !(pEvtLst->id == id && pEvtLst->pUsr == pUsr)) {
pPrev = pEvtLst;
pEvtLst = pEvtLst->pNext;
}
if(pEvtLst == NULL)
ABORT_FINALIZE(RS_RET_NOT_FOUND);
*ppEvtLst = pEvtLst;
/* unlink */
if(pPrev == NULL)
pThis->pRoot = pEvtLst->pNext;
else
pPrev->pNext = pEvtLst->pNext;
finalize_it:
pthread_mutex_unlock(&pThis->mutEvtLst);
RETiRet;
}
/* destruct the provided element. It must already be unlinked from the list.
* rgerhards, 2009-11-23
*/
static inline rsRetVal
delEvent(nsdpoll_epollevt_lst_t **ppEvtLst) {
DEFiRet;
free(*ppEvtLst);
*ppEvtLst = NULL;
RETiRet;
}
/* -END--------------------------- helpers for event list ------------------------------------ */
/* Standard-Constructor
*/
BEGINobjConstruct(nsdpoll_ptcp) /* be sure to specify the object type also in END macro! */
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
DBGPRINTF("nsdpoll_ptcp uses epoll_create1()\n");
pThis->efd = epoll_create1(EPOLL_CLOEXEC);
if(pThis->efd < 0 && errno == ENOSYS)
#endif
{
DBGPRINTF("nsdpoll_ptcp uses epoll_create()\n");
pThis->efd = epoll_create(100); /* size is ignored in newer kernels, but 100 is not bad... */
}
if(pThis->efd < 0) {
DBGPRINTF("epoll_create1() could not create fd\n");
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
pthread_mutex_init(&pThis->mutEvtLst, NULL);
finalize_it:
ENDobjConstruct(nsdpoll_ptcp)
/* destructor for the nsdpoll_ptcp object */
BEGINobjDestruct(nsdpoll_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */
nsdpoll_epollevt_lst_t *node;
nsdpoll_epollevt_lst_t *nextnode;
CODESTARTobjDestruct(nsdpoll_ptcp)
/* we check if the epoll list still holds entries. This may happen, but
* is a bit unusual.
*/
if(pThis->pRoot != NULL) {
for(node = pThis->pRoot ; node != NULL ; node = nextnode) {
nextnode = node->pNext;
dbgprintf("nsdpoll_ptcp destruct, need to destruct node %p\n", node);
delEvent(&node);
}
}
pthread_mutex_destroy(&pThis->mutEvtLst);
ENDobjDestruct(nsdpoll_ptcp)
/* Modify socket set */
static rsRetVal
Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op) {
nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll;
nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
nsdpoll_epollevt_lst_t *pEventLst;
int errSave;
char errStr[512];
DEFiRet;
if(op == NSDPOLL_ADD) {
dbgprintf("adding nsdpoll entry %d/%p, sock %d\n", id, pUsr, pSock->sock);
CHKiRet(addEvent(pThis, id, pUsr, mode, pSock, &pEventLst));
if(epoll_ctl(pThis->efd, EPOLL_CTL_ADD, pSock->sock, &pEventLst->event) < 0) {
errSave = errno;
rs_strerror_r(errSave, errStr, sizeof(errStr));
errmsg.LogError(errSave, RS_RET_ERR_EPOLL_CTL,
"epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n",
pSock->sock, id, pUsr, mode, errStr);
}
} else if(op == NSDPOLL_DEL) {
dbgprintf("removing nsdpoll entry %d/%p, sock %d\n", id, pUsr, pSock->sock);
CHKiRet(unlinkEvent(pThis, id, pUsr, &pEventLst));
if(epoll_ctl(pThis->efd, EPOLL_CTL_DEL, pSock->sock, &pEventLst->event) < 0) {
errSave = errno;
rs_strerror_r(errSave, errStr, sizeof(errStr));
errmsg.LogError(errSave, RS_RET_ERR_EPOLL_CTL,
"epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n",
pSock->sock, id, pUsr, mode, errStr);
ABORT_FINALIZE(RS_RET_ERR_EPOLL_CTL);
}
CHKiRet(delEvent(&pEventLst));
} else {
dbgprintf("program error: invalid NSDPOLL_mode %d - ignoring request\n", op);
ABORT_FINALIZE(RS_RET_ERR);
}
finalize_it:
RETiRet;
}
/* Wait for io to become ready. After the successful call, idRdy contains the
* id set by the caller for that i/o event, ppUsr is a pointer to a location
* where the user pointer shall be stored.
* numEntries contains the maximum number of entries on entry and the actual
* number of entries actually read on exit.
* rgerhards, 2009-11-18
*/
static rsRetVal
Wait(nsdpoll_t *pNsdpoll, int timeout, int *numEntries, nsd_epworkset_t workset[]) {
nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll;
nsdpoll_epollevt_lst_t *pOurEvt;
struct epoll_event event[128];
int nfds;
int i;
DEFiRet;
assert(workset != NULL);
if(*numEntries > 128)
*numEntries = 128;
DBGPRINTF("doing epoll_wait for max %d events\n", *numEntries);
nfds = epoll_wait(pThis->efd, event, *numEntries, timeout);
if(nfds == -1) {
if(errno == EINTR) {
ABORT_FINALIZE(RS_RET_EINTR);
} else {
DBGPRINTF("epoll() returned with error code %d\n", errno);
ABORT_FINALIZE(RS_RET_ERR_EPOLL);
}
} else if(nfds == 0) {
ABORT_FINALIZE(RS_RET_TIMEOUT);
}
/* we got valid events, so tell the caller... */
dbgprintf("epoll returned %d entries\n", nfds);
for(i = 0 ; i < nfds ; ++i) {
pOurEvt = (nsdpoll_epollevt_lst_t*) event[i].data.u64;
workset[i].id = pOurEvt->id;
workset[i].pUsr = pOurEvt->pUsr;
dbgprintf("epoll push ppusr[%d]: %p\n", i, pOurEvt->pUsr);
}
*numEntries = nfds;
finalize_it:
RETiRet;
}
/* ------------------------------ end support for the epoll() interface ------------------------------ */
/* queryInterface function */
BEGINobjQueryInterface(nsdpoll_ptcp)
CODESTARTobjQueryInterface(nsdpoll_ptcp)
if(pIf->ifVersion != nsdCURR_IF_VERSION) {/* check for current version, increment on each change */
ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
}
/* ok, we have the right interface, so let's fill it
* Please note that we may also do some backwards-compatibility
* work here (if we can support an older interface version - that,
* of course, also affects the "if" above).
*/
pIf->Construct = (rsRetVal(*)(nsdpoll_t**)) nsdpoll_ptcpConstruct;
pIf->Destruct = (rsRetVal(*)(nsdpoll_t**)) nsdpoll_ptcpDestruct;
pIf->Ctl = Ctl;
pIf->Wait = Wait;
finalize_it:
ENDobjQueryInterface(nsdpoll_ptcp)
/* exit our class
*/
BEGINObjClassExit(nsdpoll_ptcp, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
CODESTARTObjClassExit(nsdpoll_ptcp)
/* release objects we no longer need */
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
ENDObjClassExit(nsdpoll_ptcp)
/* Initialize the nsdpoll_ptcp class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-02-19
*/
BEGINObjClassInit(nsdpoll_ptcp, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* request objects we use */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
/* set our own handlers */
ENDObjClassInit(nsdpoll_ptcp)
#endif /* #ifdef HAVE_EPOLL_CREATE this module requires epoll! */
/* vi:set ai:
*/