/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community. If not, see .
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see
* or contact Peter Baumann via .
/
/**
* SOURCE: akgnet_nbcomm.cc
*
* MODULE: akg network
* CLASS: NbJob, NbServerJob, NbClientJob, NbCommunicator
*
* COMMENTS:
*
*/
#include
#include
//### NBJob - static members #########################
time_t akg::NbJob::timeOutInterv = 30;
time_t akg::NbJob::currentTime = 0;
void akg::NbJob::setCurrentTime() throw()
{
currentTime = time(NULL);
}
void akg::NbJob::setTimeoutInterval(time_t x) throw()
{
timeOutInterv = x;
}
time_t akg::NbJob::getTimeoutInterval() throw()
{
return timeOutInterv;
}
//####################################################
akg::NbJob::NbJob(FileDescriptor &fd) throw()
:fdRef(fd)
{
status = wks_notdefined;
selectorPtr = NULL;
currentBufferPtr = NULL;
lastActionTime = 0;
}
akg::NbJob::~NbJob() throw()
{
}
akg::NbJob::workingStatus akg::NbJob::getStatus() throw()
{
return status;
}
bool akg::NbJob::isOperationPending() throw()
{
return (status != wks_notdefined &&
status != wks_accepting) ? true:false;
}
bool akg::NbJob::isAccepting() throw()
{
return status == wks_accepting ? true:false;
}
bool akg::NbJob::isReading() throw()
{
return status == wks_reading ? true:false;
}
bool akg::NbJob::isWriting() throw()
{
return status == wks_writing ? true:false;
}
bool akg::NbJob::isProcessing() throw()
{
return status == wks_processing ? true:false;
}
bool akg::NbJob::readPartialMessage() throw()
{
assert(currentBufferPtr != NULL);
action();
int nbytes = currentBufferPtr->read(fdRef);
if(nbytes>0)
{
DBTALK("..read socket("<write(fdRef);
if(nbytes>0)
{
DBTALK("..write socket("<getNotSendedSize()==0)
{
DBTALK("Write ready");
executeOnWriteReady();
return true;
}
}
else
{int saveerrno = fdRef.getErrno();
switch(saveerrno)
{
case EINTR: //DBTALK("EINTR, retry please");
break;
case EAGAIN: //DBTALK("EAGAIN, retry please");
break;
//case 0: DBTALK("Premature partner hang up"); //?? valabil la write
// break;
default: DBTALK("Write error "< currentTime) return false;
DBTALK("Client socket "<clearRead(fdRef());
selectorPtr->clearWrite(fdRef());
fdRef.close();
}
}
void akg::NbJob::action() throw()
{ lastActionTime = currentTime;
}
int akg::NbJob::getSocket() throw()
{ return fdRef();
}
void akg::NbJob::executeOnAccept() throw()
{
}
bool akg::NbJob::setReading() throw()
{
if(selectorPtr == NULL) return false;
selectorPtr->setRead(fdRef());
status = wks_reading;
return true;
}
bool akg::NbJob::setWriting() throw()
{
if(selectorPtr == NULL) return false;
selectorPtr->setWrite(fdRef());
status = wks_writing;
return true;
}
int akg::NbJob::getErrno() throw()
{
return fdRef.getErrno();
}
//##################################################################
akg::NbServerJob::NbServerJob() throw()
:NbJob(serverSocket)
{
}
void akg::NbServerJob::initOnAttach(Selector *pSelector) throw()
{
selectorPtr = pSelector;
}
akg::NbJob::acceptStatus akg::NbServerJob::acceptConnection(ListenSocket& listenSocket) throw()
{
DBTALK("Am intrat in accepting");
assert(currentBufferPtr != NULL);
if(status != wks_accepting) return acs_Iambusy;
action();
if(serverSocket.acceptFrom(listenSocket) == false)
{
int saveerrno = serverSocket.getErrno();
if(saveerrno==EAGAIN) DBTALK("No pending connections");
else DBTALK("Accept error "< 0)
{
DBTALK("Ringing");
// first this, to increase the chance to free a client
dispatchWriteRequest();
connectNewClients();
dispatchReadRequest();
processJobs();
lookForTimeout(); // important!
}
if(rasp == 0)
{
DBTALK("Timeout");
lookForTimeout();
if(executeOnTimeout() == false) return false;
}
if(rasp < 0)
{
DBTALK("select error: "<isProcessing())
{
DBTALK("job "<processRequest();
}
}
}
void akg::NbCommunicator::lookForTimeout() throw()
{
DBTALK("Looking for timeout");
for(int i=0;icleanUpIfTimeout();
}
}
void akg::NbCommunicator::dispatchWriteRequest() throw()
{
DBTALK("Dispatch writing");
int i;
for(i=0;iisWriting())
{
DBTALK("job "<getSocket()<<" is active");
if(selector.isWrite(currentJob->getSocket()))
{
DBTALK("...and may write ");
currentJob->writePartialMessage();
}
}
}
}
void akg::NbCommunicator::dispatchReadRequest() throw()
{
DBTALK("Dispatch reading");
int i;
for(i=0;iisReading())
{
DBTALK("job "<getSocket()<<" is active");
if(selector.isRead(currentJob->getSocket()))
{
DBTALK("... and has message");
currentJob->readPartialMessage();
}
}
}
}
void akg::NbCommunicator::connectNewClients() throw()
{
DBTALK("connect listenSocket="<isAccepting())
{
// we try to connect as much pending connections as possible
status = currentJob->acceptConnection(listenSocket);
if(status == akg::NbJob::acs_nopending ) break;
// there is no pending request,
DBTALK("Connected client "<getSocket());
}
}
}
bool akg::NbCommunicator::executeBeforeSelect() throw()
{
// false means server exit immediately
return true;
}
bool akg::NbCommunicator::executeAfterSelect() throw()
{
// false means server exit immediately
return true;
}
bool akg::NbCommunicator::executeOnTimeout() throw()
{
// false means server exit immediately
return true;
}