/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community. If not, see .
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see
* or contact Peter Baumann via .
*/
/****************************************************************************
*
*
* COMMENTS:
* - FIXME: uses assert() !!! -- PB 2003-nov-22
*
****************************************************************************/
#include
#include
#ifdef AFTERV52
#include
#endif
#include "debug.hh"
#include "raslib/rminit.hh" // for RNP_COMM_TIMEOUT
using namespace rnp;
RnpClientJob::RnpClientJob() throw()
{
}
void RnpClientJob::init(CommBuffer* transmitterBuffer, RnpBaseClientComm* newClientComm) throw()
{
ENTER( "RnpClientJob::init" );
if ( ! ( transmitterBuffer != 0 ) )
{
TALK( "RnpClientJob::init(): warning: assert will fire." );
}
assert(transmitterBuffer != 0);
if ( ! ( newClientComm != 0 ) )
{
TALK( "RnpClientJob::init(): warning: assert will fire." );
}
assert(newClientComm != 0);
rnpReceiver.reset();
answerOk = false;
currentBufferPtr = transmitterBuffer;
clientCommPtr = newClientComm;
invalidFormat = false;
status=wks_notdefined;
LEAVE( "RnpClientJob::init" );
}
void RnpClientJob::clearAnswerBuffer() throw()
{
rnpReceiver.reset();
}
void RnpClientJob::resetState() throw()
{
ENTER( "RnpClientJob::resetState" );
clearConnection();
clientCommPtr->jobIsReady();
status = wks_notdefined;
LEAVE( "RnpClientJob::resetState" );
}
void RnpClientJob::processRequest() throw()
{
ENTER( "RnpClientJob::processRequest" );
answerOk = true;
invalidFormat = false;
resetState();
LEAVE( "RnpClientJob::processRequest" );
}
bool RnpClientJob::validateMessage() throw()
{
ENTER( "RnpClientJob::validateMessage()" );
bool validated = rnpReceiver.validateMessage();
currentBufferPtr = rnpReceiver.getCurrentBuffer();
if( validated == true)
{
status=wks_processing;
LEAVE( "RnpClientJob::validateMessage() -> true" );
return true;
}
if(rnpReceiver.isDiscarding())
{
TALK( "RnpClientJob::validateMessage - discarding message" );
resetState();
answerOk = false;
invalidFormat = true;
}
answerOk = false;
LEAVE( "RnpClientJob::validateMessage() -> false" );
return false;
}
void RnpClientJob::executeOnWriteReady() throw()
{
ENTER( "RnpClientJob::executeOnWriteReady()" );
rnpReceiver.reset();
currentBufferPtr->freeBuffer();
currentBufferPtr = rnpReceiver.getCurrentBuffer();
readyToReadAnswer();
LEAVE( "RnpClientJob::executeOnWriteReady()" );
}
void RnpClientJob::specificCleanUpOnTimeout() throw()
{
ENTER( "RnpClientJob::specificCleanUpOnTimeout()" );
answerOk = false;
resetState();
LEAVE( "RnpClientJob::specificCleanUpOnTimeout()" );
}
void RnpClientJob::executeOnReadError() throw()
{
ENTER( "RnpClientJob::executeOnReadError()" );
answerOk = false;
resetState();
LEAVE( "RnpClientJob::executeOnReadError()" );
}
void RnpClientJob::executeOnWriteError() throw()
{
ENTER( "RnpClientJob::executeOnWriteError()" );
answerOk = false;
resetState();
LEAVE( "RnpClientJob::executeOnWriteError()" );
}
CommBuffer* RnpClientJob::getAnswerBuffer() throw()
{
return rnpReceiver.getMessageBuffer();
}
bool RnpClientJob::isAnswerOk() throw()
{
return answerOk;
}
bool RnpClientJob::isInvalidFormat() throw()
{
return invalidFormat;
}
//###################################################
RnpBaseClientComm::RnpBaseClientComm(RnpQuark theServerType, RnpTransport::CarrierProtocol theProtocol) throw()
{
ENTER( "RnpBaseClientComm::RnpBaseClientComm( serverType="< 0 ) )
{
TALK( "RnpBaseClientComm::RnpBaseClientComm(): warning: assert will fire." );
}
assert(theServerPort > 0);
serverHost = theServerHost;
serverPort = theServerPort;
serverType = theServerType;
carrierProtocol = theProtocol;
initDefaultCommunication();
maxRetry = 0; // # of RE-tries -- PB 2005-aug-31
LEAVE( "RnpBaseClientComm::RnpBaseClientComm()" );
}
RnpBaseClientComm::~RnpBaseClientComm() throw()
{
}
void RnpBaseClientComm::setConnectionParameters(const char* theServerHost, int theServerPort) throw()
{
ENTER( "RnpBaseClientComm::setConnectionParameters( server="< 0 ) )
{
TALK( "RnpBaseClientComm::setConnectionParameters(): warning: assert will fire." );
}
assert(theServerPort > 0);
serverHost = theServerHost;
serverPort = theServerPort;
LEAVE( "RnpBaseClientComm::setConnectionParameters()" );
}
void RnpBaseClientComm::setCarrierProtocol(RnpTransport::CarrierProtocol theProtocol) throw()
{
carrierProtocol = theProtocol;
}
RnpTransport::CarrierProtocol RnpBaseClientComm::getCarrierProtocol() throw()
{
return carrierProtocol;
}
void RnpBaseClientComm::initDefaultCommunication() throw()
{
ENTER( "RnpBaseClientComm::initDefaultCommunication()" );
communicatorPtr = &internalCommunicator;
communicatorPtr->initJobs(1);
communicatorPtr->setTimeout(RNP_COMM_TIMEOUT,0); // defined in raslib/rminit.hh -- PB 2005-sep-09
communicatorPtr->attachJob(clientJob);
// not necessary? transmitterBuffer.allocate(RNP_DEFAULTBUFFERSIZE);
LEAVE( "RnpBaseClientComm::initDefaultCommunication()" );
}
void RnpBaseClientComm::jobIsReady() throw()
{
ENTER( "RnpBaseClientComm::jobIsReady()" );
communicatorPtr->shouldExit();
LEAVE( "RnpBaseClientComm::jobIsReady()" );
}
void RnpBaseClientComm::startRequest(RnpQuark command, int transmitterBufferSize)
{
ENTER( "RnpBaseClientComm::startRequest( command="< 0 ) )
{
TALK( "RnpBaseClientComm::sendRequestGetAnswer(): warning: assert will fire." );
}
assert(serverPort > 0);
encoder.endFragment();
encoder.endMessage();
bool connected = false;
for (int retry = 0; retry < maxRetry+1 && !connected; retry++) // NB: first attempt + RE-tries! -- PB 2005-aug-31
{
connected = clientJob.connectToServer(serverHost,serverPort);
}
if(connected == false)
{
#ifdef AFTERV52
LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): exception - cannot connect to server "< false" );
return false;
}
communicatorPtr->runClient();
if(clientJob.isAnswerOk()== false)
{
#ifdef AFTERV52
LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): exception - answer not OK" );
if(clientJob.isInvalidFormat()) throw RnpInvalidFormatException();
else throw RnpIOException(clientJob.getErrno());
#endif
LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): -> false" );
return false;
}
CommBuffer* receiverBuffer = clientJob.getAnswerBuffer();
decoder.decode(receiverBuffer);
decoder.getFirstFragment();
LEAVE( "RnpBaseClientComm::sendMessageGetAnswer() -> true");
return true;
}
bool RnpBaseClientComm::checkForExceptions()
{
if(decoder.getFragmentType() != Rnp::fgt_Error) return false;
return true;
}
void RnpBaseClientComm::clearAnswer() throw()
{
clientJob.clearAnswerBuffer();
}
void RnpBaseClientComm::setMaxRetry(unsigned int newMaxRetry)
{
maxRetry = newMaxRetry;
}
unsigned int RnpBaseClientComm::getMaxRetry()
{
return maxRetry;
}
//############# Server side ################################################
//#######################################################################
//#######################################################################
RnpServerJob::RnpServerJob() throw()
{
}
void RnpServerJob::init(RnpBaseServerComm* theServerComm) throw()
{
if ( ! ( theServerComm != 0 ) )
{
TALK( "RnpServerJob::init(): warning: assert will fire." );
}
assert(theServerComm != 0);
rnpReceiver.reset();
currentBufferPtr = rnpReceiver.getCurrentBuffer();
serverCommPtr = theServerComm;
status=wks_accepting;
}
void RnpServerJob::processRequest() throw()
{
serverCommPtr->processRequest(currentBufferPtr, &transmiterBuffer, rnpReceiver.getCarrierProtocol(), this);
rnpReceiver.reset();
currentBufferPtr = &transmiterBuffer;
readyToWriteAnswer();
}
bool RnpServerJob::validateMessage() throw()
{
bool validated = false;
if(rnpReceiver.validateMessage() == true)
{
status=wks_processing;
validated = true;
}
if(rnpReceiver.isDiscarding())
{
resetJob();
validated = false;
}
currentBufferPtr = rnpReceiver.getCurrentBuffer();
return validated;
}
void RnpServerJob::executeOnWriteReady() throw()
{
resetJob();
}
void RnpServerJob::executeOnAccept() throw()
{
}
void RnpServerJob::specificCleanUpOnTimeout() throw()
{
// initial era gol, dar...
// clearConnection face cine apeleaza: NbJob::cleanUpIfTimeout()
rnpReceiver.reset();
transmiterBuffer.freeBuffer();
currentBufferPtr = rnpReceiver.getCurrentBuffer();
currentBufferPtr->clearToRead();
status=wks_accepting;
}
void RnpServerJob::executeOnReadError() throw()
{
resetJob();
}
void RnpServerJob::executeOnWriteError() throw()
{
resetJob();
}
void RnpServerJob::resetJob() throw()
{
clearConnection();
rnpReceiver.reset();
transmiterBuffer.freeBuffer();
currentBufferPtr = rnpReceiver.getCurrentBuffer();
currentBufferPtr->clearToRead();
status=wks_accepting;
}
//###################################################
RnpBaseServerComm::RnpBaseServerComm() throw()
{
nrServerJobs = 1;
transmitterBufferSize = RNP_DEFAULTBUFFERSIZE;
communicator = NULL;
}
RnpBaseServerComm::~RnpBaseServerComm() throw()
{
disconnectFromCommunicator();
}
bool RnpBaseServerComm::setServerJobs(int nrOfServerJobs) throw()
{
if(communicator != 0 ) return false;
nrServerJobs = nrOfServerJobs;
return true;
}
int RnpBaseServerComm::countServerJobs() throw()
{
return nrServerJobs;
}
void RnpBaseServerComm::connectToCommunicator(NbCommunicator &theCommunicator)
{ // throws whatever 'new' throws
if ( ! ( communicator == NULL ) )
{
TALK( "RnpServerJob::init(): warning: assert will fire." );
}
assert(communicator == NULL);
communicator = &theCommunicator;
for(int i=0; iinit(this);
communicator->attachJob(*job);
serverJob.push_back(job);
}
}
bool RnpBaseServerComm::disconnectFromCommunicator() throw()
{
if(communicator == NULL) return false;
for(int i=0; ideattachJob(*(serverJob[i]));
delete serverJob[i];
}
serverJob.clear();
communicator = NULL;
return true;
}
RnpServerJob* RnpBaseServerComm::createJob()
{
return new RnpServerJob;
}
void RnpBaseServerComm::setTransmitterBufferSize(int nSize) throw()
{
transmitterBufferSize = nSize;
}
int RnpBaseServerComm::getTransmitterBufferSize() throw()
{
return transmitterBufferSize;
}
void RnpBaseServerComm::processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol protocol, RnpServerJob *callingJob) throw()
{
// use 'callingJob' to get info about the client (hostaddress, etc)
decoder.decode(receiverBuffer);
RnpQuark destServerType = decoder.getDestinationServerType();
Rnp::Endianness desEndianness = decoder.getDesiredEndianness();
// test if servertype matches!
transmiterBuffer->allocate(transmitterBufferSize);
transmiterBuffer->clearToRead();
encoder.setBuffer(transmiterBuffer);
encoder.setFinalEndianness(desEndianness);
encoder.startAnswer(destServerType, protocol);
decoder.getFirstFragment();
bool wasError = false;
for(int fragment=0; fragment < decoder.countFragments(); fragment++)
{
if(wasError == false)
{
try
{
decodeFragment();
}
catch(...)
{
wasError = true;
answerUnknownError();
}
}
else
{
discardFragment();
}
decoder.getNextFragment();
}
encoder.endMessage();
}
const char* RnpBaseServerComm::getNextAsString(RnpQuark parameterType) const
{
decoder.getNextParameter();
//if(decoder.getParameterType != parameterType) throw something
return decoder.getDataAsString();
}
int RnpBaseServerComm::getNextAsInteger(RnpQuark parameterType) const
{
decoder.getNextParameter();
//if(decoder.getParameterType != parameterType) throw something
return decoder.getDataAsInteger();
}
float RnpBaseServerComm::getNextAsFloat(RnpQuark parameterType) const
{
decoder.getNextParameter();
//if(decoder.getParameterType != parameterType) throw something
return decoder.getDataAsFloat();
}
double RnpBaseServerComm::getNextAsDouble(RnpQuark parameterType) const
{
decoder.getNextParameter();
//if(decoder.getParameterType != parameterType) throw something
return decoder.getDataAsDouble();
}
const void* RnpBaseServerComm::getNextAsOpaque(RnpQuark parameterType) const
{
decoder.getNextParameter();
//if(decoder.getParameterType != parameterType) throw something
return decoder.getDataAsOpaque();
}
int RnpBaseServerComm::getCurrentParameterLength() const throw()
{
return decoder.getDataLength();
}
void RnpBaseServerComm::answerSTLException(exception &ex) throw()
{
encoder.startFragment(Rnp::fgt_Error, decoder.getCommand());
encoder.addInt32Parameter(Rnp::ert_StlException, 0);
encoder.addStringParameter(Rnp::erp_whatValue, ex.what());
encoder.endFragment();
}
void RnpBaseServerComm::answerUnknownError() throw()
{
encoder.startFragment(Rnp::fgt_Error, decoder.getCommand());
encoder.addInt32Parameter(Rnp::ert_Unknown, 0);
encoder.endFragment();
}
void RnpBaseServerComm::discardFragment() throw()
{
encoder.startFragment(Rnp::fgt_DiscardedRequest, decoder.getCommand());
encoder.endFragment();
}
void RnpBaseServerComm::startOkAnswer() throw()
{
encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
}
void RnpBaseServerComm::endOkAnswer() throw()
{
encoder.endFragment();
}
void RnpBaseServerComm::communicatorShouldExit() throw()
{
if ( ! ( communicator != NULL ) )
{
TALK( "RnpServerJob::init(): warning: assert will fire." );
}
assert(communicator != NULL);
communicator->shouldExit();
}