diff options
author | Constantin Jucovschi <cj@ubuntu.localdomain> | 2009-04-24 07:20:22 -0400 |
---|---|---|
committer | Constantin Jucovschi <cj@ubuntu.localdomain> | 2009-04-24 07:20:22 -0400 |
commit | 8f27e65bddd7d4b8515ce620fb485fdd78fcdf89 (patch) | |
tree | bd328a4dd4f92d32202241b5e3a7f36177792c5f /rnprotocol | |
download | rasdaman-upstream-8f27e65bddd7d4b8515ce620fb485fdd78fcdf89.tar.gz rasdaman-upstream-8f27e65bddd7d4b8515ce620fb485fdd78fcdf89.tar.xz rasdaman-upstream-8f27e65bddd7d4b8515ce620fb485fdd78fcdf89.zip |
Initial commitv8.0
Diffstat (limited to 'rnprotocol')
-rw-r--r-- | rnprotocol/Makefile.am | 47 | ||||
-rw-r--r-- | rnprotocol/rnpclientcomm.cc | 1712 | ||||
-rw-r--r-- | rnprotocol/rnpclientcomm.hh | 346 | ||||
-rw-r--r-- | rnprotocol/rnpclientcomm2.cc | 1226 | ||||
-rw-r--r-- | rnprotocol/rnpcommunication.cc | 702 | ||||
-rw-r--r-- | rnprotocol/rnpcommunication.hh | 345 | ||||
-rw-r--r-- | rnprotocol/rnpembedded.cc | 467 | ||||
-rw-r--r-- | rnprotocol/rnpembedded.hh | 256 | ||||
-rw-r--r-- | rnprotocol/rnprasserver.cc | 72 | ||||
-rw-r--r-- | rnprotocol/rnprasserver.hh | 124 | ||||
-rw-r--r-- | rnprotocol/rnprotocol.cc | 819 | ||||
-rw-r--r-- | rnprotocol/rnprotocol.hh | 462 | ||||
-rw-r--r-- | rnprotocol/rnpserver.cc | 134 | ||||
-rw-r--r-- | rnprotocol/rnpserver.hh | 33 | ||||
-rw-r--r-- | rnprotocol/rnpservercomm.cc | 1344 | ||||
-rw-r--r-- | rnprotocol/rnpservercomm.hh | 157 | ||||
-rw-r--r-- | rnprotocol/srvrasmgrcomm.cc | 213 | ||||
-rw-r--r-- | rnprotocol/srvrasmgrcomm.hh | 67 |
18 files changed, 8526 insertions, 0 deletions
diff --git a/rnprotocol/Makefile.am b/rnprotocol/Makefile.am new file mode 100644 index 0000000..d6a021d --- /dev/null +++ b/rnprotocol/Makefile.am @@ -0,0 +1,47 @@ +# +# This file is part of rasdaman community. +# +# Rasdaman community is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Rasdaman community is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +# +# Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +# rasdaman GmbH. +# +# For more information please see <http://www.rasdaman.org> +# or contact Peter Baumann via <baumann@rasdaman.com>. +# +# MAKEFILE FOR: +# module rnprotocol +# +# COMMENTS: +# +################################################################## + +noinst_LIBRARIES=libservercomm.a libclientcomm.a + +libservercomm_a_SOURCES=rnprotocol.cc rnprotocol.hh \ + rnpembedded.cc rnpembedded.hh \ + rnpcommunication.cc rnpcommunication.hh \ + rnpserver.cc rnpserver.hh \ + srvrasmgrcomm.cc srvrasmgrcomm.hh \ + rnprasserver.cc rnprasserver.hh \ + rnpservercomm.cc rnpservercomm.hh + +libclientcomm_a_SOURCES=rnprotocol.cc rnprotocol.hh \ + rnpembedded.cc rnpembedded.hh \ + rnpcommunication.cc rnpcommunication.hh \ + rnprasserver.cc \ + rnpclientcomm.cc rnpclientcomm2.cc rnpclientcomm.hh + +CLEANFILES=core + diff --git a/rnprotocol/rnpclientcomm.cc b/rnprotocol/rnpclientcomm.cc new file mode 100644 index 0000000..fc58f4b --- /dev/null +++ b/rnprotocol/rnpclientcomm.cc @@ -0,0 +1,1712 @@ +#include "mymalloc/mymalloc.h" +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/************************************************************* + * + * + * PURPOSE: + * + * + * COMMENTS: + * + * Contains the upper level functions, everything related to communication + * is in rnpclientcomm2 + * + ************************************************************/ + +#include <openssl/evp.h> + +#include "rnpclientcomm.hh" +#include "rasdaman.hh" +#include "debug.hh" + + +// waiting time increment upon subsecuqnet connect tries in RNP [msecs] -- PB 2005-sep-09 +const unsigned int RNP_PAUSE_INCREMENT = 100; + + +RnpClientComm::RnpClientComm( const char* nRasmgrHost, int nRasmgrPort) throw( r_Error ) +:RnpBaseClientComm(RnpRasserver::serverID, RnpTransport::crp_Rnp) +{ + ENTER( "RpcClientComm::RnpClientComm( " << nRasmgrHost << "," << nRasmgrPort << " )" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "RnpClientComm(" << nRasmgrHost << "," << nRasmgrPort << ")" ); + + clientID = -1; + + clientParams = new r_Parse_Params(); + clientParams->add("compserver", &serverCompresses, r_Parse_Params::param_type_int); + clientParams->add("exactformat", &exactFormat, r_Parse_Params::param_type_int); + + endianClient = (int)r_Endian::get_endianness(); + + rasmgrHost=(char*)nRasmgrHost; + rasmgrPort=nRasmgrPort; + serverHost[0]=0; + capability[0]=0; + strcpy(identificationString,"rasguest:8e70a429be359b6dace8b5b2500dedb0"); // this is MD5("rasguest"); + + transferFormatParams = 0; + storageFormatParams = 0; + + // experimentally disabled -- PB 2005-sep-01 + // useTurbo = true; + useTurbo = true; + TALK( "useTurbo=" << useTurbo ); + + akg::NbJob::setTimeoutInterval(3600); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "RnpClientComm()" ); + LEAVE( "RpcClientComm::RnpClientComm()" ); +} + +RnpClientComm::~RnpClientComm() throw () +{ + ENTER( "RpcClientComm::~RnpClientComm()" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "~RnpClientComm()" ); + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "~RnpClientComm()" ); + LEAVE( "RpcClientComm::~RnpClientComm()" ); +} + +bool RnpClientComm::effectivTypeIsRNP() throw() +{ + bool retval = true; + ENTER( "RpcClientComm::effectivTypeIsRNP()" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "effectivTypeIsRNP()" ); + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "effectivTypeIsRNP() -> " << retval ); + LEAVE( "RpcClientComm::effectivTypeIsRNP() -> " << retval ); + return retval; +} + +int RnpClientComm::openDB( const char* database ) +{ + int retval = 0; + + ENTER( "RpcClientComm::openDB(" << database << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "openDB(" << database << ")" ); + + strcpy(databaseName,database); + + getFreeServer(false, true); // read only, openDB + + TALK( "openDB: Connected to server: "<<serverHost<<":"<<serverPort ); + setConnectionParameters(serverHost,serverPort); + +/* was commented out, trying it... -- PB 2005-aug-31 +*/ + if(useTurbo) + { + turboOpenDB(databaseName); + } + else + { + executeConnect(); + executeOpenDB(database); + executeCloseDB(); + executeDisconnect(); + } +/* */ + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "openDB() -> " << retval ); + LEAVE( "RpcClientComm::openDB() -> " << retval ); + return retval; +} + +int RnpClientComm::closeDB() +{ + int retval = 0; + ENTER( "RpcClientComm::closeDB()" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "closeDB()" ); + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "closeDB() -> " << retval ); + LEAVE( "RpcClientComm::closeDB() -> " << retval ); + return retval; +} + +int RnpClientComm::createDB( const char* name ) throw(r_Error) +{ + int retval = -1; + ENTER( "RpcClientComm::createDB( " << (name?name:"(null)") << " )" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "createDB()" ); + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "createDB() -> " << retval ); + LEAVE( "RpcClientComm::createDB() -> " << retval ); + return retval; +} + +int RnpClientComm::destroyDB( const char* name ) throw(r_Error) +{ + int retval = -1; + ENTER( "RpcClientComm::destroyDB( " << (name?name:"(null)") << " )" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "destroyDB()" ); + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "destroyDB() -> " << retval ); + LEAVE( "RpcClientComm::destroyDB() -> " << retval ); + return retval; +} + +int RnpClientComm::openTA( unsigned short readOnly ) throw(r_Error) +{ + int retval = 1; + ENTER( "RpcClientComm::openTA(" << readOnly << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "openTA(" << readOnly << ")" ); + + bool rw = (readOnly == 0 ? true : false); + + getFreeServer(rw, false); // readwrite?, not openDB + TALK( "openTA: connected to server "<<serverHost<<":"<<serverPort ); + setConnectionParameters(serverHost,serverPort); + + if(useTurbo) + { + turboBeginTA(rw); + } + else + { + executeConnect(); + executeOpenDB(databaseName); + executeBeginTA(rw); + } + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "openTA() -> " << retval ); + LEAVE( "RpcClientComm::openTA() -> " << retval ); + return retval; +} + +int RnpClientComm::commitTA() throw(r_Error) +{ + int retval = 1; + + ENTER( "RpcClientComm::commitTA()" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "commitTA()" ); + + if(useTurbo) + { + turboCommitTA(); + } + else + { + executeCommitTA(); + executeCloseDB(); + executeDisconnect(); + } + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "commitTA() -> " << retval ); + LEAVE( "RpcClientComm::commitTA() -> " << retval ); + return retval; +} + +int RnpClientComm::abortTA() +{ + int retval = 1; + + ENTER( "RpcClientComm::abortTA()" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "abortTA()" ); + + try + { + if(useTurbo) + { + turboAbortTA(); + } + else + { + executeAbortTA(); + executeCloseDB(); + executeDisconnect(); + } + } + // make it nicer, but we are not allowed to throw anything! Later will change the declaration of the function + catch(...) + { + TALK( "RpcClientComm::abortTA(): caught & ignored exception." ); + } + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "abortTA() -> " << retval ); + LEAVE( "RpcClientComm::abortTA() -> " << retval ); + return retval; +} + +void RnpClientComm::insertMDD( const char* collName, r_GMarray* mar ) throw( r_Error ) +{ + ENTER( "RpcClientComm::insertMDD(" << (collName?collName:"(null)") << "," << (long) mar << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << "," << (long) mar << ")" ); + + checkForRwTransaction(); + + r_Minterval spatdom; + r_Bytes marBytes; + RPCMarray* rpcMarray; + r_Bytes tileSize = 0; + + // get the spatial domain of the r_GMarray + spatdom = mar->spatial_domain(); + + // determine the amount of data to be transferred + marBytes = mar->get_array_size(); + + const r_Base_Type* baseType = mar->get_base_type_schema(); + + // if the MDD is too large for being transfered as one block, it has to be + // divided in tiles + const r_Tiling* til = mar->get_storage_layout()->get_tiling(); + r_Tiling_Scheme scheme = til->get_tiling_scheme(); + if (scheme == r_NoTiling) + tileSize = RMInit::RMInit::clientTileSize; + else + //allowed because the only subclass of tiling without size is no tiling + tileSize = ((const r_Size_Tiling*)til)->get_tile_size(); + + if( RMInit::tiling && marBytes > tileSize ) + { + // initiate composition of MDD at server side + int status = executeStartInsertPersMDD(collName, mar); //rpcStatusPtr = rpcstartinsertpersmdd_1( params, binding_h ); + + switch( status ) + { + case 0: + break; // OK + case 2: + LEAVE( "RpcClientComm::insertMDD() Error: database class undefined." ); + throw r_Error( r_Error::r_Error_DatabaseClassUndefined ); + break; + case 3: + LEAVE( "RpcClientComm::insertMDD() Error: collection element type mismatch." ); + throw r_Error( r_Error::r_Error_CollectionElementTypeMismatch ); + break; + case 4: + LEAVE( "RpcClientComm::insertMDD() Error: type invalid." ); + throw r_Error( r_Error::r_Error_TypeInvalid ); + break; + default: + LEAVE( "RpcClientComm::insertMDD() Error: transfer invalid." ); + throw r_Error( r_Error::r_Error_TransferFailed ); + break; + } + + r_Set< r_GMarray* >* bagOfTiles; + + bagOfTiles = mar->get_storage_layout()->decomposeMDD( mar ); + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "decomposing into " << bagOfTiles->cardinality() << " tiles") + + r_Iterator< r_GMarray* > iter = bagOfTiles->create_iterator(); + r_GMarray *origTile; + + for(iter.reset(); iter.not_done(); iter.advance() ) + { + origTile = *iter; + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "inserting Tile with domain " << origTile->spatial_domain() << ", " << origTile->spatial_domain().cell_count() * origTile->get_type_length() << " bytes") + + getMarRpcRepresentation( origTile, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType ); + + status = executeInsertTile(true, rpcMarray); + + // free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else) + freeMarRpcRepresentation( origTile, rpcMarray ); + + // delete current tile (including data block) + delete origTile; + + if( status > 0 ) + { + throw r_Error( r_Error::r_Error_TransferFailed ); + } + } + + executeEndInsertMDD(true); //rpcendinsertmdd_1( params3, binding_h ); + + // delete transient data + bagOfTiles->remove_all(); + delete bagOfTiles; + } + else // begin: MDD is transferred in one piece + { + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", ", one tile" ) + + getMarRpcRepresentation( mar, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType ); + + int status = executeInsertMDD(collName, mar, rpcMarray); // rpcStatusPtr = rpcinsertmdd_1( params, binding_h ); + + freeMarRpcRepresentation( mar, rpcMarray ); + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok" ) + + switch( status ) + { + case 0: + break; // OK + case 2: + LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_DatabaseClassUndefined ); + break; + case 3: + LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_CollectionElementTypeMismatch ); + break; + case 4: + LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_TypeInvalid ); + break; + default: + LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_TransferFailed ); + break; + } + + } // end: MDD i transferred in one piece + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD()" ); + LEAVE( "RpcClientComm::insertMDD()" ); +} + + +//################################################################ + +r_Ref_Any RnpClientComm::getMDDByOId( const r_OId& oid ) throw( r_Error ) +{ + ENTER( "RpcClientComm::getMDDByOId(" << oid << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDByOId(" << oid << ")" ); + + RMInit::logOut << "Internal error: RnpClientComm::getMDDByOId() not implemented, returning empty r_Ref_Any()." << endl; + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDByOId()" ); + LEAVE( "RpcClientComm::getMDDByOId()" ); + return r_Ref_Any(); +} + +void RnpClientComm::insertColl( const char* collName, const char* typeName, const r_OId& oid ) throw( r_Error ) +{ + ENTER( "RpcClientComm::insertColl(" << collName << "," << typeName << "," << oid << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertColl(" << collName << "," << typeName << "," << oid << ")" ); + + checkForRwTransaction(); + + int status = executeInsertCollection(collName, typeName, oid); + + switch( status ) + { + case 0: + break; //OK + case 1: + LEAVE( "RpcClientComm::insertColl(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_ClientUnknown ); break; + case 2: + LEAVE( "RpcClientComm::insertColl(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_DatabaseClassUndefined ); break; + case 3: + LEAVE( "RpcClientComm::insertColl(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_NameNotUnique ); break; + default: + LEAVE( "RpcClientComm::insertColl(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_General );break; + } + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertColl()" ); + LEAVE( "RpcClientComm::insertColl()" ); +} + + +void RnpClientComm::deleteCollByName( const char* collName ) throw( r_Error ) +{ + ENTER( "RpcClientComm::deleteCollByName(" << (collName?collName:"(null)") << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteCollByName(" << collName << ")" ); + + checkForRwTransaction(); + + startRequest(RnpRasserver::cmd_delcollbyname); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "adding clientID 0x" << hex << clientID << dec ); + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + + helper012d("removeObjFromColl"); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteCollByName()" ); + LEAVE( "RpcClientComm::deleteCollByName()" ); +} + +void RnpClientComm::deleteObjByOId( const r_OId& oid ) throw( r_Error ) +{ + ENTER( "RpcClientComm::deleteObjByOId(" << oid << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteObjByOId(" << oid << ")" ); + + checkForRwTransaction(); + + startRequest(RnpRasserver::cmd_delobjbyoid); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "adding clientID 0x" << hex << clientID << dec ); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation()); + + helper012d("deleteObjByOId"); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteObjByOId()" ); + LEAVE( "RpcClientComm::deleteObjByOId()" ); +} + +void RnpClientComm::removeObjFromColl( const char* collName, const r_OId& oid ) throw ( r_Error ) +{ + ENTER( "RpcClientComm::removeObjFromColl(" << (collName?collName:"(null)") << "," << oid << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "removeObjFromColl(" << collName << "," << oid << ")" ); + + checkForRwTransaction(); + + startRequest(RnpRasserver::cmd_remobjfromcoll); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "adding clientID 0x" << hex << clientID << dec ); + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation()); + + helper012d("removeObjFromColl"); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "removeObjFromColl()" ); + LEAVE( "RpcClientComm::removeObjFromColl()" ); +} + + +r_Ref_Any RnpClientComm::getCollByName( const char* collName ) throw( r_Error ) +{ + ENTER( "RpcClientComm::getCollByName(" << (collName?collName:"(null)") << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName(" << collName << ")" ); + + r_Ref_Any result = executeGetCollByNameOrOId ( collName, r_OId() ); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName()" ); + LEAVE( "RpcClientComm::getCollByName()" ); + return result; +} + +r_Ref_Any RnpClientComm::getCollByOId ( const r_OId& oid ) throw( r_Error ) +{ + ENTER( "RpcClientComm::getCollByOId(" << oid << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByOId(" << oid << ")" ); + + r_Ref_Any result = executeGetCollByNameOrOId ( NULL, oid ); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByOId()" ); + LEAVE( "RpcClientComm::getCollByOId()" ); + return result; +} + +r_Ref_Any RnpClientComm::getCollOIdsByName( const char* name ) throw( r_Error ) +{ + ENTER( "RpcClientComm::getCollOIdsByName(" << (name?name:"(null)") << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName(" << name << ")" ); + + r_Ref_Any result = executeGetCollOIdsByNameOrOId ( name, r_OId() ); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName()" ); + LEAVE( "RpcClientComm::getCollOIdsByName()" ); + return result; +} + +r_Ref_Any RnpClientComm::getCollOIdsByOId ( const r_OId& oid ) throw( r_Error ) +{ + ENTER( "RpcClientComm::getCollOIdsByOId(" << oid << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId(" << oid << ")" ); + + r_Ref_Any result = executeGetCollOIdsByNameOrOId ( NULL, oid ); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId()" ); + LEAVE( "RpcClientComm::getCollOIdsByOId()" ); + return result; +} + + +//------------------------------ +void RnpClientComm::executeQuery( const r_OQL_Query& query, r_Set< r_Ref_Any >& result ) throw( r_Error ) +{ + ENTER( "RpcClientComm::executeQuery(_,_)" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery(_,_)" ); + + int status = executeExecuteQuery( query.get_query(), result ); + + switch(status) + { + case 0: + getMDDCollection( result, 1 ); + break; // 1== isQuery + case 1: + getElementCollection( result ); + break; + //case 2: nothing + default: + RMInit::logOut << "Internal error: RpcClientComm::executeQuery(): illegal status value " << status << endl; + } + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery()" ); + LEAVE( "RpcClientComm::executeQuery()" ); +} + +void RnpClientComm::getMDDCollection( r_Set< r_Ref_Any >& mddColl, unsigned int isQuery ) throw(r_Error) +{ + ENTER( "RpcClientComm::getMDDCollection(_," << isQuery << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCollection(_," << isQuery << ")" ); + + unsigned short tileStatus=0; + unsigned short mddStatus = 0; + + while( mddStatus == 0 ) // repeat until all MDDs are transferred + { + r_Ref<r_GMarray> mddResult; + + // Get spatial domain of next MDD + GetMDDRes* thisResult = executeGetNextMDD(); + + mddStatus = thisResult->status; + + if( mddStatus == 2 ) + { + RMInit::logOut << "Error: getMDDCollection(...) - no transfer collection or empty transfer collection" << endl; + LEAVE( "RpcClientComm::getMDDCollection(): exception, status = " << mddStatus ); + throw r_Error( r_Error::r_Error_TransferFailed ); + } + + tileStatus = getMDDCore(mddResult, thisResult, isQuery); + + // finally, insert the r_Marray into the set + + mddColl.insert_element( mddResult, 1 ); + + free(thisResult->domain); + free(thisResult->typeName); + free(thisResult->typeStructure); + free(thisResult->oid); + delete thisResult; + + if( tileStatus == 0 ) // if this is true, we're done with this collection + break; + + } // end while( mddStatus == 0 ) + + executeEndTransfer(); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCollection()" ); + LEAVE( "RpcClientComm::getMDDCollection()" ); +} + + +// small helper for ... +void freeGetTileRes(GetTileRes *ptr) +{ + ENTER( "RpcClientComm::freeGetTileRes(_)" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "freeGetTileRes(_)" ); + + if(ptr->marray->domain) + free(ptr->marray->domain); + if(ptr->marray->data.confarray_val) + free(ptr->marray->data.confarray_val); + delete ptr->marray; + delete ptr; + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "freeGetTileRes()" ); + LEAVE( "RpcClientComm::freeGetTileRes(_)" ); +} + +unsigned short +RnpClientComm::getMDDCore( r_Ref< r_GMarray > &mdd, GetMDDRes *thisResult, unsigned int isQuery ) throw( r_Error ) +{ + ENTER( "RpcClientComm::getMDDCore(_,_," << isQuery << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCore(_,_," << isQuery << ")" ); + + // create r_Minterval and oid + r_Minterval mddDomain( thisResult->domain ); + r_OId rOId ( thisResult->oid ); + r_GMarray *marray; + + if( isQuery ) + marray = new( r_Database::actual_database, r_Object::transient, rOId ) r_GMarray(); + else + marray = new( r_Database::actual_database, r_Object::read , rOId ) r_GMarray(); + + marray->set_spatial_domain( mddDomain ); + marray->set_type_by_name ( thisResult->typeName ); + marray->set_type_structure( thisResult->typeStructure ); + + r_Data_Format currentFormat = (r_Data_Format)(thisResult->currentFormat); + if (r_Tile_Compression::check_data_format(currentFormat) == 1) + currentFormat = r_Array; + marray->set_current_format( currentFormat ); + + r_Data_Format decompFormat; + + const r_Base_Type *baseType = marray->get_base_type_schema(); + + // Variables needed for tile transfer + GetTileRes* tileRes=0; + unsigned short mddDim = mddDomain.dimension(); // we assume that each tile has the same dimensionality as the MDD + r_Minterval tileDomain; + r_GMarray* tile; // for temporary tile + char* memCopy; + unsigned long memCopyLen; + int tileCntr = 0; + unsigned short tileStatus = 0; + + tileStatus = 2; // call rpcgetnexttile_1 at least once + + while( tileStatus == 2 || tileStatus == 3 ) // while( for all tiles of the current MDD ) + { + tileRes = executeGetNextTile(); + + tileStatus = tileRes->status; + + if( tileStatus == 4 ) + { + freeGetTileRes(tileRes); + RMInit::logOut << "Error: rpcGetNextTile(...) - no tile to transfer or empty transfer collection" << endl; + LEAVE( "RpcClientComm::getMDDCore(): exception, status = " << tileStatus ); + throw r_Error( r_Error::r_Error_TransferFailed ); + } + + // take cellTypeLength for current MDD of the first tile + if( tileCntr == 0 ) + marray->set_type_length( tileRes->marray->cellTypeLength ); + + tileDomain = r_Minterval( tileRes->marray->domain ); + memCopyLen = tileDomain.cell_count() * marray->get_type_length(); // cell type length of the tile must be the same + if (memCopyLen < tileRes->marray->data.confarray_len) + memCopyLen = tileRes->marray->data.confarray_len; // may happen when compression expands + memCopy = new char[ memCopyLen ]; + + // create temporary tile + tile = new r_GMarray(); + tile->set_spatial_domain( tileDomain ); + tile->set_array( memCopy ); + tile->set_array_size( memCopyLen ); + tile->set_type_length( tileRes->marray->cellTypeLength ); + tileCntr++; + + // Variables needed for block transfer of a tile + unsigned long blockOffset = 0; + unsigned short subStatus = 3; + currentFormat = (r_Data_Format)(tileRes->marray->currentFormat); + + switch( tileStatus ) + { + case 3: // at least one block of the tile is left + + // Tile arrives in several blocks -> put them together + concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset); + freeGetTileRes(tileRes); + + tileRes = executeGetNextTile();//rpcgetnexttile_1( &clientID, binding_h ); + + subStatus = tileRes->status; + + if( subStatus == 4 ) + { + freeGetTileRes(tileRes); + LEAVE( "RpcClientComm::getMDDCore(): exception, status = " << tileStatus << ", subStatus = " << subStatus ); + throw r_Error( r_Error::r_Error_TransferFailed ); + } + + concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset); + freeGetTileRes(tileRes); + + tileStatus = subStatus; + break; + + default: // tileStatus = 0,3 last block of the current tile + + // Tile arrives as one block. + concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset); + freeGetTileRes(tileRes); + + break; + } + + // uncompress the tile if necessary + decompFormat = doTransferDecompression( tile, baseType, currentFormat, blockOffset ); + + char* marrayData = NULL; + // Now the tile is transferred completely, insert it into current MDD + if( tileStatus < 2 && tileCntr == 1 && (tile->spatial_domain() == marray->spatial_domain())) + { + // MDD consists of just one tile that is the same size of the mdd + + // simply take the data memory of the tile + marray->set_array( tile->get_array() ); + marray->set_array_size( tile->get_array_size() ); + tile->set_array( 0 ); + } + else + { + // MDD consists of more than one tile or the tile does not cover the whole domain + + r_Bytes size = mddDomain.cell_count() * marray->get_type_length(); + + if( tileCntr == 1 ) + { + // allocate memory for the MDD + marrayData = new char[ size ]; + memset(marrayData, 0, size); + + marray->set_array( marrayData ); + } + else + marrayData = marray->get_array(); + + + // copy tile data into MDD data space (optimized, relying on the internal representation of an MDD ) + char* mddBlockPtr; + char* tileBlockPtr = tile->get_array(); + unsigned long blockCells = tileDomain[tileDomain.dimension()-1].high()-tileDomain[tileDomain.dimension()-1].low()+1; + unsigned long blockSize = blockCells * marray->get_type_length(); + unsigned long blockNo = tileDomain.cell_count() / blockCells; + + for( unsigned long blockCtr = 0; blockCtr < blockNo; blockCtr++ ) + { + mddBlockPtr = marrayData + marray->get_type_length()*mddDomain.cell_offset( tileDomain.cell_point( blockCtr * blockCells ) ); + memcpy( (void*)mddBlockPtr, (void*)tileBlockPtr, (size_t)blockSize ); + tileBlockPtr += blockSize; + } + + // former non-optimized version + // for( i=0; i<tileDomain->cell_count(); i++ ) + // (*marray)[tileDomain->cell_point( i )] = (*tile)[tileDomain->cell_point( i )]; + + marray->set_array_size( size ); + } + + // delete temporary tile + delete tile; + + } // end while( MDD is not transferred completely ) + + + mdd = r_Ref<r_GMarray>( marray->get_oid(), marray ); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCore() -> " << tileStatus ); + LEAVE( "RpcClientComm::getMDDCore() -> " << tileStatus ); + return tileStatus; +} + + +int RnpClientComm::concatArrayData( const char *source, unsigned long srcSize, char *&dest, unsigned long &destSize, unsigned long &destLevel ) +{ + ENTER( "RpcClientComm::concatArrayData( 0x" << hex << (unsigned long) source << dec << "," << srcSize << ",_,_,_ )" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "concatArrayData(" << source << "," << srcSize << ",_,_,_)" ); + + if (destLevel + srcSize > destSize) + { + // need to extend dest + unsigned long newSize = destLevel + srcSize; + char *newArray; + + // allocate a little extra if we have to extend + newSize = newSize + newSize / 16; + + // RMDBGOUT( 1, "RpcClientComm::concatArrayData(): need to extend from " << destSize << " to " << newSize ); + + if ((newArray = new char[newSize]) == NULL) + { + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "concatArrayData() -> " << -1 ); + LEAVE( "RpcClientComm::concatArrayData() -> -1" ); + return -1; + } + + memcpy(newArray, dest, destLevel); + delete [] dest; + dest = newArray; + destSize = newSize; + } + + memcpy(dest + destLevel, source, srcSize); + destLevel += srcSize; + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "concatArrayData() -> " << 0 ); + LEAVE( "RpcClientComm::concatArrayData() -> 0" ); + return 0; +} + +r_Data_Format +RnpClientComm::doTransferDecompression( r_GMarray* tile, const r_Base_Type *type, r_Data_Format fmt, unsigned long size ) +{ + ENTER( "RpcClientComm::doTransferDecompression(...) tile dom:" << tile->spatial_domain() << " array size:" << tile->get_array_size() << " type size:" << tile->get_type_length()); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile dom:" + << tile->spatial_domain() << " array size:" << tile->get_array_size() + << " type size:" << tile->get_type_length()); + + if (fmt != r_Array) + { + r_Tile_Compression *engine = NULL; + char *newTileData = NULL; + r_Data_Format newFormat; + + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) decompressing from " + << fmt << ", " << size << "bytes"); + + try + { + r_Storage_Man_CPP sman; + engine = r_Tile_Compression::create( fmt, tile->spatial_domain(), type ); + engine->set_storage_handler(sman); + newTileData = (char*)(engine->decompress(tile->get_array(), size)); + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", + "doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " OK"); + } + catch (r_Error &err) + { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", + "doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " FAILED"); + RMInit::logOut << "RpcClientComm::doTransferDecompression() Error decompressing data, assuming raw" << endl; + } + + newFormat = engine->get_decomp_format(); + + if (newTileData != NULL) + { + delete [] tile->get_array(); + tile->set_array(newTileData); + tile->set_array_size(tile->spatial_domain().cell_count()*tile->get_type_length()); + } + else + newFormat = fmt; + + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile newFmt:" + << newFormat << " dom:" << tile->spatial_domain() + << " array size:" << tile->get_array_size() + << " type size:" << tile->get_type_length()); + + // ... also make sure the decoded format is really raw array data (r_Array) + if ((endianClient != endianServer) && (newFormat == r_Array)) + { + // if compression engine already handles endianness we mustn't change again + if (!engine->converts_endianness()) { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for " + << fmt << " endianness changed from " + << (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient); + changeEndianness(tile, type); + } + } + + if (engine != NULL) + delete engine; + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << newFormat); + LEAVE( "RpcClientComm::doTransferDecompression() -> " << newFormat ); + return newFormat; + } + + if (endianClient != endianServer) + { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for " + << fmt << " endianness changed from " + << (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient); + changeEndianness(tile, type); + } + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << r_Array); + LEAVE( "RpcClientComm::doTransferDecompression() -> " << r_Array ); + return r_Array; +} + + +void RnpClientComm::getElementCollection( r_Set< r_Ref_Any >& resultColl ) throw(r_Error) +{ + ENTER( "RpcClientComm::getElementCollection()" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection(_)" ); + + unsigned short rpcStatus = 0; + + TALK( "got set of type " << resultColl.get_type_structure() ); + + while( rpcStatus == 0 ) // repeat until all elements are transferred + { + GetElementRes* thisResult = executeGetNextElement(); + + rpcStatus = thisResult->status; + + if( rpcStatus == 2 ) + { + RMInit::logOut << "Error: getElementCollection(...) - no transfer collection or empty transfer collection" << endl; + LEAVE( "RpcClientComm::getElementCollection(): exception: rpcStatus = " << rpcStatus ); + throw r_Error( r_Error::r_Error_TransferFailed ); + } + // create new collection element, use type of collection resultColl + r_Ref_Any element; + const r_Type* elementType = resultColl.get_element_type_schema(); + + // convert the endianness before creating the new element! + if (endianClient != endianServer) + { + + if (endianClient == 0) + elementType->convertToBigEndian(thisResult->data.confarray_val, 1); + else + elementType->convertToLittleEndian(thisResult->data.confarray_val, 1); + } + + switch( elementType->type_id() ) + { + case r_Type::BOOL: + case r_Type::CHAR: + case r_Type::OCTET: + case r_Type::SHORT: + case r_Type::USHORT: + case r_Type::LONG: + case r_Type::ULONG: + case r_Type::FLOAT: + case r_Type::DOUBLE: + element = new r_Primitive( thisResult->data.confarray_val, (r_Primitive_Type*) elementType ); + r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element ); + break; + + case r_Type::COMPLEXTYPE1: + case r_Type::COMPLEXTYPE2: + element = new r_Complex(thisResult->data.confarray_val, (r_Complex_Type *)elementType); + r_Transaction::actual_transaction->add_object_list(r_Transaction::SCALAR, (void *)element); + break; + + case r_Type::STRUCTURETYPE: + element = new r_Structure( thisResult->data.confarray_val, (r_Structure_Type*) elementType ); + r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element ); + break; + + case r_Type::POINTTYPE: + { + char* stringRep = new char[thisResult->data.confarray_len+1]; + strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len ); + stringRep[thisResult->data.confarray_len] = '\0'; + + r_Point* typedElement = new r_Point( stringRep ); + element = typedElement; + r_Transaction::actual_transaction->add_object_list( r_Transaction::POINT, (void*) typedElement ); + delete [] stringRep; + } + break; + + case r_Type::SINTERVALTYPE: + { + char* stringRep = new char[thisResult->data.confarray_len+1]; + strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len ); + stringRep[thisResult->data.confarray_len] = '\0'; + + r_Sinterval* typedElement = new r_Sinterval( stringRep ); + element = typedElement; + r_Transaction::actual_transaction->add_object_list( r_Transaction::SINTERVAL, (void*) typedElement ); + delete [] stringRep; + } + break; + + case r_Type::MINTERVALTYPE: + { + char* stringRep = new char[thisResult->data.confarray_len+1]; + strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len ); + stringRep[thisResult->data.confarray_len] = '\0'; + + r_Minterval* typedElement = new r_Minterval( stringRep ); + element = typedElement; + r_Transaction::actual_transaction->add_object_list( r_Transaction::MINTERVAL, (void*) typedElement ); + delete [] stringRep; + } + break; + + case r_Type::OIDTYPE: + { + char* stringRep = new char[thisResult->data.confarray_len+1]; + strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len ); + stringRep[thisResult->data.confarray_len] = '\0'; + + r_OId* typedElement = new r_OId( stringRep ); + element = typedElement; + r_Transaction::actual_transaction->add_object_list( r_Transaction::OID, (void*) typedElement ); + delete [] stringRep; + } + break; + default: + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection(...) bad element typeId" << elementType->type_id()) + break; + } + + TALK( "got an element" ); + + // insert element into result set + resultColl.insert_element( element, 1 ); + + delete[] thisResult->data.confarray_val; + delete thisResult; + } + + executeEndTransfer(); + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection()" ); + LEAVE( "RpcClientComm::getElementCollection()" ); +} + +//################################################################################## +void RnpClientComm::executeQuery( const r_OQL_Query& query ) throw( r_Error ) +{ + ENTER( "RnpClientComm::executeQuery(_)" ); + + checkForRwTransaction(); + + unsigned short status; + + // + // Send MDD constants to the server. + // + if( query.get_constants() ) + { + r_Set< r_GMarray* >* mddConstants = (r_Set< r_GMarray* >*)query.get_constants(); + + if(executeInitUpdate() < 0) // error would be nicer + { + LEAVE( "Error: RnpClientComm::executeQuery(): update query initialization failed." ); + return; + } + + r_Iterator<r_GMarray*> iter = mddConstants->create_iterator(); + + for( iter.reset(); iter.not_done(); iter++ ) + { + r_GMarray* mdd = *iter; + + const r_Base_Type* baseType = mdd->get_base_type_schema(); + + if( mdd ) + { + status = executeStartInsertTransMDD(mdd); + switch( status ) + { + case 0: + break; // OK + case 2: + LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_DatabaseClassUndefined ); + break; + case 3: + LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_TypeInvalid ); + break; + default: + LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_TransferFailed ); + break; + } + + r_Set< r_GMarray* >* bagOfTiles; + + bagOfTiles = mdd->get_storage_layout()->decomposeMDD( mdd ); + + r_Iterator< r_GMarray* > iter2 = bagOfTiles->create_iterator(); + + for(iter2.reset(); iter2.not_done();iter2.advance()) + { + RPCMarray* rpcMarray; + + r_GMarray *origTile = *iter2; + + getMarRpcRepresentation( origTile, rpcMarray, mdd->get_storage_layout()->get_storage_format(), baseType ); + + status = executeInsertTile(false, rpcMarray); + + // free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else) + freeMarRpcRepresentation( origTile, rpcMarray ); + + // delete current tile (including data block) + delete origTile; + + if( status > 0 ) + { + LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_TransferFailed ); + } + } + + bagOfTiles->remove_all(); + delete bagOfTiles; + + executeEndInsertMDD(false); + } + } + } + + executeExecuteUpdateQuery(query.get_query()); + LEAVE( "RnpClientComm::executeQuery(_)" ); +} + + +// helper functions +void +RnpClientComm::getMarRpcRepresentation( const r_GMarray* mar, RPCMarray*& rpcMarray, + r_Data_Format initStorageFormat, + const r_Base_Type *baseType) +{ + ENTER( "RpcClientComm::getMarRpcRepresentation(...)"); + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)"); + + // allocate memory for the RPCMarray data structure and assign its fields + rpcMarray = (RPCMarray*)mymalloc( sizeof(RPCMarray) ); + rpcMarray->domain = mar->spatial_domain().get_string_representation(); + rpcMarray->cellTypeLength = mar->get_type_length(); + + void* arrayData = NULL; + r_ULong arraySize=0; + + if (initStorageFormat == r_Array) + { + if (transferFormat != r_Array) + { + r_Tile_Compression *engine = NULL; + + try + { + r_Storage_Man_CPP sman; + engine = r_Tile_Compression::create(transferFormat, mar->spatial_domain(), baseType); + engine->set_storage_handler(sman); + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) compress with " << engine->get_name()) + if ((endianClient != endianServer) && (!engine->converts_endianness())) + { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for " + << transferFormat << " endianness changed before compression from " << (r_Endian::r_Endianness)endianClient + << " to " << (r_Endian::r_Endianness) endianServer); + char *endianData = new char[mar->get_array_size()]; + changeEndianness(mar, endianData, baseType); + arrayData = engine->compress(endianData, arraySize, transferFormatParams); + delete [] endianData; + endianData=NULL; + } + else + { + arrayData = engine->compress(mar->get_array(), arraySize, transferFormatParams); + } + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compression returned " << arrayData << " (" << arraySize << " bytes)") +/* void *testData = engine->decompress(arrayData, arraySize); + cout << "Decompression worked " << ((memcmp(mar->get_array(), testData, (mar->get_type_length()) * (mar->spatial_domain().cell_count())) == 0) ? "OK" : "!NOT!") << endl; + delete [] testData; +*/ + + // ForWiss: revert to uncompressed data if the compressed data is larger + // coman: and introduced a bug of endianess ... + if (arraySize > mar->get_array_size()) + { + RMInit::logOut << "RpcClientComm::getMarRpcRepresentation(...) Warning: overriding compression setting(" + << transferFormat << ") to " << r_Array + << " because compressed size( " << arraySize + << " bytes) > uncompressed size( " << mar->get_array_size() << " bytes)" << endl; + delete [] arrayData; + arrayData = NULL; + } + } + catch (r_Error &err) + { + RMInit::logOut << "RpcClientComm::getMarRpcRepresentation() Error: Unsupported data format " << transferFormat << endl; + } + if (engine != NULL) + delete engine; + } + else + { + if (endianClient != endianServer) + { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for " + << transferFormat << " endianness changed from " + << (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer); + arraySize = mar->get_array_size(); + arrayData = new char[arraySize]; + changeEndianness(mar, arrayData, baseType); + } + } + } + + if (arrayData == NULL) + { + //error in compression or compression inefficient + rpcMarray->currentFormat = initStorageFormat; + rpcMarray->data.confarray_len = mar->get_array_size(); + if (endianClient != endianServer) + { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for " + << transferFormat << "endianness changed from " + << (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer + << " because compression " << transferFormat << " failed" ); + arrayData = new char[arraySize]; + changeEndianness(mar, arrayData, baseType); + rpcMarray->data.confarray_val = (char*)(arrayData); + } + else + { + rpcMarray->data.confarray_val = (char*)(mar->get_array()); + } + } + else + { + if (arraySize != mar->get_array_size()) + { + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compressed to " << (100.0*arraySize) / mar->get_array_size() << "%") + } + rpcMarray->currentFormat = transferFormat; + rpcMarray->data.confarray_len = arraySize; + rpcMarray->data.confarray_val = (char*)arrayData; + } + rpcMarray->storageFormat = storageFormat; + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)"); + LEAVE( "RpcClientComm::getMarRpcRepresentation()"); +} + + +void +RnpClientComm::freeMarRpcRepresentation( const r_GMarray* mar, RPCMarray* rpcMarray ) +{ + ENTER( "RnpClientComm::freeMarRpcRepresentation(_,_)" ); + + if (rpcMarray->data.confarray_val != ((r_GMarray*)mar)->get_array()) + { + delete[] rpcMarray->data.confarray_val; + } + free( rpcMarray->domain ); + free( rpcMarray ); + + LEAVE( "RnpClientComm::freeMarRpcRepresentation()" ); +} + + +//######################################################################### +r_OId RnpClientComm::getNewOId( unsigned short objType ) throw(r_Error) +{ + return executeGetNewOId(objType); +} + +unsigned short RnpClientComm::getObjectType( const r_OId& oid ) throw(r_Error) +{ + return executeGetObjectType(oid); +} + +char* RnpClientComm::getTypeStructure( const char* typeName, r_Type_Type typeType ) throw(r_Error) +{ + return executeGetTypeStructure( typeName, typeType ); +} + +int RnpClientComm::setStorageFormat( r_Data_Format format, const char *formatParams) +{ + ENTER( "RnpClientComm::setStorageFormat( format=" << format << ", formatParams=" << (formatParams?formatParams:"(null)") << " )" ); + + storageFormat = format; + + if (storageFormatParams != NULL) + { + free(storageFormatParams); + storageFormatParams = NULL; + } + if (formatParams != NULL) + { + storageFormatParams = (char*)mymalloc(strlen(formatParams) + 1); + strcpy(storageFormatParams, formatParams); + // extract ``compserver'' if present + clientParams->process(storageFormatParams); + } + + int result = executeSetFormat( false, format, formatParams); + + LEAVE( "RnpClientComm::setStorageFormat() -> " << result ); + return result; +} + +int RnpClientComm::setTransferFormat( r_Data_Format format, const char* formatParams) +{ + ENTER( "RnpClientComm::setTransferFormat( format=" << format << ", formatParams=" << (formatParams?formatParams:"(null)") << " )" ); + + transferFormat = format; + + if (transferFormatParams != NULL) + { + free(transferFormatParams); + transferFormatParams = NULL; + } + if (formatParams != NULL) + { + transferFormatParams = (char*)mymalloc(strlen(formatParams)+1); + strcpy(transferFormatParams, formatParams); + // extract ``exactformat'' if present + clientParams->process(transferFormatParams); + } + + int result = executeSetFormat( true, format, formatParams); + LEAVE( "RnpClientComm::setTransferFormat() -> " << result ); + return result; +} + +const char* RnpClientComm::getExtendedErrorInfo() throw(r_Error) +{ + // This function shouldn't be called for RNP protocol + static char *errorInfo = new char[30]; + strcpy(errorInfo,"No info"); + + return errorInfo; +} + +const char* RnpClientComm::getServerName() +{ + return serverHost; +} + +void RnpClientComm::setUserIdentification(const char *userName, const char *plainTextPassword) +{ + ENTER( "RnpClientComm::setUserIdentification( userName=" << (userName?userName:"(null)") << ", plainTextPassword=" << (plainTextPassword?plainTextPassword:"(null)") << " )" ); + + char digest[33]=""; + messageDigest(plainTextPassword,digest,"MD5"); + sprintf(identificationString,"%s:%s",userName,digest); + + LEAVE( "RnpClientComm::setUserIdentification()" ); +} + +unsigned long RnpClientComm::getClientID() const +{ + return clientID; +} + +void RnpClientComm::triggerAliveSignal(){} + +void RnpClientComm::sendAliveSignal(){} + +//############# helper functions ################### +int RnpClientComm::messageDigest(const char *input,char *output,const char *mdName) +{ + + EVP_MD_CTX mdctx; + const EVP_MD *md; + unsigned int md_len, i; + unsigned char md_value[100]; + + OpenSSL_add_all_digests(); + + md = EVP_get_digestbyname(mdName); + + if(!md) + return 0; + + EVP_DigestInit(&mdctx, md); + EVP_DigestUpdate(&mdctx,input, strlen(input)); + EVP_DigestFinal(&mdctx, md_value, &md_len); + + for(i = 0; i < md_len; i++) + sprintf(output+i+i,"%02x", md_value[i]); + + return strlen(output); +} + + +void RnpClientComm::setMaxRetry(unsigned int newMaxRetry) +{ + RMInit::clientcommMaxRetry = newMaxRetry; +} + +unsigned int RnpClientComm::getMaxRetry() +{ + return RMInit::clientcommMaxRetry; +} + +// aux function: sleep incrementally, with subsecond precision +static void pause(int retryCount) +{ + // changed PB 2005-sep-09 + // was: unsigned int millisec = 50 + retryCount * 50; + unsigned int millisec = RNP_PAUSE_INCREMENT * (retryCount + 1); + // if(millisec > 1000) + // millisec = 1000; + + timeval tv; + tv.tv_sec = millisec / 1000; + tv.tv_usec = millisec * 1000; + select(0,NULL,NULL,NULL,&tv); +} + +int RnpClientComm::getFreeServer(bool readwrite, bool openDB) +{ + ENTER( "RnpClientComm::getFreeServer( readwrite=" << readwrite << ", openDB=" << openDB << " )" ); + + // to avoid nested retries, only this loop is kept, + // the one in executeGetFreeServer() has been removed -- PB 2005-aug-31 + for ( int retryCount=0; ; retryCount++ ) + { + try + { + executeGetFreeServer(readwrite, openDB); + + // if no error, we have the server, so break + break; + } + catch(r_Error &e) + { + int errorno = e.get_errorno(); + RMInit::logOut << "Error: Connection to rasdaman server failed with " << errorno << ": " << e.what() << endl; + // for these errors a retry makes sense, as long as we haven't reached the retry limit: + if ( ( errorno==801 || errorno==805 || errorno==806 ) && retryCount < RMInit::clientcommMaxRetry ) + { + TALK( " retry="<<retryCount ); + pause(retryCount); // waiting with incremental delays + } + else + { + LEAVE( "RnpClientComm::getFreeServer(): exception " << errorno << ": " << e.what() ); + throw; + } + } + } + + LEAVE( "RnpClientComm::getFreeServer() -> 1" ); + return 1; +} + +#define MAXMSG 512 +int RnpClientComm::executeGetFreeServer(bool readwrite, bool openDB) +{ + ENTER( "RnpClientComm::executeGetFreeServer( readwrite=" << readwrite << ", openDB=" << openDB << " )" ); + + static char myRasmgrID[100]=""; + if(myRasmgrID[0]==0) + { + unsigned int hostid = gethostid(); + unsigned int pid = getpid(); + sprintf(myRasmgrID,"%u:%u",hostid,pid); + } + char message[MAXMSG]; + char header[MAXMSG]; + char body[MAXMSG]; + + if (openDB) + sprintf(header,"POST getfreeserver2 HTTP/1.1\r\nAccept: text/plain\r\nUserAgent: RasClient/1.0\r\nAuthorization: ras %s\r\nContent-length:",identificationString); + else + sprintf(header,"POST getfreeserver HTTP/1.1\r\nAccept: text/plain\r\nUserAgent: RasClient/1.0\r\nAuthorization: ras %s\r\nContent-length:",identificationString); + sprintf(body,"%s RNP %s %s",databaseName,(readwrite ? "rw":"ro"), myRasmgrID); + sprintf(message,"%s %d\r\n\r\n%s",header,strlen(body)+1,body); + + struct protoent* getprotoptr = getprotobyname("tcp"); + + struct hostent *hostinfo = gethostbyname(rasmgrHost); + if(hostinfo==NULL) + { + RMInit::logOut << "Error locating rasmgr on host " << rasmgrHost <<" ("<<strerror(errno)<<')'<<endl; + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, unable to contact rasmgr." ); + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + sockaddr_in internetSocketAddress; + + internetSocketAddress.sin_family=AF_INET; + internetSocketAddress.sin_port=htons(rasmgrPort); + internetSocketAddress.sin_addr=*(struct in_addr*)hostinfo->h_addr; + + int sock; + bool ok = false; + // old comment by Walter Schatz: "this has to be 5000 or so, now that counter is 120 default (later we'll make this better)" + // new comment by PB 2005-aug-31: + // - these retries + the ones in getFreeServer lead to max*max*40 = 400,000 retries! one loop should be enough. + // - there are unmotivated factors at every step (cf pause()), let's do just one wait loop; + // all loop code is commented out with prefix "NOLOOP" + // X: int retry; + // X: for(retry=0;retry<RMInit::clientcommMaxRetry * 40;retry++) + // X: { + sock=socket(PF_INET,SOCK_STREAM,getprotoptr->p_proto); + if(sock<0) + { + // X: if(retry==0) + RMInit::logOut << "Error: cannot open socket to rasmgr (" << strerror(errno) << ")." << endl; + // X: sleep(RMInit::clientcommSleep); + // X: continue; + } + + else if(connect(sock,(struct sockaddr*)&internetSocketAddress,sizeof(internetSocketAddress)) < 0) + { + // X: if(retry==0) + // X: RMInit::logOut <<"getFreeServer: Connection to rasmgr failed: "<<strerror(errno)<<endl; + close(sock); + // X: sleep(RMInit::clientcommSleep); + // X: continue; + } + + TALK( "Socket="<<sock<<" protocol(tcp)="<<getprotoptr->p_proto ); + ok = true; + // X: break; + // X:} + // X:if(retry) + // X: RMInit::logOut << "Warning: tried connecting " << retry+1 << " times " <<endl; + + if( !ok ) + { + // X: RMInit::logOut << "Error: Giving up on connecting, sorry, after this number of tries: " << retry+1 <<endl; + close(sock); + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, giving up on getting a free server." ); + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + // --- from here on we have a connection --- + + //write_to_server + // TALK( "want to write this message to rasmgr: " << message ); // message is said to be 0-terminated + int nbytes=writeWholeMessage(sock,message,strlen(message)+1); + + if(nbytes<0) + { + RMInit::logOut << "Error: cannot send message to rasmgr on host " << rasmgrHost << " ("<<strerror(errno)<<')' << endl; + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, error writing message to rasmgr." ); + close(sock); + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + //wait and read answer + nbytes=readWholeMessage(sock,message,MAXMSG); + close(sock); + + if(nbytes<0) + { + RMInit::logOut << "Error reading answer from rasmgr on host " << rasmgrHost <<" ("<<strerror(errno)<<')'<<endl; + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, error reading answer from rasmgr." ); + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + // TALK( "received this message: " << message ); // quite verbose! + + // and now, analyze answer + // first line is: HTTP/1.1 code answertext(CRLF) + char *p=strstr(message," "); //looks for the first white space to locate status-code + + int statusCode=strtoul( p, (char **)NULL, 10); + + char *pEOL=strstr(p,"\r\n"); // locate CRLF + if(!pEOL) + { + RMInit::logOut << "Error: Invalid answer from rasmgr." << endl; + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, invalid answer from rasmgr." ); + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + if(statusCode==200) + { // It's OK + char *addr = strstr(message,"\r\n\r\n")+4; //looks for the address of server + + addr = strtok(addr," \r\n\t"); //isolates the RasMGR host name + + char *portString = strtok(NULL," \r\n\t"); //looks for the port, sended as string + + char *capab = strtok(NULL," \r\n\t"); + + if(portString && addr && capab) + { + strcpy(serverHost,addr); + serverPort = strtoul( portString, (char **)NULL, 0); + strcpy(capability,capab); + TALK( "RnpClientComm::executeGetFreeServer(): got server " << serverHost << ":" << serverPort << ", capability: " << capability ); + } + else + { + RMInit::logOut << "Error: Invalid answer from rasmgr." << endl; + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, server invalid." ); + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + } + else + { + char *errText = strstr(message,"\r\n\r\n")+4; + RMInit::logOut << "Communication error: "<<errText<< endl; + + int errorCode = strtoul(errText, (char **)NULL, 0); + + switch(errorCode) + { + case 802: + case 803: + case 804: + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, errorCode = " << errorCode ); + throw r_Error( r_Error::r_Error_AccesDenied,errorCode); + break; + case 801: + case 805: + case 806: + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, errorCode = " << errorCode ); + throw r_Error( r_Error::r_Error_SystemOverloaded,errorCode); + break; + case 807: + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, errorCode = " << errorCode ); + throw r_Error( r_Error::r_Error_DatabaseUnknown,errorCode); + break; + default : + LEAVE( "RnpClientComm::executeGetFreeServer(): exception, errorCode = " << errorCode ); + throw r_Error( r_Error::r_Error_General,808 ); + break; + } + } + + LEAVE( "RnpClientComm::executeGetFreeServer() -> 1" ); + return 1; +} + +int RnpClientComm::readWholeMessage(int socket,char *destBuffer,int buffSize) +{ + // we read what is comming in until we encounter a '\0' + // this is our end-sign. + int totalLength=0; + int redNow; + while(1) + { + redNow = read(socket,destBuffer+totalLength,buffSize-totalLength); + if(redNow == -1) + { + if(errno == EINTR) + continue; // read was interrupted by signal + return -1; // another error + } + totalLength+=redNow; + + if(destBuffer[totalLength-1]==0) + break; // THE END + } + + TALK( "RnpClientComm::readWholeMessage(): read " << totalLength << " bytes, buffer is: " << destBuffer ); + return totalLength; +} + +int RnpClientComm::writeWholeMessage(int socket,char *destBuffer,int buffSize) +{ + // we write the whole message, including the ending '\0', which is already in + // the buffSize provided by the caller + int totalLength=0; + int writeNow; + while(1) + { + writeNow = write(socket,destBuffer+totalLength,buffSize-totalLength); + if(writeNow == -1) + { + if(errno == EINTR) + continue; // read was interrupted by signal + return -1; // another error + } + totalLength+=writeNow; + + if( totalLength==buffSize ) + break; // THE END + } + + TALK( "RnpClientComm::writeWholeMessage(): wrote " << totalLength << " bytes, buffer is: " << destBuffer ); + return totalLength; +} + + +void RnpClientComm::checkForRwTransaction() throw (r_Error) +{ + r_Transaction *trans = r_Transaction::actual_transaction; + if( trans == 0 || trans->get_mode() == r_Transaction::read_only ) + { + TALK( "RnpClientComm::checkForRwTransaction(): throwing exception from failed TA rw check." ); + throw r_Error( r_Error::r_Error_TransactionReadOnly ); + } +} diff --git a/rnprotocol/rnpclientcomm.hh b/rnprotocol/rnpclientcomm.hh new file mode 100644 index 0000000..cd81edd --- /dev/null +++ b/rnprotocol/rnpclientcomm.hh @@ -0,0 +1,346 @@ +#ifndef RNPCLIENTCOMM_HH +#define RNPCLIENTCOMM_HH +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/************************************************************* + * + * + * COMMENTS: + * + ************************************************************/ + +#include "clientcomm/clientcomm.hh" +#include "rnprotocol/rnpcommunication.hh" +#include "rnprotocol/rnprasserver.hh" +#include "clientcomm/rpcif.h" + +using namespace rnp; + +class RnpClientComm : public ClientComm, RnpBaseClientComm + { + public: + /// constructor getting the host name of the rasmgr host and it's listening port (default 7001). + RnpClientComm( const char* rasmgrHost, int rasmgrPort = RASMGRPORT ) throw( r_Error ); + + ~RnpClientComm() throw (); + + bool effectivTypeIsRNP() throw(); + + //@Man: Database methods + //@{ + /// + + /// open database + int openDB( const char* database ); + /// close current database + int closeDB(); + /// create a database + int createDB( const char* name ) throw(r_Error); + /// destroy a database + int destroyDB( const char* name ) throw(r_Error); + + /// + //@} + + //@Man: Transaction methods + //@{ + /// + + /// begin transaction + int openTA( unsigned short readOnly = 0 ) throw(r_Error); + /// commit current transaction + int commitTA() throw(r_Error); + /// abort current transaction + int abortTA(); + + /// + //@} + + //@Man: MDD methods + //@{ + /// + + /// inserts a MDD object in an existing MDD collection on the server + void insertMDD( const char* collName, r_GMarray* mar ) throw( r_Error ); + /// gets MDD object by oid + r_Ref_Any getMDDByOId( const r_OId& oid ) throw( r_Error ); + + /// + //@} + + //@Man: Collection methods + //@{ + /// + + /// creates an empty MDD collection on the server + void insertColl( const char* collName, const char* typeName, const r_OId& oid ) throw( r_Error ); + /// deletes an MDD collection by name + void deleteCollByName( const char* collName ) throw( r_Error ); + /// deletes an object by oid (right now, objects are collection only) + void deleteObjByOId( const r_OId& oid ) throw( r_Error ); + /// removes an object from a collection + void removeObjFromColl( const char* name, const r_OId& oid ) throw ( r_Error ); + /// gets collection by name + r_Ref_Any getCollByName( const char* name ) throw( r_Error ); + /// gets collection by oid + r_Ref_Any getCollByOId ( const r_OId& oid ) throw( r_Error ); + /// gets collection references by name + r_Ref_Any getCollOIdsByName( const char* name ) throw( r_Error ); + /// gets collection references by oid + r_Ref_Any getCollOIdsByOId ( const r_OId& oid ) throw( r_Error ); + + /// + //@} + + //@Man: Query methods + //@{ + /// + + /// query execution + void executeQuery( const r_OQL_Query& query, r_Set< r_Ref_Any >& result ) throw( r_Error ); + /*@Doc: + Executes a retrieval query of type \Ref{r_OQL_Query} and returns the result. Every + MDD object of the MDD collection is fetched from the server and inserted + in the resulting \Ref{r_Set}. + */ + + /// update execution + void executeQuery( const r_OQL_Query& query ) throw( r_Error ); + /*@Doc: + Executes an update query of type \Ref{r_OQL_Query}. + */ + + /// + //@} + + + //@Man: System methods + //@{ + /// + + /// get new oid + r_OId getNewOId( unsigned short objType ) throw(r_Error); + + /// get oid type + unsigned short getObjectType( const r_OId& oid ) throw(r_Error); + + /// get type structure + /// dallocate using delete [] + char* getTypeStructure( const char* typeName, r_Type_Type typeType ) throw(r_Error); + + /// + //@} + + /// provides read access to my clientID + unsigned long getClientID() const; + + /// set the preferred transfer format + int setTransferFormat( r_Data_Format format, const char* formatParams=NULL ); + + /// set the preferred storage format + int setStorageFormat( r_Data_Format format, const char *formatParams=NULL ); + + + /// get real server name (the dinamic one, assigned by the RasMGR) + const char* getServerName(); + + /// user identification for RasMGR + void setUserIdentification(const char *userName, const char *plainTextPassword); + + /// set maximum retry to get a server + void setMaxRetry(unsigned int newMaxRetry); + + /// get maximum retry to get a server + unsigned int getMaxRetry(); + + void setTurbo(bool turbo); + + /// + //@} + + // obsolete, but kept because of virtual functions in base class + void triggerAliveSignal(); + void sendAliveSignal(); + const char *getExtendedErrorInfo() throw(r_Error); + + void setTimeoutInterval(int seconds); + int getTimeoutInterval(); + +//#### secret, unofficial functions ########### + + r_OId createCollection(const char *collName, const char *collTypeName) throw(r_Error); + + r_OId createMDD(const char* collName, const char* mddTypeName, const char* definitionDomain, const char *tileDomain, bool rcindex = false) throw(r_Error); + + void extendMDD(r_OId mddOId, const char *stripeDomain, const char* tileDomain) throw(r_Error); + + vector<r_OId> getOIdsFromCollection( const char* name ) throw( r_Error ); + + vector<r_OId> getOIdsFromCollection( const r_OId& oid ) throw( r_Error ); + + vector<r_Minterval> getTileDomains(r_OId mddOId, const char *stripeDomain) throw( r_Error ); + + + void preloadTiles(r_OId mddOId, const char *tileDomain) throw(r_Error); + + int getTileData(r_OId mddOId, const char *tileDomain, char *&data, bool preallocated = false) throw(r_Error); + + void replaceTileData(r_OId mddOId, const char *tileDomain, const char *newData, int dataLength, const char *alfaData, int alfaLength) throw(r_Error); + +//############################################# + private: + /// client ID assigned to me by the server + int clientID; + + /// the name of the rasmgr host + char *rasmgrHost; + + /// the listening port of the rasmgr + int rasmgrPort; + + /// the name of the server host + char serverHost[100]; //can't be just a pointer, it's never stored elsewhere + + int serverPort; + + // the name of the opened database, needed because it will be opened again and again, in a hidden operation + char databaseName[100]; + + // the capability + char capability[100]; + + /// user identification string + char identificationString[100]; + + /// requests a free server from the rasmgr, retrying maxRetry times + int getFreeServer(bool readwrite, bool openDB); + + /// requests a free server from the rasmgr + int executeGetFreeServer(bool readwrite, bool openDB); + + int readWholeMessage(int socket,char *destBuffer,int buffSize); + + int writeWholeMessage(int socket,char *destBuffer,int buffSize); + + // MD5 of password + int messageDigest(const char *input,char *output,const char *mdName); + + /// internal function for client/server protocol handling of non-MDD collection transfer + void getElementCollection( r_Set< r_Ref_Any >& result ) throw(r_Error); + + /// internal function for client/server protocol handling of MDD collection transfer + void getMDDCollection( r_Set< r_Ref_Any >& result, unsigned int isQuery ) throw(r_Error); + + /// internal function for reading an MDD from the database + unsigned short getMDDCore( r_Ref<r_GMarray> &mdd, GetMDDRes *thisResult, unsigned int isQuery ) throw( r_Error ); + + /// concatenate data to an array, making sure there are no overflows (used by getMDDCore()) + int concatArrayData( const char *source, unsigned long srcSize, char *&dest, + unsigned long &destSize, unsigned long &destLevel ); + + /// do transfer decompression + r_Data_Format doTransferDecompression( r_GMarray *tile, const r_Base_Type *type, + r_Data_Format fmt, unsigned long size ); + + /// internal function for converting a \Ref{r_GMarray} into its RPC representation + void getMarRpcRepresentation( const r_GMarray* mar, RPCMarray*& rpcMarray, + r_Data_Format initStorageFormat = r_Array, + const r_Base_Type *bt = NULL); + + /// internal function for freeing data allocated by getMarRpcRepresentation() + void freeMarRpcRepresentation( const r_GMarray* mar, RPCMarray* rpcMarray ); + + /// endianness of client and server (0 means big endian) + int endianServer; + int endianClient; + + /// data format for transfer compression + r_Data_Format transferFormat; + /// storage format for inserting new tiles + r_Data_Format storageFormat; + /// transfer format parameters + char* transferFormatParams; + /// storage format parameters + char *storageFormatParams; + /// parameter object for configuration + r_Parse_Params *clientParams; + + /// policy is compress-on-server + int serverCompresses; + /// policy is exact + int exactFormat; + + // functions which really do the connection stuff + void executeConnect(); + void executeDisconnect(); + void executeOpenDB(const char*); + void executeCloseDB(); + void executeBeginTA(bool rw); + void executeCommitTA(); + void executeAbortTA(); + int executeExecuteQuery( const char* query, r_Set< r_Ref_Any >& result ) throw( r_Error ); + GetElementRes* executeGetNextElement(); + int executeEndTransfer(); + GetMDDRes* executeGetNextMDD(); + GetTileRes* executeGetNextTile(); + void executeExecuteUpdateQuery(const char *query) throw(r_Error); + int executeStartInsertTransMDD(r_GMarray* mdd); + int executeInsertTile(bool persistent, RPCMarray* tile); + void executeEndInsertMDD(bool persistent); + int executeInitUpdate(); + int executeStartInsertPersMDD( const char* collName, r_GMarray* mar ); + int executeInsertMDD(const char* collName, r_GMarray* mar, RPCMarray *rpcMarray); + int executeInsertCollection( const char* collName, const char* typeName, const r_OId& oid ); + r_Ref_Any executeGetCollByNameOrOId ( const char* collName, const r_OId& oid ) throw( r_Error ); + r_Ref_Any executeGetCollOIdsByNameOrOId ( const char* collName, const r_OId& oid ) throw( r_Error ); + int executeSetFormat( bool transferFormat, r_Data_Format format, const char* formatParams); + r_OId executeGetNewOId( unsigned short objType ) throw(r_Error); + unsigned short executeGetObjectType( const r_OId& oid ) throw(r_Error); + char* executeGetTypeStructure( const char* typeName, r_Type_Type typeType ) throw(r_Error); + + vector<r_OId> executeGetOIdsFromCollection ( const char* collName, const r_OId& oid ) throw( r_Error ); + + void turboOpenDB(const char*); + void turboBeginTA(bool rw); + void turboCommitTA(); + void turboAbortTA(); + + bool useTurbo; + + // returns only if transaction is open and rw, otherwise throws + void checkForRwTransaction() throw (r_Error); + + // varianta locala + void sendRequestGetAnswer() throw (r_Error); + + int sendAndReturnStatus() throw (r_Error); + + bool detectErrors(); + // doesn't return if there is an error + void reassemble_r_Error() throw (r_Error); + + // a very internal helper for some functions + void helper012d(const char* caller) throw (r_Error); + }; + +#endif diff --git a/rnprotocol/rnpclientcomm2.cc b/rnprotocol/rnpclientcomm2.cc new file mode 100644 index 0000000..3974bd5 --- /dev/null +++ b/rnprotocol/rnpclientcomm2.cc @@ -0,0 +1,1226 @@ +#include "mymalloc/mymalloc.h" +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/************************************************************* + * + * + * PURPOSE + * Contains the functions which really communicate with the server + * + * + * COMMENTS: + * - return values & their meaning see servercomm.hh + * + ************************************************************/ + +#include <openssl/evp.h> + +#include "rnpclientcomm.hh" + +#include "rasodmg/transaction.hh" +#include "rasodmg/database.hh" +#include "rasodmg/iterator.hh" +#include "rasodmg/set.hh" +#include "rasodmg/ref.hh" +#include "rasodmg/storagelayout.hh" +#include "rasodmg/tiling.hh" + +#include "raslib/minterval.hh" +#include "raslib/rmdebug.hh" +#include "raslib/rminit.hh" +#include "raslib/primitivetype.hh" +#include "raslib/complextype.hh" +#include "raslib/structuretype.hh" +#include "raslib/primitive.hh" +#include "raslib/complex.hh" +#include "raslib/structure.hh" +#include "raslib/endian.hh" +#include "raslib/parseparams.hh" +// for transfer compression +#include "compression/tilecompression.hh" + +#include "debug.hh" + +void RnpClientComm::setTurbo(bool turbo) +{ + ENTER( "RpcClientComm::setTurbo(" << turbo << ")" ); + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "setTurbo(" << turbo << ")" ); + + useTurbo = turbo; + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "setTurbo()" ); + LEAVE( "RpcClientComm::setTurbo()" ); +} + +void RnpClientComm::executeConnect() +{ + ENTER( "RnpClientComm::executeConnect()" ); + + startRequest(RnpRasserver::cmd_connect); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, 0); + encoder.addStringParameter(RnpRasserver::pmt_capability, capability); + TALK( "request RnpRasserver::cmd_connect with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + clientID = decoder.getDataAsInteger(); + TALK( "executeConnect: Connected, clientID 0x" << hex << clientID << dec << endl ); + + endianServer = decoder.getDesiredEndianness() == Rnp::bigEndian ? 0: 1; + endianClient = Rnp::detectHostEndianness() == Rnp::bigEndian ? 0: 1; + + clearAnswer(); + + LEAVE( "RnpClientComm::executeConnect()" ); +} + +void RnpClientComm::executeDisconnect() +{ + ENTER( "RnpClientComm::executeDisconnect()" ); + + startRequest(RnpRasserver::cmd_disconnect); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "request RnpRasserver::cmd_disconnect with clientID 0x" << hex << clientID << dec ); + + clientID = -1; + TALK( "clientID now set to 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + clearAnswer(); + + LEAVE( "RnpClientComm::executeDisconnect()" ); +} + +void RnpClientComm::executeOpenDB(const char* lDatabaseName) +{ + ENTER( "RnpClientComm::executeOpenDB( lDatabaseName=" << (lDatabaseName?lDatabaseName:"(null)") << " )" ); + + startRequest(RnpRasserver::cmd_opendb); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_dbname, lDatabaseName); + TALK( "request RnpRasserver::cmd_opendb '" << lDatabaseName << "', with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + clearAnswer(); + + LEAVE( "RnpClientComm::executeOpenDB()" ); +} + +void RnpClientComm::executeCloseDB() +{ + ENTER( "RnpClientComm::executeCloseDB()" ); + + startRequest(RnpRasserver::cmd_closedb); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "request RnpRasserver::cmd_closedb with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + clearAnswer(); + + LEAVE( "RnpClientComm::executeCloseDB()" ); +} + +void RnpClientComm::executeBeginTA(bool rw) +{ + ENTER( "RnpClientComm::executeBeginTA( rw=" << rw << " )" ); + + startRequest(RnpRasserver::cmd_beginta); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addInt32Parameter(RnpRasserver::pmt_accesmode, rw); + TALK( "request RnpRasserver::cmd_beginta with rw=" << rw << ", clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + clearAnswer(); + + LEAVE( "RnpClientComm::executeBeginTA()" ); +} + +void RnpClientComm::executeCommitTA() +{ + ENTER( "RnpClientComm::executeCommitTA()" ); + + startRequest(RnpRasserver::cmd_committa); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "request RnpRasserver::cmd_committa with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + clearAnswer(); + + LEAVE( "RnpClientComm::executeCommitTA()" ); +} + +void RnpClientComm::executeAbortTA() +{ + ENTER( "RnpClientComm::executeAbortTA()" ); + + startRequest(RnpRasserver::cmd_abortta); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "request RnpRasserver::cmd_abortta with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + clearAnswer(); + + LEAVE( "RnpClientComm::executeAbortTA()" ); +} + +void RnpClientComm::turboOpenDB(const char *lDatabaseName) +{ + ENTER( "RnpClientComm::turboOpenDB( lDatabaseName=" << (lDatabaseName?lDatabaseName:"(null)") << " )" ); + + clientID = 0; + + startRequest(RnpRasserver::cmd_connect); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, 0); // = always clientID -- PB + encoder.addStringParameter(RnpRasserver::pmt_capability, capability); + encoder.endFragment(); + TALK( "request RnpRasserver::cmd_connect with clientID 0x0, capability '" << capability << "'" ); + + encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_opendb); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_dbname, lDatabaseName); + encoder.endFragment(); + TALK( "adding fragment RnpRasserver::cmd_opendb with db '" << lDatabaseName << "', clientID 0x" << hex << clientID << dec ); + + encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_closedb); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.endFragment(); + TALK( "adding fragment RnpRasserver::cmd_closedb with clientID 0x" << hex << clientID << dec ); + + encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_disconnect); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "adding fragment RnpRasserver::cmd_disconnect with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + //clientID = decoder.getDataAsInteger(); + + clientID = -1; // we will disconnect so we force it here + TALK( "RnpClientComm::turboOpenDB(): clientID set to 0x" << hex << clientID << dec ); + + endianServer = decoder.getDesiredEndianness() == Rnp::bigEndian ? 0: 1; + endianClient = Rnp::detectHostEndianness() == Rnp::bigEndian ? 0: 1; + + // open + decoder.getNextFragment(); + detectErrors(); + // close + decoder.getNextFragment(); + detectErrors(); + // disconnect + decoder.getNextFragment(); + detectErrors(); + + clearAnswer(); + + LEAVE( "RnpClientComm::turboOpenDB()" ); +} + +void RnpClientComm::turboBeginTA(bool rw) +{ + ENTER( "RnpClientComm::turboBeginTA( rw=" << rw << " )" ); + + clientID = 0; + + startRequest(RnpRasserver::cmd_connect); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, 0); + encoder.addStringParameter(RnpRasserver::pmt_capability, capability); + encoder.endFragment(); + TALK( "request RnpRasserver::cmd_connect with clientID 0x" << hex << clientID << dec << ", capability '" << capability << "'" ); + + encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_opendb); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_dbname, databaseName); + encoder.endFragment(); + TALK( "adding fragment RnpRasserver::cmd_opendb '" << databaseName << "', with clientID 0x" << hex << clientID << dec ); + + encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_beginta); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addInt32Parameter(RnpRasserver::pmt_accesmode, rw); + TALK( "adding fragment RnpRasserver::cmd_beginta with rw=" << rw << ", clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + clientID = decoder.getDataAsInteger(); + TALK( "rceived clientID 0x" << hex << clientID << dec ); + endianServer = decoder.getDesiredEndianness() == Rnp::bigEndian ? 0: 1; + endianClient = Rnp::detectHostEndianness() == Rnp::bigEndian ? 0: 1; + + // open + decoder.getNextFragment(); + detectErrors(); + + // beginTA + decoder.getNextFragment(); + detectErrors(); + + clearAnswer(); + + LEAVE( "RnpClientComm::turboBeginTA()" ); +} + +void RnpClientComm::turboCommitTA() +{ + ENTER( "RnpClientComm::turboCommitTA()" ); + + startRequest(RnpRasserver::cmd_committa); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.endFragment(); + TALK( "request RnpRasserver::cmd_committa with clientID 0x" << hex << clientID << dec ); + + encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_closedb); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.endFragment(); + TALK( "adding fragment RnpRasserver::cmd_closedb with clientID 0x" << hex << clientID << dec ); + + encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_disconnect); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "adding fragment RnpRasserver::cmd_disconnect with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + // close + decoder.getNextFragment(); + detectErrors(); + + // disconnect + decoder.getNextFragment(); + detectErrors(); + + clientID = -1; + TALK( "resetting: clientID 0x" << hex << clientID << dec ); + clearAnswer(); + + LEAVE( "RnpClientComm::turboCommitTA()" ); +} +void RnpClientComm::turboAbortTA() +{ + ENTER( "RnpClientComm::turboAbortTA()" ); + + startRequest(RnpRasserver::cmd_abortta); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.endFragment(); + TALK( "request RnpRasserver::cmd_abortta with clientID 0x" << hex << clientID << dec ); + + encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_closedb); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.endFragment(); + TALK( "adding fragment RnpRasserver::cmd_closedb with clientID 0x" << hex << clientID << dec ); + + encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_disconnect); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "adding fragment RnpRasserver::cmd_disconnect with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + // close + decoder.getNextFragment(); + detectErrors(); + + // disconnect + decoder.getNextFragment(); + detectErrors(); + + clientID = -1; + TALK( "resetting: clientID 0x" << hex << clientID << dec ); + + clearAnswer(); + + LEAVE( "RnpClientComm::turboAbortTA()" ); +} + +//--------------------------------------------------------------------------------- +int RnpClientComm::executeStartInsertPersMDD( const char* collName, r_GMarray* mar ) +{ + ENTER( "RnpClientComm::executeStartInsertPersMDD( collName=" << (collName?collName:"(null)") << ", mar=" << ((unsigned long) mar) << " )" ); + + startRequest(RnpRasserver::cmd_startinsPmdd); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + + char *domain = mar->spatial_domain().get_string_representation(); + encoder.addStringParameter(RnpRasserver::pmt_domain, domain); + + encoder.addInt32Parameter(RnpRasserver::pmt_typelength, mar->get_type_length()); + encoder.addStringParameter(RnpRasserver::pmt_typename, mar->get_type_name()); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, mar->get_oid().get_string_representation()); + + TALK( "request RnpRasserver::cmd_startinsPmdd with collname '" << collName << "', domain " << domain << ", typelength " << mar->get_type_length() << ", typename '" << mar->get_type_name() << ", oid " << mar->get_oid().get_string_representation() << ", clientID 0x" << hex << clientID << dec ); + + free(domain); + + int result = sendAndReturnStatus(); + + ENTER( "RnpClientComm::executeStartInsertPersMDD() -> " << result ); + return result; +} + +int RnpClientComm::executeExecuteQuery( const char* query, r_Set< r_Ref_Any >& result ) throw( r_Error ) +{ + ENTER( "RnpClientComm::executeExecuteQuery( query=" << (query?query:"(null)") << ", result=" << ((unsigned long) &result) << " )" ); + + startRequest(RnpRasserver::cmd_queryrpc); + encoder.adjustBufferSize(strlen(query)); + + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_querystring, query); + TALK( "request RnpRasserver::cmd_queryrpc with query '" << query << ", clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + int status = decoder.getDataAsInteger(); + decoder.getNextParameter(); + int errNo = decoder.getDataAsInteger(); + decoder.getNextParameter(); + int lineNo = decoder.getDataAsInteger(); + decoder.getNextParameter(); + int colNo = decoder.getDataAsInteger(); + decoder.getNextParameter(); + const char* token = decoder.getDataAsString(); + decoder.getNextParameter(); + const char* typeName = decoder.getDataAsString(); + decoder.getNextParameter(); + const char* typeStructure = decoder.getDataAsString(); + + if(status == 0 || status == 1) + { + result.set_type_by_name( typeName ); + result.set_type_structure( typeStructure ); + } + // status == 2 - empty result + + if( status == 4 || status == 5 ) + { + r_Equery_execution_failed err( errNo, lineNo, colNo, token ); + clearAnswer(); + LEAVE( "RnpClientComm::executeExecuteQuery() exception: status=" << status ); + throw err; + } + clearAnswer(); + + LEAVE( "RnpClientComm::executeExecuteQuery() -> " << status ); + return status; +} + +int RnpClientComm::executeEndTransfer() +{ + ENTER( "RnpClientComm::executeEndTransfer()" ); + + startRequest(RnpRasserver::cmd_endtransfer); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "request RnpRasserver::cmd_endtransfer with clientID 0x" << hex << clientID << dec ); + + int result = sendAndReturnStatus(); + + LEAVE( "RnpClientComm::executeEndTransfer() -> " << result ); + return result; +} + +GetElementRes* RnpClientComm::executeGetNextElement() +{ + ENTER( "RnpClientComm::executeGetNextElement()" ); + + startRequest(RnpRasserver::cmd_getnextelem); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "request RnpRasserver::cmd_getnextelem with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + GetElementRes *result = new GetElementRes; + result->data.confarray_val = NULL; + + result->status = decoder.getDataAsInteger(); + + if(decoder.countParameters() == 2) + { + decoder.getNextParameter(); + result->data.confarray_len = decoder.getDataLength(); + result->data.confarray_val = new char[decoder.getDataLength()]; + + memcpy(result->data.confarray_val, decoder.getData(), decoder.getDataLength()); + + } + + clearAnswer(); + + LEAVE( "RnpClientComm::executeGetNextElement() -> " << result ); + return result; +} + + +int RnpClientComm::executeInsertMDD(const char* collName, r_GMarray* mar, RPCMarray *rpcMarray) +{ + ENTER( "RnpClientComm::executeInsertMDD( collName=" << (collName?collName:"(null)") << ", mar=" << ((unsigned long) mar) << ", rpcMarray=" << ((unsigned long) rpcMarray) << " )" ); + + int size = rpcMarray->data.confarray_len; + startRequest(RnpRasserver::cmd_insertmdd, RNP_DEFAULTBUFFERSIZE + size); + encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter( RnpRasserver::pmt_collname, collName); + encoder.addStringParameter( RnpRasserver::pmt_typename, mar->get_type_name()); + encoder.addStringParameter( RnpRasserver::pmt_oidstring, mar->get_oid().get_string_representation()); + encoder.addStringParameter( RnpRasserver::pmt_domain, rpcMarray->domain); + encoder.addInt32Parameter( RnpRasserver::pmt_typelength, rpcMarray->cellTypeLength); + encoder.addInt32Parameter( RnpRasserver::pmt_currentformat, rpcMarray->currentFormat); + encoder.addInt32Parameter( RnpRasserver::pmt_storageformat, rpcMarray->storageFormat); + encoder.addOpaqueParameter( RnpRasserver::pmt_tiledata, rpcMarray->data.confarray_val, size); + TALK( "request RnpRasserver::cmd_insertmdd with collection '" << collName << ", ..., clientID 0x" << hex << clientID << dec ); + + int result = sendAndReturnStatus(); + + LEAVE( "RnpClientComm::executeInsertMDD() -> " << result ); + return result; +} + +int RnpClientComm::executeInsertCollection( const char* collName, const char* typeName, const r_OId& oid ) +{ + ENTER( "RnpClientComm::executeInsertCollection( collName=" << (collName?collName:"(null)") << ", typeName=" << (typeName?typeName:"(null)") << ", oid=" << oid << " )" ); + + startRequest(RnpRasserver::cmd_insertcoll); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + encoder.addStringParameter(RnpRasserver::pmt_typename, typeName); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation()); + TALK( "request RnpRasserver::cmd_insertcoll collection '" << collName << ", ..., with clientID 0x" << hex << clientID << dec ); + + int result = sendAndReturnStatus(); + LEAVE( "RnpClientComm::executeInsertCollection() -> " << result ); + return result; +} + +// common function using the dynamic parameter facility of RNP +r_Ref_Any RnpClientComm::executeGetCollByNameOrOId ( const char* collName, const r_OId& oid ) throw( r_Error ) +{ + ENTER( "RnpClientComm::executeGetCollByNameOrOId( collName=" << (collName?collName:"(null)") << ", oid=" << oid << " )" ); + + startRequest(RnpRasserver::cmd_getcoll); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + if( collName != NULL) + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + else + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation()); + TALK( "request RnpRasserver::cmd_getcoll with ..., clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + int status = decoder.getDataAsInteger(); + decoder.getNextParameter(); const char *typeName = decoder.getDataAsString(); + decoder.getNextParameter(); const char *typeStructure = decoder.getDataAsString(); + decoder.getNextParameter(); const char *oidstring = decoder.getDataAsString(); + decoder.getNextParameter(); const char *collectionName= decoder.getDataAsString(); + + r_Set< r_Ref_Any >* set = 0; + + if( status != 0 && status != 1 ) + { + r_Error err; + switch( status ) + { + case 2: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + case 3: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + break; + } + clearAnswer(); + LEAVE( "RnpClientComm::executeGetCollByNameOrOId(): exception, status = " << status ); + throw err; + } + + // create the set + r_OId rOId( oidstring ); + set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref_Any >; + + // initialize data elements + set->set_type_by_name ( typeName ); + set->set_type_structure( typeStructure ); + set->set_object_name ( collectionName ); + + clearAnswer(); + + // get collection elements + if( status == 0 ) + getMDDCollection( *set, 0 ); + // else rpcStatus == 1 -> Result collection is empty and nothing has to be got. + + r_Ref_Any result = r_Ref_Any( set->get_oid(), set ); + LEAVE( "RnpClientComm::executeGetCollByNameOrOId() -> (result set not displayed)" ); + return result; +} + + +// common function using the dynamic parameter facility of RNP +r_Ref_Any RnpClientComm::executeGetCollOIdsByNameOrOId ( const char* collName, const r_OId& oid ) throw( r_Error ) +{ + ENTER( "RnpClientComm::executeGetCollOIdsByNameOrOId( collName=" << (collName?collName:"(null)") << ", oid=" << oid << " )" ); + + startRequest(RnpRasserver::cmd_getcolloids); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + if( collName != NULL) + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + else + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation()); + TALK( "request RnpRasserver::cmd_getcolloids with ..., clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + int status = decoder.getDataAsInteger(); + decoder.getNextParameter(); const char *typeName = decoder.getDataAsString(); + decoder.getNextParameter(); const char *typeStructure = decoder.getDataAsString(); + decoder.getNextParameter(); const char *oidstring = decoder.getDataAsString(); + decoder.getNextParameter(); const char *collectionName= decoder.getDataAsString(); + + r_Set< r_Ref<r_GMarray> >* set = 0; + + if( status != 0 && status != 1 ) + { + r_Error err; + switch( status ) + { + case 2: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + case 3: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + break; + } + clearAnswer(); + LEAVE( "RnpClientComm::executeGetCollOIdsByNameOrOId(): exception, status = " << status ); + throw err; + } + + // create the set + r_OId rOId( oidstring ); + set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref< r_GMarray > >; + + set->set_type_by_name ( typeName ); + set->set_type_structure( typeStructure ); + set->set_object_name ( collName ); + + // fill set with oids + if( status == 0 ) + { + while(decoder.getNextParameter() != 0) + { + r_OId roid( decoder.getDataAsString() ); + set->insert_element( r_Ref<r_GMarray>(roid), 1 ); + } + } + + clearAnswer(); + + r_Ref_Any result = r_Ref_Any( set->get_oid(), set ); + LEAVE( "RnpClientComm::executeGetCollOIdsByNameOrOId() -> (result not displayed)" ); + return result; + } + + +GetMDDRes* RnpClientComm::executeGetNextMDD() +{ + ENTER( "RnpClientComm::executeGetNextMDD()" ); + + startRequest(RnpRasserver::cmd_getnextmdd); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "request RnpRasserver::cmd_getnextmdd with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + GetMDDRes* result = new GetMDDRes; + + result->status = decoder.getDataAsInteger(); + decoder.getNextParameter(); result->domain = strdup(decoder.getDataAsString()); + decoder.getNextParameter(); result->typeName = strdup(decoder.getDataAsString()); + decoder.getNextParameter(); result->typeStructure = strdup(decoder.getDataAsString()); + decoder.getNextParameter(); result->oid = strdup(decoder.getDataAsString()); + decoder.getNextParameter(); result->currentFormat = decoder.getDataAsInteger(); + + clearAnswer(); + + LEAVE( "RnpClientComm::executeGetNextMDD() -> " << result ); + return result; +} + +GetTileRes* RnpClientComm::executeGetNextTile() +{ + ENTER( "RnpClientComm::executeGetNextTile()" ); + + startRequest(RnpRasserver::cmd_getnexttile); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "request RnpRasserver::cmd_getnexttile with clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + GetTileRes* result = new GetTileRes; + result->marray = new RPCMarray; + + result->status = decoder.getDataAsInteger(); + if(decoder.getNextParameter() != 0) + { + result->marray->domain = strdup(decoder.getDataAsString()); + decoder.getNextParameter(); result->marray->cellTypeLength = decoder.getDataAsInteger(); + decoder.getNextParameter(); result->marray->currentFormat = decoder.getDataAsInteger(); + decoder.getNextParameter(); result->marray->storageFormat = decoder.getDataAsInteger(); + decoder.getNextParameter(); + int length = decoder.getDataLength(); + result->marray->data.confarray_len = length; + result->marray->data.confarray_val = (char*)mymalloc(length); + memcpy(result->marray->data.confarray_val, decoder.getData(), length); + } + else + { + result->marray->domain = 0; + result->marray->data.confarray_val = 0; + } + clearAnswer(); + + LEAVE( "RnpClientComm::executeGetNextTile() -> " << result ); + return result; +} + +int RnpClientComm::executeInitUpdate() +{ + ENTER( "RnpClientComm::executeInitUpdate()" ); + + startRequest(RnpRasserver::cmd_initupdate); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "adding fragment XXX with clientID 0x" << hex << clientID << dec ); + + int result = sendAndReturnStatus(); + LEAVE( "RnpClientComm::executeInitUpdate() -> " << result ); + return result; +} + +int RnpClientComm::executeStartInsertTransMDD(r_GMarray* mdd) +{ + ENTER( "RnpClientComm::executeStartInsertTransMDD( mdd=" << ((unsigned long) mdd) << " )" ); + + startRequest(RnpRasserver::cmd_startinsTmdd); + encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter( RnpRasserver::pmt_domain, mdd->spatial_domain().get_string_representation()); + encoder.addInt32Parameter( RnpRasserver::pmt_typelength, mdd->get_type_length()); + encoder.addStringParameter( RnpRasserver::pmt_typename, mdd->get_type_name()); + TALK( "request RnpRasserver::cmd_startinsTmdd with ..., clientID 0x" << hex << clientID << dec ); + + int result = sendAndReturnStatus(); + LEAVE( "RnpClientComm::executeStartInsertTransMDD() -> " << result ); + return result; +} + +int RnpClientComm::executeInsertTile(bool persistent, RPCMarray *tile) +{ + ENTER( "RnpClientComm::executeInsertTile( persistent=" << persistent << ", tile=" << ((unsigned long) tile) << " )" ); + + int size = tile->data.confarray_len; + startRequest(RnpRasserver::cmd_inserttile, RNP_DEFAULTBUFFERSIZE + size); + encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID); + encoder.addInt32Parameter( RnpRasserver::pmt_ispersistent, persistent ? 1:0); + encoder.addStringParameter( RnpRasserver::pmt_domain, tile->domain); + encoder.addInt32Parameter( RnpRasserver::pmt_typelength, tile->cellTypeLength); + encoder.addInt32Parameter( RnpRasserver::pmt_currentformat, tile->currentFormat); + encoder.addInt32Parameter( RnpRasserver::pmt_storageformat, tile->storageFormat); + encoder.addOpaqueParameter( RnpRasserver::pmt_tiledata, tile->data.confarray_val, size); + TALK( "request RnpRasserver::cmd_inserttile with ..., clientID 0x" << hex << clientID << dec ); + + int result = sendAndReturnStatus(); + + LEAVE( "RnpClientComm::executeInsertTile() -> " << result ); + return result; +} + +void RnpClientComm::executeEndInsertMDD(bool persistent) +{ + ENTER( "RnpClientComm::executeEndInsertMDD( persistent=" << persistent << " )" ); + + startRequest(RnpRasserver::cmd_endinsmdd); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addInt32Parameter(RnpRasserver::pmt_ispersistent, persistent ? 1:0); + TALK( "request RnpRasserver::cmd_endinsmdd with persistent " << persistent << ", clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + clearAnswer(); + + LEAVE( "RnpClientComm::executeEndInsertMDD()" ); +} + +void RnpClientComm::executeExecuteUpdateQuery(const char *query) throw(r_Error) + { + ENTER( "RnpClientComm::executeExecuteUpdateQuery( query=" << (query?query:"(null)") << " )" ); + + startRequest(RnpRasserver::cmd_updaterpc); + encoder.adjustBufferSize(strlen(query)); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_querystring, query); + TALK( "request RnpRasserver::cmd_updaterpc with query '" << query << "', clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + int status = decoder.getDataAsInteger(); + decoder.getNextParameter(); + int errNo = decoder.getDataAsInteger(); + decoder.getNextParameter(); + int lineNo = decoder.getDataAsInteger(); + decoder.getNextParameter(); + int colNo = decoder.getDataAsInteger(); + decoder.getNextParameter(); + const char* token = decoder.getDataAsString(); + + clearAnswer(); + + if( status == 2 || status == 3 ) + { + LEAVE( "RnpClientComm::executeExecuteUpdateQuery(): exception, status = " << status ); + throw r_Equery_execution_failed( errNo, lineNo, colNo, token ); + } + + if( status == 1 ) + { + LEAVE( "RnpClientComm::executeExecuteUpdateQuery(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_ClientUnknown ); + } + + if( status > 3 ) + { + LEAVE( "RnpClientComm::executeExecuteUpdateQuery(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_TransferFailed ); + } + + LEAVE( "RnpClientComm::executeExecuteUpdateQuery()" ); +} + +r_OId RnpClientComm::executeGetNewOId( unsigned short objType ) throw(r_Error) +{ + //cout<<" RnpClientComm::getNewOId: not implemented"<<endl; + + ENTER( "RnpClientComm::executeGetNewOId( objType=" << objType << " )" ); + + startRequest(RnpRasserver::cmd_getnewoid); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addInt32Parameter(RnpRasserver::pmt_objecttype, objType); + TALK( "request RnpRasserver::cmd_getnewoid with ..., clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + r_OId oid( decoder.getDataAsString() ); + + clearAnswer(); + + LEAVE( "RnpClientComm::executeGetNewOId() -> " << oid ); + return oid; +} + +unsigned short RnpClientComm::executeGetObjectType( const r_OId& oid ) throw(r_Error) +{ + ENTER( "RnpClientComm::executeGetObjectType( oid=" << oid << " )" ); + + startRequest(RnpRasserver::cmd_getobjecttype); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation()); + TALK( "request RnpRasserver::cmd_getobjecttype with ..., clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + int status = decoder.getDataAsInteger(); + switch( status ) + { + case 0: + break; //OK + case 1: + clearAnswer(); + LEAVE( "RnpClientComm::executeGetObjectType(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_ClientUnknown ); + break; + case 2: + clearAnswer(); + LEAVE( "RnpClientComm::executeGetObjectType(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_ObjectUnknown ); + break; + default: + clearAnswer(); + LEAVE( "RnpClientComm::executeGetObjectType(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_TransferFailed ); + break; + } + + decoder.getNextParameter(); unsigned short objType = decoder.getDataAsInteger(); + + clearAnswer(); + + LEAVE( "RnpClientComm::executeGetObjectType() -> " << objType ); + return objType; +} + +char* RnpClientComm::executeGetTypeStructure( const char* typeName, r_Type_Type typeType ) throw(r_Error) +{ + ENTER( "RnpClientComm::executeGetTypeStructure( typeName=" << (typeName?typeName:"(null)") << ", typeType=" << typeType << " )" ); + + startRequest(RnpRasserver::cmd_gettypestruct); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_typename, typeName); + encoder.addInt32Parameter(RnpRasserver::pmt_typetype, typeType); + TALK( "request RnpRasserver::cmd_gettypestruct with ..., clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + int status = decoder.getDataAsInteger(); + switch(status) + { + case 0: + break; //OK + case 1: + clearAnswer(); + LEAVE( "RnpClientComm::executeGetTypeStructure(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_TransactionNotOpen ); + break; + case 2: + clearAnswer(); + LEAVE( "RnpClientComm::executeGetTypeStructure(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_DatabaseClassUndefined ); + break; + default: + clearAnswer(); + LEAVE( "RnpClientComm::executeGetTypeStructure(): exception, status = " << status ); + throw r_Error( r_Error::r_Error_TransferFailed ); + break; + } + + decoder.getNextParameter(); + char* typeStructure = new char [decoder.getDataLength() + 1]; + strcpy(typeStructure, decoder.getDataAsString()); + + clearAnswer(); + + LEAVE( "RnpClientComm::executeGetTypeStructure() -> " << typeStructure ); + return typeStructure; +} + +int RnpClientComm::executeSetFormat( bool lTransferFormat, r_Data_Format format, const char* formatParams) +{ + ENTER( "RnpClientComm::executeSetFormat( lTransferFormat=" << lTransferFormat << ", format=" << format << ", formatParams=" << (formatParams?formatParams:"(null)") << " )" ); + + startRequest(RnpRasserver::cmd_setformat); + encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID); + encoder.addInt32Parameter( RnpRasserver::pmt_whichformat, lTransferFormat); + encoder.addInt32Parameter( RnpRasserver::pmt_format, format); + encoder.addStringParameter(RnpRasserver::pmt_formatparams, formatParams); + TALK( "request RnpRasserver::cmd_setformat with ..., clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + int status = decoder.getDataAsInteger(); + + clearAnswer(); + + LEAVE( "RnpClientComm::executeSetFormat() -> " << status ); + return status; +} + +//---------------------------------------------------------- +int RnpClientComm::sendAndReturnStatus() throw (r_Error) +{ + ENTER( "RnpClientComm::sendAndReturnStatus()" ); + + sendRequestGetAnswer(); + + int status = decoder.getDataAsInteger(); + + clearAnswer(); + + LEAVE( "RnpClientComm::sendAndReturnStatus() -> " << status ); + return status; +} + +void RnpClientComm::sendRequestGetAnswer() throw (r_Error) +{ + ENTER( "RnpClientComm::sendRequestGetAnswer()" ); + + RnpBaseClientComm::setMaxRetry(RNP_MAX_RETRY); // defined in raslib/rminit.hh -- PB 2005-sep-01 + if(RnpBaseClientComm::sendRequestGetAnswer() == false) + { + clearAnswer(); + LEAVE( "RnpClientComm::sendRequestGetAnswer(): exception, sendRequestGetAnswer() == false" ); + throw r_Error( r_Error::r_Error_TransferFailed); + } + + detectErrors(); + if(decoder.countParameters() > 0) decoder.getFirstParameter(); + + LEAVE( "RnpClientComm::sendRequestGetAnswer()" ); +} + +void RnpClientComm::helper012d(const char* caller) throw (r_Error) +{ + int status = sendAndReturnStatus(); + + switch( status ) + { + case 0: + break; + case 1: + TALK( "RnpClientComm::helper012d( " << (caller?caller:"(null)") << " ): error: status = " << status ); + throw r_Error( r_Error::r_Error_ClientUnknown ); + break; + case 2: + TALK( "RnpClientComm::helper012d( " << (caller?caller:"(null)") << " ): error: status = " << status ); + throw r_Error( r_Error::r_Error_ObjectUnknown ); + break; + default: + TALK( "RnpClientComm::helper012d( " << (caller?caller:"(null)") << " ): error: status = " << status ); + throw r_Error( r_Error::r_Error_General ); + break; + } +} + +bool RnpClientComm::detectErrors() +{ + if(decoder.getFragmentType() != Rnp::fgt_Error) + return false; + + reassemble_r_Error() ; + + return true; +} + +void RnpClientComm::reassemble_r_Error() throw (r_Error) +{ + decoder.getFirstParameter(); + if(decoder.getParameterType() != Rnp::ert_Other) + return; + + decoder.getNextParameter(); + + r_Error *temp = r_Error::getAnyError((char*)decoder.getDataAsString()); + + r_Error err = *temp; + + delete temp; + + TALK( "npClientComm::reassemble_r_Error() throwing exception: " << (char*)decoder.getDataAsString() ); + throw err; +} + +void RnpClientComm::setTimeoutInterval(int seconds) +{ + akg::NbJob::setTimeoutInterval(seconds); +} + +int RnpClientComm::getTimeoutInterval() +{ + return akg::NbJob::getTimeoutInterval(); +} + +//## unofficial functions + +r_OId RnpClientComm::createCollection(const char *collName, const char *collTypeName) throw(r_Error) +{ + ENTER( "RnpClientComm::createCollection( collName=" << (collName?collName:"(null)") << ", collTypeName=" << (collTypeName?collTypeName:"(null)") << " )" ); + + checkForRwTransaction(); + startRequest(RnpRasserver::cmd_createcollection); + encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + encoder.addStringParameter(RnpRasserver::pmt_typename, collTypeName); + TALK( "request RnpRasserver::cmd_createcollection with ..., clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + r_OId oid(decoder.getDataAsString()); + + clearAnswer(); + + LEAVE( "RnpClientComm::createCollection() -> " << oid ); + return oid; +} + +r_OId RnpClientComm::createMDD(const char* collName, const char* mddTypeName, const char* definitionDomain, const char *tileDomain, bool rcindex) throw(r_Error) +{ + ENTER( "RnpClientComm::createMDD( collName=" << (collName?collName:"(null)") << "; mddTypeName=" << (mddTypeName?mddTypeName:"(null)") << ", definitionDomain=" << (definitionDomain?definitionDomain:"(null)") << ", tileDomain=" << (tileDomain?tileDomain:"(null)") << ", rcindex=" << rcindex << " )" ); + + checkForRwTransaction(); + startRequest(RnpRasserver::cmd_createmdd); + encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + encoder.addStringParameter(RnpRasserver::pmt_typename, mddTypeName); + encoder.addStringParameter(RnpRasserver::pmt_domain, definitionDomain); + encoder.addInt32Parameter( RnpRasserver::pmt_indextype, rcindex); + encoder.addStringParameter(RnpRasserver::pmt_domain, tileDomain); + TALK( "request RnpRasserver::cmd_createmdd with collName " << collName << ", mddTypeName " << mddTypeName << ", definitionDomain " << definitionDomain << ", rcindex " << rcindex << ", tileDomain " << tileDomain << ", clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + r_OId oid(decoder.getDataAsString()); + TALK( "RnpClientComm::createMDD() receiving oid " << oid ); + + clearAnswer(); + + LEAVE( "RnpClientComm::createMDD() -> " << oid ); + return oid; +} + +void RnpClientComm::extendMDD(r_OId mddOId, const char *stripeDomain, const char* tileDomain) throw(r_Error) +{ + ENTER( "RnpClientComm::extendMDD( mddOId=" << mddOId << ", stripeDomain=" << (stripeDomain?stripeDomain:"(null)") << ", tileDomain=" << (tileDomain?tileDomain:"(null)") << " )" ); + + checkForRwTransaction(); + startRequest(RnpRasserver::cmd_extendmdd); + encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, mddOId.get_string_representation()); + encoder.addStringParameter(RnpRasserver::pmt_domain, stripeDomain); + encoder.addStringParameter(RnpRasserver::pmt_domain, tileDomain); + TALK( "request RnpRasserver::cmd_extendmdd with oid " << mddOId.get_string_representation() << ", stripeDomain " << stripeDomain << ", tileDomain " << tileDomain << ", clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + clearAnswer(); + + LEAVE( "RnpClientComm::extendMDD()" ); +} + +vector<r_OId> RnpClientComm::getOIdsFromCollection( const char* collName ) throw( r_Error ) +{ + ENTER( "RnpClientComm::getOIdsFromCollection( collName=" << (collName?collName:"(null)") << " )" ); + + vector<r_OId> result = executeGetOIdsFromCollection ( collName, r_OId()); + + LEAVE( "RnpClientComm::getOIdsFromCollection()" ); + return result; +} + +vector<r_OId> RnpClientComm::getOIdsFromCollection( const r_OId& oid ) throw( r_Error ) +{ + ENTER( "RnpClientComm::getOIdsFromCollection( oid=" << oid << " )" ); + + vector<r_OId> result = executeGetOIdsFromCollection ( 0, oid ); + + LEAVE( "RnpClientComm::getOIdsFromCollection()" ); + return result; +} + +vector<r_OId> RnpClientComm::executeGetOIdsFromCollection ( const char* collName, const r_OId& oid ) throw( r_Error ) +{ + ENTER( "RnpClientComm::executeGetOIdsFromCollection( collName=" << (collName?collName:"(null)") << ", oid=" << oid << " )" ); + + startRequest(RnpRasserver::cmd_getcolloids); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + TALK( "request RnpRasserver::cmd_getcolloids with clientID 0x" << hex << clientID << dec ); + if( collName != NULL) + { + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + TALK( "RnpClientComm::executeGetOIdsFromCollection() adding collName " << collName ); + } + else + { + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation()); + TALK( "RnpClientComm::executeGetOIdsFromCollection() adding oid " << oid.get_string_representation() ); + } + + sendRequestGetAnswer(); + + int status = decoder.getDataAsInteger(); + // we have to read all this, but we don't use them here + decoder.getNextParameter(); const char *typeName = decoder.getDataAsString(); + TALK( "RnpClientComm::executeGetOIdsFromCollection() received typeName " << typeName ); + decoder.getNextParameter(); const char *typeStructure = decoder.getDataAsString(); + TALK( "RnpClientComm::executeGetOIdsFromCollection() received typeStructure " << typeStructure ); + decoder.getNextParameter(); const char *oidstring = decoder.getDataAsString(); + TALK( "RnpClientComm::executeGetOIdsFromCollection() received oid " << oidstring ); + decoder.getNextParameter(); const char *collectionName= decoder.getDataAsString(); + TALK( "RnpClientComm::executeGetOIdsFromCollection() received collectionName " << collectionName ); + + if( status != 0 && status != 1 ) + { + r_Error err; + switch( status ) + { + case 2: + LEAVE( "RnpClientComm::executeGetOIdsFromCollection(): exception, status = " << status ); + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + case 3: + LEAVE( "RnpClientComm::executeGetOIdsFromCollection(): exception, status = " << status ); + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + default: + LEAVE( "RnpClientComm::executeGetOIdsFromCollection(): exception, status = " << status ); + err = r_Error( r_Error::r_Error_TransferFailed ); + break; + } + clearAnswer(); + throw err; + } + + // create the set + vector<r_OId> result; + // fill set with oids + if( status == 0 ) + { + while(decoder.getNextParameter() != 0) + { + r_OId roid( decoder.getDataAsString() ); + TALK( "RnpClientComm::executeGetOIdsFromCollection() received oid set component " << roid ); + + result.push_back(roid); + } + } + + clearAnswer(); + + LEAVE( "RnpClientComm::executeGetOIdsFromCollection()" ); + return result; + } + +vector<r_Minterval> RnpClientComm::getTileDomains(r_OId mddOId, const char *stripeDomain) throw( r_Error ) +{ + ENTER( "RnpClientComm::getTileDomains( mddOId=" << mddOId << ", stripeDomain=" << (stripeDomain?stripeDomain:"(null)") << " )" ); + + startRequest(RnpRasserver::cmd_gettiledomains); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, mddOId.get_string_representation()); + encoder.addStringParameter(RnpRasserver::pmt_domain, stripeDomain); + TALK( "request RnpRasserver::cmd_gettiledomains with oid " << mddOId.get_string_representation() << ", stripeDomain " << stripeDomain << ", clientID 0x" << hex << clientID << dec ); + + sendRequestGetAnswer(); + + vector<r_Minterval> result; + + const RnpParameter *currParam = decoder.getFirstParameter(); + + while(currParam) + { + r_Minterval interval(decoder.getDataAsString()); + TALK( "RnpClientComm::getTileDomains() received minterval " << interval ); + + result.push_back(interval); + + currParam = decoder.getNextParameter(); + } + + clearAnswer(); + + LEAVE( "RnpClientComm::getTileDomains() -> " << result ); + return result; +} + diff --git a/rnprotocol/rnpcommunication.cc b/rnprotocol/rnpcommunication.cc new file mode 100644 index 0000000..65c5ed8 --- /dev/null +++ b/rnprotocol/rnpcommunication.cc @@ -0,0 +1,702 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/**************************************************************************** + * + * + * COMMENTS: + * - FIXME: uses assert() !!! -- PB 2003-nov-22 + * + ****************************************************************************/ + +#include <assert.h> +#include <rnpcommunication.hh> + +#ifdef AFTERV52 +#include <rnpexception.hh> +#endif + +#include "debug.hh" +#include "raslib/rminit.hh" // for RNP_COMM_TIMEOUT + +using namespace rnp; + +RnpClientJob::RnpClientJob() throw() + { + } + +void RnpClientJob::init(CommBuffer* transmitterBuffer, RnpBaseClientComm* newClientComm) throw() + { + ENTER( "RnpClientJob::init" ); + + if ( ! ( transmitterBuffer != 0 ) ) + { + TALK( "RnpClientJob::init(): warning: assert will fire." ); + } + assert(transmitterBuffer != 0); + if ( ! ( newClientComm != 0 ) ) + { + TALK( "RnpClientJob::init(): warning: assert will fire." ); + } + assert(newClientComm != 0); + + rnpReceiver.reset(); + answerOk = false; + currentBufferPtr = transmitterBuffer; + clientCommPtr = newClientComm; + invalidFormat = false; + + status=wks_notdefined; + + LEAVE( "RnpClientJob::init" ); + } + +void RnpClientJob::clearAnswerBuffer() throw() + { + rnpReceiver.reset(); + } + +void RnpClientJob::resetState() throw() + { + ENTER( "RnpClientJob::resetState" ); + + clearConnection(); + + clientCommPtr->jobIsReady(); + + status = wks_notdefined; + + LEAVE( "RnpClientJob::resetState" ); + } +void RnpClientJob::processRequest() throw() + { + ENTER( "RnpClientJob::processRequest" ); + + answerOk = true; + + invalidFormat = false; + + resetState(); + + LEAVE( "RnpClientJob::processRequest" ); + } + +bool RnpClientJob::validateMessage() throw() + { + ENTER( "RnpClientJob::validateMessage()" ); + + bool validated = rnpReceiver.validateMessage(); + + currentBufferPtr = rnpReceiver.getCurrentBuffer(); + + if( validated == true) + { + status=wks_processing; + LEAVE( "RnpClientJob::validateMessage() -> true" ); + return true; + } + + if(rnpReceiver.isDiscarding()) + { + TALK( "RnpClientJob::validateMessage - discarding message" ); + resetState(); + answerOk = false; + invalidFormat = true; + } + answerOk = false; + + LEAVE( "RnpClientJob::validateMessage() -> false" ); + return false; + } + +void RnpClientJob::executeOnWriteReady() throw() + { + ENTER( "RnpClientJob::executeOnWriteReady()" ); + + rnpReceiver.reset(); + + currentBufferPtr->freeBuffer(); + + currentBufferPtr = rnpReceiver.getCurrentBuffer(); + + readyToReadAnswer(); + + LEAVE( "RnpClientJob::executeOnWriteReady()" ); + } + +void RnpClientJob::specificCleanUpOnTimeout() throw() + { + ENTER( "RnpClientJob::specificCleanUpOnTimeout()" ); + + answerOk = false; + resetState(); + + LEAVE( "RnpClientJob::specificCleanUpOnTimeout()" ); + } + +void RnpClientJob::executeOnReadError() throw() + { + ENTER( "RnpClientJob::executeOnReadError()" ); + + answerOk = false; + resetState(); + + LEAVE( "RnpClientJob::executeOnReadError()" ); + } + +void RnpClientJob::executeOnWriteError() throw() + { + ENTER( "RnpClientJob::executeOnWriteError()" ); + + answerOk = false; + resetState(); + + LEAVE( "RnpClientJob::executeOnWriteError()" ); + } + +CommBuffer* RnpClientJob::getAnswerBuffer() throw() + { + return rnpReceiver.getMessageBuffer(); + } + +bool RnpClientJob::isAnswerOk() throw() + { + return answerOk; + } + +bool RnpClientJob::isInvalidFormat() throw() + { + return invalidFormat; + } + + +//################################################### + +RnpBaseClientComm::RnpBaseClientComm(RnpQuark theServerType, RnpTransport::CarrierProtocol theProtocol) throw() + { + ENTER( "RnpBaseClientComm::RnpBaseClientComm( serverType="<<theServerType<<" protocol="<<theProtocol << " )" ); + + serverHost = NULL; + serverPort = 0; + serverType = theServerType; + carrierProtocol = theProtocol; + + initDefaultCommunication(); + maxRetry = 0; // # of RE-tries -- PB 2005-aug-31 + + LEAVE( "RnpBaseClientComm::RnpBaseClientComm()" ); + } + +RnpBaseClientComm::RnpBaseClientComm(const char* theServerHost, int theServerPort, RnpQuark theServerType, RnpTransport::CarrierProtocol theProtocol) throw() + { + ENTER( "RnpBaseClientComm::RnpBaseClientComm( server="<<theServerHost<<" port="<<theServerPort<<" serverType="<<theServerType<<" protocol="<<theProtocol << " )" ); + + if ( ! ( theServerHost != 0 ) ) + { + TALK( "RnpBaseClientComm::RnpBaseClientComm(): warning: assert will fire." ); + } + assert(theServerHost != 0); + if ( ! ( theServerPort > 0 ) ) + { + TALK( "RnpBaseClientComm::RnpBaseClientComm(): warning: assert will fire." ); + } + assert(theServerPort > 0); + + serverHost = theServerHost; + serverPort = theServerPort; + serverType = theServerType; + carrierProtocol = theProtocol; + + initDefaultCommunication(); + + maxRetry = 0; // # of RE-tries -- PB 2005-aug-31 + + LEAVE( "RnpBaseClientComm::RnpBaseClientComm()" ); + } +RnpBaseClientComm::~RnpBaseClientComm() throw() + { + } + +void RnpBaseClientComm::setConnectionParameters(const char* theServerHost, int theServerPort) throw() + { + ENTER( "RnpBaseClientComm::setConnectionParameters( server="<<theServerHost<<" port="<<theServerPort << " )" ); + + if ( ! ( theServerHost != 0 ) ) + { + TALK( "RnpBaseClientComm::setConnectionParameters(): warning: assert will fire." ); + } + assert(theServerHost != 0); + if ( ! ( theServerPort > 0 ) ) + { + TALK( "RnpBaseClientComm::setConnectionParameters(): warning: assert will fire." ); + } + assert(theServerPort > 0); + + serverHost = theServerHost; + serverPort = theServerPort; + + LEAVE( "RnpBaseClientComm::setConnectionParameters()" ); + } + +void RnpBaseClientComm::setCarrierProtocol(RnpTransport::CarrierProtocol theProtocol) throw() + { + carrierProtocol = theProtocol; + } + +RnpTransport::CarrierProtocol RnpBaseClientComm::getCarrierProtocol() throw() + { + return carrierProtocol; + } + +void RnpBaseClientComm::initDefaultCommunication() throw() + { + ENTER( "RnpBaseClientComm::initDefaultCommunication()" ); + + communicatorPtr = &internalCommunicator; + + communicatorPtr->initJobs(1); + communicatorPtr->setTimeout(RNP_COMM_TIMEOUT,0); // defined in raslib/rminit.hh -- PB 2005-sep-09 + + communicatorPtr->attachJob(clientJob); + + // not necessary? transmitterBuffer.allocate(RNP_DEFAULTBUFFERSIZE); + + LEAVE( "RnpBaseClientComm::initDefaultCommunication()" ); + } + + +void RnpBaseClientComm::jobIsReady() throw() + { + ENTER( "RnpBaseClientComm::jobIsReady()" ); + + communicatorPtr->shouldExit(); + + LEAVE( "RnpBaseClientComm::jobIsReady()" ); + } + +void RnpBaseClientComm::startRequest(RnpQuark command, int transmitterBufferSize) + { + ENTER( "RnpBaseClientComm::startRequest( command="<<command<<" transmitterBufferSize="<<transmitterBufferSize << " )" ); + + transmitterBuffer.allocate(transmitterBufferSize); + + clientJob.init(&transmitterBuffer,this); + + encoder.setBuffer(&transmitterBuffer); + + encoder.startRequest(serverType, carrierProtocol); + encoder.startFragment(Rnp::fgt_Command, command); + + LEAVE( "RnpBaseClientComm::startRequest()" ); + } + +bool RnpBaseClientComm::sendRequestGetAnswer() + { + ENTER( "RnpBaseClientComm::sendMessageGetAnswer()" ); + + if ( ! ( serverHost != NULL ) ) + { + TALK( "RnpBaseClientComm::sendRequestGetAnswer(): warning: assert will fire." ); + } + assert(serverHost != NULL); + if ( ! ( serverPort > 0 ) ) + { + TALK( "RnpBaseClientComm::sendRequestGetAnswer(): warning: assert will fire." ); + } + assert(serverPort > 0); + + encoder.endFragment(); + encoder.endMessage(); + + bool connected = false; + for (int retry = 0; retry < maxRetry+1 && !connected; retry++) // NB: first attempt + RE-tries! -- PB 2005-aug-31 + { + connected = clientJob.connectToServer(serverHost,serverPort); + } + + if(connected == false) + { +#ifdef AFTERV52 + LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): exception - cannot connect to server "<<serverHost<<":"<<serverPort ); + throw RnpIOException(clientJob.getErrno()); +#endif + LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): -> false" ); + return false; + } + + communicatorPtr->runClient(); + + if(clientJob.isAnswerOk()== false) + { +#ifdef AFTERV52 + LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): exception - answer not OK" ); + if(clientJob.isInvalidFormat()) throw RnpInvalidFormatException(); + else throw RnpIOException(clientJob.getErrno()); +#endif + LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): -> false" ); + return false; + } + + CommBuffer* receiverBuffer = clientJob.getAnswerBuffer(); + decoder.decode(receiverBuffer); + decoder.getFirstFragment(); + + LEAVE( "RnpBaseClientComm::sendMessageGetAnswer() -> true"); + return true; + } + +bool RnpBaseClientComm::checkForExceptions() + { + if(decoder.getFragmentType() != Rnp::fgt_Error) return false; + return true; + } + +void RnpBaseClientComm::clearAnswer() throw() + { + clientJob.clearAnswerBuffer(); + } + +void RnpBaseClientComm::setMaxRetry(unsigned int newMaxRetry) + { + maxRetry = newMaxRetry; + } + +unsigned int RnpBaseClientComm::getMaxRetry() + { + return maxRetry; + } +//############# Server side ################################################ +//####################################################################### +//####################################################################### + +RnpServerJob::RnpServerJob() throw() + { + } + +void RnpServerJob::init(RnpBaseServerComm* theServerComm) throw() + { + if ( ! ( theServerComm != 0 ) ) + { + TALK( "RnpServerJob::init(): warning: assert will fire." ); + } + assert(theServerComm != 0); + + rnpReceiver.reset(); + currentBufferPtr = rnpReceiver.getCurrentBuffer(); + serverCommPtr = theServerComm; + + status=wks_accepting; + } + +void RnpServerJob::processRequest() throw() + { + serverCommPtr->processRequest(currentBufferPtr, &transmiterBuffer, rnpReceiver.getCarrierProtocol(), this); + + rnpReceiver.reset(); + + currentBufferPtr = &transmiterBuffer; + + readyToWriteAnswer(); + } + +bool RnpServerJob::validateMessage() throw() + { + + bool validated = false; + + if(rnpReceiver.validateMessage() == true) + { + status=wks_processing; + validated = true; + } + + if(rnpReceiver.isDiscarding()) + { + resetJob(); + validated = false; + } + + currentBufferPtr = rnpReceiver.getCurrentBuffer(); + + return validated; + } + +void RnpServerJob::executeOnWriteReady() throw() + { + resetJob(); + } + +void RnpServerJob::executeOnAccept() throw() + { + } + +void RnpServerJob::specificCleanUpOnTimeout() throw() + { + // initial era gol, dar... + // clearConnection face cine apeleaza: NbJob::cleanUpIfTimeout() + rnpReceiver.reset(); + + transmiterBuffer.freeBuffer(); + + currentBufferPtr = rnpReceiver.getCurrentBuffer(); + + currentBufferPtr->clearToRead(); + + status=wks_accepting; + } + +void RnpServerJob::executeOnReadError() throw() + { + resetJob(); + } + +void RnpServerJob::executeOnWriteError() throw() + { + resetJob(); + } + +void RnpServerJob::resetJob() throw() + { + clearConnection(); + + rnpReceiver.reset(); + + transmiterBuffer.freeBuffer(); + + currentBufferPtr = rnpReceiver.getCurrentBuffer(); + + currentBufferPtr->clearToRead(); + + status=wks_accepting; + } + +//################################################### +RnpBaseServerComm::RnpBaseServerComm() throw() + { + nrServerJobs = 1; + + transmitterBufferSize = RNP_DEFAULTBUFFERSIZE; + + communicator = NULL; + } + +RnpBaseServerComm::~RnpBaseServerComm() throw() + { + disconnectFromCommunicator(); + } + +bool RnpBaseServerComm::setServerJobs(int nrOfServerJobs) throw() + { + if(communicator != 0 ) return false; + + nrServerJobs = nrOfServerJobs; + + return true; + } + +int RnpBaseServerComm::countServerJobs() throw() + { + return nrServerJobs; + } + +void RnpBaseServerComm::connectToCommunicator(NbCommunicator &theCommunicator) + { // throws whatever 'new' throws + if ( ! ( communicator == NULL ) ) + { + TALK( "RnpServerJob::init(): warning: assert will fire." ); + } + assert(communicator == NULL); + + communicator = &theCommunicator; + + for(int i=0; i<nrServerJobs;i++) + { + RnpServerJob* job = createJob(); + + job->init(this); + + communicator->attachJob(*job); + + serverJob.push_back(job); + } + } + +bool RnpBaseServerComm::disconnectFromCommunicator() throw() + { + if(communicator == NULL) return false; + + for(int i=0; i<nrServerJobs;i++) + { + communicator->deattachJob(*(serverJob[i])); + + delete serverJob[i]; + } + + serverJob.clear(); + + communicator = NULL; + + return true; + } + +RnpServerJob* RnpBaseServerComm::createJob() + { + return new RnpServerJob; + } + + +void RnpBaseServerComm::setTransmitterBufferSize(int nSize) throw() + { + transmitterBufferSize = nSize; + } + +int RnpBaseServerComm::getTransmitterBufferSize() throw() + { + return transmitterBufferSize; + } + + +void RnpBaseServerComm::processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol protocol, RnpServerJob *callingJob) throw() + { + // use 'callingJob' to get info about the client (hostaddress, etc) + + decoder.decode(receiverBuffer); + RnpQuark destServerType = decoder.getDestinationServerType(); + Rnp::Endianness desEndianness = decoder.getDesiredEndianness(); + + // test if servertype matches! + + transmiterBuffer->allocate(transmitterBufferSize); + transmiterBuffer->clearToRead(); + + encoder.setBuffer(transmiterBuffer); + encoder.setFinalEndianness(desEndianness); + encoder.startAnswer(destServerType, protocol); + + decoder.getFirstFragment(); + bool wasError = false; + for(int fragment=0; fragment < decoder.countFragments(); fragment++) + { + if(wasError == false) + { + try + { + decodeFragment(); + } + catch(...) + { + wasError = true; + answerUnknownError(); + } + } + else + { + discardFragment(); + } + decoder.getNextFragment(); + } + encoder.endMessage(); + } + +const char* RnpBaseServerComm::getNextAsString(RnpQuark parameterType) const + { + decoder.getNextParameter(); + //if(decoder.getParameterType != parameterType) throw something + return decoder.getDataAsString(); + } + +int RnpBaseServerComm::getNextAsInteger(RnpQuark parameterType) const + { + decoder.getNextParameter(); + //if(decoder.getParameterType != parameterType) throw something + return decoder.getDataAsInteger(); + } + +float RnpBaseServerComm::getNextAsFloat(RnpQuark parameterType) const + { + decoder.getNextParameter(); + //if(decoder.getParameterType != parameterType) throw something + return decoder.getDataAsFloat(); + } + +double RnpBaseServerComm::getNextAsDouble(RnpQuark parameterType) const + { + decoder.getNextParameter(); + //if(decoder.getParameterType != parameterType) throw something + return decoder.getDataAsDouble(); + } + +const void* RnpBaseServerComm::getNextAsOpaque(RnpQuark parameterType) const + { + decoder.getNextParameter(); + //if(decoder.getParameterType != parameterType) throw something + return decoder.getDataAsOpaque(); + } + +int RnpBaseServerComm::getCurrentParameterLength() const throw() + { + return decoder.getDataLength(); + } + +void RnpBaseServerComm::answerSTLException(exception &ex) throw() + { + encoder.startFragment(Rnp::fgt_Error, decoder.getCommand()); + encoder.addInt32Parameter(Rnp::ert_StlException, 0); + encoder.addStringParameter(Rnp::erp_whatValue, ex.what()); + encoder.endFragment(); + } + +void RnpBaseServerComm::answerUnknownError() throw() + { + encoder.startFragment(Rnp::fgt_Error, decoder.getCommand()); + encoder.addInt32Parameter(Rnp::ert_Unknown, 0); + encoder.endFragment(); + } + +void RnpBaseServerComm::discardFragment() throw() + { + encoder.startFragment(Rnp::fgt_DiscardedRequest, decoder.getCommand()); + + encoder.endFragment(); + } + +void RnpBaseServerComm::startOkAnswer() throw() + { + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + } + +void RnpBaseServerComm::endOkAnswer() throw() + { + encoder.endFragment(); + } + +void RnpBaseServerComm::communicatorShouldExit() throw() + { + if ( ! ( communicator != NULL ) ) + { + TALK( "RnpServerJob::init(): warning: assert will fire." ); + } + assert(communicator != NULL); + + communicator->shouldExit(); + } + diff --git a/rnprotocol/rnpcommunication.hh b/rnprotocol/rnpcommunication.hh new file mode 100644 index 0000000..56aca81 --- /dev/null +++ b/rnprotocol/rnpcommunication.hh @@ -0,0 +1,345 @@ +#ifndef RNPCOMMUNICATION_HH +#define RNPCOMMUNICATION_HH +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/**************************************************************************** + * + * + * COMMENTS: + * + * + ****************************************************************************/ + +#include "network/akgnetwork.hh" +#include "rnprotocol/rnpembedded.hh" +#include <vector> + +#ifdef AFTERV52 + #include <akglogging.hh> + #include <akg_exception.hh> +#else + #include <exception> +#endif + +namespace rnp + { +using namespace akg; +using namespace std; + +class RnpBaseClientComm; + +/** This class represents the RNP client job. It taker a CommBuffer, sends its data + and receives the answer. Is directy owned and used by 'RnpBaseClientComm', + so you don't have to worry about it + Be aware that the transmitter buffer is freed after transmission! */ +class RnpClientJob : public NbClientJob + { + public: + /// Default constructor + RnpClientJob() throw(); + + /** Initialization: takes the tarnsmitter buffer containing data to be send + and a pointer to a Communicator object, which will coordinate the transmission + Assert: transmitterBuffer!=0, newClientComm !=0 */ + void init(CommBuffer *transmitterBuffer, RnpBaseClientComm *newClientComm) throw(); + + /// Call-back function for the Communicator. + void processRequest() throw(); + + /** Returns a pointer to the buffer containing the answer. The buffer + holds only the RNP message, without carrier header */ + CommBuffer* getAnswerBuffer() throw(); + + /// Returns 'true' if the answer was correctly received + bool isAnswerOk() throw(); + + /// Returns true if the format of the received message is not valid RNP and was discarded + bool isInvalidFormat() throw(); + + /** Clears the answer buffer. Important if huge amount of data where received. + The buffer is cleared by the next transmission, also. */ + void clearAnswerBuffer() throw(); + protected: + /// (See the explanations from NbJob) + bool validateMessage() throw(); + void executeOnWriteReady() throw(); + void specificCleanUpOnTimeout() throw(); + void executeOnReadError() throw(); + void executeOnWriteError() throw(); + + /// Resets the object: clears the connection and marks the job as ready + void resetState() throw(); + private: + RnpBaseClientComm *clientCommPtr; + + RnpReceiver rnpReceiver; + bool answerOk; + bool invalidFormat; + }; + +/** + RnpBaseClientComm is the base class for the client communication. It offers + the necessary elements for creating the request, send it, receive the answer + and decode it. Every specific client comm will inherit from this and will + implement the various functions using the functions provided by this class. + + It has a private NbCommunicator object, but if you need a shared one + be my guest. The RnpClientJob is its own also and this stays like that! +*/ +class RnpBaseClientComm + { + public: + /// Constructor taking the server type and the carrier protocol + RnpBaseClientComm(RnpQuark serverType, RnpTransport::CarrierProtocol = RnpTransport::crp_Rnp) throw(); + + /** Constructor taking also the connection info for the server + Assert: serverHost != 0, serverPort > 0 */ + RnpBaseClientComm(const char* serverHost, int serverPort, RnpQuark serverType, RnpTransport::CarrierProtocol = RnpTransport::crp_Rnp) throw(); + + /// Destructor + virtual ~RnpBaseClientComm() throw(); + + /** Set the connection parameter + Assert: serverHost != 0, serverPort > 0 */ + void setConnectionParameters(const char* serverHost, int serverPort) throw(); + + /// Set the carrier protocol + void setCarrierProtocol(RnpTransport::CarrierProtocol) throw(); + /// Returns the used carrier protocol + RnpTransport::CarrierProtocol getCarrierProtocol() throw(); + + // callback from RnpClientJob + void jobIsReady() throw(); + + // Set the maximal retry count (retries to connect to the server) + void setMaxRetry(unsigned int newMaxRetry); + + /// Returns the maximal retry count + unsigned int getMaxRetry(); + + protected: + // stuff for helping creating the function calls + RnpQuark serverType; + RnpTransport::CarrierProtocol carrierProtocol; + + /// Start building the request, might throw whatever new throws + void startRequest(RnpQuark command, int transmitterBufferSize = RNP_DEFAULTBUFFERSIZE); + + /** Does the dirty work: sends the request and brings the answer + Later it will throw various exceptions, but for now it only + returns 'true' if everything is OK + Assert: serverHost != 0, serverPort > 0 */ + bool sendRequestGetAnswer(); + + /** detects an exception as answer and throws it. this version only Akg and STL + returns true if there is an exception, but can't reassemble it + returns false if there is a correct answer, no exception + doesn't return, but throws, if there is an exception and it can reassemble it*/ + virtual bool checkForExceptions(); + +#ifdef AFTERV52 + // reassembles and throws an AkgSerializableException. Returns if it isn't an Akg... + void reassembleAkgSerializable() throw(AkgSerializableException); + + // reassembles and throws a STL-exception. Returns only if it isn't a stl-exception + void reassembleStlException() throw(RnpStlException); +#endif + /// Clear the answer when you don't need it any more, memory is released + void clearAnswer() throw(); + + /** Default communication init, build another init() if you don't like this + This sets 1 job, 60sec as timeout, attaches the internal job. + Be aware that this timeout is not the timeout of the client job, + but the one of the communicator */ + void initDefaultCommunication() throw(); + + // encoding and decoding + RnpProtocolDecoder decoder; + RnpTransmitter encoder; + CommBuffer transmitterBuffer; // to go, use internal of encoder + + // stuff for non blocking communication + RnpClientJob clientJob; // the client job + NbCommunicator *communicatorPtr; // the communicator to be used + NbCommunicator internalCommunicator; // an internal communicator, if you dont like that you put another one + + // connection parameters + const char* serverHost; + unsigned int serverPort; + unsigned int maxRetry; + + /// Helper function for ptinting the current parameter + void printCurrentParameter() throw(); + }; + + +//############ Server side ################################### + +class RnpBaseServerComm; + +/** This class represents the RNP server job. It receives the request, sends it to 'RnpBaseServerComm' + for processing and gets from there the answer which it transmittes to the client +*/ +class RnpServerJob : public NbServerJob + { + public: + /// Default constructor + RnpServerJob() throw(); + + /** Initialization: it connects to the given 'RnpBaseServerComm' + Assert: theServerComm != 0 */ + void init(RnpBaseServerComm*) throw(); + + /// Calls the 'RnpBaseServerComm->processRequest()' and than initiates the transmission + void processRequest() throw(); + + protected: + /// (See explanations from NbJob) + bool validateMessage() throw(); + void executeOnAccept() throw(); + void executeOnWriteReady() throw(); + void specificCleanUpOnTimeout() throw(); + void executeOnReadError() throw(); + void executeOnWriteError() throw(); + + void resetJob() throw(); + + RnpBaseServerComm *serverCommPtr; + + RnpReceiver rnpReceiver; + + CommBuffer transmiterBuffer; + }; + +/** + RnpBaseServerComm is the base class for the server communication. It offers + the necessary elements for decoding the request, and for creating and transmitting + the answer. Every specific server comm will inherit from this and will + implement the various functions, most important the 'processRequest()', + using the elements provided by this class. + + It has a pool of 'RnpServerJob's which deal with the communication. Whichever has + a valid request calls 'processRequest()'. The communicator object is external +*/ + +class RnpBaseServerComm + { + public: + /// Default constructor - 1 server job + RnpBaseServerComm() throw(); + + /// Destructor + virtual ~RnpBaseServerComm() throw(); + + /** Sets the number of server jobs, only if there is no connection to a communicator + Otherwise it changes nothing and returns 'false' */ + bool setServerJobs(int nrOfServerJobs) throw(); + + /// Returns the number of server jobs + int countServerJobs() throw(); + + /// Connect to the communicator. It also creates the jobs. Throws whatever new throws. Assert: no other connection! + void connectToCommunicator(NbCommunicator&); + + /** Disconnect the jobs from the communicator and destroys them. + Returns 'false' if there wasn't any connection to a communicator */ + bool disconnectFromCommunicator() throw(); + + /// Set the transmitter buffer size + void setTransmitterBufferSize(int) throw(); + + /// Returns the transmitter buffer size + int getTransmitterBufferSize() throw(); + + /** The heart of the class. It takes the request, decodes it, sends every fragment + to the 'decodeFragment()', which has to dispatch the commands to the specific + functions. These functions have to use 'decoder' and 'encoder' to do their job and + might throw whatever is appropriate. 'processRequest()' catches 'AkgException', + 'exception' and (...) and converts them for transmission. + If you don't like this version, make another one */ + virtual void processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol, RnpServerJob *callingJob) throw(); + + /** Instructs the communicator that it should exit. Usefull to implement + 'down server' commands */ + void communicatorShouldExit() throw(); + + protected: + /** Called by 'processRequest' to dispatch to the specific functions + Might throw whatever appropriate */ + virtual void decodeFragment() = 0; + + /// Returns next parameter as string(can be NULL), verifying the parameter type. + const char* getNextAsString(RnpQuark parameterType) const; + + /// Returns next parameter as integer, verifying the parameter type. + int getNextAsInteger(RnpQuark parameterType) const; + + /// Returns next parameter as float, verifying the parameter type. + float getNextAsFloat(RnpQuark parameterType) const; + + /// Returns next parameter as double, verifying the parameter type. + double getNextAsDouble(RnpQuark parameterType) const; + + /// Returns next parameter as const void* (can be NULL), verifying the parameter type. + const void* getNextAsOpaque(RnpQuark parameterType) const; + + /// Returns the length of the data of the current parameter + int getCurrentParameterLength() const throw(); +#ifdef AFTERV52 + /// Helper function to serialize an 'AkgException' + void answerAkgSerializable(AkgSerializableException&) throw(); +#endif + /// Helper function to serialize an 'exception' (based on it's 'what()'-member + void answerSTLException(exception&) throw(); + + /// Helper function to serialize an unknown exception + void answerUnknownError() throw(); + + /// Helper function to discard a fragment + void discardFragment() throw(); + + /// Start building an OK-answer + void startOkAnswer() throw(); + + /// Just for completeness, it's only an 'encoder.endFragment()' + void endOkAnswer() throw(); + + RnpProtocolDecoder decoder; + RnpTransmitter encoder; + + private: + /** Creates a server jobs. Default is a RnpServerJob, but you might want + some other kind of job */ + virtual RnpServerJob* createJob(); + + vector<RnpServerJob*> serverJob; + + int nrServerJobs; + + NbCommunicator *communicator; + + int transmitterBufferSize; + }; + +} // namespace +#endif diff --git a/rnprotocol/rnpembedded.cc b/rnprotocol/rnpembedded.cc new file mode 100644 index 0000000..c0362bb --- /dev/null +++ b/rnprotocol/rnpembedded.cc @@ -0,0 +1,467 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/**************************************************************************** + * + * + * COMMENTS: + * + ****************************************************************************/ + +#include <rnpembedded.hh> +#include <assert.h> + +#include "debug.hh" + + +using namespace rnp; + +const char* RnpTransport::carrierNames[] = + { + "(unknown)","RNP","HTTP","(bad)" + }; + +const char* RnpTransport::getCarrierName(RnpTransport::CarrierProtocol x) throw() + { + if(x < crp_Unknown || x >= crp_HowMany) return carrierNames[0]; + return carrierNames[x]; + } + + +const int RnpReceiver::headerBufferLength = 1000; + +RnpReceiver::RnpReceiver() throw() + { + headerBuffer.allocate(headerBufferLength); + + reset(); + } + +RnpReceiver::~RnpReceiver() throw() + { + } + +void RnpReceiver::reset() throw() + { + rnpMessageBuffer.freeBuffer(); + headerBuffer.clearToRead(); + status = waitingHeader; + carrier = RnpTransport::crp_Unknown; + } + +akg::CommBuffer* RnpReceiver::getCurrentBuffer() throw() + { + return status == readingMessage ? &rnpMessageBuffer : &headerBuffer; + } + +akg::CommBuffer* RnpReceiver::getMessageBuffer() throw() + { + return &rnpMessageBuffer; + } + +RnpTransport::CarrierProtocol +RnpReceiver::getCarrierProtocol() const throw() + { + return carrier; + } + +int RnpReceiver::getCarrierHeaderSize() const throw() + { + return carrierHeaderLength; + } + +const void* RnpReceiver::getCarrierHeader() throw() + { + return headerBuffer.getData(); + } + +bool RnpReceiver::isDiscarding() const throw() + { + return status == discarding ? true:false; + } + +bool RnpReceiver::validateMessage() throw() + { + ENTER( "RnpReceiver::validateMessage()"); + + if(status == waitingHeader) + { rnpHeader = NULL; + + if(isHttpCarrier() || isRnpCarrier()) + { // a valid carrier header was detected + if(rnpHeader != NULL) + { // we can switch to reading the message + if(prepareMessageBuffer()) + { + status = readingMessage; + } + else { + status = discarding; + LEAVE( "RnpReceiver::validateMessage() -> false - discarding message: not enough memory for message buffer."); + return false; + } + } + else + { + // status == readingHeader, but rnpHeader == NULL + // so we wait for some more message + LEAVE( "RnpReceiver::validateMessage - wait for more message(s)"); + return false; + } + } + else + { status = discarding; + LEAVE( "RnpReceiver::validateMessage() -> false - discarding message: no valid carrier header."); + return false; + } + } + + if(status == readingMessage) + { + if(rnpMessageBuffer.getNotFilledSize() != 0) + { + LEAVE( "RnpReceiver::validateMessage() -> false"); + return false; + } + } + if(status == discarding) + { + TALK( "RnpReceiver::validateMessage - discarding(3)." ); + headerBuffer.clearToRead(); + LEAVE( "RnpReceiver::validateMessage() -> false"); + return false; + } + LEAVE( "RnpReceiver::validateMessage() -> true"); + return true; + } + +/* the isXXXCarrier() functions have to: + - return true if the message is or might be an XXX embedded RnpMessage + - set rnpHeader only if there is a valid carrier header + - set carrierHeaderLength in this case + + so: + - invalid carrier returns false / rnpHeader == NULL + - valid carrier but not rnp - false /rnpHeader != NULL + - valid carrier but not enough data to be sure it's also valid rnp - true/rnpHeader == NULL + - valid carrier & valid rnp header =>true/rnpHeader != NULL carrierHeaderLength set +*/ + +bool RnpReceiver::isHttpCarrier() throw() + { + ENTER( "RnpReceiver::isHttpCarrier()" ); + + char *data = (char*)headerBuffer.getData(); + + rnpHeader = NULL; + carrierHeaderLength = -1; + + bool isHttpReqHeader = strncmp(data,"POST RnpMessage HTTP/1.1",24) ? false : true; + bool isHttpAnsHeader = strncmp(data,"HTTP/1.1 200 OK" ,15) ? false : true; + + if(isHttpReqHeader == false && isHttpAnsHeader == false) + { + LEAVE( "RnpReceiver::isHttpCarrier() -> false" ); + return false; + } + + char *eoHttp = strstr(data,"\r\n\r\n"); + + if(eoHttp == NULL) + { + LEAVE( "RnpReceiver::isHttpCarrier() -> false" ); + return false; + } + + carrierHeaderLength = eoHttp - data + 4; + + if( carrierHeaderLength == -1) + { + LEAVE( "RnpReceiver::isHttpCarrier() -> false" ); + return false; + } + + if(carrierHeaderLength + sizeof(RnpHeader) > headerBuffer.getDataSize()) + { // is HTTP carrier, but we need more data to say if it's an embedded RnpMessage + LEAVE( "RnpReceiver::isHttpCarrier() -> true" ); + return true; + } + + rnpHeader = (RnpHeader*)(data + carrierHeaderLength); + + bool isRnp = rnpHeader->isRnpMessage(); + + if(isRnp) + { carrier = RnpTransport::crp_Http; + TALK( "RnpReceiver::isHttpCarrier - valid HTTP carrier detected."); + } + + LEAVE( "RnpReceiver::isHttpCarrier() -> " << isRnp); + return isRnp; + } + +bool RnpReceiver::isRnpCarrier() throw() + { + ENTER( "RnpReceiver::isRnpCarrier()" ); + + char *data = (char*)headerBuffer.getData(); + + rnpHeader = NULL; + carrierHeaderLength = -1; + + if(sizeof(RnpHeader) > headerBuffer.getDataSize()) + { // we need more data to say if it's an RnpMessage + LEAVE( "RnpReceiver::isRnpCarrier() -> true" ); + return true; + } + + rnpHeader = (RnpHeader*)data; + carrierHeaderLength = 0; + + bool isRnp = rnpHeader->isRnpMessage(); + + if(isRnp) + { carrier = RnpTransport::crp_Rnp; + TALK( "RnpReceiver::isRnpCarrier - valid RNP carrier detected!"); + } + + LEAVE( "RnpReceiver::isRnpCarrier() -> " << isRnp ); + return isRnp; + } + +bool RnpReceiver::prepareMessageBuffer() throw() + { + if(rnpMessageBuffer.allocate(rnpHeader->getTotalLength()) == false) + return false; + + char *data = (char*)headerBuffer.getData(); + + rnpMessageBuffer.read(data + carrierHeaderLength, headerBuffer.getDataSize() - carrierHeaderLength); + + RnpHeader *nRnpHeader = (RnpHeader*)rnpMessageBuffer.getData(); + + return true; + } + +//######################################################################################################## + +RnpTransmitter::RnpTransmitter() throw() + { + carrier = NULL; + carrierType = RnpTransport::crp_Http; + } + +RnpTransmitter::~RnpTransmitter() throw() + { + if(carrier != NULL) delete carrier; + } + +bool RnpTransmitter::startRequest(RnpQuark serverType, RnpTransport::CarrierProtocol desiredProtocol) throw() + { + getCarrierObject(desiredProtocol); + + if(carrier == NULL) return false; + + startMessage(serverType, carrier->getRequestHeaderLength()); + + return true; + } + +bool RnpTransmitter::startAnswer(RnpQuark serverType, RnpTransport::CarrierProtocol desiredProtocol) throw() + { + getCarrierObject(desiredProtocol); + + if(carrier == NULL) return false; + + startMessage(serverType, carrier->getAnswerHeaderLength()); + + return true; + } + +akg::CommBuffer* RnpTransmitter::endMessage() throw() + { + if(carrier == NULL) return NULL; + + akg::CommBuffer *theBuffer = RnpProtocolEncoder::endMessage(); + carrier->putHeader(theBuffer); + + return theBuffer; + } + +RnpTransport::CarrierProtocol +RnpTransmitter::getCarrierProtocol() throw() + { + return carrierType; + } + +RnpCarrier* RnpTransmitter::getCarrierObject(RnpTransport::CarrierProtocol desiredProtocol) throw() + { + carrierType = desiredProtocol; + + if(carrier != NULL) delete carrier; + + switch(carrierType) + { + case RnpTransport::crp_Rnp : carrier = new RnpCarrier;break; + case RnpTransport::crp_Http: carrier = new HttpRnpCarrier;break; + + case RnpTransport::crp_BadCarrier: + carrier = new BadRnpCarrier;break; + + default : carrier = NULL;break; + } + + return carrier; + } + +int RnpTransmitter::getBufferSize() const throw() + { + if ( ! ( commBuffer != 0 ) ) + { + TALK( "RnpTransmitter::getBufferSize(): warning: assert will fire." ); + } + assert(commBuffer != 0); + return commBuffer->getBufferSize(); + } + +int RnpTransmitter::getNotFilledSize() const throw() + { + if ( ! ( commBuffer != 0 ) ) + { + TALK( "RnpTransmitter::getNotFilledSize(): warning: assert will fire." ); + } + assert(commBuffer != 0); + return commBuffer->getNotFilledSize(); + } + +int RnpTransmitter::getDataSize() const throw() + { + if ( ! ( commBuffer != 0 ) ) + { + TALK( "RnpTransmitter::getDataSize(): warning: assert will fire." ); + } + assert(commBuffer != 0); + return commBuffer->getDataSize(); + } + +//################################################ + +RnpCarrier::RnpCarrier() throw() + { + type = RnpTransport::crp_Rnp; + } + +RnpCarrier::~RnpCarrier() throw() + { + } + +RnpTransport::CarrierProtocol RnpCarrier::getType() throw() + { + return type; + } + +int RnpCarrier::getRequestHeaderLength() throw() + { + requestHeader = true; + return 0; + } + +int RnpCarrier::getAnswerHeaderLength() throw() + { + requestHeader = false; + return 0; + } + +void RnpCarrier::putHeader(akg::CommBuffer*) throw() + { // nothing!! + } + + +HttpRnpCarrier::HttpRnpCarrier() throw() + { + type = RnpTransport::crp_Http; + } + + +int HttpRnpCarrier::getRequestHeaderLength() throw() + { + requestHeader = true; + return strlen(theRequestHeader); + } + +int HttpRnpCarrier::getAnswerHeaderLength() throw() + { + requestHeader = false; + return strlen(theAnswerHeader); + } + +void HttpRnpCarrier::putHeader(akg::CommBuffer *messageBuffer) throw() + { + char *data = (char*)messageBuffer->getData(); + + const char *header = requestHeader ? theRequestHeader : theAnswerHeader; + int headerLength = strlen(header); + int posOfLength = headerLength - 14; + + strncpy(data,header,posOfLength); + + sprintf(data + posOfLength, "%10d",messageBuffer->getDataSize() - headerLength); + + strncpy(data + headerLength - 4,"\r\n\r\n",4); // it shouldn't be null terminated! + } + +const char HttpRnpCarrier::theRequestHeader[]= + "POST RnpMessage HTTP/1.1\r\nAccept: bin/rnp\r\nUserAgent: RnpClient/1.0\r\nContent-length: uxxxyyyzzz\r\n\r\n"; + // ^10 digits + +const char HttpRnpCarrier::theAnswerHeader[]= + "HTTP/1.1 200 OK\r\nContent-type: bin/rnp\r\nContent-length: uxxxyyyzzz\r\n\r\n"; + + + +BadRnpCarrier::BadRnpCarrier() throw() + { + type = RnpTransport::crp_BadCarrier; + } + +int BadRnpCarrier::getRequestHeaderLength() throw() + { + requestHeader = true; + return strlen(theHeader); + } + +int BadRnpCarrier::getAnswerHeaderLength() throw() + { + requestHeader = false; + return strlen(theHeader); + } + +void BadRnpCarrier::putHeader(akg::CommBuffer *messageBuffer) throw() + { + char *data = (char*)messageBuffer->getData(); + + strncpy(data,theHeader,strlen(theHeader)); + } + +const char BadRnpCarrier::theHeader[]="BadCarrier"; + + diff --git a/rnprotocol/rnpembedded.hh b/rnprotocol/rnpembedded.hh new file mode 100644 index 0000000..2e7c454 --- /dev/null +++ b/rnprotocol/rnpembedded.hh @@ -0,0 +1,256 @@ +#ifndef RNPEMBEDDED_HH +#define RNPEMBEDDED_HH +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/**************************************************************************** + * + * + * COMMENTS: + * + * + ****************************************************************************/ +#include "rnprotocol/rnprotocol.hh" + +namespace rnp + { + +/** RNP messages may be embedded in messages of the carrier protocol + We will use for now only HTTP carrier or none at all + But we define a BadCarrier for testing purposes +*/ + + +/** Class containing definitions and some helper functions +*/ +class RnpTransport + { + public: + enum CarrierProtocol + { + crp_Unknown, + crp_Rnp, + crp_Http, + crp_BadCarrier, + //.... + crp_HowMany + }; + + static const char* getCarrierName(CarrierProtocol) throw(); + private: + static const char* carrierNames[]; + }; + +/** + RnpReceiver is a class that is able to receive an message in a CommBuffer and decide if it is a + valid Rnp message, embedded or not. + + It is designed to be used by a NbJob for receiving the message and cooperate with it. + + If it is an invalid message or the message buffer can't be allocated, + the rest of the message is discarded and the NbJob has to close the connection and do appropriate cleaning + + The receiver has two buffers, a fixed length one, for header and a dynamic one for the RNP message +*/ + +class RnpReceiver + { + public: + /// Default constructor + RnpReceiver() throw(); + + /// Destructor + ~RnpReceiver() throw(); + + /// Resets the receiver, preparing for a new message + void reset() throw(); + + /// Returns a pointer to the current buffer (the header one or the message one) + akg::CommBuffer* getCurrentBuffer() throw(); + + /** Returns a pointer to the message buffer, which contains the RNP message, + whitout any carrier header */ + akg::CommBuffer* getMessageBuffer() throw(); + + /// Returns 'true' if the whole message was received, 'false' if more data is expected + bool validateMessage() throw(); + + /** Returns 'true' if an error occured and the message has to be discarded + If validate()==false and isDiscarding()==true => NbJob has to reset receiver and close connection*/ + bool isDiscarding() const throw(); + + /// Returns the type of the carrier protocol + RnpTransport::CarrierProtocol getCarrierProtocol() const throw(); + + /// Returns the size of the carrier header + int getCarrierHeaderSize() const throw(); + + /// Returns a pointer to the carrier header + const void* getCarrierHeader() throw(); + + private: + + enum Status + { + waitingHeader, + readingMessage, + discarding + }; + + + Status status; + + akg::CommBuffer headerBuffer; + akg::CommBuffer rnpMessageBuffer; + + RnpHeader *rnpHeader; + + RnpTransport::CarrierProtocol carrier; + int carrierHeaderLength; + + static const int headerBufferLength; + + bool isHttpCarrier() throw(); + bool isRnpCarrier() throw(); + bool prepareMessageBuffer() throw(); + + }; + +class RnpCarrier; + +/** Class for creating an embedded RNP message. Most methods are inherited + from RnpProtocolEncoder, it offers just convenient methods for + dealing with carriers +*/ +class RnpTransmitter : public RnpProtocolEncoder + { + public: + /// Default constructor + RnpTransmitter() throw(); + + /// Destructor + ~RnpTransmitter() throw(); + + /// Starts a new message, as a request, embedded in a specified protocol + bool startRequest(RnpQuark serverType, RnpTransport::CarrierProtocol) throw(); + + /// Starts a new message, as an answer, embedded in a specified protocol + bool startAnswer(RnpQuark serverType, RnpTransport::CarrierProtocol) throw(); + + /// ends the message, puts the carrier headers and, if requested, changes endianness + akg::CommBuffer* endMessage() throw(); + + /// Returns the carrier protocol + RnpTransport::CarrierProtocol getCarrierProtocol() throw(); + + /// Returns the total size of the buffer + int getBufferSize() const throw(); + + /// Return the space left in the buffer + int getNotFilledSize() const throw(); + + /// Returns the data size in the buffer + int getDataSize() const throw(); + private: + + RnpTransport::CarrierProtocol carrierType; + + /** Creates and returns a RnpCarrier object, based on the type got as parameter + It assignes the object to 'carrier' and it also destroys the previous + assigned object + */ + RnpCarrier* getCarrierObject(RnpTransport::CarrierProtocol) throw(); + RnpCarrier* carrier; + }; + +/** Base class for the various carriers, is itself the RNP carrier +*/ +class RnpCarrier + { + public: + /// Default constructor + RnpCarrier() throw(); + + /// Virtual destructor + virtual ~RnpCarrier() throw(); + + /// Returns the type of the object + RnpTransport::CarrierProtocol getType() throw(); + + /// Returns the length of the request header + virtual int getRequestHeaderLength() throw(); + + /// Returns the length of the answer header + virtual int getAnswerHeaderLength() throw(); + + /** Write the header directly into the reserved space of the buffer, + since the rest of the message is already there*/ + virtual void putHeader(akg::CommBuffer*) throw(); + + protected: + /// The type of the carrier + RnpTransport::CarrierProtocol type; + + /// Flag for 'putHeader' to know which header to write + bool requestHeader; + + }; + +/** The HTTP-carrier +*/ +class HttpRnpCarrier : public RnpCarrier + { + public: + /// Default constructor + HttpRnpCarrier() throw(); + + /// Returns the length of the request header + int getRequestHeaderLength() throw(); + + /// Returns the length of the answer header + int getAnswerHeaderLength() throw(); + + /// Writes the header into the buffer + void putHeader(akg::CommBuffer*) throw(); + + private: + static const char theRequestHeader[]; + static const char theAnswerHeader[]; + }; + +/** A 'bad carrier', just for testing purposes +*/ +class BadRnpCarrier : public RnpCarrier + { + public: + BadRnpCarrier() throw(); + + int getRequestHeaderLength() throw(); + int getAnswerHeaderLength() throw(); + void putHeader(akg::CommBuffer*) throw(); + + private: + static const char theHeader[]; + }; + +} //namespace +#endif diff --git a/rnprotocol/rnprasserver.cc b/rnprotocol/rnprasserver.cc new file mode 100644 index 0000000..f877403 --- /dev/null +++ b/rnprotocol/rnprasserver.cc @@ -0,0 +1,72 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +#include "debug.hh" + +#include "rnprasserver.hh" + +// forever and ever +const RnpQuark RnpRasserver::serverID = 3072002; + + +const char* RnpRasserver::parameterTypeNames[pmt_HowMany] = + { + "(pmt_none)", "pmt_clientid" + }; + +const char* RnpRasserver::commandNames[cmd_HowMany] = + { + "(cmd_none)", "cmd_connect", "cmd_disconnect", + "cmd_opendb", "cmd_closedb", "cmd_beginta", + "cmd_committa", "cmd_abortta", "cmd_istaopen", + "cmd_queryhttp", "cmd_getnewoid", "cmd_queryrpc", + "cmd_getnextelem", "cmd_endtransfer", "cmd_getnextmdd", + "cmd_getnexttile", "cmd_updaterpc", "cmd_startinsTmdd", + "cmd_inserttile", "cmd_endinsmdd", "cmd_initupdate", + "cmd_gettypestruct", "cmd_startinsPmdd", "cmd_insertmdd", + "cmd_insertcoll", "cmd_removeobjfromcoll", "cmd_delobjbyoid", + "cmd_delcollbyname", "cmd_getcoll", "cmd_getcolloids", + "cmd_getobjecttype", "cmd_setformat", + + "cmd_createcollection", "cmd_createmdd", "cmd_extendmdd", + "cmd_gettiledomains" + }; + +const char* RnpRasserver::getParameterTypeName(RnpQuark pType) const throw() + { + if(0<= pType && pType < pmt_HowMany) return parameterTypeNames[pType]; + return undefValue; + } + +const char* RnpRasserver::getCommandName(RnpQuark cmd) const throw() + { + const char *result = NULL; + + if(0<= cmd && cmd < cmd_HowMany) + result = commandNames[cmd]; + else + result = undefValue; + + TALK( "RnpRasserver::getCommandName( " << cmd << " ) -> " << result ); + return result; + } + diff --git a/rnprotocol/rnprasserver.hh b/rnprotocol/rnprasserver.hh new file mode 100644 index 0000000..44b6473 --- /dev/null +++ b/rnprotocol/rnprasserver.hh @@ -0,0 +1,124 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +#ifndef RNPRASSERVER_HH +#define RNPRASSERVER_HH + +#include "rnprotocol/rnprotocol.hh" + +using namespace rnp; + +class RnpRasserver : public Rnp + { + public: + + static const RnpQuark serverID; // 3072002 + enum ParameterType + { + pmt_none, + pmt_clientid = 1, + pmt_rErrorString = 2, + pmt_dbname = 3, + pmt_accesmode = 4, + pmt_querystring = 5, + pmt_httpqanswer = 6, + pmt_oidstring = 7, + pmt_capability = 8, + pmt_transstatus = 9, + pmt_objecttype =10, + pmt_returnstatus =11, + pmt_skalarobject =12, + pmt_tiledata =13, + pmt_domain =14, + pmt_typename =15, + pmt_typelength =16, + pmt_typetype =17, + pmt_typestructure = 18, + pmt_collname = 19, + pmt_whichformat = 20, + pmt_format = 21, + pmt_formatparams = 22, + pmt_currentformat = 23, + pmt_storageformat = 24, + pmt_ispersistent = 25, + pmt_errorno = 26, + pmt_lineno = 27, + pmt_columnno = 28, + pmt_errortoken = 29, + pmt_indextype = 30, + //....... + pmt_HowMany + }; + + enum Command + { + cmd_none, + cmd_connect = 1, + cmd_disconnect = 2, + cmd_opendb = 3, + cmd_closedb = 4, + cmd_beginta = 5, + cmd_committa = 6, + cmd_abortta = 7, + cmd_istaopen = 8, + cmd_queryhttp = 9, + cmd_getnewoid = 10, + cmd_queryrpc = 11, + cmd_getnextelem = 12, + cmd_endtransfer = 13, + cmd_getnextmdd = 14, + cmd_getnexttile = 15, + cmd_updaterpc = 16, + cmd_startinsTmdd = 17, + cmd_inserttile = 18, + cmd_endinsmdd = 19, + cmd_initupdate = 20, + cmd_gettypestruct = 21, + cmd_startinsPmdd = 22, + cmd_insertmdd = 23, + cmd_insertcoll = 24, + cmd_remobjfromcoll = 25, + cmd_delobjbyoid = 26, + cmd_delcollbyname = 27, + cmd_getcoll = 28, + cmd_getcolloids = 29, + cmd_getobjecttype = 30, + cmd_setformat = 31, + //-- until here the compatibility functions -- + + cmd_createcollection = 32, + cmd_createmdd = 33, + cmd_extendmdd = 34, + cmd_gettiledomains = 35, + cmd_HowMany + }; + + const char* getParameterTypeName(RnpQuark) const throw(); + const char* getCommandName(RnpQuark) const throw(); + + protected: + /// Arrays containing the names of the various elements + static const char* parameterTypeNames[pmt_HowMany]; + static const char* commandNames[cmd_HowMany]; + + }; +#endif diff --git a/rnprotocol/rnprotocol.cc b/rnprotocol/rnprotocol.cc new file mode 100644 index 0000000..ce0deec --- /dev/null +++ b/rnprotocol/rnprotocol.cc @@ -0,0 +1,819 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/**************************************************************************** + * + * + * COMMENTS: + * + ****************************************************************************/ + +#include <rnprotocol.hh> +#include <assert.h> + +using namespace rnp; +using namespace std; + +#include "raslib/rminit.hh" +#include "debug.hh" + + +const RnpQuark Rnp::rnpProtocolId = 25112001; + + +const char* Rnp::endiannessNames[2]= + { + "big endian", "little endian" + }; + +const char* Rnp::fragmentTypeNames[Rnp::fgt_HowMany]= + { + "(fgt_none)", "Command", "OkAnswer", "ErrorAnswer" + }; + +const char* Rnp::dataTypeNames[Rnp::dtt_HowMany] = + { + "(dtt_none)", "Asciiz", "Int32","Float","Double","Opaque","NullPtr" + }; + +const char* Rnp::errorTypeNames[Rnp::ert_HowMany] = + { + "(ert_unknown)", "StlException", "OtherException" + }; + +const char* Rnp::errorParamNames[Rnp::erp_HowMany] = + { + "(erp_none)", "StlWhatValue" + }; + +const char* Rnp::undefValue = "(undef)"; + +const char* Rnp::getFragmentTypeName(RnpQuark fType) throw() + { + if(0<= fType && fType < fgt_HowMany) return fragmentTypeNames[fType]; + return undefValue; + } + +const char* Rnp::getDataTypeName(RnpQuark dType) throw() + { + if(0<= dType && dType < dtt_HowMany) return dataTypeNames[dType]; + return undefValue; + } + +const char* Rnp::getErrorTypeName(RnpQuark eType) throw() + { + if(0<= eType && eType < ert_HowMany) return errorTypeNames[eType]; + return undefValue; + } + +const char* Rnp::getErrorParamName(RnpQuark eName) throw() + { + if(0<= eName && eName < erp_HowMany) return errorParamNames[eName]; + return undefValue; + } + +const char* Rnp::getEndiannessName(Rnp::Endianness endianness) throw() + { + return endiannessNames[endianness]; + } + +Rnp::Endianness Rnp::detectHostEndianness() throw() + { + unsigned int uInteger = 1; + + char* ptr = (char*)&uInteger; + + return *ptr==1 ? littleEndian : bigEndian; + } + +RnpQuark Rnp::swapBytes(RnpQuark orig) throw() + { + RnpQuark result = orig; + + char *buf = (char*)&result; + + char tmp = buf[0]; + buf[0] = buf[3]; + buf[3] = tmp; + + tmp = buf[1]; + buf[1] = buf[2]; + buf[2] = tmp; + + return result; + } + +//############## RNP Header ################################# + +bool RnpHeader::isRnpMessage() const throw() + { + Rnp::Endianness hostEndianness = Rnp::detectHostEndianness(); + + RnpQuark cProtocolId = hostEndianness == Rnp::littleEndian ? protocolId : Rnp::swapBytes(protocolId); + + return cProtocolId == Rnp::rnpProtocolId ? true:false; + } + +Rnp::Endianness RnpHeader::getEndianness() const throw() + { + return (Rnp::Endianness)messageEndianness; + } + +RnpQuark RnpHeader::getTotalLength() const throw() + { + Rnp::Endianness endianness = (Rnp::Endianness)messageEndianness; + return endianness == Rnp::detectHostEndianness() ? totalMessageLength : Rnp::swapBytes(totalMessageLength); + } + +bool RnpHeader::changeEndianness(Rnp::Endianness newEndianness) throw() + { + Rnp::Endianness endianness = (Rnp::Endianness)messageEndianness; + + if(newEndianness == endianness) return false; // no change necessary + + messageEndianness = newEndianness; + + totalMessageLength = Rnp::swapBytes(totalMessageLength); + nrFragments = Rnp::swapBytes(nrFragments); + serverType = Rnp::swapBytes(serverType); + authInfoStart = Rnp::swapBytes(authInfoStart); + authInfoLength = Rnp::swapBytes(authInfoLength); + comprInfoStart = Rnp::swapBytes(comprInfoStart); + comprInfoLength = Rnp::swapBytes(comprInfoLength); + dataStart = Rnp::swapBytes(dataStart); + dataLength = Rnp::swapBytes(dataLength); + + return true; + } + +RnpFragmentHeader* RnpHeader::getFirstFragment() const throw() + { + return (RnpFragmentHeader*)((char*)this + dataStart); + } + +//############ RNP Fragment ################################## + +RnpFragmentHeader* RnpFragmentHeader::getNextFragment() const throw() + { + char *ptr = (char*)this; + + return (RnpFragmentHeader*)(ptr + totalLength); + } + +RnpParameter* RnpFragmentHeader::getFirstParameter() const throw() + { + return (RnpParameter*)(this+1); + } + +void RnpFragmentHeader::changeEndianness() throw() + { + // there is no info about the initial endianness so be carefull + fragmType = Rnp::swapBytes(fragmType); + command = Rnp::swapBytes(command); + nrParams = Rnp::swapBytes(nrParams); + totalLength = Rnp::swapBytes(totalLength); + } + +//############ RNP Parameter ################################## +RnpParameter* RnpParameter::getNextParameter() const throw() + { + char *ptr = (char*)this; + return (RnpParameter*)(ptr + totalLength); + } + +void* RnpParameter::getData() const throw() + { + return (void*)(this+1); + } + +RnpQuark RnpParameter::getDataLength() const throw() + { + return dataLength; + } + +RnpQuark RnpParameter::computeTotalAlignedLength() throw() + { + totalLength = (sizeof(RnpParameter) + dataLength + 3) & 0xFFFFFFFC; + return totalLength; + } + +RnpQuark RnpParameter::getPaddLength() const throw() + { + return totalLength - (sizeof(RnpParameter) + dataLength); + } + +void RnpParameter::changeToHostEndianness() throw() + { + // there is no info about the initial endianness so be carefull + paramType = Rnp::swapBytes(paramType); + dataType = Rnp::swapBytes(dataType); + dataLength = Rnp::swapBytes(dataLength); + totalLength = Rnp::swapBytes(totalLength); + + RnpQuark *valPtr = (RnpQuark*)getData(); + switch(dataType) + { + case Rnp::dtt_Int32: + case Rnp::dtt_Float32: *valPtr = Rnp::swapBytes(*valPtr); + break; + + case Rnp::dtt_Double64: RnpQuark temp = Rnp::swapBytes(*valPtr); + *valPtr = Rnp::swapBytes(*(valPtr +1)); + *(valPtr+1) = temp; + break; + } + } + +void RnpParameter::changeToPartnerEndianness() throw() + { + // there is no info about the initial endianness so be careful + + RnpQuark *valPtr = (RnpQuark*)getData(); + switch(dataType) + { + case Rnp::dtt_Int32: + case Rnp::dtt_Float32: *valPtr = Rnp::swapBytes(*valPtr); + break; + + case Rnp::dtt_Double64: RnpQuark temp = Rnp::swapBytes(*valPtr); + *valPtr = Rnp::swapBytes(*(valPtr +1)); + *(valPtr+1) = temp; + break; + } + paramType = Rnp::swapBytes(paramType); + dataType = Rnp::swapBytes(dataType); + dataLength = Rnp::swapBytes(dataLength); + totalLength = Rnp::swapBytes(totalLength); + } + +//########################################################### + +RnpProtocolEncoder::RnpProtocolEncoder() throw() + { + commBuffer = NULL; + + rnpHeader = NULL; + currFragment = NULL; + currParameter = NULL; + allocated = false; + carrierHeaderSize = 0; + finalEndianness = Rnp::detectHostEndianness(); + } + +RnpProtocolEncoder::~RnpProtocolEncoder() throw() + { + if(commBuffer != NULL && allocated == true) delete commBuffer; + // the other pointers are not pointing to allocated memory!! + } + +void RnpProtocolEncoder::setBuffer(akg::CommBuffer* buffer) throw() + { + if(commBuffer != NULL && allocated == true) delete commBuffer; + + commBuffer = buffer; + allocated = false; + } + +bool RnpProtocolEncoder::allocateBuffer(int maxMessageLength) throw() + { + if(commBuffer != NULL && allocated ==true) delete commBuffer; + + commBuffer = new akg::CommBuffer(maxMessageLength); + allocated = true; + return true; + } + +bool RnpProtocolEncoder::adjustBufferSize(int differenceSize) throw() + { + ENTER( "RnpProtocolEncoder::adjustBufferSize( differenceSize=" << differenceSize << " )" ); + + if (commBuffer == 0) + { + TALK( "RnpProtocolEncoder::adjustBufferSize(): warning: null commBuffer, assert would fire." ); + return false; + } + assert(commBuffer != 0); + + if (differenceSize <= 0) + { + TALK( "RnpProtocolEncoder::adjustBufferSize(): warning: nonpositive differenceSize, assert would fire." ); + return false; + } + assert(differenceSize > 0); + + // we need to adjust the pointers to the new location + char *orig = (char*)(commBuffer->getData()); + char *head = (char*)rnpHeader; + char *frag = (char*)currFragment; + char *para = (char*)currParameter; + + if(commBuffer->resize(commBuffer->getDataSize() + differenceSize + RNP_DEFAULTBUFFERSIZE) == true) + { + char *final = (char*)(commBuffer->getData()); + rnpHeader = (RnpHeader*)(final + (head - orig)); + currFragment = (RnpFragmentHeader*)(final + (frag - orig)); + currParameter= (RnpParameter*)(final + (para - orig)); + LEAVE( "RnpProtocolEncoder::adjustBufferSize() -> true" ); + return true; + } + + LEAVE( "RnpProtocolEncoder::adjustBufferSize() -> false" ); + return false; + } + +int RnpProtocolEncoder::getBufferSize() throw() + { + if (commBuffer == 0) + { + TALK( "RnpProtocolEncoder::getBufferSize(): warning: null commBuffer, assert will fire." ); + } + assert(commBuffer != 0); + + return commBuffer->getBufferSize(); + } + +void RnpProtocolEncoder::startMessage(RnpQuark serverType, int nCarrierHeaderSize) throw() + { + if (commBuffer == 0) + { + TALK( "RnpProtocolEncoder::startMessage(): warning: null commBuffer, assert will fire." ); + } + assert(commBuffer != NULL); + + carrierHeaderSize = nCarrierHeaderSize; + + commBuffer->clearToRead(); + + commBuffer->reserve(sizeof(RnpHeader) + carrierHeaderSize); + + rnpHeader = (RnpHeader*)((char*)commBuffer->getData() + carrierHeaderSize); + + Rnp::Endianness hostEndianness = Rnp::detectHostEndianness(); + + // the protocolID is always 25112001 little endian!! + rnpHeader->protocolId = hostEndianness == Rnp::littleEndian ? Rnp::rnpProtocolId : Rnp::swapBytes(Rnp::rnpProtocolId); + + rnpHeader->messageEndianness = hostEndianness; + rnpHeader->desiredEndianness = hostEndianness; + rnpHeader->majorVersion = 1; + rnpHeader->minorVersion = 0; + rnpHeader->totalMessageLength = sizeof(RnpHeader); + rnpHeader->nrFragments = 0; + rnpHeader->serverType = serverType; + rnpHeader->authInfoStart = 0; + rnpHeader->authInfoLength = 0; + rnpHeader->comprInfoStart = 0; + rnpHeader->comprInfoLength = 0; + rnpHeader->dataStart = sizeof(RnpHeader); + rnpHeader->dataLength = 0; + + currFragment = (RnpFragmentHeader*)(rnpHeader+1); + + } + +int RnpProtocolEncoder::getCarrierHeaderSize() throw() + { + return carrierHeaderSize; + } + +void RnpProtocolEncoder::setDesiredEndianness(Rnp::Endianness desiredEndianness) throw() + { + rnpHeader->desiredEndianness = desiredEndianness; + } +void RnpProtocolEncoder::setFinalEndianness(Rnp::Endianness endianness) throw() + { + finalEndianness = endianness; + } + +void RnpProtocolEncoder::startFragment(Rnp::FragmentType fType,RnpQuark command) throw() + { + commBuffer->reserve(sizeof(RnpFragmentHeader)); + + currFragment->fragmType = fType; + currFragment->command = command; + currFragment->nrParams = 0; + currFragment->totalLength = sizeof(RnpFragmentHeader); + + currParameter = currFragment->getFirstParameter(); + rnpHeader->nrFragments++; + + } + +void RnpProtocolEncoder::addStringParameter(RnpQuark parameterType,const char* str) throw() + { + if(str != 0) addParameter(parameterType, Rnp::dtt_Asciiz, str, strlen(str)+1); + else addParameter(parameterType, Rnp::dtt_NullPtr, str, 0); + } + +void RnpProtocolEncoder::addInt32Parameter(RnpQuark parameterType, int par) throw() + { + addParameter(parameterType, Rnp::dtt_Int32, &par, sizeof(par)); + } + +void RnpProtocolEncoder::addFloat32Parameter(RnpQuark parameterType, float par) throw() + { + addParameter(parameterType, Rnp::dtt_Float32, &par, sizeof(par)); + } +void RnpProtocolEncoder::addDouble64Parameter(RnpQuark parameterType, double par) throw() + { + addParameter(parameterType, Rnp::dtt_Double64, &par, sizeof(par)); + } + +void RnpProtocolEncoder::addOpaqueParameter(RnpQuark parameterType, const void *buf, int size) throw() + { + if(buf != 0) addParameter(parameterType, Rnp::dtt_Opaque, buf, size); + else addParameter(parameterType, Rnp::dtt_NullPtr, buf, 0); + } + +void RnpProtocolEncoder::addParameter(RnpQuark parameterType, Rnp::DataType dtt, const void *data, int length) throw() + { + commBuffer->reserve(sizeof(RnpParameter)); + + currParameter->paramType = parameterType; + currParameter->dataType = dtt; + currParameter->dataLength = length; + currParameter->totalLength = currParameter->computeTotalAlignedLength(); + + int paddlen = currParameter->getPaddLength(); + + if(data != 0 ) commBuffer->read(data,length); + if(paddlen) commBuffer->reserve(paddlen); + + currFragment->nrParams++; + currFragment->totalLength += currParameter->totalLength; + currParameter = currParameter->getNextParameter(); + } + +void RnpProtocolEncoder::endFragment() throw() + { + rnpHeader->totalMessageLength += currFragment->totalLength; + rnpHeader->dataLength += currFragment->totalLength; + currFragment = currFragment->getNextFragment(); + } + +akg::CommBuffer* RnpProtocolEncoder::endMessage() throw() + { + changeToPartnerEndianness(finalEndianness); + return commBuffer; + } + +bool RnpProtocolEncoder::changeToPartnerEndianness(Rnp::Endianness newEndianness) throw() + { + if(newEndianness == rnpHeader->getEndianness()) return false; + // so newEndianness is the same as the message endiannes, no change necessary + + // don't forget that the endianness is now the host endianness so, + // after changed, the data can't be used correctly any more!! + + RnpFragmentHeader* lCurrFragment = rnpHeader->getFirstFragment(); + + for(int fragment=0; fragment < rnpHeader->nrFragments; fragment++) + { + + RnpParameter* lCurrParameter = lCurrFragment->getFirstParameter(); + for(int parameter = 0; parameter < lCurrFragment->nrParams; parameter++) + { + RnpParameter* nextParameter = lCurrParameter->getNextParameter(); + lCurrParameter->changeToPartnerEndianness(); + lCurrParameter = nextParameter; + } + + RnpFragmentHeader* nextFragment = lCurrFragment->getNextFragment(); + lCurrFragment->changeEndianness(); + lCurrFragment = nextFragment; + } + + rnpHeader->changeEndianness(newEndianness); + + return true; + } + +//###### DECODER ####################### + +RnpProtocolDecoder::RnpProtocolDecoder() throw() + { + // this memory doesn't belong to us, so do not deallocate!! + commBuffer = NULL; + rnpHeader = NULL; + currFragment = NULL; + currParameter = NULL; + } + +bool RnpProtocolDecoder::decode(akg::CommBuffer *buffer) throw() + { + // Later, throw something intelligible! + commBuffer = buffer; + + rnpHeader = (RnpHeader*)commBuffer->getData(); + + if(rnpHeader->isRnpMessage() == false) return false; + + originalEndianness = rnpHeader->getEndianness(); + + if(originalEndianness == Rnp::detectHostEndianness()) + { + // -- test validity of message -- + if(testIntegrity() == false) return false; + } + else + { // -- endianess of message -- + changeToHostEndianness(); + } + return true; + } + +bool RnpProtocolDecoder::testIntegrity() const throw() + { + // could be done better... + ENTER( "RnpProtocolDecoder::testIntegrity()" ); + + if(rnpHeader->isRnpMessage() == false) + { + RMInit::logOut << "Communication error: received invalid RNP header." << endl; + return false; + } + + bool ok = true; + char *endOfMessage = (char*)rnpHeader + rnpHeader->getTotalLength(); + char *endOfHeader = (char*)rnpHeader + sizeof(RnpHeader); + int maxLength = commBuffer->getDataSize() - sizeof(RnpHeader); + // max of every length + + RnpFragmentHeader* lCurrFragment = (RnpFragmentHeader*)getFirstFragment(); + for(int fragment=0; fragment < countFragments(); fragment++) + { + if( endOfHeader <= (char*)lCurrFragment && (char*)lCurrFragment < endOfMessage) + ; + else + { + ok = false; + RMInit::logOut << "Communication error: RNP message corrupt: short header." << endl; + break; + } + + if(lCurrFragment->totalLength > maxLength) + { + ok = false; + RMInit::logOut << "Communication error: RNP message corrupt: actual length (" << lCurrFragment->totalLength << ") larger than foreseen (" << maxLength << ")." << endl; + break; + } + + char *startOfParameters = (char*)lCurrFragment + sizeof(RnpFragmentHeader); + char *endOfParameters = (char*)lCurrFragment + lCurrFragment->totalLength; + + RnpParameter* lCurrParameter = (RnpParameter*)getFirstParameter(); + for(int parameter = 0; parameter < countParameters(); parameter++) + { + if( startOfParameters <= (char*)lCurrParameter && (char*)lCurrParameter < endOfParameters) + ; + else + { + ok = false; + RMInit::logOut << "Communication error: RNP message corrupt: current parameter location outside parameter area." << endl; + break; + } + + if(lCurrParameter->totalLength > lCurrFragment->totalLength) + { + ok = false; + RMInit::logOut << "Communication error: RNP message corrupt: current parameter length (" << lCurrParameter->totalLength << ") larger than total fragment size (" << lCurrFragment->totalLength << ")." << endl; + break; + } + + lCurrParameter = (RnpParameter*)getNextParameter(); + } + + if( (char*)lCurrParameter != endOfParameters) + { + ok = false; +// the counting seems to differ from the protocol specs; +// to avoid log flooding we disable it for the moment being -- PB 2005-aug-28 +#ifdef RMANDEBUG + RMInit::logOut << "Communication warning: puzzled by message: parameter count too small, found extra parameter(s). (this message can be ignored)" << endl; +#endif + } + + if(ok == false) + break; + + // we found a valid fragment, proceed to next one + lCurrFragment = (RnpFragmentHeader*)getNextFragment(); + } + + LEAVE( "RnpProtocolDecoder::testIntegrity() -> " << ok ); + return ok; + } + +bool RnpProtocolDecoder::changeToHostEndianness() throw() + { + if(rnpHeader->changeEndianness(Rnp::detectHostEndianness()) == false ) return false; + // so host endianness is the same as the message endiannes, no change necessary + + currFragment = (RnpFragmentHeader*)getFirstFragment(); + + for(int fragment=0; fragment < countFragments(); fragment++) + { + currFragment->changeEndianness(); + + currParameter = (RnpParameter*)getFirstParameter(); + for(int parameter = 0; parameter < countParameters(); parameter++) + { + currParameter->changeToHostEndianness(); + + currParameter = (RnpParameter*)getNextParameter(); + } + + currFragment = (RnpFragmentHeader*)getNextFragment(); + } + return true; + } + +RnpQuark RnpProtocolDecoder::getDestinationServerType() const throw() + { + return rnpHeader->serverType; + } + +Rnp::Endianness RnpProtocolDecoder::getDesiredEndianness() const throw() + { + return (Rnp::Endianness)rnpHeader->desiredEndianness; + } + +Rnp::Endianness RnpProtocolDecoder::getOriginalEndianness() const throw() + { + return originalEndianness; + } + +int RnpProtocolDecoder::getMessageLength() const throw() + { + return rnpHeader->totalMessageLength; + } + +int RnpProtocolDecoder::getMessageVersion() const throw() + { + return 1000*rnpHeader->majorVersion + rnpHeader->minorVersion; + } + +RnpQuark RnpProtocolDecoder::countFragments() const throw() + { + return rnpHeader->nrFragments; + } + +const RnpFragmentHeader* RnpProtocolDecoder::getFirstFragment() const throw() + { + currFragmentIdx = 0; + + return currFragment = (currFragmentIdx < rnpHeader->nrFragments ? rnpHeader->getFirstFragment() : 0); + } + +const RnpFragmentHeader* RnpProtocolDecoder::getNextFragment() const throw() + { + currFragmentIdx++; + + return currFragment = (currFragmentIdx < rnpHeader->nrFragments ? currFragment->getNextFragment() : 0); + } + +RnpQuark RnpProtocolDecoder::getFragmentType() const throw() + { + return currFragment->fragmType; + } + +const char* RnpProtocolDecoder::getFragmentTypeName() const throw() + { + return Rnp::getFragmentTypeName(currFragment->fragmType); + } +RnpQuark RnpProtocolDecoder::getCommand() const throw() + { + return currFragment->command; + } + +int RnpProtocolDecoder::countParameters() const throw() + { + return currFragment->nrParams; + } + +RnpQuark RnpProtocolDecoder::getFragmentLength() const throw() + { + return currFragment->totalLength; + } + +const RnpParameter* RnpProtocolDecoder::getFirstParameter() const throw() + { + currParameterIdx = 0; + + return currParameter = ( currParameterIdx < currFragment->nrParams ? currFragment->getFirstParameter() : 0); + } + +const RnpParameter* RnpProtocolDecoder::getNextParameter() const throw() + { + currParameterIdx++; + + return currParameter = ( currParameterIdx < currFragment->nrParams ? currParameter->getNextParameter() : 0); + } + +RnpQuark RnpProtocolDecoder::getParameterType() const throw() + { + return currParameter->paramType; + } + +RnpQuark RnpProtocolDecoder::getDataType() const throw() + { + return currParameter->dataType; + } + +const void* RnpProtocolDecoder::getData() const throw() + { + return currParameter->getData(); + } + +const char* RnpProtocolDecoder::getDataAsString() const throw() + { + if ( !( currParameter->dataType == Rnp::dtt_Asciiz || currParameter->dataType == Rnp::dtt_NullPtr) ) + { + TALK( "RnpProtocolEncoder::getDataAsString(): warning: assert will fire." ); + } + assert(currParameter->dataType == Rnp::dtt_Asciiz || currParameter->dataType == Rnp::dtt_NullPtr); + + return currParameter->dataType == Rnp::dtt_Asciiz ? (const char*)currParameter->getData() : (const char*)0; + } + +int RnpProtocolDecoder::getDataAsInteger() const throw() + { + if ( !( currParameter->dataType == Rnp::dtt_Int32 ) ) + { + TALK( "RnpProtocolEncoder::getDataAsInteger(): warning: assert will fire." ); + } + assert(currParameter->dataType == Rnp::dtt_Int32); + + return *(int*)currParameter->getData(); + } + +float RnpProtocolDecoder::getDataAsFloat() const throw() + { + if ( !( currParameter->dataType == Rnp::dtt_Float32 ) ) + { + TALK( "RnpProtocolEncoder::getDataAsFloat(): warning: assert will fire." ); + } + assert(currParameter->dataType == Rnp::dtt_Float32); + + return *(float*)currParameter->getData(); + } + +double RnpProtocolDecoder::getDataAsDouble() const throw() + { + if ( !( currParameter->dataType == Rnp::dtt_Double64 ) ) + { + TALK( "RnpProtocolEncoder::getDataAsDouble(): warning: assert will fire." ); + } + assert(currParameter->dataType == Rnp::dtt_Double64); + + return *(double*)currParameter->getData(); + } + +const void* RnpProtocolDecoder::getDataAsOpaque() const throw() + { + if ( !( currParameter->dataType == Rnp::dtt_Opaque || currParameter->dataType == Rnp::dtt_NullPtr) ) + { + TALK( "RnpProtocolEncoder::getDataAsOpaque(): warning: assert will fire." ); + } + assert(currParameter->dataType == Rnp::dtt_Opaque || currParameter->dataType == Rnp::dtt_NullPtr); + + return currParameter->dataType == Rnp::dtt_Opaque ? (const void*)currParameter->getData() : (const void*)0; + } + +int RnpProtocolDecoder::getDataLength() const throw() + { + return currParameter->getDataLength(); + } + +void RnpProtocolDecoder::printRnpHeader(RnpHeader *lRnpHeader) const throw() + { + cout<<"RnpHeader ID="<<lRnpHeader->protocolId<<endl; + cout<<" total fragments="<<lRnpHeader->nrFragments<<endl; + cout<<" sizeof header="<<sizeof(RnpHeader)<<endl; + cout<<endl; + } + +bool RnpProtocolDecoder::isRnpMessage() const throw() + { + return rnpHeader->isRnpMessage(); + } + diff --git a/rnprotocol/rnprotocol.hh b/rnprotocol/rnprotocol.hh new file mode 100644 index 0000000..dfa7885 --- /dev/null +++ b/rnprotocol/rnprotocol.hh @@ -0,0 +1,462 @@ +#ifndef RNPROTOCOL_HH +#define RNPROTOCOL_HH +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/**************************************************************************** + * + * + * COMMENTS: + * + * + ****************************************************************************/ + +#include "network/akgnetwork.hh" +#ifdef AFTERV52 + #include "akglogging.hh" + #include "rnpexception.hh" +#else + #define AKGLOGLN(a,b,c) +#endif + +namespace rnp + { + +//using namespace akg; + +/** If nothing else is specified, this is the size of the RNP buffers + It is enough as long as you dont send large opaque data */ +#define RNP_DEFAULTBUFFERSIZE 1024 + +/// The basic type used in RNP. It is always 32-bit long +typedef int RnpQuark; + +/** Class Rnp contains definitions and general helper functions for RNP +*/ +class Rnp + { + public: + /** The 32-bit protocol ID. value 25112001, stored always little endian. + In big endian this is 0xc12d7f01. + */ + static const RnpQuark rnpProtocolId;//always little endian!!! + + enum Endianness + { + bigEndian = 0, + littleEndian = 1 + }; + + enum FragmentType + { + fgt_None = 0, + fgt_Command, + fgt_OkAnswer, + fgt_Error, + fgt_DiscardedRequest, + //... + // to know how many where defined + fgt_HowMany + }; + enum DataType + { + dtt_None = 0, + dtt_Asciiz = 1, + dtt_Int32 = 2, + dtt_Float32 = 3, + dtt_Double64 = 4, + dtt_Opaque = 5, + dtt_NullPtr = 6, // NULL pointer + //... + // to know how many where defined + dtt_HowMany + }; + + // the type of the error, so the receiver can rebuild it + enum ErrorType + { + ert_Unknown = 0, // unknown error type, no exception, something else + ert_StlException, // ... has a "what()" - member + ert_AkgSerializable, // akg serializable exception, we don't carry usual exceptions! + ert_Other, // other exceptions + + ert_HowMany + }; + + enum ErrorParam + { + erp_None = 0, + erp_whatValue = 1, // used by "exception" + erp_Key = 2, // key of "akgexception" + erp_Value = 3, // value of "akgexception" + + erp_HowMany + }; + + /// Functions to get the names of the various elements + static const char* getFragmentTypeName(RnpQuark) throw(); + static const char* getDataTypeName(RnpQuark) throw(); + static const char* getEndiannessName(Endianness) throw(); + static const char* getErrorTypeName(RnpQuark) throw(); + static const char* getErrorParamName(RnpQuark) throw(); + + /** Every server has his own command set, each with parameters + Define your own functions to get names for this elements */ + virtual const char* getParameterTypeName(RnpQuark) const throw() =0; + virtual const char* getCommandName(RnpQuark) const throw() =0; + + /// Helper functions for endianness + static RnpQuark swapBytes(RnpQuark) throw(); + static Endianness detectHostEndianness() throw(); + +#ifdef AFTERV52 + /// Log connection for the whole RNP module + static AkgLogConnection logConn; +#endif + protected: + /// Arrays containing the names of the various elements + static const char* undefValue; + static const char* endiannessNames[2]; + static const char* fragmentTypeNames[fgt_HowMany]; + static const char* dataTypeNames[dtt_HowMany]; + static const char* errorTypeNames[ert_HowMany]; + static const char* errorParamNames[erp_HowMany]; + }; + +struct RnpFragmentHeader; + +/** The header of the RNP message. Always 64 bytes long +*/ +struct RnpHeader + { + RnpQuark protocolId; + char messageEndianness; + char desiredEndianness; + char majorVersion; + char minorVersion; + RnpQuark totalMessageLength; + RnpQuark nrFragments; + RnpQuark serverType; + RnpQuark authInfoStart; + RnpQuark authInfoLength; + RnpQuark comprInfoStart; + RnpQuark comprInfoLength; + RnpQuark dataStart; + RnpQuark dataLength; + RnpQuark _unused[5]; + // sizeof = 64 + + /// Returns 'true' if this is a valid RNP header + bool isRnpMessage() const throw(); + + /// Returns the message endianness + Rnp::Endianness getEndianness() const throw(); + + /// Returns the total length of the message, regardless of endianness + RnpQuark getTotalLength() const throw(); + + /** Changes the endianness of the header to the specified one + Returns 'true' if a change was necessary */ + bool changeEndianness(Rnp::Endianness) throw(); + + /// Returns a pointer to the first fragment. Header has to be in host endianness + RnpFragmentHeader* getFirstFragment() const throw(); + }; + +/** The header of parameters. Size is 16. + The parameter has a header like this and then the data +*/ +struct RnpParameter + { + /// The logical type of the parameter. Server dependent + RnpQuark paramType; + + /// The data type of the parameter. One of Rnp::DataType + RnpQuark dataType; + + /// The length of the data + RnpQuark dataLength; + + /// Total length of teh parameter, header + data + alignment bytes + // (Length is always 4bytes aligned!, at least Sun requires it) + RnpQuark totalLength; + + /// Returns a pointer to the next parameter + RnpParameter* getNextParameter() const throw(); + + /// Returns a pointer to the parameter data + void* getData() const throw(); + + /// Returns the length of the parameter data + RnpQuark getDataLength() const throw(); + + /** Changes the endianness of the parameter. Since there is no info + about the current endianness, be carefull when you use it. + It also changes the endianness of the data, except when it is + opaque data.*/ + void changeToHostEndianness() throw(); + void changeToPartnerEndianness() throw(); + + RnpQuark computeTotalAlignedLength() throw(); + RnpQuark getPaddLength() const throw(); + }; + +/** The header of fragments. Size is 16. + Every fragment has a header like this and a number of parameters +*/ +struct RnpFragmentHeader + { + /// The type of the fragment. One of Rnp::FragmentType + RnpQuark fragmType; + + /// The command. Server dependent + RnpQuark command; + + /// Number of parameters + RnpQuark nrParams; + + /// Total length of the fragment, this header + all parameters + RnpQuark totalLength; + + /// Returns a pointer to the next fragment + RnpFragmentHeader* getNextFragment() const throw(); + + /// Returns a pointer to the first parameter of this fragment + RnpParameter* getFirstParameter() const throw(); + + /** Changes the endianness of the fragment. Since there is no info + about the current endianness, be carefull when you use it */ + void changeEndianness() throw(); + }; + +/** Class for encoding a RNP message. It has support for the header of the + embedding protocol and for the endianness of the partner. The rest is for + creating the message into a akg::CommBuffer, which can be internal or external. + The buffer has to be big enough, the size is not adapted +*/ +class RnpProtocolEncoder + { + public: + /// Default constructor + RnpProtocolEncoder() throw(); + /// Destructor + ~RnpProtocolEncoder() throw(); + + /// Sets an external buffer as work buffer. + void setBuffer(akg::CommBuffer*) throw(); + + /// Allocates an internal buffer as work buffer + bool allocateBuffer(int maxMessageLength) throw(); + + /** resizes the internal buffer, so the new buffer can hold the actual data plus + the requested difference. Additionally we allocate also RNP_DEFAULTBUFFERSIZE bytes + Assert: commBuffer != 0 , differenceSize >= 0*/ + bool adjustBufferSize(int differenceSize) throw(); + + int getBufferSize() throw(); + + /** Makes the necessary initializations for a new message. + Takes as parameter the type of the destination server and allocates + space for an embedding protocol header + Assert: commBuffer != NULL, meaning there is a valid working buffer + + IMPORTANT: Be aware that all this functions for creating the + message have to be called in the correct order, otherwise undefined + results may occur! + */ + void startMessage(RnpQuark serverType, int carrierHeaderSize = 0) throw(); + + /** Sets the desired endianness for the answer. Servers have to use + this endianness when they answer, clients might use it for the next + requests + */ + void setDesiredEndianness(Rnp::Endianness) throw(); + + /** Sets the final endianness for the message. 'endMessage()' is the one + who changes the endianness to the final one + */ + void setFinalEndianness(Rnp::Endianness) throw(); + + /// Starts a new fragment. + void startFragment(Rnp::FragmentType, RnpQuark command) throw(); + + /// Adds a string parameter to the current fragment + void addStringParameter(RnpQuark parameterType, const char*) throw(); + + /// Adds an int parameter to the current fragment + void addInt32Parameter(RnpQuark parameterType, int) throw(); + + /// Adds a float parameter to the current fragment + void addFloat32Parameter(RnpQuark parameterType, float) throw(); + + /// Adds a double parameter to the current fragment + void addDouble64Parameter(RnpQuark parameterType, double) throw(); + + /// Adds an opaque parameter to the current fragment + void addOpaqueParameter(RnpQuark parameterType, const void*, int size) throw(); + + /// Ends the current fragment + void endFragment() throw(); + + /// Ends the message and, if necessary, changes the endianness + akg::CommBuffer* endMessage() throw(); + + /// Returns the size of the reserved space for the embedding carrier header + int getCarrierHeaderSize() throw(); + + protected: + + akg::CommBuffer *commBuffer; + + private: + + /// Helper function to add a parameter to the current fragment + void addParameter(RnpQuark parameterType, Rnp::DataType, const void *data, int length) throw(); + + /// The function which does the endianness change + bool changeToPartnerEndianness(Rnp::Endianness) throw(); + + bool allocated; + int carrierHeaderSize; + Rnp::Endianness finalEndianness; + + RnpHeader *rnpHeader; + RnpFragmentHeader *currFragment; + RnpParameter *currParameter; + }; + +/** Class for decoding a RNP message. The buffer is always an external one + Decoding the messsage means also changing the endianness to the host endianness +*/ +class RnpProtocolDecoder + { + public: + /// Default constructor + RnpProtocolDecoder() throw(); + + /** Takes the buffer and decodes it, provided it is a RNP message + Returns 'false' if it is not a RNP message, or the message is corrupt + (for now, if the endianness is not the one of the host, no integrity + verification is done. In this case endianness is changes, but if the + message is corrupt...bang!!). Later this will have to throw something + */ + bool decode(akg::CommBuffer*) throw(); + + /// Returns the code of the destination server + RnpQuark getDestinationServerType() const throw(); + + /// Returns the desired endianness + Rnp::Endianness getDesiredEndianness() const throw(); + + /// Returns the original endianness of the message + Rnp::Endianness getOriginalEndianness() const throw(); + + /// Returns the total message length + int getMessageLength() const throw(); + + /// Returns the version of the message + int getMessageVersion() const throw(); + + /// Returns the number of fragments contained in the message + RnpQuark countFragments() const throw(); + + /// Returns a pointer to the first fragment + const RnpFragmentHeader* getFirstFragment() const throw(); + + /// Returns a pointer to the next fragment + const RnpFragmentHeader* getNextFragment() const throw(); + + /// Returns the type of the current fragment + RnpQuark getFragmentType() const throw(); + + /// Returns the name of type of the current fragment + const char* getFragmentTypeName() const throw(); + + /// Returns the command of the current fragment + RnpQuark getCommand() const throw(); + + /// Returns the number of parameters of the current fragment + int countParameters() const throw(); + + /// Returns the length of the current fragment + RnpQuark getFragmentLength() const throw(); + + /// Returns a pointer to the first parameter of the current fragment + const RnpParameter* getFirstParameter() const throw(); + + /// Returns a pointer to the next parameter of the current fragment + const RnpParameter* getNextParameter() const throw(); + + /// Returns the logical type of the current parameter + RnpQuark getParameterType() const throw(); + + /// Returns the data type of the current parameter + RnpQuark getDataType() const throw(); + + /// Returns a pointer to the data of the current parameter, can't be NULL + const void* getData() const throw(); + + /// Returns a pointer to the data of the current parameter, as string-asciiz (assert!) (can be NULL) + const char* getDataAsString() const throw(); + + /// Returns a pointer to the data of the current parameter, as integer (assert!) + int getDataAsInteger() const throw(); + + /// Returns a pointer to the data of the current parameter, as float (assert!) + float getDataAsFloat() const throw(); + + /// Returns a pointer to the data of the current parameter, as double (assert!) + double getDataAsDouble() const throw(); + + /// Returns a pointer to the data of the current parameter, as const void* (assert!) (can be NULL) + const void* getDataAsOpaque() const throw(); + + /// Returns the length of the data of the current parameter + int getDataLength() const throw(); + + private: + akg::CommBuffer *commBuffer; + Rnp::Endianness originalEndianness; + mutable RnpHeader *rnpHeader; + mutable RnpFragmentHeader *currFragment; + mutable int currFragmentIdx; + + mutable RnpParameter *currParameter; + mutable int currParameterIdx; + + /// Helper function to print a RNP header + void printRnpHeader(RnpHeader*) const throw(); + + /// Tests the integrity of the message + bool testIntegrity() const throw(); + + + /// Returns 'true' if the message is a RNP message + bool isRnpMessage() const throw(); + + /// Changes the endianness of the message to the message of the host + bool changeToHostEndianness() throw(); + }; + + +} //namespace +#endif diff --git a/rnprotocol/rnpserver.cc b/rnprotocol/rnpserver.cc new file mode 100644 index 0000000..cac982c --- /dev/null +++ b/rnprotocol/rnpserver.cc @@ -0,0 +1,134 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/************************************************************* + * + * + * PURPOSE: + * + * + * COMMENTS: + * - called from rasserver_main.cc + * - startRnpServer() is twin to startRpcServer and startHttpServer() + * + ************************************************************/ + +#include <iostream> +#include <signal.h> +#include "rnpserver.hh" +#include "srvrasmgrcomm.hh" +#include "server/rasserver_config.hh" +#include "rnpservercomm.hh" + +#include "raslib/rminit.hh" +#include "debug-srv.hh" + +// only for access control +#include "servercomm/servercomm.hh" + +#include "server/rasserver_entry.hh" + +RnpRasDaManComm rnpServerComm; +RasserverCommunicator communicator(&rnpServerComm); + +extern "C" +{ +void rnpSignalHandler(int sig); +} + +void startRnpServer() +{ + ENTER( "startRnpServer" ); + + signal (SIGTERM, rnpSignalHandler); + + RMInit::logOut << "Initializing control connections..." << flush; + rasmgrComm.init(configuration.getTimeout(), configuration.getServerName(), configuration.getRasmgrHost(), configuration.getRasmgrPort()); + + accessControl.setServerName(configuration.getServerName()); + + RMInit::logOut << "informing rasmgr: server available..." << flush; + rasmgrComm.informRasmgrServerAvailable(); + RMInit::logOut << "ok" << endl; + + //################## + + RMInit::logOut << "Initializing job control..." << flush; + communicator.initJobs(1); + communicator.setTimeout(RNP_TIMEOUT_LISTEN,0); // the select loop! + + communicator.setListenPort(configuration.getListenPort()); + + rnpServerComm.setServerJobs(1); + rnpServerComm.connectToCommunicator(communicator); + rnpServerComm.setTransmitterBufferSize(configuration.getMaxTransferBufferSize()); + + RMInit::logOut<<"setting timeout to "<<configuration.getTimeout()<< " secs..." << flush; + + rnpServerComm.setTimeoutInterval(configuration.getTimeout()); + NbJob::setTimeoutInterval(configuration.getTimeout()); + + RMInit::logOut << "connecting to base DBMS..." << flush; + RasServerEntry &rasserver = RasServerEntry::getInstance(); + rasserver.compat_connectToDBMS(); + + RMInit::logOut<<"ok, waiting for clients."<<endl<<endl; + communicator.runServer(); + + RMInit::logOut<<"RNP server shutdown in progress..."<<flush; + rnpServerComm.disconnectFromCommunicator(); + + //################## + + RMInit::logOut<<"informing rasmgr..."<<flush; + rasmgrComm.informRasmgrServerDown(); + + RMInit::logOut<<"server stopped."<<endl; + LEAVE( "startRnpServer" ); +} + +void stopRnpServer() +{ + ENTER( "stopRnpServer" ); + + communicator.shouldExit(); + + LEAVE( "stopRnpServer" ); +} + + +void rnpSignalHandler(int sig) +{ + static int in_progress=0; // sema for signal-in-signal + + if (in_progress) // routine already active? + return; // ...then don't interfere + + in_progress = 1; // block further signals + + for(long j=0;j<1000000;j++) // make sure server notices shutdown + ; // NB: why this large number? doesn't seem to be thought over carefully -- PB 2003-nov-23 + + stopRnpServer(); // send shutdown request +} + + diff --git a/rnprotocol/rnpserver.hh b/rnprotocol/rnpserver.hh new file mode 100644 index 0000000..9d7162f --- /dev/null +++ b/rnprotocol/rnpserver.hh @@ -0,0 +1,33 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +#ifndef RNPSERVER_HH +#define RNPSERVER_HH + + +// This header contains only a function for starting the RNP server +// It is done like this to keep the rest of the server as separate as possible from the rnp part + +void startRnpServer(); + + +#endif diff --git a/rnprotocol/rnpservercomm.cc b/rnprotocol/rnpservercomm.cc new file mode 100644 index 0000000..677cf4e --- /dev/null +++ b/rnprotocol/rnpservercomm.cc @@ -0,0 +1,1344 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/************************************************************* + * + * + * PURPOSE: + * + * + * COMMENTS: + * - CLIENTID: | 0|.counter.|....timestamp ......| + * |31,30.....24|23...16|15...8|7...0| + * - requests count from 1..n between connect and disconnect + * - fragments count from 1..n between ??? + * + ************************************************************/ + +#ifndef RMANVERSION +#error "Please specify RMANVERSION variable!" +#endif + +#ifndef COMPDATE +#error "Please specify the COMPDATE variable!" +/* +COMPDATE=`date +"%d.%m.%Y %H:%M:%S"` + +and -DCOMPDATE="\"$(COMPDATE)\"" when compiling +*/ +#endif + +#include "mymalloc/mymalloc.h" +#include <time.h> +#include "rnprasserver.hh" +#include "rnpservercomm.hh" +#include "srvrasmgrcomm.hh" + +#include "server/rasserver_entry.hh" +#include "time/akgtime.hh" + +#include "debug-srv.hh" + + + +// aux function to avoid a compiler warning (see 'man strftime') +size_t my_strftime(char *s, size_t max, const char *fmt, const struct tm *tm) +{ + return strftime(s, max, fmt, tm); +} + +// now(): aux function returning, as a static string, the current time +// keep in sync with same function in rasmgr/rasmgr_localsrv.cc! +const char* now() +{ + size_t strfResult = 0; // return value of strftime() + static char timestring[50]; // must hold 20+1 chars + + time_t t = time(NULL); // get time + struct tm* tm = localtime(&t); // break down time + strfResult = my_strftime( timestring, sizeof(timestring), "[%F %T]", tm ); // format time + if (strfResult == 0) // bad luck? then take fallback message + (void) strncpy( timestring, "[-no time available-]", sizeof(timestring) ); + return( timestring ); +} + +const int RnpRasDaManComm::NoClient = -1; + +RnpRasDaManComm::RnpRasDaManComm() throw() +{ + ENTER( "RnpRasDaManComm::RnpRasDaManComm" ); + requestCounter = 0; + fragmentCounter = 0; + clientID = NoClient; + LEAVE( "RnpRasDaManComm::RnpRasDaManComm" ); +} + +RnpRasDaManComm::~RnpRasDaManComm() throw() +{ + TALK( "RnpRasDaManComm::~RnpRasDaManComm" ); +} + +// we need our implementation because of r_Error, but we will go for the default when r_Error is AkgException +void RnpRasDaManComm::processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol protocol, RnpServerJob *callingJob) throw() +{ + ENTER( "RnpRasDaManComm::processRequest, at " << now() << ", client=" << callingJob->getClientHostAddress().getStringAddress() ); + RMInit::logOut << now() << " request from " << callingJob->getClientHostAddress().getStringAddress() << endl; + + BenchmarkTimer bmt("request time"); + bmt.start(); + + decoder.decode(receiverBuffer); + RnpQuark destServerType = decoder.getDestinationServerType(); + Rnp::Endianness desEndianness = decoder.getDesiredEndianness(); + + // test if servertype matches! + + transmiterBuffer->allocate(getTransmitterBufferSize()); + transmiterBuffer->clearToWrite(); + + encoder.setBuffer(transmiterBuffer); + encoder.setFinalEndianness(desEndianness); + encoder.startAnswer(destServerType, protocol); + + requestCounter++; + + decoder.getFirstFragment(); + bool wasError = false; + for(int fragment=0; fragment < decoder.countFragments(); fragment++) + { + if(wasError == false) + { + try + { + decodeFragment(); + } + // DBMS connection lost? then need to disconnect client to allow to resync + catch(r_Ebase_dbms &edb) + { + RMInit::logOut << "Error: base DBMS reports: " << edb.what() << endl; + wasError = true; + answerr_Error(edb); +#if 0 // seems too hard -- PB 2005-jul-25 + try + { + RMInit::logOut << "detaching client..." ; + executeDisconnect(); + RMInit::logOut << "ok" << endl; + } + catch (...) // ignore any further error, just log it + { + RMInit::logOut << "failed" << endl; + } +#endif // 0 + } + catch(r_Error &ex) + { + RMInit::logOut << "Error: request terminated: " << ex.what() << endl; + TALK( "rasdaman exception kind=" << ex.get_kind() << ", errorno=" << ex.get_errorno() ); + wasError = true; + answerr_Error(ex); + +#if 0 // seems too hard -- PB 2005-jul-25 + // a base DBMS error we treat just like above + // -- PB 2003-nov-24 + if (ex.get_kind() == r_Error::r_Error_BaseDBMSFailed + || ex.get_errorno() == 206) // serializable error, see errtxts + { + try + { + RMInit::logOut << "detaching client..."; + executeDisconnect(); + RMInit::logOut << "ok" << endl; + } + catch (...) // ignore any further error, just log it + { + RMInit::logOut << "failed" << endl; + } + } +#endif // 0 + } + catch(exception &ex) + { + RMInit::logOut << "Error: request terminated with general exception: " << ex.what() << endl; + wasError = true; + answerSTLException(ex); + } + catch(...) + { + RMInit::logOut << "Error: request terminated with generic exception." << endl; + wasError = true; + answerUnknownError(); + } + } + else + { + discardFragment(); + } + decoder.getNextFragment(); + } + encoder.endMessage(); + + bmt.stop(); + + RMInit::logOut << now() << " request completed; " << bmt << endl << endl; + LEAVE( "RnpRasDaManComm::processRequest" ); +} + +RnpServerJob* RnpRasDaManComm::createJobs(int howMany) +{ + TALK( "RNP: creating "<<howMany<<" RnpRasserverJob's" ); + return new RnpRasserverJob[howMany]; +} + +void RnpRasDaManComm::setTimeoutInterval(int seconds) +{ + clientTimer.setTimeoutInterval(seconds); +} + +void RnpRasDaManComm::checkForTimeout() +{ + ENTER( "RnpRasDaManComm::checkForTimeout" ); + if(clientID != NoClient) + { + if(clientTimer.checkForTimeout()) + { + TALK( "Client 0x" << hex << clientID << dec << " has timed out." ); + RMInit::logOut << "Client has timed out, connection being freed." << endl; + disconnectClient(); + } + } + else + rasmgrComm.informRasmgrServerStillAvailable(); + LEAVE( "RnpRasDaManComm::checkForTimeout" ); +} + + +void RnpRasDaManComm::decodeFragment() throw( r_Error ) +{ + ENTER( "RnpRasDaManComm::decodeFragment" ); + + try // somewhere during cmd execution there can be exceptions; we pass them thru + { + fragmentCounter++; + + RnpQuark command = decoder.getCommand(); + + RnpRasserver *hook = new RnpRasserver; + TALK( "fragmentCounter=" << fragmentCounter << ", command is " << hook->getCommandName( command ) ); + + // first parameter has to be the clientID + verifyClientID( command ); + + switch(command) + { + case RnpRasserver::cmd_connect: executeConnect(); break; + case RnpRasserver::cmd_disconnect: executeDisconnect(); break; + case RnpRasserver::cmd_opendb: executeOpenDB(); break; + case RnpRasserver::cmd_closedb: executeCloseDB(); break; + case RnpRasserver::cmd_beginta: executeBeginTA(); break; + case RnpRasserver::cmd_committa: executeCommitTA(); break; + case RnpRasserver::cmd_abortta: executeAbortTA(); break; + case RnpRasserver::cmd_istaopen: executeIsTAOpen(); break; + case RnpRasserver::cmd_queryhttp: executeQueryHttp(); break; + case RnpRasserver::cmd_getnewoid: executeGetNewOId(); break; + case RnpRasserver::cmd_queryrpc: executeQueryRpc(); break; + case RnpRasserver::cmd_getnextelem: executeGetNextElement(); break; + case RnpRasserver::cmd_endtransfer: executeEndTransfer(); break; + case RnpRasserver::cmd_getnextmdd: executeGetNextMDD(); break; + case RnpRasserver::cmd_getnexttile: executeGetNextTile(); break; + case RnpRasserver::cmd_updaterpc : executeUpdateQuery(); break; + case RnpRasserver::cmd_startinsTmdd: executeStartInsertTransMDD(); break; + case RnpRasserver::cmd_inserttile : executeInsertTile(); break; + case RnpRasserver::cmd_endinsmdd : executeEndInsertMDD(); break; + case RnpRasserver::cmd_initupdate : executeInitUpdate(); break; + case RnpRasserver::cmd_gettypestruct: executeGetTypeStructure(); break; + case RnpRasserver::cmd_startinsPmdd: executeStartInsertPersMDD(); break; + case RnpRasserver::cmd_insertmdd: executeInsertMDD(); break; + case RnpRasserver::cmd_insertcoll: executeInsertCollection(); break; + case RnpRasserver::cmd_remobjfromcoll: executeRemoveObjFromColl(); break; + case RnpRasserver::cmd_delobjbyoid: executeDeleteObjByOId(); break; + case RnpRasserver::cmd_delcollbyname: executeDeleteCollByName(); break; + case RnpRasserver::cmd_getcoll: executeGetCollection(); break; + case RnpRasserver::cmd_getcolloids: executeGetCollectionOIds(); break; + case RnpRasserver::cmd_getobjecttype: executeGetObjectType(); break; + case RnpRasserver::cmd_setformat: executeSetFormat(); break; + // --- until here the compatible ones --- + + // -- the secret, unofficial ones -- + case RnpRasserver::cmd_createcollection: executeCreateCollection(); break; + case RnpRasserver::cmd_createmdd: executeCreateMDD(); break; + case RnpRasserver::cmd_extendmdd: executeExtendMDD(); break; + case RnpRasserver::cmd_gettiledomains: executeGetTileDomains(); break; + + default: + RMInit::logOut << "Protocol error: Unknown command: " << command << endl; + LEAVE( "RnpRasDaManComm::decodeFragment: Unknown command: "<<command ); + throw r_Error(822); + break; + } + + clientTimer.markAction(); + } + catch (r_Error &e) // any rasdaman error is passed through + { + LEAVE( "RnpRasDaManComm::decodeFragment, rasdaman exception caught: " << e.what() ); + throw; // pass on + } + + catch (...) // any other error is "unexpected", by def + { + LEAVE( "RnpRasDaManComm::decodeFragment, general exception caught." ); + throw( r_Error( 10000 ) ); // unexpected internal error - FIXME: can we be more precise? + } + + LEAVE( "RnpRasDaManComm::decodeFragment" ); +} + +//######## here the executing functions ################ +void RnpRasDaManComm::executeConnect() +{ + ENTER( "RnpRasDaManComm::executeConnect" ); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); + const char *capability = decoder.getDataAsString(); + + TALK("capability: "<<capability); + + // a new connect requires to drop any eventually preexisting connection first -- PB 2005-sep-02 + // if (clientID != NoClient) // any previous un-disconnected activity? + if (fragmentCounter > 1 || requestCounter > 1) // any previous un-disconnected activity? + { + RMInit::logOut << "Preparing request for new connect by resetting old connection; "; + RnpRasDaManComm::disconnectInternally(); + // FIXME: the entry in CltTable still remains (see compat_*()) + // - although this doesn't harm in any way it should be removed + } + + rasserver.compat_connectNewClient(capability); + connectClient(); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID); + encoder.endFragment(); + TALK( "adding clientID 0x" << hex << clientID << dec ); + + LEAVE( "RnpRasDaManComm::executeConnect, assigned clientID=0x"<<hex<<clientID<<dec); +} + +void RnpRasDaManComm::executeDisconnect() +{ + ENTER("RnpRasDaManComm::executeDisconnect, clientID=0x"<<hex<<clientID<<dec); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + rasserver.compat_disconnectClient(); + TALK( "rasserver.compat_disconnectClient() done, now disconnectClient()." ); + disconnectClient(); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.endFragment(); + + LEAVE("RnpRasDaManComm::executeDisconnect"); +} + +void RnpRasDaManComm::executeOpenDB() +{ + ENTER("RnpRasDaManComm::executeOpenDB"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); + const char* databaseName = decoder.getDataAsString(); + + TALK( "Execute open DB, database="<< databaseName ); + + rasserver.compat_openDB(databaseName); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.endFragment(); + + LEAVE("RnpRasDaManComm::executeOpenDB"); +} + +void RnpRasDaManComm::executeCloseDB() +{ + ENTER("RnpRasDaManComm::executeCloseDB"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + rasserver.compat_closeDB(); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.endFragment(); + + LEAVE("RnpRasDaManComm::executeCloseDB"); +} + +void RnpRasDaManComm::executeBeginTA() +{ + ENTER( "RnpRasDaManComm::executeBeginTA" ); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); + bool rw = decoder.getDataAsInteger() ? true:false; + + TALK( "executeBeginTA transaction: "<<(rw ? "rw":"ro") ); + + rasserver.compat_beginTA(rw); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.endFragment(); + + LEAVE( "RnpRasDaManComm::executeBeginTA" ); +} + +void RnpRasDaManComm::executeCommitTA() +{ + ENTER("executeCommitTA - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + rasserver.compat_commitTA(); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.endFragment(); + LEAVE("executeCommitTA - out"); +} + +void RnpRasDaManComm::executeAbortTA() + { + ENTER("executeAbortTA - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + rasserver.compat_abortTA(); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.endFragment(); + LEAVE("executeAbortTA - out"); + } + +void RnpRasDaManComm::executeIsTAOpen() + { + ENTER("executeIsTAOpen - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + bool isOpen = rasserver.compat_isOpenTA(); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_transstatus, isOpen); + encoder.endFragment(); + LEAVE("executeIsTAOpen - out; isOpen=" << isOpen); + } + +void RnpRasDaManComm::executeQueryHttp() +{ + ENTER("executeQueryHttp - in"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + // TALK("have inst" ); + decoder.getNextParameter(); + + const void* httpParams = decoder.getData(); + int httpParamsLen = decoder.getDataLength(); + // TALK( "httpParamsLen=" << httpParamsLen ); + char *resultBuffer = 0; + int resultLen = rasserver.compat_executeQueryHttp((const char*)httpParams, httpParamsLen, resultBuffer); + // TALK( "resultLen=" << resultLen ); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + + if(resultLen && resultBuffer) + { + encoder.adjustBufferSize(resultLen); + encoder.addOpaqueParameter(RnpRasserver::pmt_httpqanswer, resultBuffer, resultLen); + delete[] resultBuffer; + resultBuffer = 0; + } + encoder.endFragment(); + + LEAVE("executeQueryHttp - out; resultLen=" << resultLen ); +} + +void RnpRasDaManComm::executeGetNewOId() + { + ENTER("executeGetNewOId - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); + + int objType = decoder.getDataAsInteger(); + + r_OId oid = rasserver.compat_getNewOId( (unsigned short)objType ); + const char* cOId = oid.get_string_representation(); + + TALK("executeGetNewOId objType = "<<objType<<" oid="<<cOId); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, cOId); + encoder.endFragment(); + + LEAVE("executeGetNewOId - out"); + } + +#define INITPTR(a) a = 0 +#define SECUREPTR(a) if(a == 0) a = strdup("") +#define FREEPTR(a) free(a) + +void RnpRasDaManComm::executeQueryRpc() + { + ENTER("executeQueryRpc() - in"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); + + const char *query = decoder.getDataAsString(); + TALK("query="<<query); + + ExecuteQueryRes queryResult; + INITPTR(queryResult.token); + INITPTR(queryResult.typeName); + INITPTR(queryResult.typeStructure); + + int status = rasserver.compat_executeQueryRpc(query, queryResult); + SECUREPTR(queryResult.token); + SECUREPTR(queryResult.typeName); + SECUREPTR(queryResult.typeStructure); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter( RnpRasserver::pmt_returnstatus,status); + TALK( "adding return status " << status ); + encoder.addInt32Parameter( RnpRasserver::pmt_errorno, queryResult.errorNo); + encoder.addInt32Parameter( RnpRasserver::pmt_lineno, queryResult.lineNo); + encoder.addInt32Parameter( RnpRasserver::pmt_columnno, queryResult.columnNo); + encoder.addStringParameter(RnpRasserver::pmt_errortoken,queryResult.token); + encoder.addStringParameter(RnpRasserver::pmt_typename, queryResult.typeName); + encoder.addStringParameter(RnpRasserver::pmt_typestructure,queryResult.typeStructure); + encoder.endFragment(); + + FREEPTR(queryResult.token); + FREEPTR(queryResult.typeName); + FREEPTR(queryResult.typeStructure); + + LEAVE("executeQueryRpc - out"); + } + +void RnpRasDaManComm::executeGetNextElement() + { + ENTER("executeGetNextElement - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + char *buffer = NULL; + unsigned int bufferSize; + + int status = rasserver.compat_getNextElement(buffer,bufferSize); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus,status); + TALK( "adding return status " << status ); + if(buffer != NULL) + encoder.addOpaqueParameter(RnpRasserver::pmt_skalarobject, buffer, bufferSize); + + encoder.endFragment(); + + free(buffer); + + LEAVE("executeGetNextElement - out"); + } + +void RnpRasDaManComm::executeEndTransfer() + { + ENTER("executeEndTransfer - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + int status = rasserver.compat_endTransfer(); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus,status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + LEAVE("executeEndTransfer - out"); + } + + +void RnpRasDaManComm::executeGetNextMDD() + { + ENTER("executeGetNextMDD - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + + r_Minterval mddDomain; + char* typeName; + char* typeStructure; + r_OId oid; + unsigned short currentFormat; + + + int status = rasserver.compat_getNextMDD(mddDomain, typeName, typeStructure, oid, currentFormat); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus,status); + TALK( "adding return status " << status ); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, mddDomain.get_string_representation()); + encoder.addStringParameter(RnpRasserver::pmt_typename, typeName); + encoder.addStringParameter(RnpRasserver::pmt_typestructure, typeStructure); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation() ? oid.get_string_representation() : ""); + encoder.addInt32Parameter(RnpRasserver::pmt_currentformat, currentFormat); + encoder.endFragment(); + + free(typeName); + free(typeStructure); + + LEAVE("executeGetNextMDD - out"); + } + +void RnpRasDaManComm::executeGetNextTile() + { + ENTER("executeGetNextTile - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + RPCMarray *tempRpcMarray; + + int status = rasserver.compat_getNextTile(&tempRpcMarray); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus,status); + TALK( "adding return status " << status ); + + if(tempRpcMarray != 0) + { + encoder.addStringParameter(RnpRasserver::pmt_domain, tempRpcMarray->domain); + encoder.addInt32Parameter( RnpRasserver::pmt_typelength, tempRpcMarray->cellTypeLength); + encoder.addInt32Parameter( RnpRasserver::pmt_currentformat, tempRpcMarray->currentFormat); + encoder.addInt32Parameter( RnpRasserver::pmt_storageformat, tempRpcMarray->storageFormat); + + encoder.adjustBufferSize(tempRpcMarray->data.confarray_len); + encoder.addOpaqueParameter(RnpRasserver::pmt_tiledata, tempRpcMarray->data.confarray_val, tempRpcMarray->data.confarray_len); + + // Do not free this! "tempRpcMarray->data.confarray_val"; + free(tempRpcMarray->domain); + free(tempRpcMarray); + + } + encoder.endFragment(); + + /* Notez aici ca n-am unde: e adevarat ca tilele trebuie transferate si pe bucati, fiindca + un tiff mare creat cu select e o tila! + */ + + + LEAVE("executeGetNextTile - out"); + } + +//---------- + +void RnpRasDaManComm::executeUpdateQuery() + { + ENTER("executeUpdateQuery - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); const char* query = decoder.getDataAsString(); + + ExecuteUpdateRes returnStructure; + returnStructure.token = NULL; + + int status = rasserver.compat_ExecuteUpdateQuery(query, returnStructure); + + const char *token = returnStructure.token != NULL ? returnStructure.token : ""; + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter( RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.addInt32Parameter( RnpRasserver::pmt_errorno, returnStructure.errorNo); + encoder.addInt32Parameter( RnpRasserver::pmt_lineno, returnStructure.lineNo); + encoder.addInt32Parameter( RnpRasserver::pmt_columnno, returnStructure.columnNo); + encoder.addStringParameter(RnpRasserver::pmt_errortoken, token ); + encoder.endFragment(); + + if(returnStructure.token) free(returnStructure.token); + + LEAVE("executeUpdateQuery - out"); + } + +void RnpRasDaManComm::executeInitUpdate() + { + ENTER("executeInitUpdate - in"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + int status = rasserver.compat_InitUpdate(); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + LEAVE("executeInitUpdate - out"); + } + +void RnpRasDaManComm::executeStartInsertTransMDD() + { + ENTER("executeStartInsertTransMDD - in"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); const char* domain = decoder.getDataAsString(); + decoder.getNextParameter(); int typeLength = decoder.getDataAsInteger(); + decoder.getNextParameter(); const char* typeName = decoder.getDataAsString(); + + int status = rasserver.compat_StartInsertTransMDD(domain, typeLength, typeName); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + LEAVE("executeStartInsertTransMDD - out"); + } + +void RnpRasDaManComm::executeInsertTile() + { + ENTER("executeInsertTile - in"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + RPCMarray *rpcMarray = new RPCMarray; + + decoder.getNextParameter(); int persistent = decoder.getDataAsInteger(); + decoder.getNextParameter(); rpcMarray->domain = (char*)decoder.getDataAsString(); + decoder.getNextParameter(); rpcMarray->cellTypeLength = decoder.getDataAsInteger(); + decoder.getNextParameter(); rpcMarray->currentFormat = decoder.getDataAsInteger(); + decoder.getNextParameter(); rpcMarray->storageFormat = decoder.getDataAsInteger(); + + decoder.getNextParameter(); + const void* buffer = decoder.getData(); + int length = decoder.getDataLength(); + + rpcMarray->data.confarray_val = (char*)mymalloc(length); memcpy(rpcMarray->data.confarray_val, buffer, length); + rpcMarray->data.confarray_len = length; + + int status = rasserver.compat_InsertTile(persistent, rpcMarray); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + // rpcMarray->data.confarray_val is freed by Tile::Tile(...), which is stupid, but... + delete rpcMarray; + + LEAVE("executeInsertTile - out"); + } + +void RnpRasDaManComm::executeEndInsertMDD() + { + ENTER("executeEndInsertMDD - in"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); int persistent = decoder.getDataAsInteger(); + + int status = rasserver.compat_EndInsertMDD(persistent); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + LEAVE("executeEndInsertMDD - out"); + } + +void RnpRasDaManComm::executeGetTypeStructure() + { + ENTER("executeGetTypeStructure - in"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); const char *typeName = decoder.getDataAsString(); + decoder.getNextParameter(); int typeType = decoder.getDataAsInteger(); + + char *typeStructure=NULL; + + int status = rasserver.compat_GetTypeStructure(typeName, typeType, typeStructure); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.addStringParameter(RnpRasserver::pmt_typestructure, typeStructure ? typeStructure : ""); + encoder.endFragment(); + + if(typeStructure) + { + free(typeStructure); + } + LEAVE("executeGetTypeStructure - out"); + } + +void RnpRasDaManComm::executeStartInsertPersMDD() + { + ENTER("executeStartInsertPersMDD - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); const char* collName = decoder.getDataAsString(); + decoder.getNextParameter(); r_Minterval mddDomain( decoder.getDataAsString() ); + decoder.getNextParameter(); int typeLength = decoder.getDataAsInteger(); + decoder.getNextParameter(); const char* typeName = decoder.getDataAsString(); + decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() ); + + int status = rasserver.compat_StartInsertPersMDD(collName, mddDomain, typeLength, typeName, oid); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + LEAVE("executeStartInsertPersMDD - out"); + } + +void RnpRasDaManComm::executeInsertMDD() + { + ENTER("executeInsertMDD - in"); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); const char* collName = decoder.getDataAsString(); + decoder.getNextParameter(); const char* typeName = decoder.getDataAsString(); + decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() ); + + RPCMarray *rpcMarray = new RPCMarray; + + decoder.getNextParameter(); rpcMarray->domain = (char*)decoder.getDataAsString(); + decoder.getNextParameter(); rpcMarray->cellTypeLength = decoder.getDataAsInteger(); + decoder.getNextParameter(); rpcMarray->currentFormat = decoder.getDataAsInteger(); + decoder.getNextParameter(); rpcMarray->storageFormat = decoder.getDataAsInteger(); + + decoder.getNextParameter(); + const void* buffer = decoder.getData(); + int length = decoder.getDataLength(); + + rpcMarray->data.confarray_val = (char*)mymalloc(length); memcpy(rpcMarray->data.confarray_val, buffer, length); + rpcMarray->data.confarray_len = length; + + int status = rasserver.compat_InsertMDD(collName, rpcMarray, typeName, oid); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + LEAVE("executeInsertMDD - out"); + } + +void RnpRasDaManComm::executeInsertCollection() + { + ENTER("executeInsertCollection - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); const char* collName = decoder.getDataAsString(); + decoder.getNextParameter(); const char* typeName = decoder.getDataAsString(); + decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() ); + + int status = rasserver.compat_InsertCollection(collName, typeName, oid); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + LEAVE("executeInsertCollection - out"); + } + +void RnpRasDaManComm::executeDeleteCollByName() + { + ENTER("executeDeleteCollByName - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); const char* collName = decoder.getDataAsString(); + + int status = rasserver.compat_DeleteCollByName(collName); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + LEAVE("executeDeleteCollByName - out"); + } + +void RnpRasDaManComm::executeDeleteObjByOId() + { + ENTER("executeDeleteObjByOId - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() ); + + int status = rasserver.compat_DeleteObjByOId(oid); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + LEAVE("executeDeleteObjByOId - out"); + } + +void RnpRasDaManComm::executeRemoveObjFromColl() + { + ENTER("executeRemoveObjFromColl - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); const char* collName = decoder.getDataAsString(); + decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() ); + + int status = rasserver.compat_RemoveObjFromColl(collName, oid); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + LEAVE("executeRemoveObjFromColl - out"); + } + +void RnpRasDaManComm::executeGetCollection() + { + ENTER("executeGetCollection - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + char* typeName = NULL; + char* typeStructure = NULL; + char* collName = NULL; + r_OId oid; + int status = 0; + + decoder.getNextParameter(); + if(decoder.getParameterType() == RnpRasserver::pmt_collname) + { + collName = strdup(decoder.getDataAsString()); + status = rasserver.compat_GetCollectionByName(collName, typeName, typeStructure, oid); + } + else + { + const char* oidstring = decoder.getDataAsString(); + oid = r_OId(oidstring); + status = rasserver.compat_GetCollectionByName(oid, typeName, typeStructure, collName); + } + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.addStringParameter(RnpRasserver::pmt_typename, typeName); + encoder.addStringParameter(RnpRasserver::pmt_typestructure, typeStructure); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation()); + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + encoder.endFragment(); + + free((void*)typeName); + free((void*)typeStructure); + free((void*)collName); + + LEAVE("executeGetCollection - out"); + } + +void RnpRasDaManComm::executeGetCollectionOIds() + { + ENTER("executeGetCollectionOIds - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + char* typeName = NULL; + char* typeStructure = NULL; + char* collName = NULL; + r_OId oid; + RPCOIdEntry* oidTable = NULL; + unsigned int oidTableSize = 0; + int status = 0; + + decoder.getNextParameter(); + if(decoder.getParameterType() == RnpRasserver::pmt_collname) + { + collName = strdup(decoder.getDataAsString()); + status = rasserver.compat_GetCollectionOidsByName(collName, typeName, typeStructure, oid, oidTable, oidTableSize); + } + else + { oid = r_OId(decoder.getDataAsString()); + status = rasserver.compat_GetCollectionOidsByOId(oid, typeName, typeStructure, oidTable, oidTableSize, collName); + } + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.addStringParameter(RnpRasserver::pmt_typename, typeName); + encoder.addStringParameter(RnpRasserver::pmt_typestructure, typeStructure); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation()); + encoder.addStringParameter(RnpRasserver::pmt_collname, collName); + + if(oidTable) + for(int i=0;i<oidTableSize; i++) + { + encoder.adjustBufferSize(strlen(oidTable[i].oid)); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, oidTable[i].oid); + free(oidTable[i].oid); + } + encoder.endFragment(); + + free((void*)typeName); + free((void*)typeStructure); + free((void*)collName); + free(oidTable); + + LEAVE("executeGetCollectionOIds - out"); + } + +void RnpRasDaManComm::executeGetObjectType() + { + ENTER("executeGetObjectType - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); const char* oidstring = decoder.getDataAsString(); + + r_OId oid(oidstring); + unsigned short objType; + + int status=rasserver.compat_GetObjectType(oid, objType); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.addInt32Parameter(RnpRasserver::pmt_objecttype, objType); + encoder.endFragment(); + + + LEAVE("executeGetObjectType - out"); + } + +void RnpRasDaManComm::executeSetFormat() + { + ENTER("executeSetFormat - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + decoder.getNextParameter(); int whichFormat = decoder.getDataAsInteger(); + decoder.getNextParameter(); int format = decoder.getDataAsInteger(); + decoder.getNextParameter(); const char* params = decoder.getDataAsString(); + + int status = 0; + + if(whichFormat == 1) + status = rasserver.compat_SetTransferFormat(format, params); + else + status = rasserver.compat_SetStorageFormat(format, params); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status); + TALK( "adding return status " << status ); + encoder.endFragment(); + + LEAVE("executeSetFormat - out"); + } + +//########### until here the compatible ones ############### + +void RnpRasDaManComm::executeCreateCollection() + { + ENTER("executeCreateCollection - in"); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + const char* collName = getNextAsString(RnpRasserver::pmt_collname); + const char* collTypeName = getNextAsString(RnpRasserver::pmt_typename); + + TALK("rasserver.createCollection( " << collName << ", " << collTypeName << " )" ); + r_OId roid = rasserver.createCollection(collName, collTypeName); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, roid.get_string_representation()); + encoder.endFragment(); + + LEAVE("executeCreateCollection - out"); + } + +void RnpRasDaManComm::executeCreateMDD() +{ + ENTER( "RnpRasDaManComm::executeCreateMDD" ); + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + const char *collName = getNextAsString(RnpRasserver::pmt_collname); + const char *mddTypeName = getNextAsString(RnpRasserver::pmt_typename); + const char *definitionDomain = getNextAsString(RnpRasserver::pmt_domain); + bool rcindex = false; + const char *tileDomain = 0; + + if(decoder.getNextParameter()) + { + rcindex = decoder.getDataAsInteger() ? true : false; + tileDomain = getNextAsString(RnpRasserver::pmt_domain); + } + TALK( "collName=" << collName << ", mddTypeName=" << mddTypeName << ", definitionDomain=" << definitionDomain << ", tileDomain=" << tileDomain << ", rcindex=" << rcindex ); + r_OId roid = rasserver.createMDD(collName, mddTypeName, definitionDomain, tileDomain, rcindex); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.addStringParameter(RnpRasserver::pmt_oidstring, roid.get_string_representation()); + encoder.endFragment(); + + LEAVE( "RnpRasDaManComm::executeCreateMDD, oid=" << roid.get_string_representation() ); +} + +void RnpRasDaManComm::executeExtendMDD() +{ + ENTER( "RnpRasDaManComm::executeExtendMDD" ); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + const char *oidstring = getNextAsString(RnpRasserver::pmt_oidstring); + const char *stripeDomain = getNextAsString(RnpRasserver::pmt_domain); + const char *tileDomain = getNextAsString(RnpRasserver::pmt_domain); + + r_OId mddOId = r_OId(oidstring); + + TALK( "mddOId=" << oidstring << ", stripeDomain=" << stripeDomain << ", tileDomain=" << tileDomain ); + rasserver.extendMDD(mddOId, stripeDomain, tileDomain); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + encoder.endFragment(); + + LEAVE( "RnpRasDaManComm::executeExtendMDD" ); +} + +void RnpRasDaManComm::executeGetTileDomains() +{ + ENTER( "RnpRasDaManComm::executeGetTileDomains" ); + + RasServerEntry& rasserver = RasServerEntry::getInstance(); + + const char *oidstring = getNextAsString(RnpRasserver::pmt_oidstring); + const char *stripeDomain = getNextAsString(RnpRasserver::pmt_domain); + + r_OId mddOId = r_OId(oidstring); + + vector<r_Minterval> result = rasserver.getTileDomains(mddOId, stripeDomain); + + encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand()); + + for(int i=0;i < result.size(); i++) + { + const char *domain = result[i].get_string_representation(); + encoder.addStringParameter(RnpRasserver::pmt_domain, domain); + + free((void*)domain); + } + + encoder.endFragment(); + + LEAVE( "RnpRasDaManComm::executeGetTileDomains" ); +} + +//######### helper functions ########################### + +void RnpRasDaManComm::connectClient() + { + clientID = makeNewClientID(); + TALK( "RnpRasDaManComm::connectClient(): assigned new client id 0x" << hex << clientID << dec ); + } + +void RnpRasDaManComm::disconnectInternally() +{ + clientID = NoClient; + requestCounter = 1; // because pre-increment before request processing will not be reached when this is called + fragmentCounter = 1; // same phenomenon, different reason: verify needs this counter for OK'ing connect +} + +void RnpRasDaManComm::disconnectClient() + { + clientID = NoClient; + requestCounter = 0; + fragmentCounter = 0; + rasmgrComm.informRasmgrServerAvailable(); + } + + +void RnpRasDaManComm::verifyClientID( RnpQuark command ) throw (r_Error) + { + ENTER( "RnpRasDaManComm::verifyClientID( command=" << command << " ), fragmentCounter=" << fragmentCounter << ", requestCounter=" << requestCounter ); + + decoder.getFirstParameter(); + + if(decoder.getParameterType() != RnpRasserver::pmt_clientid) + { + RMInit::logOut << "Error: unidentified client." << endl; + LEAVE( "RnpRasDaManComm::verifyClientID() - exception, unknown client id." ); + throw r_Error(820); // sorry, I know, symbolic constants + } + + int verClientID = decoder.getDataAsInteger(); + TALK( "RnpRasDaManComm::verifyClientID: clientID 0x" << hex << clientID << dec << ", verClientID 0x" << hex << verClientID << dec ); + + // it's our client, it's OK + if(clientID == verClientID) + { + LEAVE( "RnpRasDaManComm::verifyClientID() - it's our client, it's OK" ); + return; + } + + // connect cmd is OK too + if(command == RnpRasserver::cmd_connect) + { + LEAVE( "RnpRasDaManComm::verifyClientID() - connect requested, OK" ); + return; + } + + // new client, first request, it's probably connect, so OK + if(clientID == NoClient && fragmentCounter == 1) + { + LEAVE( "RnpRasDaManComm::verifyClientID() - new client, first request, it's probably connect, so OK" ); + return; + } + + // new client, same message, a new request, it's also OK (he is allowed to put more fragments in a request!) + if(clientID != NoClient && fragmentCounter > 1 && requestCounter == 1 && verClientID == 0) + { + LEAVE( "RnpRasDaManComm::verifyClientID() - new client, same message, a new request (multi-fragment), so OK" ); + return; + } + + RMInit::logOut << "Error: unregistered client." << endl; + LEAVE("RnpRasDaManComm::verifyClientID(): stored clientID is 0x" << hex << clientID << dec << ", but client identified as 0x" << hex << verClientID << dec << ", fragmentCounter=" << fragmentCounter << ", requestCounter=" << requestCounter); + throw r_Error(821); // invalid sequence number + } + +int RnpRasDaManComm::makeNewClientID() + { + + // CLIENTID: | 0|.counter.|....timestamp ......| + // |31,30.....24|23...16|15...8|7...0| + static int counter = 0; + + int timeNow = time(NULL); + + int result = (timeNow & 0xFFFFFF) + (counter << 24); + + counter = (counter+1) & 0x7F; + + TALK( "RnpRasDaManComm::makeNewClientID() -> 0x" << hex << result << " (counter now: " << counter << ")" ); + return result; + } + +void RnpRasDaManComm::answerr_Error(r_Error &err) +{ + const char *errText = err.serialiseError(); + + RMInit::logOut << "Error in response: (" << errText << ") " << err.what() << endl; + + encoder.startFragment(Rnp::fgt_Error, decoder.getCommand()); + encoder.addInt32Parameter(Rnp::ert_Other, 0); + encoder.addStringParameter(RnpRasserver::pmt_rErrorString, errText); + + // add descriptive text -- PB 2003-nov-24 + encoder.addStringParameter(RnpRasserver::pmt_rErrorString, err.what() ); + + encoder.endFragment(); + + delete[] errText; +} + +//###################################################### +RnpRasserverJob::RnpRasserverJob() throw() + { + TALK( "RNP: RnpRasserverJob created" ); + } + +bool RnpRasserverJob::validateMessage() throw() + { + TALK( "RNP: validateMessage()" ); + return RnpServerJob::validateMessage(); + } + +void RnpRasserverJob::executeOnAccept() throw() + { + TALK( "RNP: executeOnAccept()" ); + RnpServerJob::executeOnAccept(); + } + +void RnpRasserverJob::executeOnWriteReady() throw() + { + TALK( "RNP: executeOnWriteReady()" ); + RnpServerJob::executeOnWriteReady(); + } + +void RnpRasserverJob::specificCleanUpOnTimeout() throw() + { + TALK( "RNP: specificCleanUpOnTimeout()" ); + RnpServerJob::specificCleanUpOnTimeout(); + } + +void RnpRasserverJob::executeOnReadError() throw() + { + RMInit::logOut << "Error while executing read operation." << endl; + RnpServerJob::executeOnReadError(); + } + +void RnpRasserverJob::executeOnWriteError() throw() + { + RMInit::logOut << "Error while executing write operation." << endl; + RnpServerJob::executeOnWriteError(); + } + +//######################################################### +RasserverCommunicator::RasserverCommunicator(RnpRasDaManComm* cmm) throw() + { + commPtr = cmm; + } + +bool RasserverCommunicator::executeOnTimeout() throw() + { + TALK( "RasserverCommunicator::executeOnTimeout()" ); + + commPtr->checkForTimeout(); + + return true; + } + +//######################################################### +ClientTimer::ClientTimer() + { + interval = 0; + lastAction = 0; + enabled = false; + } + +void ClientTimer::setTimeoutInterval(int seconds) + { + interval = seconds; + enabled = true; + markAction(); + } + +void ClientTimer::markAction() + { + lastAction = time(NULL); + } + +bool ClientTimer::checkForTimeout() + { + if(enabled == false) return false; + + time_t now = time(NULL); + + return now >= lastAction + interval; + } + diff --git a/rnprotocol/rnpservercomm.hh b/rnprotocol/rnpservercomm.hh new file mode 100644 index 0000000..9a173f4 --- /dev/null +++ b/rnprotocol/rnpservercomm.hh @@ -0,0 +1,157 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/************************************************************* + * + * + * PURPOSE: + * + * + * COMMENTS: + * + ************************************************************/ + +#ifndef RNPSERVERCOMM_HH +#define RNPSERVERCOMM_HH + +//#include <rnprotocol.hh> +//#include <rnpembedded.hh> +#include "rnpcommunication.hh" +#include "raslib/error.hh" + +using namespace rnp; + +class ClientTimer + { + public: + ClientTimer(); + void setTimeoutInterval(int seconds); + void markAction(); + bool checkForTimeout(); + private: + time_t interval; + time_t lastAction; + bool enabled; + }; + +class RnpRasserverJob : public RnpServerJob + { + public: + RnpRasserverJob() throw(); + + private: + bool validateMessage() throw(); + void executeOnAccept() throw(); + void executeOnWriteReady() throw(); + void specificCleanUpOnTimeout() throw(); + void executeOnReadError() throw(); + void executeOnWriteError() throw(); + }; + +class RnpRasDaManComm : public RnpBaseServerComm + { + public: + RnpRasDaManComm() throw(); + + ~RnpRasDaManComm() throw(); + + void processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol, RnpServerJob *callingJob) throw(); + + void setTimeoutInterval(int seconds); + void checkForTimeout(); + + private: // inherited from RnpBaseServerComm + RnpServerJob* createJobs(int howMany); + + void decodeFragment() throw( r_Error ); + + ClientTimer clientTimer; + private: // the execution functions: + void executeConnect(); + void executeDisconnect(); + void executeOpenDB(); + void executeCloseDB(); + void executeBeginTA(); + void executeCommitTA(); + void executeAbortTA(); + void executeIsTAOpen(); + void executeQueryHttp(); + void executeGetNewOId(); + void executeQueryRpc(); + void executeGetNextElement(); + void executeEndTransfer(); + void executeGetNextMDD(); + void executeGetNextTile(); + + void executeUpdateQuery(); + void executeStartInsertTransMDD(); + void executeInsertTile(); + void executeEndInsertMDD(); + void executeInitUpdate(); + void executeGetTypeStructure(); + void executeStartInsertPersMDD(); + void executeInsertMDD(); + void executeInsertCollection(); + void executeRemoveObjFromColl(); + void executeDeleteObjByOId(); + void executeDeleteCollByName(); + void executeGetCollection(); + void executeGetCollectionOIds(); + void executeGetObjectType(); + void executeSetFormat(); + + + void executeCreateCollection(); + void executeCreateMDD(); + void executeExtendMDD(); + void executeGetTileDomains(); + + void answerr_Error(r_Error&); + private: // helper functions + void connectClient(); + // reset connection, without reporting availability to rasmgr + void disconnectInternally(); + // reset connection, with reporting availability to rasmgr + void disconnectClient(); + void verifyClientID( RnpQuark command ) throw (r_Error); + int makeNewClientID(); + + int clientID; // un timestamp, de fapt! + int requestCounter; // numara pachetele trimise de un client + int fragmentCounter; // numara fragmentele trimise de un client + + static const int NoClient; + }; + +class RasserverCommunicator : public NbCommunicator + { + public: + RasserverCommunicator(RnpRasDaManComm*) throw(); + + protected: + bool executeOnTimeout() throw(); + + RnpRasDaManComm *commPtr; + }; + +#endif // RNPSERVERCOMM_HH + diff --git a/rnprotocol/srvrasmgrcomm.cc b/rnprotocol/srvrasmgrcomm.cc new file mode 100644 index 0000000..d977045 --- /dev/null +++ b/rnprotocol/srvrasmgrcomm.cc @@ -0,0 +1,213 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/************************************************************* + * + * + * PURPOSE: + * + * + * COMMENTS: + * - why sometimes exit() and sometimes return on error? + * should be return or exception, always. + * + ************************************************************/ + +#include "srvrasmgrcomm.hh" +#include<stdio.h> +#include<errno.h> +#include<stdlib.h> +#include<unistd.h> +#include<sys/types.h> +#include<sys/socket.h> +#include<netinet/in.h> +#include<netdb.h> +#include<iostream> +#include<string.h> +#include "raslib/rminit.hh" + + +#include "debug.hh" + + +// exit code of the server when communication is impossible +// PB: Why 10 ? Anyway, should not use exit in a server, how often did I tell you, Walter Schatz!!! +const unsigned int EXIT_CODE = 10; + +// max number of retries to connect to rasmgr in informRasMgr() +const unsigned int SRV_MAX_RETRY = 10000000; + +// how many retries are attempted before a message is issued informRasMgr() +const unsigned int SRV_TALK_INTERVAL = 100000; + + +SrvRasmgrComm rasmgrComm; + +SrvRasmgrComm::SrvRasmgrComm() +{ + timeout = 0; + serverName = 0; + rasmgrHost = 0; + rasmgrPort = -1; +} + +// note: should make use of timeout as defined in cmd line, +// but that's a major undertaking -- PB 2005-sep-02 + +void SrvRasmgrComm::init(unsigned int timeOut, const char* instanceName, const char* nRasmgrHost, int nRasmgrPort) +{ + timeout = timeOut; + serverName = instanceName; + rasmgrHost = nRasmgrHost; + rasmgrPort = nRasmgrPort; +} + +unsigned int SrvRasmgrComm::getTimeout() +{ + return timeout; +} + +void SrvRasmgrComm::informRasmgrServerAvailable() +{ + informRasMGR(SERVER_AVAILABLE); +} + +void SrvRasmgrComm::informRasmgrServerDown() +{ + informRasMGR(SERVER_DOWN); +} + +void SrvRasmgrComm::informRasmgrServerStillAvailable() +{ + // too verbose, blows up log file + // RMInit::logOut << "informing rasmgr: server still available." << endl; + informRasMGR(SERVER_REGULARSIG); +} + +void SrvRasmgrComm::informRasMGR(int what) +{ //what: 0 - going down + // 1 - available + // 2 - regular signal + +// cout<<"servername ="<<serverName<<" rasmgrhost="<<rasmgrHost<<" port="<<rasmgrPort<<endl; + +// if(what == SERVER_AVAILABLE) accessControl.resetForNewClient(); + + struct protoent* getprotoptr = getprotobyname("tcp"); + + struct hostent *hostinfo = gethostbyname(rasmgrHost); + if(hostinfo==NULL) + { + RMInit::logOut << "Error: cannot locate rasmgr host '" << rasmgrHost << "': " << strerror(errno) << std::endl; + return; + } + + sockaddr_in internetSocketAddress; + internetSocketAddress.sin_family = AF_INET; + internetSocketAddress.sin_port=htons(rasmgrPort); + internetSocketAddress.sin_addr=*(struct in_addr*)hostinfo->h_addr; + + int sock; + + bool ok=false; + long talkInterval=SRV_TALK_INTERVAL; + long maxRetry=SRV_MAX_RETRY; + long retry =0; + // creating socket + for(retry=0;retry<maxRetry;retry++) + { + sock=socket(PF_INET,SOCK_STREAM,getprotoptr->p_proto); + //std::cout<<"Socket="<<sock<<" protocol(tcp)="<<getprotoptr->p_proto<<std::endl; + + if(sock<0) + { + if( (retry%talkInterval) == 0) + { + std::cerr<< "Error: server '" << serverName << " cannot open socket to rasmgr (" << retry << " attempts, still retrying): " << strerror(errno) << std::endl; + RMInit::logOut << "Error: server '" << serverName << " cannot open socket to rasmgr (" << retry << " attempts, still retrying): " << strerror(errno) << std::endl; + } + continue; + } + + if(connect(sock,(struct sockaddr*)&internetSocketAddress,sizeof(internetSocketAddress)) < 0) + { + if( (retry%talkInterval) == 0) + { + std::cerr << "Error: server '" << serverName << " cannot connect to rasmgr (" << retry << " attempts, still retrying): " << strerror(errno) << std::endl; + RMInit::logOut << "Error: server '" << serverName << " cannot connect to rasmgr (" << retry << " attempts, still retrying): " << strerror(errno) << std::endl; + } + close(sock); //yes, some SO requieres this, like DEC from BLVA + continue; + } + ok = true; + break; + } + + if( !ok ) + { + std::cerr << "Error: unable to contact rasmgr, server '" << serverName << "' herewith giving up." <<std::endl; + RMInit::logOut << "Error: unable to contact rasmgr, server '" << serverName << "' herewith giving up." <<std::endl; + if(sock) + close(sock); + exit( EXIT_CODE ); + } + + // creating the HTTP message + char message[200]; + sprintf(message,"%s%d\r\n\r\n%s %d %ld ","POST rasservernewstatus HTTP/1.1\r\nUserAgent: RasServer/1.0\r\nContent-length: ",strlen(serverName)+3,serverName,what,0); + + // writing message; + if(writeWholeMessage(sock,message,strlen(message)+1)<0) + { + std::cerr << "Error: cannot send message to rasmgr: " << strerror(errno) << std::endl; + RMInit::logOut << "Error: cannot send message to rasmgr: " << strerror(errno) << std::endl; + close(sock); + exit( EXIT_CODE ); + } + close(sock); +} + + +int SrvRasmgrComm::writeWholeMessage(int socket,char *destBuffer,int buffSize) +{ + // we write the whole message, including the ending '\0', which is already in + // the buffSize provided by the caller + int totalLength=0; + int writeNow; + while(1) + { + writeNow = write(socket,destBuffer+totalLength,buffSize-totalLength); + if(writeNow == -1) + { + if(errno == EINTR) + continue; // read was interrupted by signal (on bad SO's) + return -1; // another error + } + totalLength+=writeNow; + + if( totalLength==buffSize ) + break; // THE END + } + + return totalLength; +} + diff --git a/rnprotocol/srvrasmgrcomm.hh b/rnprotocol/srvrasmgrcomm.hh new file mode 100644 index 0000000..0d15c4c --- /dev/null +++ b/rnprotocol/srvrasmgrcomm.hh @@ -0,0 +1,67 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +*/ +/************************************************************* + * + * + * PURPOSE: + * + * + * COMMENTS: + * + ************************************************************/ + +#ifndef SRVRASMGR_HH +#define SRVRASMGR_HH + +#define SERVER_DOWN 0 +#define SERVER_AVAILABLE 1 + // 2 is server crushed, but it's generated by rasmgr! + // regularly signal the rasmgr that we are available +#define SERVER_REGULARSIG 3 + +class SrvRasmgrComm + { + public: + SrvRasmgrComm(); + + void init(unsigned int timeOut, const char* instanceName, const char* rasmgrHost, int rasmgrPort); + + void informRasmgrServerAvailable(); + void informRasmgrServerDown(); + void informRasmgrServerStillAvailable(); + + unsigned int getTimeout(); + + private: + void informRasMGR(int what); + int writeWholeMessage(int socket,char *destBuffer,int buffSize); + + const char* serverName; + const char* rasmgrHost; + int rasmgrPort; + unsigned int timeout; + }; + +extern SrvRasmgrComm rasmgrComm; + +#endif |