/* * This file is part of rasdaman community. * * Rasdaman community is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Rasdaman community is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with rasdaman community. If not, see . * * Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / rasdaman GmbH. * * For more information please see * or contact Peter Baumann via . / /** * SOURCE: akgnet_nbcomm.cc * * MODULE: akg network * CLASS: NbJob, NbServerJob, NbClientJob, NbCommunicator * * COMMENTS: * */ #include #include //### NBJob - static members ######################### time_t akg::NbJob::timeOutInterv = 30; time_t akg::NbJob::currentTime = 0; void akg::NbJob::setCurrentTime() throw() { currentTime = time(NULL); } void akg::NbJob::setTimeoutInterval(time_t x) throw() { timeOutInterv = x; } time_t akg::NbJob::getTimeoutInterval() throw() { return timeOutInterv; } //#################################################### akg::NbJob::NbJob(FileDescriptor &fd) throw() :fdRef(fd) { status = wks_notdefined; selectorPtr = NULL; currentBufferPtr = NULL; lastActionTime = 0; } akg::NbJob::~NbJob() throw() { } akg::NbJob::workingStatus akg::NbJob::getStatus() throw() { return status; } bool akg::NbJob::isOperationPending() throw() { return (status != wks_notdefined && status != wks_accepting) ? true:false; } bool akg::NbJob::isAccepting() throw() { return status == wks_accepting ? true:false; } bool akg::NbJob::isReading() throw() { return status == wks_reading ? true:false; } bool akg::NbJob::isWriting() throw() { return status == wks_writing ? true:false; } bool akg::NbJob::isProcessing() throw() { return status == wks_processing ? true:false; } bool akg::NbJob::readPartialMessage() throw() { assert(currentBufferPtr != NULL); action(); int nbytes = currentBufferPtr->read(fdRef); if(nbytes>0) { DBTALK("..read socket("<write(fdRef); if(nbytes>0) { DBTALK("..write socket("<getNotSendedSize()==0) { DBTALK("Write ready"); executeOnWriteReady(); return true; } } else {int saveerrno = fdRef.getErrno(); switch(saveerrno) { case EINTR: //DBTALK("EINTR, retry please"); break; case EAGAIN: //DBTALK("EAGAIN, retry please"); break; //case 0: DBTALK("Premature partner hang up"); //?? valabil la write // break; default: DBTALK("Write error "< currentTime) return false; DBTALK("Client socket "<clearRead(fdRef()); selectorPtr->clearWrite(fdRef()); fdRef.close(); } } void akg::NbJob::action() throw() { lastActionTime = currentTime; } int akg::NbJob::getSocket() throw() { return fdRef(); } void akg::NbJob::executeOnAccept() throw() { } bool akg::NbJob::setReading() throw() { if(selectorPtr == NULL) return false; selectorPtr->setRead(fdRef()); status = wks_reading; return true; } bool akg::NbJob::setWriting() throw() { if(selectorPtr == NULL) return false; selectorPtr->setWrite(fdRef()); status = wks_writing; return true; } int akg::NbJob::getErrno() throw() { return fdRef.getErrno(); } //################################################################## akg::NbServerJob::NbServerJob() throw() :NbJob(serverSocket) { } void akg::NbServerJob::initOnAttach(Selector *pSelector) throw() { selectorPtr = pSelector; } akg::NbJob::acceptStatus akg::NbServerJob::acceptConnection(ListenSocket& listenSocket) throw() { DBTALK("Am intrat in accepting"); assert(currentBufferPtr != NULL); if(status != wks_accepting) return acs_Iambusy; action(); if(serverSocket.acceptFrom(listenSocket) == false) { int saveerrno = serverSocket.getErrno(); if(saveerrno==EAGAIN) DBTALK("No pending connections"); else DBTALK("Accept error "< 0) { DBTALK("Ringing"); // first this, to increase the chance to free a client dispatchWriteRequest(); connectNewClients(); dispatchReadRequest(); processJobs(); lookForTimeout(); // important! } if(rasp == 0) { DBTALK("Timeout"); lookForTimeout(); if(executeOnTimeout() == false) return false; } if(rasp < 0) { DBTALK("select error: "<isProcessing()) { DBTALK("job "<processRequest(); } } } void akg::NbCommunicator::lookForTimeout() throw() { DBTALK("Looking for timeout"); for(int i=0;icleanUpIfTimeout(); } } void akg::NbCommunicator::dispatchWriteRequest() throw() { DBTALK("Dispatch writing"); int i; for(i=0;iisWriting()) { DBTALK("job "<getSocket()<<" is active"); if(selector.isWrite(currentJob->getSocket())) { DBTALK("...and may write "); currentJob->writePartialMessage(); } } } } void akg::NbCommunicator::dispatchReadRequest() throw() { DBTALK("Dispatch reading"); int i; for(i=0;iisReading()) { DBTALK("job "<getSocket()<<" is active"); if(selector.isRead(currentJob->getSocket())) { DBTALK("... and has message"); currentJob->readPartialMessage(); } } } } void akg::NbCommunicator::connectNewClients() throw() { DBTALK("connect listenSocket="<isAccepting()) { // we try to connect as much pending connections as possible status = currentJob->acceptConnection(listenSocket); if(status == akg::NbJob::acs_nopending ) break; // there is no pending request, DBTALK("Connected client "<getSocket()); } } } bool akg::NbCommunicator::executeBeforeSelect() throw() { // false means server exit immediately return true; } bool akg::NbCommunicator::executeAfterSelect() throw() { // false means server exit immediately return true; } bool akg::NbCommunicator::executeOnTimeout() throw() { // false means server exit immediately return true; }