/* * This file is part of rasdaman community. * * Rasdaman community is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Rasdaman community is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with rasdaman community. If not, see . * * Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / rasdaman GmbH. * * For more information please see * or contact Peter Baumann via . / /** * SOURCE: rasmgr_comm_nb.cc * * MODULE: rasmgr * CLASS: IOSelector, NbJob, NbServerComm * * PURPOSE: * Performs reliable, non-blocking HTTP communication. used by the master rasmgr * Hierarchy: NbServerComm uses NbJob uses IOSelector * IOSelector maintains a set of read and write file descriptors (ie, sockets) * plus a timeout value common to all of them. * NbJob maintains message streams allowing partial (piecewise) read/write. * NbServerComm bundles all this, abstracting from the particular socket used. * * COMMENTS: * none * */ #include #include #include #include #include "rasmgr_comm_nb.hh" #include "debug.hh" /************************************************************************* * * CLASS: IOSelector * * PURPOSE: * IOSelector maintains a set of read and write file descriptors to be watched * plus a timeout value common to all of them. * ATTENTION: This class does not really care about socket status! * Exceptions: * - waitForRequest() performs blocking wait (w timeout) for incoming request on the sets. * - closeForcedAllSockets() closes all open sockets in the read and write set (should go to NbJob where bind() etc is performed) * * CHANGE HISTORY (append further entries): [see module header] * * COMMENTS: * - this class is (almost) internal to NbJob * - when adding sockets there is no check for valid socket, duplicate insertion, etc * - when clearing a socket from a set, there is no check whether it was really in there * ***********************************************************************/ IOSelector::IOSelector() { ENTER( "IOSelector::IOSelector: enter." ); FD_ZERO(& watchReadFdSet); FD_ZERO(& watchWriteFdSet); FD_ZERO(& watchExceptFdSet); tvptr = NULL; LEAVE( "IOSelector::IOSelector: leave." ); } void IOSelector::setTimeout(int sec,int millisec) { ENTER( "IOSelector::setTimeout: enter. timeout=" << sec << "." << millisec << " secs." ); tvinit.tv_sec=sec; tvinit.tv_usec=millisec * 1000; tvptr=&tv; // yes, yes, &tv LEAVE( "IOSelector::setTimeout: leave." ); } void IOSelector::disableTimeout() { ENTER( "IOSelector::disableTimeout: enter." ); tvptr=NULL; LEAVE( "IOSelector::disableTimeout: leave." ); } // add socket to the socket set watched for incoming requests void IOSelector::setReadSocket(int socket) { ENTER( "IOSelector::setReadSocket: enter. add to watchReadSet: " << socket ); FD_SET(socket,&watchReadFdSet); LEAVE( "IOSelector::setReadSocket: leave." ); } // remove socket from read socket set void IOSelector::clearReadSocket(int socket) { ENTER( "IOSelector::clearReadSocket: enter. remove from watchReadSet: " << socket ); FD_CLR(socket,&watchReadFdSet); LEAVE( "IOSelector::clearReadSocket: leave." ); } // add socket to the socket set watched for outgoing requests void IOSelector::setWriteSocket(int socket) { ENTER( "IOSelector::setWriteSocket: enter. add to watchWriteSet: " << socket ); FD_SET(socket,&watchWriteFdSet); LEAVE( "IOSelector::setWriteSocket: leave." ); } // remove socket from write socket set void IOSelector::clearWriteSocket(int socket) { ENTER( "IOSelector::clearWriteSocket: enter. remove from watchWriteSet: " << socket ); FD_CLR(socket,&watchWriteFdSet); LEAVE( "IOSelector::clearWriteSocket: leave." ); } // result = outcome of select() request: // > 0: number of active sockets // ==0: timeout // < 0: error // preconditions: // - read set, write set contains valid sockets // - bind() on each socket in the read and write set int IOSelector::waitForRequest() { int result; ENTER( "IOSelector::waitForRequest: enter." ); resultReadFdSet =watchReadFdSet; resultWriteFdSet=watchWriteFdSet; // error unused // tv has to be reloaded every time; if tvptr is NULL it doesn't matter tv.tv_sec = tvinit.tv_sec; tv.tv_usec = tvinit.tv_usec; TALK( "IOSelector::waitForRequest: timeout=" << tv.tv_sec << "sec " << tv.tv_usec << "microsec." ); result = select(FD_SETSIZE,&resultReadFdSet,&resultWriteFdSet,NULL,tvptr); if (result < 0) { TALK( "IOSelector::waitForRequest: select error: " << strerror(errno) ); } else // if ( result == 0) { TALK( "IOSelector::waitForRequest: select() successful, result=" << result ); } #if 0 // unsuccessful try else // (result > 0) { TALK( "IOSelector::waitForRequest: select() successful, returned " << result ); int isPending = 0; for (int i=0; i 0) { pselector->clearReadSocket(socket); pselector->clearWriteSocket(socket); int result = close(socket); int tempErrno = errno; TALK( "NbJob::clearConnection: close() on socket " << socket << " returned " << result ); if (result != 0) { TALK( "NbJob::clearConnection: error closing socket: " << strerror(tempErrno) ); } socket = -1; bigError = false; } LEAVE( "NbJob::clearConnection: leave." ); } // returns true if the current job is too old bool NbJob::processJobTimeout() { bool result = (messageReadyTime + timeOutInterv > currentTime) ? false:true; TALK( "NbJob::processJobTimeout: result=" << result ); return result; } // on timeout, reset all buffers but don't close socket bool NbJob::cleanUpIfTimeout() { ENTER( "NbJob::cleanUpIfTimeout: enter." ); bool result = (socket < 0 ) || (lastActionTime + timeOutInterv > currentTime) ? false : true; if (result==true) { TALK("NbJob::cleanUpIfTimeout: client timeout on socket " << socket); closeConnection(); } LEAVE( "NbJob::cleanUpIfTimeout: leave. result=" << result ); return result; } // reset all that's necessary, then do an accept() to wait [timeout] for incoming calls // upon success, set socket to the descriptor returned by accept() // preconditions: // - socket initialised with bind() etc. // Note: accept() works like an "open" in that it creates a separate socket which must be closed explicitly! NbJob::acceptStatus NbJob::acceptConnection(int listenSocket) { ENTER( "NbJob::acceptConnection: enter. listenSocket=" << listenSocket ); if(socket>0) { bool result = cleanUpIfTimeout(); if(result == false) { LEAVE("NbJob::acceptConnection: leave. cleanUpIfIimeout() returned: no timeout yet."); return acs_Iambusy; // free again } } markAction(); clearInputBuffer(); inputBuffer = new char[maxInputLength]; if(inputBuffer == NULL) { LEAVE("NbJob::acceptConnection: leave. out of memory."); return acs_outofmem; } struct sockaddr_in internetAddress; r_Socklen_t size=sizeof(sockaddr_in); // extract the first pending request from the socket queue // NB: accept() clones the socket. errno = 0; // replace accept() code with a simple FD_ISSET() search #ifndef NEVER_AGAIN // accept() here serves to clone the socket. // there shouldn't be any wait because of the select() call just passed via waitForRequest() socket=accept(listenSocket,(struct sockaddr*)&internetAddress,&size); int saveerrno=errno; TALK("NbJob::acceptConnection: accept() with socket " << listenSocket << " returned " << socket << ", sin_port=" << htons(internetAddress.sin_port) << ", requestor=" << inet_ntoa(internetAddress.sin_addr) ); if(socket<0) { if(saveerrno==EAGAIN) { LEAVE("NbJob::acceptConnection: leave. no pending connections"); } else { LEAVE("NbJob::acceptConnection: leave. accept error " << strerror(saveerrno) ); } return acs_nopending; } // several flags, such as non-blocking, are not inherited accross accept() - see manual int val = fcntl(socket,F_GETFL,0); val |= O_NONBLOCK; fcntl(socket,F_SETFL,val); pselector->setReadSocket(socket); #else // !NEVER_AGAIN -- but currently inactive. // int activeSocket = pselector->someWaitingSocket(); // basically we should always have at least one pending request because a select() came before if (activeSocket == 0) { TALK( "NbJob::acceptConnection: found NO active socket." ); } else { TALK( "NbJob::acceptConnection: found active socket " << activeSocket ); } # endif // NEVER_AGAIN LEAVE( "NbJob::acceptConnection: leave. acs_accepted, cloned socket is " << socket); return acs_accepted; } // acceptConnection() // this method got the body of reset() which (being redundant) has been dropped -- PB 2003-jun-04 void NbJob::closeConnection() { ENTER( "NbJob::closeConnection: enter." ); clearConnection(); clearInputBuffer(); clearOutputBuffer(); LEAVE( "NbJob::closeConnection: leave." ); } // PB 2003-may-29: to fix bug that connections cloned by accept() remain open, so the number grows infinitely // ...but this doesn't work yet, don't use it... void NbJob::closeSocket() { ENTER( "NbJob::closeSocket: enter. socket=" << socket ); int result = close(socket); if (result != 0) { TALK( "NbJob::closeSocket: error closing socket: " << strerror(errno) ); } socket = -1; LEAVE( "NbJob::closeSocket: leave." ); } void NbJob::markAction() { lastActionTime=currentTime; } int NbJob::getSocket() { return socket; } bool NbJob::isMessageOK() { return (nextReadPos > 1 && inputBuffer[nextReadPos - 1] == messageTerminator) ? true:false; } bool NbJob::wasError() { return bigError; } // read from socket into input buffer as much as fits in or until NULL terminator // returns true iff some bytes could be written bool NbJob::readPartialMessage() { bool messOK; ENTER("NbJob::readPartialMessage: enter." ); markAction(); errno = 0; int nbytes = read(socket,inputBuffer + nextReadPos,maxInputLength - nextReadPos); TALK("NbJob::readPartialMessage: read() with socket=" << socket << " returned " << nbytes ); if(nbytes) // wrote some bytes { TALK( "NbJob::readPartialMessage: read socket("<setWriteSocket(socket); pselector->clearReadSocket(socket); // sa fie } LEAVE("NbJob::initSendAnswer: leave. result=" << result ); return result; } // write current contents of output buffer to socket // result == true iff writing went fine bool NbJob::writePartialMessage() { bool result = false; ENTER("NbJob::writePartialMessage: enter." ); markAction(); errno = 0; int nbytes = write(socket,outputBuffer + nextWritePos,answerLength - nextWritePos); TALK("NbJob::writePartialMessage: write() with socket=" << socket << " returned " << nbytes ); if(nbytes) { TALK("NbJob::writePartialMessage: write to socket=" << socket << ", " << nbytes << " bytes to pos=" << nextWritePos << ", answerLength=" << answerLength ); nextWritePos += nbytes; if(nextWritePos == answerLength) // everything written? { TALK("NbJob::writePartialMessage: write completed."); // closeConnection(); // was here, now shifted up the hierarchy -- PB 2003-jun-10 result = true; } } else { int saveerrno=errno; switch(saveerrno) { case EINTR: TALK("NbJob::writePartialMessage: EINTR, retry please"); break; case EAGAIN: TALK("NbJob::writePartialMessage: EAGAIN, retry please"); break; case 0: TALK("NbJob::writePartialMessage: premature client hang up."); bigError=true; break; default: TALK("NbJob::writePartialMessage: error "<< strerror(saveerrno) ); bigError = true; break; } } LEAVE("NbJob::writePartialMessage: leave. result=" << result ); return result; } // is socket still open? (actually; wrong name) bool NbJob::isOperationPending() { bool result = socket > 0 ? true:false; TALK("NbJob::isOperationPending (i.e.: socket open) -> " << result ); return result; } void NbJob::printStatus() { TALK( "NbJob::printStatus: socket=" << socket << ", isRead=" << (int) pselector->isReadSocket(socket) << ", isWrite=" << (int) pselector->isWriteSocket(socket) ); } //################################################################################################ NbServerComm::NbServerComm() { listenSocket = -1; maxJobs = 0; job = 0; mypid = getpid(); } void NbServerComm::initJobs(int maxJobs) { ENTER( "NbServerComm::initJobs: enter. maxJobs=" << maxJobs ); this->maxJobs = maxJobs; job = new NbJob[maxJobs]; for(int i=0;ip_proto); if(listenSocket < 0) { TALK( "NbServerComm::initListenSocket: socket error: " << strerror(errno) ); exitbyerror("socket"); } // make the socket nonblocking int val =fcntl(listenSocket,F_GETFL,0); val|=O_NONBLOCK; fcntl(listenSocket,F_SETFL,val); val =fcntl(listenSocket,F_GETFL,0); if(val & O_NONBLOCK) TALK("NbServerComm::initListenSocket: socket " << listenSocket << " is nonblocking" ); #ifdef SO_REUSEADDR val = 1; int len = sizeof( val ); if(setsockopt( listenSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&val, len )) { TALK( "NbServerComm::initListenSocket: cannot set address reusable using setsockopt: " << strerror(errno) ); } #endif int sockResult = bind(listenSocket,(sockaddr*)&name,sizeof(name)); TALK( "NbServerComm::initListenSocket: bind() with socket=" << listenSocket << ", name.port=" << name.sin_port << " returned " << sockResult ); if (sockResult < 0) { TALK( "NbServerComm::initListenSocket: cannot set address reusable using bind: " << strerror(errno) ); exitbyerror("bind"); } int queuesize=SOMAXCONN; // the maximum number allowed by SO!! sockResult = listen(listenSocket,queuesize); TALK("NbServerComm::initListenSocket: listen() with socket=" << listenSocket << ", queuesize=" << queuesize << " returned " << sockResult ); if(sockResult < 0) { TALK( "NbServerComm::initListenSocket: listen error: " << strerror(errno) ); exitbyerror("listen"); } selector.setReadSocket(listenSocket); // add this socket to the read watch list LEAVE( "NbServerComm::initListenSocket: leave." ); return true; } void NbServerComm::closeListenSocket() { ENTER( "NbServerComm::closeListenSocket: enter." ); if(listenSocket>0) { selector.clearReadSocket(listenSocket); TALK( "NbServerComm::closeListenSocket: closing socket " << listenSocket ); int result = close(listenSocket); if (result < 0) { TALK( "NbServerComm::closeListenSocket: error closing socket: " << strerror(errno) ); } listenSocket = -1; } LEAVE( "NbServerComm::closeListenSocket: leave." ); } void NbServerComm::shouldExit() { ENTER( "NbServerComm::shouldExit: enter." ); exitRequest=true; LEAVE( "NbServerComm::shouldExit: leave." ); } bool NbServerComm::mayExit() { ENTER( "NbServerComm::mayExit: enter." ); bool result = true; if(exitRequest==false) result = false; else { closeListenSocket(); // we don't accept requests any more for(int i=0;i0) // active job pending? { if(selector.isWriteSocket(socket)) { TALK( "flushing write job " << i << ", socket " << socket ); bool result = job[i].writePartialMessage(); if (result == true) { TALK( "job done, closing connection " << i << ", socket " << socket ); job[i].closeConnection(); // was in writePartialMessage() -- PB 2003-jun-10 } else { TALK( "connection " << i << "write error, socket " << socket ); } } else { TALK( "job " << i << ": socket not writable, nothing to do." ); } } } // for LEAVE( "NbServerComm::dispatchWriteRequest: leave." ); } // look through all read jobs and load msg buffers // NB: as opposed to write, here is no closeSocket! why?? void NbServerComm::dispatchReadRequest() { ENTER( "NbServerComm::dispatchReadRequest: enter." ); int i; for(i=0;i0) { if(selector.isReadSocket(socket)) { TALK( "NbServerComm::dispatchReadRequest: flush reading job " << i << ", socket " << socket << " -- NO CLOSE!?!" ); // result code was not queried, added it -- PB 2004-jul-16 bool allOk = job[i].readPartialMessage(); if (allOk) { TALK( "connection " << i << " done reading, socket " << socket ); // no close here, connection used for writing afterwards } else // could not read { TALK( "connection " << i << " read error, socket " << socket ); } } } } // for LEAVE( "NbServerComm::dispatchReadRequest: leave." ); } void NbServerComm::connectNewClients() { ENTER( "NbServerComm::connectNewClients: enter." ); // why only for read sockets? because we process _incoming_ requests if(selector.isReadSocket(listenSocket)) { for(int i=0;i