summaryrefslogtreecommitdiffstats
path: root/rnprotocol
diff options
context:
space:
mode:
authorConstantin Jucovschi <cj@ubuntu.localdomain>2009-04-24 07:20:22 -0400
committerConstantin Jucovschi <cj@ubuntu.localdomain>2009-04-24 07:20:22 -0400
commit8f27e65bddd7d4b8515ce620fb485fdd78fcdf89 (patch)
treebd328a4dd4f92d32202241b5e3a7f36177792c5f /rnprotocol
downloadrasdaman-upstream-8f27e65bddd7d4b8515ce620fb485fdd78fcdf89.tar.gz
rasdaman-upstream-8f27e65bddd7d4b8515ce620fb485fdd78fcdf89.tar.xz
rasdaman-upstream-8f27e65bddd7d4b8515ce620fb485fdd78fcdf89.zip
Initial commitv8.0
Diffstat (limited to 'rnprotocol')
-rw-r--r--rnprotocol/Makefile.am47
-rw-r--r--rnprotocol/rnpclientcomm.cc1712
-rw-r--r--rnprotocol/rnpclientcomm.hh346
-rw-r--r--rnprotocol/rnpclientcomm2.cc1226
-rw-r--r--rnprotocol/rnpcommunication.cc702
-rw-r--r--rnprotocol/rnpcommunication.hh345
-rw-r--r--rnprotocol/rnpembedded.cc467
-rw-r--r--rnprotocol/rnpembedded.hh256
-rw-r--r--rnprotocol/rnprasserver.cc72
-rw-r--r--rnprotocol/rnprasserver.hh124
-rw-r--r--rnprotocol/rnprotocol.cc819
-rw-r--r--rnprotocol/rnprotocol.hh462
-rw-r--r--rnprotocol/rnpserver.cc134
-rw-r--r--rnprotocol/rnpserver.hh33
-rw-r--r--rnprotocol/rnpservercomm.cc1344
-rw-r--r--rnprotocol/rnpservercomm.hh157
-rw-r--r--rnprotocol/srvrasmgrcomm.cc213
-rw-r--r--rnprotocol/srvrasmgrcomm.hh67
18 files changed, 8526 insertions, 0 deletions
diff --git a/rnprotocol/Makefile.am b/rnprotocol/Makefile.am
new file mode 100644
index 0000000..d6a021d
--- /dev/null
+++ b/rnprotocol/Makefile.am
@@ -0,0 +1,47 @@
+#
+# This file is part of rasdaman community.
+#
+# Rasdaman community is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Rasdaman community is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+#
+# Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+# rasdaman GmbH.
+#
+# For more information please see <http://www.rasdaman.org>
+# or contact Peter Baumann via <baumann@rasdaman.com>.
+#
+# MAKEFILE FOR:
+# module rnprotocol
+#
+# COMMENTS:
+#
+##################################################################
+
+noinst_LIBRARIES=libservercomm.a libclientcomm.a
+
+libservercomm_a_SOURCES=rnprotocol.cc rnprotocol.hh \
+ rnpembedded.cc rnpembedded.hh \
+ rnpcommunication.cc rnpcommunication.hh \
+ rnpserver.cc rnpserver.hh \
+ srvrasmgrcomm.cc srvrasmgrcomm.hh \
+ rnprasserver.cc rnprasserver.hh \
+ rnpservercomm.cc rnpservercomm.hh
+
+libclientcomm_a_SOURCES=rnprotocol.cc rnprotocol.hh \
+ rnpembedded.cc rnpembedded.hh \
+ rnpcommunication.cc rnpcommunication.hh \
+ rnprasserver.cc \
+ rnpclientcomm.cc rnpclientcomm2.cc rnpclientcomm.hh
+
+CLEANFILES=core
+
diff --git a/rnprotocol/rnpclientcomm.cc b/rnprotocol/rnpclientcomm.cc
new file mode 100644
index 0000000..fc58f4b
--- /dev/null
+++ b/rnprotocol/rnpclientcomm.cc
@@ -0,0 +1,1712 @@
+#include "mymalloc/mymalloc.h"
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/*************************************************************
+ *
+ *
+ * PURPOSE:
+ *
+ *
+ * COMMENTS:
+ *
+ * Contains the upper level functions, everything related to communication
+ * is in rnpclientcomm2
+ *
+ ************************************************************/
+
+#include <openssl/evp.h>
+
+#include "rnpclientcomm.hh"
+#include "rasdaman.hh"
+#include "debug.hh"
+
+
+// waiting time increment upon subsecuqnet connect tries in RNP [msecs] -- PB 2005-sep-09
+const unsigned int RNP_PAUSE_INCREMENT = 100;
+
+
+RnpClientComm::RnpClientComm( const char* nRasmgrHost, int nRasmgrPort) throw( r_Error )
+:RnpBaseClientComm(RnpRasserver::serverID, RnpTransport::crp_Rnp)
+{
+ ENTER( "RpcClientComm::RnpClientComm( " << nRasmgrHost << "," << nRasmgrPort << " )" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "RnpClientComm(" << nRasmgrHost << "," << nRasmgrPort << ")" );
+
+ clientID = -1;
+
+ clientParams = new r_Parse_Params();
+ clientParams->add("compserver", &serverCompresses, r_Parse_Params::param_type_int);
+ clientParams->add("exactformat", &exactFormat, r_Parse_Params::param_type_int);
+
+ endianClient = (int)r_Endian::get_endianness();
+
+ rasmgrHost=(char*)nRasmgrHost;
+ rasmgrPort=nRasmgrPort;
+ serverHost[0]=0;
+ capability[0]=0;
+ strcpy(identificationString,"rasguest:8e70a429be359b6dace8b5b2500dedb0"); // this is MD5("rasguest");
+
+ transferFormatParams = 0;
+ storageFormatParams = 0;
+
+ // experimentally disabled -- PB 2005-sep-01
+ // useTurbo = true;
+ useTurbo = true;
+ TALK( "useTurbo=" << useTurbo );
+
+ akg::NbJob::setTimeoutInterval(3600);
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "RnpClientComm()" );
+ LEAVE( "RpcClientComm::RnpClientComm()" );
+}
+
+RnpClientComm::~RnpClientComm() throw ()
+{
+ ENTER( "RpcClientComm::~RnpClientComm()" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "~RnpClientComm()" );
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "~RnpClientComm()" );
+ LEAVE( "RpcClientComm::~RnpClientComm()" );
+}
+
+bool RnpClientComm::effectivTypeIsRNP() throw()
+{
+ bool retval = true;
+ ENTER( "RpcClientComm::effectivTypeIsRNP()" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "effectivTypeIsRNP()" );
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "effectivTypeIsRNP() -> " << retval );
+ LEAVE( "RpcClientComm::effectivTypeIsRNP() -> " << retval );
+ return retval;
+}
+
+int RnpClientComm::openDB( const char* database )
+{
+ int retval = 0;
+
+ ENTER( "RpcClientComm::openDB(" << database << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "openDB(" << database << ")" );
+
+ strcpy(databaseName,database);
+
+ getFreeServer(false, true); // read only, openDB
+
+ TALK( "openDB: Connected to server: "<<serverHost<<":"<<serverPort );
+ setConnectionParameters(serverHost,serverPort);
+
+/* was commented out, trying it... -- PB 2005-aug-31
+*/
+ if(useTurbo)
+ {
+ turboOpenDB(databaseName);
+ }
+ else
+ {
+ executeConnect();
+ executeOpenDB(database);
+ executeCloseDB();
+ executeDisconnect();
+ }
+/* */
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "openDB() -> " << retval );
+ LEAVE( "RpcClientComm::openDB() -> " << retval );
+ return retval;
+}
+
+int RnpClientComm::closeDB()
+{
+ int retval = 0;
+ ENTER( "RpcClientComm::closeDB()" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "closeDB()" );
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "closeDB() -> " << retval );
+ LEAVE( "RpcClientComm::closeDB() -> " << retval );
+ return retval;
+}
+
+int RnpClientComm::createDB( const char* name ) throw(r_Error)
+{
+ int retval = -1;
+ ENTER( "RpcClientComm::createDB( " << (name?name:"(null)") << " )" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "createDB()" );
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "createDB() -> " << retval );
+ LEAVE( "RpcClientComm::createDB() -> " << retval );
+ return retval;
+}
+
+int RnpClientComm::destroyDB( const char* name ) throw(r_Error)
+{
+ int retval = -1;
+ ENTER( "RpcClientComm::destroyDB( " << (name?name:"(null)") << " )" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "destroyDB()" );
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "destroyDB() -> " << retval );
+ LEAVE( "RpcClientComm::destroyDB() -> " << retval );
+ return retval;
+}
+
+int RnpClientComm::openTA( unsigned short readOnly ) throw(r_Error)
+{
+ int retval = 1;
+ ENTER( "RpcClientComm::openTA(" << readOnly << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "openTA(" << readOnly << ")" );
+
+ bool rw = (readOnly == 0 ? true : false);
+
+ getFreeServer(rw, false); // readwrite?, not openDB
+ TALK( "openTA: connected to server "<<serverHost<<":"<<serverPort );
+ setConnectionParameters(serverHost,serverPort);
+
+ if(useTurbo)
+ {
+ turboBeginTA(rw);
+ }
+ else
+ {
+ executeConnect();
+ executeOpenDB(databaseName);
+ executeBeginTA(rw);
+ }
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "openTA() -> " << retval );
+ LEAVE( "RpcClientComm::openTA() -> " << retval );
+ return retval;
+}
+
+int RnpClientComm::commitTA() throw(r_Error)
+{
+ int retval = 1;
+
+ ENTER( "RpcClientComm::commitTA()" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "commitTA()" );
+
+ if(useTurbo)
+ {
+ turboCommitTA();
+ }
+ else
+ {
+ executeCommitTA();
+ executeCloseDB();
+ executeDisconnect();
+ }
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "commitTA() -> " << retval );
+ LEAVE( "RpcClientComm::commitTA() -> " << retval );
+ return retval;
+}
+
+int RnpClientComm::abortTA()
+{
+ int retval = 1;
+
+ ENTER( "RpcClientComm::abortTA()" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "abortTA()" );
+
+ try
+ {
+ if(useTurbo)
+ {
+ turboAbortTA();
+ }
+ else
+ {
+ executeAbortTA();
+ executeCloseDB();
+ executeDisconnect();
+ }
+ }
+ // make it nicer, but we are not allowed to throw anything! Later will change the declaration of the function
+ catch(...)
+ {
+ TALK( "RpcClientComm::abortTA(): caught & ignored exception." );
+ }
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "abortTA() -> " << retval );
+ LEAVE( "RpcClientComm::abortTA() -> " << retval );
+ return retval;
+}
+
+void RnpClientComm::insertMDD( const char* collName, r_GMarray* mar ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::insertMDD(" << (collName?collName:"(null)") << "," << (long) mar << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << "," << (long) mar << ")" );
+
+ checkForRwTransaction();
+
+ r_Minterval spatdom;
+ r_Bytes marBytes;
+ RPCMarray* rpcMarray;
+ r_Bytes tileSize = 0;
+
+ // get the spatial domain of the r_GMarray
+ spatdom = mar->spatial_domain();
+
+ // determine the amount of data to be transferred
+ marBytes = mar->get_array_size();
+
+ const r_Base_Type* baseType = mar->get_base_type_schema();
+
+ // if the MDD is too large for being transfered as one block, it has to be
+ // divided in tiles
+ const r_Tiling* til = mar->get_storage_layout()->get_tiling();
+ r_Tiling_Scheme scheme = til->get_tiling_scheme();
+ if (scheme == r_NoTiling)
+ tileSize = RMInit::RMInit::clientTileSize;
+ else
+ //allowed because the only subclass of tiling without size is no tiling
+ tileSize = ((const r_Size_Tiling*)til)->get_tile_size();
+
+ if( RMInit::tiling && marBytes > tileSize )
+ {
+ // initiate composition of MDD at server side
+ int status = executeStartInsertPersMDD(collName, mar); //rpcStatusPtr = rpcstartinsertpersmdd_1( params, binding_h );
+
+ switch( status )
+ {
+ case 0:
+ break; // OK
+ case 2:
+ LEAVE( "RpcClientComm::insertMDD() Error: database class undefined." );
+ throw r_Error( r_Error::r_Error_DatabaseClassUndefined );
+ break;
+ case 3:
+ LEAVE( "RpcClientComm::insertMDD() Error: collection element type mismatch." );
+ throw r_Error( r_Error::r_Error_CollectionElementTypeMismatch );
+ break;
+ case 4:
+ LEAVE( "RpcClientComm::insertMDD() Error: type invalid." );
+ throw r_Error( r_Error::r_Error_TypeInvalid );
+ break;
+ default:
+ LEAVE( "RpcClientComm::insertMDD() Error: transfer invalid." );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ break;
+ }
+
+ r_Set< r_GMarray* >* bagOfTiles;
+
+ bagOfTiles = mar->get_storage_layout()->decomposeMDD( mar );
+
+ RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "decomposing into " << bagOfTiles->cardinality() << " tiles")
+
+ r_Iterator< r_GMarray* > iter = bagOfTiles->create_iterator();
+ r_GMarray *origTile;
+
+ for(iter.reset(); iter.not_done(); iter.advance() )
+ {
+ origTile = *iter;
+
+ RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "inserting Tile with domain " << origTile->spatial_domain() << ", " << origTile->spatial_domain().cell_count() * origTile->get_type_length() << " bytes")
+
+ getMarRpcRepresentation( origTile, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType );
+
+ status = executeInsertTile(true, rpcMarray);
+
+ // free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else)
+ freeMarRpcRepresentation( origTile, rpcMarray );
+
+ // delete current tile (including data block)
+ delete origTile;
+
+ if( status > 0 )
+ {
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ }
+ }
+
+ executeEndInsertMDD(true); //rpcendinsertmdd_1( params3, binding_h );
+
+ // delete transient data
+ bagOfTiles->remove_all();
+ delete bagOfTiles;
+ }
+ else // begin: MDD is transferred in one piece
+ {
+ RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", ", one tile" )
+
+ getMarRpcRepresentation( mar, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType );
+
+ int status = executeInsertMDD(collName, mar, rpcMarray); // rpcStatusPtr = rpcinsertmdd_1( params, binding_h );
+
+ freeMarRpcRepresentation( mar, rpcMarray );
+
+ RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok" )
+
+ switch( status )
+ {
+ case 0:
+ break; // OK
+ case 2:
+ LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_DatabaseClassUndefined );
+ break;
+ case 3:
+ LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_CollectionElementTypeMismatch );
+ break;
+ case 4:
+ LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_TypeInvalid );
+ break;
+ default:
+ LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ break;
+ }
+
+ } // end: MDD i transferred in one piece
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD()" );
+ LEAVE( "RpcClientComm::insertMDD()" );
+}
+
+
+//################################################################
+
+r_Ref_Any RnpClientComm::getMDDByOId( const r_OId& oid ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::getMDDByOId(" << oid << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDByOId(" << oid << ")" );
+
+ RMInit::logOut << "Internal error: RnpClientComm::getMDDByOId() not implemented, returning empty r_Ref_Any()." << endl;
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDByOId()" );
+ LEAVE( "RpcClientComm::getMDDByOId()" );
+ return r_Ref_Any();
+}
+
+void RnpClientComm::insertColl( const char* collName, const char* typeName, const r_OId& oid ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::insertColl(" << collName << "," << typeName << "," << oid << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertColl(" << collName << "," << typeName << "," << oid << ")" );
+
+ checkForRwTransaction();
+
+ int status = executeInsertCollection(collName, typeName, oid);
+
+ switch( status )
+ {
+ case 0:
+ break; //OK
+ case 1:
+ LEAVE( "RpcClientComm::insertColl(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_ClientUnknown ); break;
+ case 2:
+ LEAVE( "RpcClientComm::insertColl(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_DatabaseClassUndefined ); break;
+ case 3:
+ LEAVE( "RpcClientComm::insertColl(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_NameNotUnique ); break;
+ default:
+ LEAVE( "RpcClientComm::insertColl(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_General );break;
+ }
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertColl()" );
+ LEAVE( "RpcClientComm::insertColl()" );
+}
+
+
+void RnpClientComm::deleteCollByName( const char* collName ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::deleteCollByName(" << (collName?collName:"(null)") << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteCollByName(" << collName << ")" );
+
+ checkForRwTransaction();
+
+ startRequest(RnpRasserver::cmd_delcollbyname);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "adding clientID 0x" << hex << clientID << dec );
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+
+ helper012d("removeObjFromColl");
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteCollByName()" );
+ LEAVE( "RpcClientComm::deleteCollByName()" );
+}
+
+void RnpClientComm::deleteObjByOId( const r_OId& oid ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::deleteObjByOId(" << oid << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteObjByOId(" << oid << ")" );
+
+ checkForRwTransaction();
+
+ startRequest(RnpRasserver::cmd_delobjbyoid);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "adding clientID 0x" << hex << clientID << dec );
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
+
+ helper012d("deleteObjByOId");
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteObjByOId()" );
+ LEAVE( "RpcClientComm::deleteObjByOId()" );
+}
+
+void RnpClientComm::removeObjFromColl( const char* collName, const r_OId& oid ) throw ( r_Error )
+{
+ ENTER( "RpcClientComm::removeObjFromColl(" << (collName?collName:"(null)") << "," << oid << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "removeObjFromColl(" << collName << "," << oid << ")" );
+
+ checkForRwTransaction();
+
+ startRequest(RnpRasserver::cmd_remobjfromcoll);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "adding clientID 0x" << hex << clientID << dec );
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
+
+ helper012d("removeObjFromColl");
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "removeObjFromColl()" );
+ LEAVE( "RpcClientComm::removeObjFromColl()" );
+}
+
+
+r_Ref_Any RnpClientComm::getCollByName( const char* collName ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::getCollByName(" << (collName?collName:"(null)") << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName(" << collName << ")" );
+
+ r_Ref_Any result = executeGetCollByNameOrOId ( collName, r_OId() );
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName()" );
+ LEAVE( "RpcClientComm::getCollByName()" );
+ return result;
+}
+
+r_Ref_Any RnpClientComm::getCollByOId ( const r_OId& oid ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::getCollByOId(" << oid << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByOId(" << oid << ")" );
+
+ r_Ref_Any result = executeGetCollByNameOrOId ( NULL, oid );
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByOId()" );
+ LEAVE( "RpcClientComm::getCollByOId()" );
+ return result;
+}
+
+r_Ref_Any RnpClientComm::getCollOIdsByName( const char* name ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::getCollOIdsByName(" << (name?name:"(null)") << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName(" << name << ")" );
+
+ r_Ref_Any result = executeGetCollOIdsByNameOrOId ( name, r_OId() );
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName()" );
+ LEAVE( "RpcClientComm::getCollOIdsByName()" );
+ return result;
+}
+
+r_Ref_Any RnpClientComm::getCollOIdsByOId ( const r_OId& oid ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::getCollOIdsByOId(" << oid << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId(" << oid << ")" );
+
+ r_Ref_Any result = executeGetCollOIdsByNameOrOId ( NULL, oid );
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId()" );
+ LEAVE( "RpcClientComm::getCollOIdsByOId()" );
+ return result;
+}
+
+
+//------------------------------
+void RnpClientComm::executeQuery( const r_OQL_Query& query, r_Set< r_Ref_Any >& result ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::executeQuery(_,_)" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery(_,_)" );
+
+ int status = executeExecuteQuery( query.get_query(), result );
+
+ switch(status)
+ {
+ case 0:
+ getMDDCollection( result, 1 );
+ break; // 1== isQuery
+ case 1:
+ getElementCollection( result );
+ break;
+ //case 2: nothing
+ default:
+ RMInit::logOut << "Internal error: RpcClientComm::executeQuery(): illegal status value " << status << endl;
+ }
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery()" );
+ LEAVE( "RpcClientComm::executeQuery()" );
+}
+
+void RnpClientComm::getMDDCollection( r_Set< r_Ref_Any >& mddColl, unsigned int isQuery ) throw(r_Error)
+{
+ ENTER( "RpcClientComm::getMDDCollection(_," << isQuery << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCollection(_," << isQuery << ")" );
+
+ unsigned short tileStatus=0;
+ unsigned short mddStatus = 0;
+
+ while( mddStatus == 0 ) // repeat until all MDDs are transferred
+ {
+ r_Ref<r_GMarray> mddResult;
+
+ // Get spatial domain of next MDD
+ GetMDDRes* thisResult = executeGetNextMDD();
+
+ mddStatus = thisResult->status;
+
+ if( mddStatus == 2 )
+ {
+ RMInit::logOut << "Error: getMDDCollection(...) - no transfer collection or empty transfer collection" << endl;
+ LEAVE( "RpcClientComm::getMDDCollection(): exception, status = " << mddStatus );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ }
+
+ tileStatus = getMDDCore(mddResult, thisResult, isQuery);
+
+ // finally, insert the r_Marray into the set
+
+ mddColl.insert_element( mddResult, 1 );
+
+ free(thisResult->domain);
+ free(thisResult->typeName);
+ free(thisResult->typeStructure);
+ free(thisResult->oid);
+ delete thisResult;
+
+ if( tileStatus == 0 ) // if this is true, we're done with this collection
+ break;
+
+ } // end while( mddStatus == 0 )
+
+ executeEndTransfer();
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCollection()" );
+ LEAVE( "RpcClientComm::getMDDCollection()" );
+}
+
+
+// small helper for ...
+void freeGetTileRes(GetTileRes *ptr)
+{
+ ENTER( "RpcClientComm::freeGetTileRes(_)" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "freeGetTileRes(_)" );
+
+ if(ptr->marray->domain)
+ free(ptr->marray->domain);
+ if(ptr->marray->data.confarray_val)
+ free(ptr->marray->data.confarray_val);
+ delete ptr->marray;
+ delete ptr;
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "freeGetTileRes()" );
+ LEAVE( "RpcClientComm::freeGetTileRes(_)" );
+}
+
+unsigned short
+RnpClientComm::getMDDCore( r_Ref< r_GMarray > &mdd, GetMDDRes *thisResult, unsigned int isQuery ) throw( r_Error )
+{
+ ENTER( "RpcClientComm::getMDDCore(_,_," << isQuery << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCore(_,_," << isQuery << ")" );
+
+ // create r_Minterval and oid
+ r_Minterval mddDomain( thisResult->domain );
+ r_OId rOId ( thisResult->oid );
+ r_GMarray *marray;
+
+ if( isQuery )
+ marray = new( r_Database::actual_database, r_Object::transient, rOId ) r_GMarray();
+ else
+ marray = new( r_Database::actual_database, r_Object::read , rOId ) r_GMarray();
+
+ marray->set_spatial_domain( mddDomain );
+ marray->set_type_by_name ( thisResult->typeName );
+ marray->set_type_structure( thisResult->typeStructure );
+
+ r_Data_Format currentFormat = (r_Data_Format)(thisResult->currentFormat);
+ if (r_Tile_Compression::check_data_format(currentFormat) == 1)
+ currentFormat = r_Array;
+ marray->set_current_format( currentFormat );
+
+ r_Data_Format decompFormat;
+
+ const r_Base_Type *baseType = marray->get_base_type_schema();
+
+ // Variables needed for tile transfer
+ GetTileRes* tileRes=0;
+ unsigned short mddDim = mddDomain.dimension(); // we assume that each tile has the same dimensionality as the MDD
+ r_Minterval tileDomain;
+ r_GMarray* tile; // for temporary tile
+ char* memCopy;
+ unsigned long memCopyLen;
+ int tileCntr = 0;
+ unsigned short tileStatus = 0;
+
+ tileStatus = 2; // call rpcgetnexttile_1 at least once
+
+ while( tileStatus == 2 || tileStatus == 3 ) // while( for all tiles of the current MDD )
+ {
+ tileRes = executeGetNextTile();
+
+ tileStatus = tileRes->status;
+
+ if( tileStatus == 4 )
+ {
+ freeGetTileRes(tileRes);
+ RMInit::logOut << "Error: rpcGetNextTile(...) - no tile to transfer or empty transfer collection" << endl;
+ LEAVE( "RpcClientComm::getMDDCore(): exception, status = " << tileStatus );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ }
+
+ // take cellTypeLength for current MDD of the first tile
+ if( tileCntr == 0 )
+ marray->set_type_length( tileRes->marray->cellTypeLength );
+
+ tileDomain = r_Minterval( tileRes->marray->domain );
+ memCopyLen = tileDomain.cell_count() * marray->get_type_length(); // cell type length of the tile must be the same
+ if (memCopyLen < tileRes->marray->data.confarray_len)
+ memCopyLen = tileRes->marray->data.confarray_len; // may happen when compression expands
+ memCopy = new char[ memCopyLen ];
+
+ // create temporary tile
+ tile = new r_GMarray();
+ tile->set_spatial_domain( tileDomain );
+ tile->set_array( memCopy );
+ tile->set_array_size( memCopyLen );
+ tile->set_type_length( tileRes->marray->cellTypeLength );
+ tileCntr++;
+
+ // Variables needed for block transfer of a tile
+ unsigned long blockOffset = 0;
+ unsigned short subStatus = 3;
+ currentFormat = (r_Data_Format)(tileRes->marray->currentFormat);
+
+ switch( tileStatus )
+ {
+ case 3: // at least one block of the tile is left
+
+ // Tile arrives in several blocks -> put them together
+ concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset);
+ freeGetTileRes(tileRes);
+
+ tileRes = executeGetNextTile();//rpcgetnexttile_1( &clientID, binding_h );
+
+ subStatus = tileRes->status;
+
+ if( subStatus == 4 )
+ {
+ freeGetTileRes(tileRes);
+ LEAVE( "RpcClientComm::getMDDCore(): exception, status = " << tileStatus << ", subStatus = " << subStatus );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ }
+
+ concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset);
+ freeGetTileRes(tileRes);
+
+ tileStatus = subStatus;
+ break;
+
+ default: // tileStatus = 0,3 last block of the current tile
+
+ // Tile arrives as one block.
+ concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset);
+ freeGetTileRes(tileRes);
+
+ break;
+ }
+
+ // uncompress the tile if necessary
+ decompFormat = doTransferDecompression( tile, baseType, currentFormat, blockOffset );
+
+ char* marrayData = NULL;
+ // Now the tile is transferred completely, insert it into current MDD
+ if( tileStatus < 2 && tileCntr == 1 && (tile->spatial_domain() == marray->spatial_domain()))
+ {
+ // MDD consists of just one tile that is the same size of the mdd
+
+ // simply take the data memory of the tile
+ marray->set_array( tile->get_array() );
+ marray->set_array_size( tile->get_array_size() );
+ tile->set_array( 0 );
+ }
+ else
+ {
+ // MDD consists of more than one tile or the tile does not cover the whole domain
+
+ r_Bytes size = mddDomain.cell_count() * marray->get_type_length();
+
+ if( tileCntr == 1 )
+ {
+ // allocate memory for the MDD
+ marrayData = new char[ size ];
+ memset(marrayData, 0, size);
+
+ marray->set_array( marrayData );
+ }
+ else
+ marrayData = marray->get_array();
+
+
+ // copy tile data into MDD data space (optimized, relying on the internal representation of an MDD )
+ char* mddBlockPtr;
+ char* tileBlockPtr = tile->get_array();
+ unsigned long blockCells = tileDomain[tileDomain.dimension()-1].high()-tileDomain[tileDomain.dimension()-1].low()+1;
+ unsigned long blockSize = blockCells * marray->get_type_length();
+ unsigned long blockNo = tileDomain.cell_count() / blockCells;
+
+ for( unsigned long blockCtr = 0; blockCtr < blockNo; blockCtr++ )
+ {
+ mddBlockPtr = marrayData + marray->get_type_length()*mddDomain.cell_offset( tileDomain.cell_point( blockCtr * blockCells ) );
+ memcpy( (void*)mddBlockPtr, (void*)tileBlockPtr, (size_t)blockSize );
+ tileBlockPtr += blockSize;
+ }
+
+ // former non-optimized version
+ // for( i=0; i<tileDomain->cell_count(); i++ )
+ // (*marray)[tileDomain->cell_point( i )] = (*tile)[tileDomain->cell_point( i )];
+
+ marray->set_array_size( size );
+ }
+
+ // delete temporary tile
+ delete tile;
+
+ } // end while( MDD is not transferred completely )
+
+
+ mdd = r_Ref<r_GMarray>( marray->get_oid(), marray );
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCore() -> " << tileStatus );
+ LEAVE( "RpcClientComm::getMDDCore() -> " << tileStatus );
+ return tileStatus;
+}
+
+
+int RnpClientComm::concatArrayData( const char *source, unsigned long srcSize, char *&dest, unsigned long &destSize, unsigned long &destLevel )
+{
+ ENTER( "RpcClientComm::concatArrayData( 0x" << hex << (unsigned long) source << dec << "," << srcSize << ",_,_,_ )" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "concatArrayData(" << source << "," << srcSize << ",_,_,_)" );
+
+ if (destLevel + srcSize > destSize)
+ {
+ // need to extend dest
+ unsigned long newSize = destLevel + srcSize;
+ char *newArray;
+
+ // allocate a little extra if we have to extend
+ newSize = newSize + newSize / 16;
+
+ // RMDBGOUT( 1, "RpcClientComm::concatArrayData(): need to extend from " << destSize << " to " << newSize );
+
+ if ((newArray = new char[newSize]) == NULL)
+ {
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "concatArrayData() -> " << -1 );
+ LEAVE( "RpcClientComm::concatArrayData() -> -1" );
+ return -1;
+ }
+
+ memcpy(newArray, dest, destLevel);
+ delete [] dest;
+ dest = newArray;
+ destSize = newSize;
+ }
+
+ memcpy(dest + destLevel, source, srcSize);
+ destLevel += srcSize;
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "concatArrayData() -> " << 0 );
+ LEAVE( "RpcClientComm::concatArrayData() -> 0" );
+ return 0;
+}
+
+r_Data_Format
+RnpClientComm::doTransferDecompression( r_GMarray* tile, const r_Base_Type *type, r_Data_Format fmt, unsigned long size )
+{
+ ENTER( "RpcClientComm::doTransferDecompression(...) tile dom:" << tile->spatial_domain() << " array size:" << tile->get_array_size() << " type size:" << tile->get_type_length());
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile dom:"
+ << tile->spatial_domain() << " array size:" << tile->get_array_size()
+ << " type size:" << tile->get_type_length());
+
+ if (fmt != r_Array)
+ {
+ r_Tile_Compression *engine = NULL;
+ char *newTileData = NULL;
+ r_Data_Format newFormat;
+
+ RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) decompressing from "
+ << fmt << ", " << size << "bytes");
+
+ try
+ {
+ r_Storage_Man_CPP sman;
+ engine = r_Tile_Compression::create( fmt, tile->spatial_domain(), type );
+ engine->set_storage_handler(sman);
+ newTileData = (char*)(engine->decompress(tile->get_array(), size));
+ RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm",
+ "doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " OK");
+ }
+ catch (r_Error &err)
+ {
+ RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm",
+ "doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " FAILED");
+ RMInit::logOut << "RpcClientComm::doTransferDecompression() Error decompressing data, assuming raw" << endl;
+ }
+
+ newFormat = engine->get_decomp_format();
+
+ if (newTileData != NULL)
+ {
+ delete [] tile->get_array();
+ tile->set_array(newTileData);
+ tile->set_array_size(tile->spatial_domain().cell_count()*tile->get_type_length());
+ }
+ else
+ newFormat = fmt;
+
+ RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile newFmt:"
+ << newFormat << " dom:" << tile->spatial_domain()
+ << " array size:" << tile->get_array_size()
+ << " type size:" << tile->get_type_length());
+
+ // ... also make sure the decoded format is really raw array data (r_Array)
+ if ((endianClient != endianServer) && (newFormat == r_Array))
+ {
+ // if compression engine already handles endianness we mustn't change again
+ if (!engine->converts_endianness()) {
+ RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for "
+ << fmt << " endianness changed from "
+ << (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient);
+ changeEndianness(tile, type);
+ }
+ }
+
+ if (engine != NULL)
+ delete engine;
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << newFormat);
+ LEAVE( "RpcClientComm::doTransferDecompression() -> " << newFormat );
+ return newFormat;
+ }
+
+ if (endianClient != endianServer)
+ {
+ RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for "
+ << fmt << " endianness changed from "
+ << (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient);
+ changeEndianness(tile, type);
+ }
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << r_Array);
+ LEAVE( "RpcClientComm::doTransferDecompression() -> " << r_Array );
+ return r_Array;
+}
+
+
+void RnpClientComm::getElementCollection( r_Set< r_Ref_Any >& resultColl ) throw(r_Error)
+{
+ ENTER( "RpcClientComm::getElementCollection()" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection(_)" );
+
+ unsigned short rpcStatus = 0;
+
+ TALK( "got set of type " << resultColl.get_type_structure() );
+
+ while( rpcStatus == 0 ) // repeat until all elements are transferred
+ {
+ GetElementRes* thisResult = executeGetNextElement();
+
+ rpcStatus = thisResult->status;
+
+ if( rpcStatus == 2 )
+ {
+ RMInit::logOut << "Error: getElementCollection(...) - no transfer collection or empty transfer collection" << endl;
+ LEAVE( "RpcClientComm::getElementCollection(): exception: rpcStatus = " << rpcStatus );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ }
+ // create new collection element, use type of collection resultColl
+ r_Ref_Any element;
+ const r_Type* elementType = resultColl.get_element_type_schema();
+
+ // convert the endianness before creating the new element!
+ if (endianClient != endianServer)
+ {
+
+ if (endianClient == 0)
+ elementType->convertToBigEndian(thisResult->data.confarray_val, 1);
+ else
+ elementType->convertToLittleEndian(thisResult->data.confarray_val, 1);
+ }
+
+ switch( elementType->type_id() )
+ {
+ case r_Type::BOOL:
+ case r_Type::CHAR:
+ case r_Type::OCTET:
+ case r_Type::SHORT:
+ case r_Type::USHORT:
+ case r_Type::LONG:
+ case r_Type::ULONG:
+ case r_Type::FLOAT:
+ case r_Type::DOUBLE:
+ element = new r_Primitive( thisResult->data.confarray_val, (r_Primitive_Type*) elementType );
+ r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element );
+ break;
+
+ case r_Type::COMPLEXTYPE1:
+ case r_Type::COMPLEXTYPE2:
+ element = new r_Complex(thisResult->data.confarray_val, (r_Complex_Type *)elementType);
+ r_Transaction::actual_transaction->add_object_list(r_Transaction::SCALAR, (void *)element);
+ break;
+
+ case r_Type::STRUCTURETYPE:
+ element = new r_Structure( thisResult->data.confarray_val, (r_Structure_Type*) elementType );
+ r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element );
+ break;
+
+ case r_Type::POINTTYPE:
+ {
+ char* stringRep = new char[thisResult->data.confarray_len+1];
+ strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
+ stringRep[thisResult->data.confarray_len] = '\0';
+
+ r_Point* typedElement = new r_Point( stringRep );
+ element = typedElement;
+ r_Transaction::actual_transaction->add_object_list( r_Transaction::POINT, (void*) typedElement );
+ delete [] stringRep;
+ }
+ break;
+
+ case r_Type::SINTERVALTYPE:
+ {
+ char* stringRep = new char[thisResult->data.confarray_len+1];
+ strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
+ stringRep[thisResult->data.confarray_len] = '\0';
+
+ r_Sinterval* typedElement = new r_Sinterval( stringRep );
+ element = typedElement;
+ r_Transaction::actual_transaction->add_object_list( r_Transaction::SINTERVAL, (void*) typedElement );
+ delete [] stringRep;
+ }
+ break;
+
+ case r_Type::MINTERVALTYPE:
+ {
+ char* stringRep = new char[thisResult->data.confarray_len+1];
+ strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
+ stringRep[thisResult->data.confarray_len] = '\0';
+
+ r_Minterval* typedElement = new r_Minterval( stringRep );
+ element = typedElement;
+ r_Transaction::actual_transaction->add_object_list( r_Transaction::MINTERVAL, (void*) typedElement );
+ delete [] stringRep;
+ }
+ break;
+
+ case r_Type::OIDTYPE:
+ {
+ char* stringRep = new char[thisResult->data.confarray_len+1];
+ strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
+ stringRep[thisResult->data.confarray_len] = '\0';
+
+ r_OId* typedElement = new r_OId( stringRep );
+ element = typedElement;
+ r_Transaction::actual_transaction->add_object_list( r_Transaction::OID, (void*) typedElement );
+ delete [] stringRep;
+ }
+ break;
+ default:
+ RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection(...) bad element typeId" << elementType->type_id())
+ break;
+ }
+
+ TALK( "got an element" );
+
+ // insert element into result set
+ resultColl.insert_element( element, 1 );
+
+ delete[] thisResult->data.confarray_val;
+ delete thisResult;
+ }
+
+ executeEndTransfer();
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection()" );
+ LEAVE( "RpcClientComm::getElementCollection()" );
+}
+
+//##################################################################################
+void RnpClientComm::executeQuery( const r_OQL_Query& query ) throw( r_Error )
+{
+ ENTER( "RnpClientComm::executeQuery(_)" );
+
+ checkForRwTransaction();
+
+ unsigned short status;
+
+ //
+ // Send MDD constants to the server.
+ //
+ if( query.get_constants() )
+ {
+ r_Set< r_GMarray* >* mddConstants = (r_Set< r_GMarray* >*)query.get_constants();
+
+ if(executeInitUpdate() < 0) // error would be nicer
+ {
+ LEAVE( "Error: RnpClientComm::executeQuery(): update query initialization failed." );
+ return;
+ }
+
+ r_Iterator<r_GMarray*> iter = mddConstants->create_iterator();
+
+ for( iter.reset(); iter.not_done(); iter++ )
+ {
+ r_GMarray* mdd = *iter;
+
+ const r_Base_Type* baseType = mdd->get_base_type_schema();
+
+ if( mdd )
+ {
+ status = executeStartInsertTransMDD(mdd);
+ switch( status )
+ {
+ case 0:
+ break; // OK
+ case 2:
+ LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_DatabaseClassUndefined );
+ break;
+ case 3:
+ LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_TypeInvalid );
+ break;
+ default:
+ LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ break;
+ }
+
+ r_Set< r_GMarray* >* bagOfTiles;
+
+ bagOfTiles = mdd->get_storage_layout()->decomposeMDD( mdd );
+
+ r_Iterator< r_GMarray* > iter2 = bagOfTiles->create_iterator();
+
+ for(iter2.reset(); iter2.not_done();iter2.advance())
+ {
+ RPCMarray* rpcMarray;
+
+ r_GMarray *origTile = *iter2;
+
+ getMarRpcRepresentation( origTile, rpcMarray, mdd->get_storage_layout()->get_storage_format(), baseType );
+
+ status = executeInsertTile(false, rpcMarray);
+
+ // free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else)
+ freeMarRpcRepresentation( origTile, rpcMarray );
+
+ // delete current tile (including data block)
+ delete origTile;
+
+ if( status > 0 )
+ {
+ LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ }
+ }
+
+ bagOfTiles->remove_all();
+ delete bagOfTiles;
+
+ executeEndInsertMDD(false);
+ }
+ }
+ }
+
+ executeExecuteUpdateQuery(query.get_query());
+ LEAVE( "RnpClientComm::executeQuery(_)" );
+}
+
+
+// helper functions
+void
+RnpClientComm::getMarRpcRepresentation( const r_GMarray* mar, RPCMarray*& rpcMarray,
+ r_Data_Format initStorageFormat,
+ const r_Base_Type *baseType)
+{
+ ENTER( "RpcClientComm::getMarRpcRepresentation(...)");
+ RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)");
+
+ // allocate memory for the RPCMarray data structure and assign its fields
+ rpcMarray = (RPCMarray*)mymalloc( sizeof(RPCMarray) );
+ rpcMarray->domain = mar->spatial_domain().get_string_representation();
+ rpcMarray->cellTypeLength = mar->get_type_length();
+
+ void* arrayData = NULL;
+ r_ULong arraySize=0;
+
+ if (initStorageFormat == r_Array)
+ {
+ if (transferFormat != r_Array)
+ {
+ r_Tile_Compression *engine = NULL;
+
+ try
+ {
+ r_Storage_Man_CPP sman;
+ engine = r_Tile_Compression::create(transferFormat, mar->spatial_domain(), baseType);
+ engine->set_storage_handler(sman);
+ RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) compress with " << engine->get_name())
+ if ((endianClient != endianServer) && (!engine->converts_endianness()))
+ {
+ RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for "
+ << transferFormat << " endianness changed before compression from " << (r_Endian::r_Endianness)endianClient
+ << " to " << (r_Endian::r_Endianness) endianServer);
+ char *endianData = new char[mar->get_array_size()];
+ changeEndianness(mar, endianData, baseType);
+ arrayData = engine->compress(endianData, arraySize, transferFormatParams);
+ delete [] endianData;
+ endianData=NULL;
+ }
+ else
+ {
+ arrayData = engine->compress(mar->get_array(), arraySize, transferFormatParams);
+ }
+ RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compression returned " << arrayData << " (" << arraySize << " bytes)")
+/* void *testData = engine->decompress(arrayData, arraySize);
+ cout << "Decompression worked " << ((memcmp(mar->get_array(), testData, (mar->get_type_length()) * (mar->spatial_domain().cell_count())) == 0) ? "OK" : "!NOT!") << endl;
+ delete [] testData;
+*/
+
+ // ForWiss: revert to uncompressed data if the compressed data is larger
+ // coman: and introduced a bug of endianess ...
+ if (arraySize > mar->get_array_size())
+ {
+ RMInit::logOut << "RpcClientComm::getMarRpcRepresentation(...) Warning: overriding compression setting("
+ << transferFormat << ") to " << r_Array
+ << " because compressed size( " << arraySize
+ << " bytes) > uncompressed size( " << mar->get_array_size() << " bytes)" << endl;
+ delete [] arrayData;
+ arrayData = NULL;
+ }
+ }
+ catch (r_Error &err)
+ {
+ RMInit::logOut << "RpcClientComm::getMarRpcRepresentation() Error: Unsupported data format " << transferFormat << endl;
+ }
+ if (engine != NULL)
+ delete engine;
+ }
+ else
+ {
+ if (endianClient != endianServer)
+ {
+ RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for "
+ << transferFormat << " endianness changed from "
+ << (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer);
+ arraySize = mar->get_array_size();
+ arrayData = new char[arraySize];
+ changeEndianness(mar, arrayData, baseType);
+ }
+ }
+ }
+
+ if (arrayData == NULL)
+ {
+ //error in compression or compression inefficient
+ rpcMarray->currentFormat = initStorageFormat;
+ rpcMarray->data.confarray_len = mar->get_array_size();
+ if (endianClient != endianServer)
+ {
+ RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for "
+ << transferFormat << "endianness changed from "
+ << (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer
+ << " because compression " << transferFormat << " failed" );
+ arrayData = new char[arraySize];
+ changeEndianness(mar, arrayData, baseType);
+ rpcMarray->data.confarray_val = (char*)(arrayData);
+ }
+ else
+ {
+ rpcMarray->data.confarray_val = (char*)(mar->get_array());
+ }
+ }
+ else
+ {
+ if (arraySize != mar->get_array_size())
+ {
+ RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compressed to " << (100.0*arraySize) / mar->get_array_size() << "%")
+ }
+ rpcMarray->currentFormat = transferFormat;
+ rpcMarray->data.confarray_len = arraySize;
+ rpcMarray->data.confarray_val = (char*)arrayData;
+ }
+ rpcMarray->storageFormat = storageFormat;
+
+ RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)");
+ LEAVE( "RpcClientComm::getMarRpcRepresentation()");
+}
+
+
+void
+RnpClientComm::freeMarRpcRepresentation( const r_GMarray* mar, RPCMarray* rpcMarray )
+{
+ ENTER( "RnpClientComm::freeMarRpcRepresentation(_,_)" );
+
+ if (rpcMarray->data.confarray_val != ((r_GMarray*)mar)->get_array())
+ {
+ delete[] rpcMarray->data.confarray_val;
+ }
+ free( rpcMarray->domain );
+ free( rpcMarray );
+
+ LEAVE( "RnpClientComm::freeMarRpcRepresentation()" );
+}
+
+
+//#########################################################################
+r_OId RnpClientComm::getNewOId( unsigned short objType ) throw(r_Error)
+{
+ return executeGetNewOId(objType);
+}
+
+unsigned short RnpClientComm::getObjectType( const r_OId& oid ) throw(r_Error)
+{
+ return executeGetObjectType(oid);
+}
+
+char* RnpClientComm::getTypeStructure( const char* typeName, r_Type_Type typeType ) throw(r_Error)
+{
+ return executeGetTypeStructure( typeName, typeType );
+}
+
+int RnpClientComm::setStorageFormat( r_Data_Format format, const char *formatParams)
+{
+ ENTER( "RnpClientComm::setStorageFormat( format=" << format << ", formatParams=" << (formatParams?formatParams:"(null)") << " )" );
+
+ storageFormat = format;
+
+ if (storageFormatParams != NULL)
+ {
+ free(storageFormatParams);
+ storageFormatParams = NULL;
+ }
+ if (formatParams != NULL)
+ {
+ storageFormatParams = (char*)mymalloc(strlen(formatParams) + 1);
+ strcpy(storageFormatParams, formatParams);
+ // extract ``compserver'' if present
+ clientParams->process(storageFormatParams);
+ }
+
+ int result = executeSetFormat( false, format, formatParams);
+
+ LEAVE( "RnpClientComm::setStorageFormat() -> " << result );
+ return result;
+}
+
+int RnpClientComm::setTransferFormat( r_Data_Format format, const char* formatParams)
+{
+ ENTER( "RnpClientComm::setTransferFormat( format=" << format << ", formatParams=" << (formatParams?formatParams:"(null)") << " )" );
+
+ transferFormat = format;
+
+ if (transferFormatParams != NULL)
+ {
+ free(transferFormatParams);
+ transferFormatParams = NULL;
+ }
+ if (formatParams != NULL)
+ {
+ transferFormatParams = (char*)mymalloc(strlen(formatParams)+1);
+ strcpy(transferFormatParams, formatParams);
+ // extract ``exactformat'' if present
+ clientParams->process(transferFormatParams);
+ }
+
+ int result = executeSetFormat( true, format, formatParams);
+ LEAVE( "RnpClientComm::setTransferFormat() -> " << result );
+ return result;
+}
+
+const char* RnpClientComm::getExtendedErrorInfo() throw(r_Error)
+{
+ // This function shouldn't be called for RNP protocol
+ static char *errorInfo = new char[30];
+ strcpy(errorInfo,"No info");
+
+ return errorInfo;
+}
+
+const char* RnpClientComm::getServerName()
+{
+ return serverHost;
+}
+
+void RnpClientComm::setUserIdentification(const char *userName, const char *plainTextPassword)
+{
+ ENTER( "RnpClientComm::setUserIdentification( userName=" << (userName?userName:"(null)") << ", plainTextPassword=" << (plainTextPassword?plainTextPassword:"(null)") << " )" );
+
+ char digest[33]="";
+ messageDigest(plainTextPassword,digest,"MD5");
+ sprintf(identificationString,"%s:%s",userName,digest);
+
+ LEAVE( "RnpClientComm::setUserIdentification()" );
+}
+
+unsigned long RnpClientComm::getClientID() const
+{
+ return clientID;
+}
+
+void RnpClientComm::triggerAliveSignal(){}
+
+void RnpClientComm::sendAliveSignal(){}
+
+//############# helper functions ###################
+int RnpClientComm::messageDigest(const char *input,char *output,const char *mdName)
+{
+
+ EVP_MD_CTX mdctx;
+ const EVP_MD *md;
+ unsigned int md_len, i;
+ unsigned char md_value[100];
+
+ OpenSSL_add_all_digests();
+
+ md = EVP_get_digestbyname(mdName);
+
+ if(!md)
+ return 0;
+
+ EVP_DigestInit(&mdctx, md);
+ EVP_DigestUpdate(&mdctx,input, strlen(input));
+ EVP_DigestFinal(&mdctx, md_value, &md_len);
+
+ for(i = 0; i < md_len; i++)
+ sprintf(output+i+i,"%02x", md_value[i]);
+
+ return strlen(output);
+}
+
+
+void RnpClientComm::setMaxRetry(unsigned int newMaxRetry)
+{
+ RMInit::clientcommMaxRetry = newMaxRetry;
+}
+
+unsigned int RnpClientComm::getMaxRetry()
+{
+ return RMInit::clientcommMaxRetry;
+}
+
+// aux function: sleep incrementally, with subsecond precision
+static void pause(int retryCount)
+{
+ // changed PB 2005-sep-09
+ // was: unsigned int millisec = 50 + retryCount * 50;
+ unsigned int millisec = RNP_PAUSE_INCREMENT * (retryCount + 1);
+ // if(millisec > 1000)
+ // millisec = 1000;
+
+ timeval tv;
+ tv.tv_sec = millisec / 1000;
+ tv.tv_usec = millisec * 1000;
+ select(0,NULL,NULL,NULL,&tv);
+}
+
+int RnpClientComm::getFreeServer(bool readwrite, bool openDB)
+{
+ ENTER( "RnpClientComm::getFreeServer( readwrite=" << readwrite << ", openDB=" << openDB << " )" );
+
+ // to avoid nested retries, only this loop is kept,
+ // the one in executeGetFreeServer() has been removed -- PB 2005-aug-31
+ for ( int retryCount=0; ; retryCount++ )
+ {
+ try
+ {
+ executeGetFreeServer(readwrite, openDB);
+
+ // if no error, we have the server, so break
+ break;
+ }
+ catch(r_Error &e)
+ {
+ int errorno = e.get_errorno();
+ RMInit::logOut << "Error: Connection to rasdaman server failed with " << errorno << ": " << e.what() << endl;
+ // for these errors a retry makes sense, as long as we haven't reached the retry limit:
+ if ( ( errorno==801 || errorno==805 || errorno==806 ) && retryCount < RMInit::clientcommMaxRetry )
+ {
+ TALK( " retry="<<retryCount );
+ pause(retryCount); // waiting with incremental delays
+ }
+ else
+ {
+ LEAVE( "RnpClientComm::getFreeServer(): exception " << errorno << ": " << e.what() );
+ throw;
+ }
+ }
+ }
+
+ LEAVE( "RnpClientComm::getFreeServer() -> 1" );
+ return 1;
+}
+
+#define MAXMSG 512
+int RnpClientComm::executeGetFreeServer(bool readwrite, bool openDB)
+{
+ ENTER( "RnpClientComm::executeGetFreeServer( readwrite=" << readwrite << ", openDB=" << openDB << " )" );
+
+ static char myRasmgrID[100]="";
+ if(myRasmgrID[0]==0)
+ {
+ unsigned int hostid = gethostid();
+ unsigned int pid = getpid();
+ sprintf(myRasmgrID,"%u:%u",hostid,pid);
+ }
+ char message[MAXMSG];
+ char header[MAXMSG];
+ char body[MAXMSG];
+
+ if (openDB)
+ sprintf(header,"POST getfreeserver2 HTTP/1.1\r\nAccept: text/plain\r\nUserAgent: RasClient/1.0\r\nAuthorization: ras %s\r\nContent-length:",identificationString);
+ else
+ sprintf(header,"POST getfreeserver HTTP/1.1\r\nAccept: text/plain\r\nUserAgent: RasClient/1.0\r\nAuthorization: ras %s\r\nContent-length:",identificationString);
+ sprintf(body,"%s RNP %s %s",databaseName,(readwrite ? "rw":"ro"), myRasmgrID);
+ sprintf(message,"%s %d\r\n\r\n%s",header,strlen(body)+1,body);
+
+ struct protoent* getprotoptr = getprotobyname("tcp");
+
+ struct hostent *hostinfo = gethostbyname(rasmgrHost);
+ if(hostinfo==NULL)
+ {
+ RMInit::logOut << "Error locating rasmgr on host " << rasmgrHost <<" ("<<strerror(errno)<<')'<<endl;
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, unable to contact rasmgr." );
+ throw r_Error( r_Error::r_Error_ServerInvalid );
+ }
+
+ sockaddr_in internetSocketAddress;
+
+ internetSocketAddress.sin_family=AF_INET;
+ internetSocketAddress.sin_port=htons(rasmgrPort);
+ internetSocketAddress.sin_addr=*(struct in_addr*)hostinfo->h_addr;
+
+ int sock;
+ bool ok = false;
+ // old comment by Walter Schatz: "this has to be 5000 or so, now that counter is 120 default (later we'll make this better)"
+ // new comment by PB 2005-aug-31:
+ // - these retries + the ones in getFreeServer lead to max*max*40 = 400,000 retries! one loop should be enough.
+ // - there are unmotivated factors at every step (cf pause()), let's do just one wait loop;
+ // all loop code is commented out with prefix "NOLOOP"
+ // X: int retry;
+ // X: for(retry=0;retry<RMInit::clientcommMaxRetry * 40;retry++)
+ // X: {
+ sock=socket(PF_INET,SOCK_STREAM,getprotoptr->p_proto);
+ if(sock<0)
+ {
+ // X: if(retry==0)
+ RMInit::logOut << "Error: cannot open socket to rasmgr (" << strerror(errno) << ")." << endl;
+ // X: sleep(RMInit::clientcommSleep);
+ // X: continue;
+ }
+
+ else if(connect(sock,(struct sockaddr*)&internetSocketAddress,sizeof(internetSocketAddress)) < 0)
+ {
+ // X: if(retry==0)
+ // X: RMInit::logOut <<"getFreeServer: Connection to rasmgr failed: "<<strerror(errno)<<endl;
+ close(sock);
+ // X: sleep(RMInit::clientcommSleep);
+ // X: continue;
+ }
+
+ TALK( "Socket="<<sock<<" protocol(tcp)="<<getprotoptr->p_proto );
+ ok = true;
+ // X: break;
+ // X:}
+ // X:if(retry)
+ // X: RMInit::logOut << "Warning: tried connecting " << retry+1 << " times " <<endl;
+
+ if( !ok )
+ {
+ // X: RMInit::logOut << "Error: Giving up on connecting, sorry, after this number of tries: " << retry+1 <<endl;
+ close(sock);
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, giving up on getting a free server." );
+ throw r_Error( r_Error::r_Error_ServerInvalid );
+ }
+
+ // --- from here on we have a connection ---
+
+ //write_to_server
+ // TALK( "want to write this message to rasmgr: " << message ); // message is said to be 0-terminated
+ int nbytes=writeWholeMessage(sock,message,strlen(message)+1);
+
+ if(nbytes<0)
+ {
+ RMInit::logOut << "Error: cannot send message to rasmgr on host " << rasmgrHost << " ("<<strerror(errno)<<')' << endl;
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, error writing message to rasmgr." );
+ close(sock);
+ throw r_Error( r_Error::r_Error_ServerInvalid );
+ }
+
+ //wait and read answer
+ nbytes=readWholeMessage(sock,message,MAXMSG);
+ close(sock);
+
+ if(nbytes<0)
+ {
+ RMInit::logOut << "Error reading answer from rasmgr on host " << rasmgrHost <<" ("<<strerror(errno)<<')'<<endl;
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, error reading answer from rasmgr." );
+ throw r_Error( r_Error::r_Error_ServerInvalid );
+ }
+ // TALK( "received this message: " << message ); // quite verbose!
+
+ // and now, analyze answer
+ // first line is: HTTP/1.1 code answertext(CRLF)
+ char *p=strstr(message," "); //looks for the first white space to locate status-code
+
+ int statusCode=strtoul( p, (char **)NULL, 10);
+
+ char *pEOL=strstr(p,"\r\n"); // locate CRLF
+ if(!pEOL)
+ {
+ RMInit::logOut << "Error: Invalid answer from rasmgr." << endl;
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, invalid answer from rasmgr." );
+ throw r_Error( r_Error::r_Error_ServerInvalid );
+ }
+
+ if(statusCode==200)
+ { // It's OK
+ char *addr = strstr(message,"\r\n\r\n")+4; //looks for the address of server
+
+ addr = strtok(addr," \r\n\t"); //isolates the RasMGR host name
+
+ char *portString = strtok(NULL," \r\n\t"); //looks for the port, sended as string
+
+ char *capab = strtok(NULL," \r\n\t");
+
+ if(portString && addr && capab)
+ {
+ strcpy(serverHost,addr);
+ serverPort = strtoul( portString, (char **)NULL, 0);
+ strcpy(capability,capab);
+ TALK( "RnpClientComm::executeGetFreeServer(): got server " << serverHost << ":" << serverPort << ", capability: " << capability );
+ }
+ else
+ {
+ RMInit::logOut << "Error: Invalid answer from rasmgr." << endl;
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, server invalid." );
+ throw r_Error( r_Error::r_Error_ServerInvalid );
+ }
+
+ }
+ else
+ {
+ char *errText = strstr(message,"\r\n\r\n")+4;
+ RMInit::logOut << "Communication error: "<<errText<< endl;
+
+ int errorCode = strtoul(errText, (char **)NULL, 0);
+
+ switch(errorCode)
+ {
+ case 802:
+ case 803:
+ case 804:
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, errorCode = " << errorCode );
+ throw r_Error( r_Error::r_Error_AccesDenied,errorCode);
+ break;
+ case 801:
+ case 805:
+ case 806:
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, errorCode = " << errorCode );
+ throw r_Error( r_Error::r_Error_SystemOverloaded,errorCode);
+ break;
+ case 807:
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, errorCode = " << errorCode );
+ throw r_Error( r_Error::r_Error_DatabaseUnknown,errorCode);
+ break;
+ default :
+ LEAVE( "RnpClientComm::executeGetFreeServer(): exception, errorCode = " << errorCode );
+ throw r_Error( r_Error::r_Error_General,808 );
+ break;
+ }
+ }
+
+ LEAVE( "RnpClientComm::executeGetFreeServer() -> 1" );
+ return 1;
+}
+
+int RnpClientComm::readWholeMessage(int socket,char *destBuffer,int buffSize)
+{
+ // we read what is comming in until we encounter a '\0'
+ // this is our end-sign.
+ int totalLength=0;
+ int redNow;
+ while(1)
+ {
+ redNow = read(socket,destBuffer+totalLength,buffSize-totalLength);
+ if(redNow == -1)
+ {
+ if(errno == EINTR)
+ continue; // read was interrupted by signal
+ return -1; // another error
+ }
+ totalLength+=redNow;
+
+ if(destBuffer[totalLength-1]==0)
+ break; // THE END
+ }
+
+ TALK( "RnpClientComm::readWholeMessage(): read " << totalLength << " bytes, buffer is: " << destBuffer );
+ return totalLength;
+}
+
+int RnpClientComm::writeWholeMessage(int socket,char *destBuffer,int buffSize)
+{
+ // we write the whole message, including the ending '\0', which is already in
+ // the buffSize provided by the caller
+ int totalLength=0;
+ int writeNow;
+ while(1)
+ {
+ writeNow = write(socket,destBuffer+totalLength,buffSize-totalLength);
+ if(writeNow == -1)
+ {
+ if(errno == EINTR)
+ continue; // read was interrupted by signal
+ return -1; // another error
+ }
+ totalLength+=writeNow;
+
+ if( totalLength==buffSize )
+ break; // THE END
+ }
+
+ TALK( "RnpClientComm::writeWholeMessage(): wrote " << totalLength << " bytes, buffer is: " << destBuffer );
+ return totalLength;
+}
+
+
+void RnpClientComm::checkForRwTransaction() throw (r_Error)
+{
+ r_Transaction *trans = r_Transaction::actual_transaction;
+ if( trans == 0 || trans->get_mode() == r_Transaction::read_only )
+ {
+ TALK( "RnpClientComm::checkForRwTransaction(): throwing exception from failed TA rw check." );
+ throw r_Error( r_Error::r_Error_TransactionReadOnly );
+ }
+}
diff --git a/rnprotocol/rnpclientcomm.hh b/rnprotocol/rnpclientcomm.hh
new file mode 100644
index 0000000..cd81edd
--- /dev/null
+++ b/rnprotocol/rnpclientcomm.hh
@@ -0,0 +1,346 @@
+#ifndef RNPCLIENTCOMM_HH
+#define RNPCLIENTCOMM_HH
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/*************************************************************
+ *
+ *
+ * COMMENTS:
+ *
+ ************************************************************/
+
+#include "clientcomm/clientcomm.hh"
+#include "rnprotocol/rnpcommunication.hh"
+#include "rnprotocol/rnprasserver.hh"
+#include "clientcomm/rpcif.h"
+
+using namespace rnp;
+
+class RnpClientComm : public ClientComm, RnpBaseClientComm
+ {
+ public:
+ /// constructor getting the host name of the rasmgr host and it's listening port (default 7001).
+ RnpClientComm( const char* rasmgrHost, int rasmgrPort = RASMGRPORT ) throw( r_Error );
+
+ ~RnpClientComm() throw ();
+
+ bool effectivTypeIsRNP() throw();
+
+ //@Man: Database methods
+ //@{
+ ///
+
+ /// open database
+ int openDB( const char* database );
+ /// close current database
+ int closeDB();
+ /// create a database
+ int createDB( const char* name ) throw(r_Error);
+ /// destroy a database
+ int destroyDB( const char* name ) throw(r_Error);
+
+ ///
+ //@}
+
+ //@Man: Transaction methods
+ //@{
+ ///
+
+ /// begin transaction
+ int openTA( unsigned short readOnly = 0 ) throw(r_Error);
+ /// commit current transaction
+ int commitTA() throw(r_Error);
+ /// abort current transaction
+ int abortTA();
+
+ ///
+ //@}
+
+ //@Man: MDD methods
+ //@{
+ ///
+
+ /// inserts a MDD object in an existing MDD collection on the server
+ void insertMDD( const char* collName, r_GMarray* mar ) throw( r_Error );
+ /// gets MDD object by oid
+ r_Ref_Any getMDDByOId( const r_OId& oid ) throw( r_Error );
+
+ ///
+ //@}
+
+ //@Man: Collection methods
+ //@{
+ ///
+
+ /// creates an empty MDD collection on the server
+ void insertColl( const char* collName, const char* typeName, const r_OId& oid ) throw( r_Error );
+ /// deletes an MDD collection by name
+ void deleteCollByName( const char* collName ) throw( r_Error );
+ /// deletes an object by oid (right now, objects are collection only)
+ void deleteObjByOId( const r_OId& oid ) throw( r_Error );
+ /// removes an object from a collection
+ void removeObjFromColl( const char* name, const r_OId& oid ) throw ( r_Error );
+ /// gets collection by name
+ r_Ref_Any getCollByName( const char* name ) throw( r_Error );
+ /// gets collection by oid
+ r_Ref_Any getCollByOId ( const r_OId& oid ) throw( r_Error );
+ /// gets collection references by name
+ r_Ref_Any getCollOIdsByName( const char* name ) throw( r_Error );
+ /// gets collection references by oid
+ r_Ref_Any getCollOIdsByOId ( const r_OId& oid ) throw( r_Error );
+
+ ///
+ //@}
+
+ //@Man: Query methods
+ //@{
+ ///
+
+ /// query execution
+ void executeQuery( const r_OQL_Query& query, r_Set< r_Ref_Any >& result ) throw( r_Error );
+ /*@Doc:
+ Executes a retrieval query of type \Ref{r_OQL_Query} and returns the result. Every
+ MDD object of the MDD collection is fetched from the server and inserted
+ in the resulting \Ref{r_Set}.
+ */
+
+ /// update execution
+ void executeQuery( const r_OQL_Query& query ) throw( r_Error );
+ /*@Doc:
+ Executes an update query of type \Ref{r_OQL_Query}.
+ */
+
+ ///
+ //@}
+
+
+ //@Man: System methods
+ //@{
+ ///
+
+ /// get new oid
+ r_OId getNewOId( unsigned short objType ) throw(r_Error);
+
+ /// get oid type
+ unsigned short getObjectType( const r_OId& oid ) throw(r_Error);
+
+ /// get type structure
+ /// dallocate using delete []
+ char* getTypeStructure( const char* typeName, r_Type_Type typeType ) throw(r_Error);
+
+ ///
+ //@}
+
+ /// provides read access to my clientID
+ unsigned long getClientID() const;
+
+ /// set the preferred transfer format
+ int setTransferFormat( r_Data_Format format, const char* formatParams=NULL );
+
+ /// set the preferred storage format
+ int setStorageFormat( r_Data_Format format, const char *formatParams=NULL );
+
+
+ /// get real server name (the dinamic one, assigned by the RasMGR)
+ const char* getServerName();
+
+ /// user identification for RasMGR
+ void setUserIdentification(const char *userName, const char *plainTextPassword);
+
+ /// set maximum retry to get a server
+ void setMaxRetry(unsigned int newMaxRetry);
+
+ /// get maximum retry to get a server
+ unsigned int getMaxRetry();
+
+ void setTurbo(bool turbo);
+
+ ///
+ //@}
+
+ // obsolete, but kept because of virtual functions in base class
+ void triggerAliveSignal();
+ void sendAliveSignal();
+ const char *getExtendedErrorInfo() throw(r_Error);
+
+ void setTimeoutInterval(int seconds);
+ int getTimeoutInterval();
+
+//#### secret, unofficial functions ###########
+
+ r_OId createCollection(const char *collName, const char *collTypeName) throw(r_Error);
+
+ r_OId createMDD(const char* collName, const char* mddTypeName, const char* definitionDomain, const char *tileDomain, bool rcindex = false) throw(r_Error);
+
+ void extendMDD(r_OId mddOId, const char *stripeDomain, const char* tileDomain) throw(r_Error);
+
+ vector<r_OId> getOIdsFromCollection( const char* name ) throw( r_Error );
+
+ vector<r_OId> getOIdsFromCollection( const r_OId& oid ) throw( r_Error );
+
+ vector<r_Minterval> getTileDomains(r_OId mddOId, const char *stripeDomain) throw( r_Error );
+
+
+ void preloadTiles(r_OId mddOId, const char *tileDomain) throw(r_Error);
+
+ int getTileData(r_OId mddOId, const char *tileDomain, char *&data, bool preallocated = false) throw(r_Error);
+
+ void replaceTileData(r_OId mddOId, const char *tileDomain, const char *newData, int dataLength, const char *alfaData, int alfaLength) throw(r_Error);
+
+//#############################################
+ private:
+ /// client ID assigned to me by the server
+ int clientID;
+
+ /// the name of the rasmgr host
+ char *rasmgrHost;
+
+ /// the listening port of the rasmgr
+ int rasmgrPort;
+
+ /// the name of the server host
+ char serverHost[100]; //can't be just a pointer, it's never stored elsewhere
+
+ int serverPort;
+
+ // the name of the opened database, needed because it will be opened again and again, in a hidden operation
+ char databaseName[100];
+
+ // the capability
+ char capability[100];
+
+ /// user identification string
+ char identificationString[100];
+
+ /// requests a free server from the rasmgr, retrying maxRetry times
+ int getFreeServer(bool readwrite, bool openDB);
+
+ /// requests a free server from the rasmgr
+ int executeGetFreeServer(bool readwrite, bool openDB);
+
+ int readWholeMessage(int socket,char *destBuffer,int buffSize);
+
+ int writeWholeMessage(int socket,char *destBuffer,int buffSize);
+
+ // MD5 of password
+ int messageDigest(const char *input,char *output,const char *mdName);
+
+ /// internal function for client/server protocol handling of non-MDD collection transfer
+ void getElementCollection( r_Set< r_Ref_Any >& result ) throw(r_Error);
+
+ /// internal function for client/server protocol handling of MDD collection transfer
+ void getMDDCollection( r_Set< r_Ref_Any >& result, unsigned int isQuery ) throw(r_Error);
+
+ /// internal function for reading an MDD from the database
+ unsigned short getMDDCore( r_Ref<r_GMarray> &mdd, GetMDDRes *thisResult, unsigned int isQuery ) throw( r_Error );
+
+ /// concatenate data to an array, making sure there are no overflows (used by getMDDCore())
+ int concatArrayData( const char *source, unsigned long srcSize, char *&dest,
+ unsigned long &destSize, unsigned long &destLevel );
+
+ /// do transfer decompression
+ r_Data_Format doTransferDecompression( r_GMarray *tile, const r_Base_Type *type,
+ r_Data_Format fmt, unsigned long size );
+
+ /// internal function for converting a \Ref{r_GMarray} into its RPC representation
+ void getMarRpcRepresentation( const r_GMarray* mar, RPCMarray*& rpcMarray,
+ r_Data_Format initStorageFormat = r_Array,
+ const r_Base_Type *bt = NULL);
+
+ /// internal function for freeing data allocated by getMarRpcRepresentation()
+ void freeMarRpcRepresentation( const r_GMarray* mar, RPCMarray* rpcMarray );
+
+ /// endianness of client and server (0 means big endian)
+ int endianServer;
+ int endianClient;
+
+ /// data format for transfer compression
+ r_Data_Format transferFormat;
+ /// storage format for inserting new tiles
+ r_Data_Format storageFormat;
+ /// transfer format parameters
+ char* transferFormatParams;
+ /// storage format parameters
+ char *storageFormatParams;
+ /// parameter object for configuration
+ r_Parse_Params *clientParams;
+
+ /// policy is compress-on-server
+ int serverCompresses;
+ /// policy is exact
+ int exactFormat;
+
+ // functions which really do the connection stuff
+ void executeConnect();
+ void executeDisconnect();
+ void executeOpenDB(const char*);
+ void executeCloseDB();
+ void executeBeginTA(bool rw);
+ void executeCommitTA();
+ void executeAbortTA();
+ int executeExecuteQuery( const char* query, r_Set< r_Ref_Any >& result ) throw( r_Error );
+ GetElementRes* executeGetNextElement();
+ int executeEndTransfer();
+ GetMDDRes* executeGetNextMDD();
+ GetTileRes* executeGetNextTile();
+ void executeExecuteUpdateQuery(const char *query) throw(r_Error);
+ int executeStartInsertTransMDD(r_GMarray* mdd);
+ int executeInsertTile(bool persistent, RPCMarray* tile);
+ void executeEndInsertMDD(bool persistent);
+ int executeInitUpdate();
+ int executeStartInsertPersMDD( const char* collName, r_GMarray* mar );
+ int executeInsertMDD(const char* collName, r_GMarray* mar, RPCMarray *rpcMarray);
+ int executeInsertCollection( const char* collName, const char* typeName, const r_OId& oid );
+ r_Ref_Any executeGetCollByNameOrOId ( const char* collName, const r_OId& oid ) throw( r_Error );
+ r_Ref_Any executeGetCollOIdsByNameOrOId ( const char* collName, const r_OId& oid ) throw( r_Error );
+ int executeSetFormat( bool transferFormat, r_Data_Format format, const char* formatParams);
+ r_OId executeGetNewOId( unsigned short objType ) throw(r_Error);
+ unsigned short executeGetObjectType( const r_OId& oid ) throw(r_Error);
+ char* executeGetTypeStructure( const char* typeName, r_Type_Type typeType ) throw(r_Error);
+
+ vector<r_OId> executeGetOIdsFromCollection ( const char* collName, const r_OId& oid ) throw( r_Error );
+
+ void turboOpenDB(const char*);
+ void turboBeginTA(bool rw);
+ void turboCommitTA();
+ void turboAbortTA();
+
+ bool useTurbo;
+
+ // returns only if transaction is open and rw, otherwise throws
+ void checkForRwTransaction() throw (r_Error);
+
+ // varianta locala
+ void sendRequestGetAnswer() throw (r_Error);
+
+ int sendAndReturnStatus() throw (r_Error);
+
+ bool detectErrors();
+ // doesn't return if there is an error
+ void reassemble_r_Error() throw (r_Error);
+
+ // a very internal helper for some functions
+ void helper012d(const char* caller) throw (r_Error);
+ };
+
+#endif
diff --git a/rnprotocol/rnpclientcomm2.cc b/rnprotocol/rnpclientcomm2.cc
new file mode 100644
index 0000000..3974bd5
--- /dev/null
+++ b/rnprotocol/rnpclientcomm2.cc
@@ -0,0 +1,1226 @@
+#include "mymalloc/mymalloc.h"
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/*************************************************************
+ *
+ *
+ * PURPOSE
+ * Contains the functions which really communicate with the server
+ *
+ *
+ * COMMENTS:
+ * - return values & their meaning see servercomm.hh
+ *
+ ************************************************************/
+
+#include <openssl/evp.h>
+
+#include "rnpclientcomm.hh"
+
+#include "rasodmg/transaction.hh"
+#include "rasodmg/database.hh"
+#include "rasodmg/iterator.hh"
+#include "rasodmg/set.hh"
+#include "rasodmg/ref.hh"
+#include "rasodmg/storagelayout.hh"
+#include "rasodmg/tiling.hh"
+
+#include "raslib/minterval.hh"
+#include "raslib/rmdebug.hh"
+#include "raslib/rminit.hh"
+#include "raslib/primitivetype.hh"
+#include "raslib/complextype.hh"
+#include "raslib/structuretype.hh"
+#include "raslib/primitive.hh"
+#include "raslib/complex.hh"
+#include "raslib/structure.hh"
+#include "raslib/endian.hh"
+#include "raslib/parseparams.hh"
+// for transfer compression
+#include "compression/tilecompression.hh"
+
+#include "debug.hh"
+
+void RnpClientComm::setTurbo(bool turbo)
+{
+ ENTER( "RpcClientComm::setTurbo(" << turbo << ")" );
+ RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "setTurbo(" << turbo << ")" );
+
+ useTurbo = turbo;
+
+ RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "setTurbo()" );
+ LEAVE( "RpcClientComm::setTurbo()" );
+}
+
+void RnpClientComm::executeConnect()
+{
+ ENTER( "RnpClientComm::executeConnect()" );
+
+ startRequest(RnpRasserver::cmd_connect);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, 0);
+ encoder.addStringParameter(RnpRasserver::pmt_capability, capability);
+ TALK( "request RnpRasserver::cmd_connect with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ clientID = decoder.getDataAsInteger();
+ TALK( "executeConnect: Connected, clientID 0x" << hex << clientID << dec << endl );
+
+ endianServer = decoder.getDesiredEndianness() == Rnp::bigEndian ? 0: 1;
+ endianClient = Rnp::detectHostEndianness() == Rnp::bigEndian ? 0: 1;
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeConnect()" );
+}
+
+void RnpClientComm::executeDisconnect()
+{
+ ENTER( "RnpClientComm::executeDisconnect()" );
+
+ startRequest(RnpRasserver::cmd_disconnect);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "request RnpRasserver::cmd_disconnect with clientID 0x" << hex << clientID << dec );
+
+ clientID = -1;
+ TALK( "clientID now set to 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeDisconnect()" );
+}
+
+void RnpClientComm::executeOpenDB(const char* lDatabaseName)
+{
+ ENTER( "RnpClientComm::executeOpenDB( lDatabaseName=" << (lDatabaseName?lDatabaseName:"(null)") << " )" );
+
+ startRequest(RnpRasserver::cmd_opendb);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_dbname, lDatabaseName);
+ TALK( "request RnpRasserver::cmd_opendb '" << lDatabaseName << "', with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeOpenDB()" );
+}
+
+void RnpClientComm::executeCloseDB()
+{
+ ENTER( "RnpClientComm::executeCloseDB()" );
+
+ startRequest(RnpRasserver::cmd_closedb);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "request RnpRasserver::cmd_closedb with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeCloseDB()" );
+}
+
+void RnpClientComm::executeBeginTA(bool rw)
+{
+ ENTER( "RnpClientComm::executeBeginTA( rw=" << rw << " )" );
+
+ startRequest(RnpRasserver::cmd_beginta);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addInt32Parameter(RnpRasserver::pmt_accesmode, rw);
+ TALK( "request RnpRasserver::cmd_beginta with rw=" << rw << ", clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeBeginTA()" );
+}
+
+void RnpClientComm::executeCommitTA()
+{
+ ENTER( "RnpClientComm::executeCommitTA()" );
+
+ startRequest(RnpRasserver::cmd_committa);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "request RnpRasserver::cmd_committa with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeCommitTA()" );
+}
+
+void RnpClientComm::executeAbortTA()
+{
+ ENTER( "RnpClientComm::executeAbortTA()" );
+
+ startRequest(RnpRasserver::cmd_abortta);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "request RnpRasserver::cmd_abortta with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeAbortTA()" );
+}
+
+void RnpClientComm::turboOpenDB(const char *lDatabaseName)
+{
+ ENTER( "RnpClientComm::turboOpenDB( lDatabaseName=" << (lDatabaseName?lDatabaseName:"(null)") << " )" );
+
+ clientID = 0;
+
+ startRequest(RnpRasserver::cmd_connect);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, 0); // = always clientID -- PB
+ encoder.addStringParameter(RnpRasserver::pmt_capability, capability);
+ encoder.endFragment();
+ TALK( "request RnpRasserver::cmd_connect with clientID 0x0, capability '" << capability << "'" );
+
+ encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_opendb);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_dbname, lDatabaseName);
+ encoder.endFragment();
+ TALK( "adding fragment RnpRasserver::cmd_opendb with db '" << lDatabaseName << "', clientID 0x" << hex << clientID << dec );
+
+ encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_closedb);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.endFragment();
+ TALK( "adding fragment RnpRasserver::cmd_closedb with clientID 0x" << hex << clientID << dec );
+
+ encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_disconnect);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "adding fragment RnpRasserver::cmd_disconnect with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ //clientID = decoder.getDataAsInteger();
+
+ clientID = -1; // we will disconnect so we force it here
+ TALK( "RnpClientComm::turboOpenDB(): clientID set to 0x" << hex << clientID << dec );
+
+ endianServer = decoder.getDesiredEndianness() == Rnp::bigEndian ? 0: 1;
+ endianClient = Rnp::detectHostEndianness() == Rnp::bigEndian ? 0: 1;
+
+ // open
+ decoder.getNextFragment();
+ detectErrors();
+ // close
+ decoder.getNextFragment();
+ detectErrors();
+ // disconnect
+ decoder.getNextFragment();
+ detectErrors();
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::turboOpenDB()" );
+}
+
+void RnpClientComm::turboBeginTA(bool rw)
+{
+ ENTER( "RnpClientComm::turboBeginTA( rw=" << rw << " )" );
+
+ clientID = 0;
+
+ startRequest(RnpRasserver::cmd_connect);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, 0);
+ encoder.addStringParameter(RnpRasserver::pmt_capability, capability);
+ encoder.endFragment();
+ TALK( "request RnpRasserver::cmd_connect with clientID 0x" << hex << clientID << dec << ", capability '" << capability << "'" );
+
+ encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_opendb);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_dbname, databaseName);
+ encoder.endFragment();
+ TALK( "adding fragment RnpRasserver::cmd_opendb '" << databaseName << "', with clientID 0x" << hex << clientID << dec );
+
+ encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_beginta);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addInt32Parameter(RnpRasserver::pmt_accesmode, rw);
+ TALK( "adding fragment RnpRasserver::cmd_beginta with rw=" << rw << ", clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ clientID = decoder.getDataAsInteger();
+ TALK( "rceived clientID 0x" << hex << clientID << dec );
+ endianServer = decoder.getDesiredEndianness() == Rnp::bigEndian ? 0: 1;
+ endianClient = Rnp::detectHostEndianness() == Rnp::bigEndian ? 0: 1;
+
+ // open
+ decoder.getNextFragment();
+ detectErrors();
+
+ // beginTA
+ decoder.getNextFragment();
+ detectErrors();
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::turboBeginTA()" );
+}
+
+void RnpClientComm::turboCommitTA()
+{
+ ENTER( "RnpClientComm::turboCommitTA()" );
+
+ startRequest(RnpRasserver::cmd_committa);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.endFragment();
+ TALK( "request RnpRasserver::cmd_committa with clientID 0x" << hex << clientID << dec );
+
+ encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_closedb);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.endFragment();
+ TALK( "adding fragment RnpRasserver::cmd_closedb with clientID 0x" << hex << clientID << dec );
+
+ encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_disconnect);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "adding fragment RnpRasserver::cmd_disconnect with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ // close
+ decoder.getNextFragment();
+ detectErrors();
+
+ // disconnect
+ decoder.getNextFragment();
+ detectErrors();
+
+ clientID = -1;
+ TALK( "resetting: clientID 0x" << hex << clientID << dec );
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::turboCommitTA()" );
+}
+void RnpClientComm::turboAbortTA()
+{
+ ENTER( "RnpClientComm::turboAbortTA()" );
+
+ startRequest(RnpRasserver::cmd_abortta);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.endFragment();
+ TALK( "request RnpRasserver::cmd_abortta with clientID 0x" << hex << clientID << dec );
+
+ encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_closedb);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.endFragment();
+ TALK( "adding fragment RnpRasserver::cmd_closedb with clientID 0x" << hex << clientID << dec );
+
+ encoder.startFragment(Rnp::fgt_Command,RnpRasserver::cmd_disconnect);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "adding fragment RnpRasserver::cmd_disconnect with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ // close
+ decoder.getNextFragment();
+ detectErrors();
+
+ // disconnect
+ decoder.getNextFragment();
+ detectErrors();
+
+ clientID = -1;
+ TALK( "resetting: clientID 0x" << hex << clientID << dec );
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::turboAbortTA()" );
+}
+
+//---------------------------------------------------------------------------------
+int RnpClientComm::executeStartInsertPersMDD( const char* collName, r_GMarray* mar )
+{
+ ENTER( "RnpClientComm::executeStartInsertPersMDD( collName=" << (collName?collName:"(null)") << ", mar=" << ((unsigned long) mar) << " )" );
+
+ startRequest(RnpRasserver::cmd_startinsPmdd);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+
+ char *domain = mar->spatial_domain().get_string_representation();
+ encoder.addStringParameter(RnpRasserver::pmt_domain, domain);
+
+ encoder.addInt32Parameter(RnpRasserver::pmt_typelength, mar->get_type_length());
+ encoder.addStringParameter(RnpRasserver::pmt_typename, mar->get_type_name());
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, mar->get_oid().get_string_representation());
+
+ TALK( "request RnpRasserver::cmd_startinsPmdd with collname '" << collName << "', domain " << domain << ", typelength " << mar->get_type_length() << ", typename '" << mar->get_type_name() << ", oid " << mar->get_oid().get_string_representation() << ", clientID 0x" << hex << clientID << dec );
+
+ free(domain);
+
+ int result = sendAndReturnStatus();
+
+ ENTER( "RnpClientComm::executeStartInsertPersMDD() -> " << result );
+ return result;
+}
+
+int RnpClientComm::executeExecuteQuery( const char* query, r_Set< r_Ref_Any >& result ) throw( r_Error )
+{
+ ENTER( "RnpClientComm::executeExecuteQuery( query=" << (query?query:"(null)") << ", result=" << ((unsigned long) &result) << " )" );
+
+ startRequest(RnpRasserver::cmd_queryrpc);
+ encoder.adjustBufferSize(strlen(query));
+
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_querystring, query);
+ TALK( "request RnpRasserver::cmd_queryrpc with query '" << query << ", clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ int status = decoder.getDataAsInteger();
+ decoder.getNextParameter();
+ int errNo = decoder.getDataAsInteger();
+ decoder.getNextParameter();
+ int lineNo = decoder.getDataAsInteger();
+ decoder.getNextParameter();
+ int colNo = decoder.getDataAsInteger();
+ decoder.getNextParameter();
+ const char* token = decoder.getDataAsString();
+ decoder.getNextParameter();
+ const char* typeName = decoder.getDataAsString();
+ decoder.getNextParameter();
+ const char* typeStructure = decoder.getDataAsString();
+
+ if(status == 0 || status == 1)
+ {
+ result.set_type_by_name( typeName );
+ result.set_type_structure( typeStructure );
+ }
+ // status == 2 - empty result
+
+ if( status == 4 || status == 5 )
+ {
+ r_Equery_execution_failed err( errNo, lineNo, colNo, token );
+ clearAnswer();
+ LEAVE( "RnpClientComm::executeExecuteQuery() exception: status=" << status );
+ throw err;
+ }
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeExecuteQuery() -> " << status );
+ return status;
+}
+
+int RnpClientComm::executeEndTransfer()
+{
+ ENTER( "RnpClientComm::executeEndTransfer()" );
+
+ startRequest(RnpRasserver::cmd_endtransfer);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "request RnpRasserver::cmd_endtransfer with clientID 0x" << hex << clientID << dec );
+
+ int result = sendAndReturnStatus();
+
+ LEAVE( "RnpClientComm::executeEndTransfer() -> " << result );
+ return result;
+}
+
+GetElementRes* RnpClientComm::executeGetNextElement()
+{
+ ENTER( "RnpClientComm::executeGetNextElement()" );
+
+ startRequest(RnpRasserver::cmd_getnextelem);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "request RnpRasserver::cmd_getnextelem with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ GetElementRes *result = new GetElementRes;
+ result->data.confarray_val = NULL;
+
+ result->status = decoder.getDataAsInteger();
+
+ if(decoder.countParameters() == 2)
+ {
+ decoder.getNextParameter();
+ result->data.confarray_len = decoder.getDataLength();
+ result->data.confarray_val = new char[decoder.getDataLength()];
+
+ memcpy(result->data.confarray_val, decoder.getData(), decoder.getDataLength());
+
+ }
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeGetNextElement() -> " << result );
+ return result;
+}
+
+
+int RnpClientComm::executeInsertMDD(const char* collName, r_GMarray* mar, RPCMarray *rpcMarray)
+{
+ ENTER( "RnpClientComm::executeInsertMDD( collName=" << (collName?collName:"(null)") << ", mar=" << ((unsigned long) mar) << ", rpcMarray=" << ((unsigned long) rpcMarray) << " )" );
+
+ int size = rpcMarray->data.confarray_len;
+ startRequest(RnpRasserver::cmd_insertmdd, RNP_DEFAULTBUFFERSIZE + size);
+ encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter( RnpRasserver::pmt_collname, collName);
+ encoder.addStringParameter( RnpRasserver::pmt_typename, mar->get_type_name());
+ encoder.addStringParameter( RnpRasserver::pmt_oidstring, mar->get_oid().get_string_representation());
+ encoder.addStringParameter( RnpRasserver::pmt_domain, rpcMarray->domain);
+ encoder.addInt32Parameter( RnpRasserver::pmt_typelength, rpcMarray->cellTypeLength);
+ encoder.addInt32Parameter( RnpRasserver::pmt_currentformat, rpcMarray->currentFormat);
+ encoder.addInt32Parameter( RnpRasserver::pmt_storageformat, rpcMarray->storageFormat);
+ encoder.addOpaqueParameter( RnpRasserver::pmt_tiledata, rpcMarray->data.confarray_val, size);
+ TALK( "request RnpRasserver::cmd_insertmdd with collection '" << collName << ", ..., clientID 0x" << hex << clientID << dec );
+
+ int result = sendAndReturnStatus();
+
+ LEAVE( "RnpClientComm::executeInsertMDD() -> " << result );
+ return result;
+}
+
+int RnpClientComm::executeInsertCollection( const char* collName, const char* typeName, const r_OId& oid )
+{
+ ENTER( "RnpClientComm::executeInsertCollection( collName=" << (collName?collName:"(null)") << ", typeName=" << (typeName?typeName:"(null)") << ", oid=" << oid << " )" );
+
+ startRequest(RnpRasserver::cmd_insertcoll);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+ encoder.addStringParameter(RnpRasserver::pmt_typename, typeName);
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
+ TALK( "request RnpRasserver::cmd_insertcoll collection '" << collName << ", ..., with clientID 0x" << hex << clientID << dec );
+
+ int result = sendAndReturnStatus();
+ LEAVE( "RnpClientComm::executeInsertCollection() -> " << result );
+ return result;
+}
+
+// common function using the dynamic parameter facility of RNP
+r_Ref_Any RnpClientComm::executeGetCollByNameOrOId ( const char* collName, const r_OId& oid ) throw( r_Error )
+{
+ ENTER( "RnpClientComm::executeGetCollByNameOrOId( collName=" << (collName?collName:"(null)") << ", oid=" << oid << " )" );
+
+ startRequest(RnpRasserver::cmd_getcoll);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ if( collName != NULL)
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+ else
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
+ TALK( "request RnpRasserver::cmd_getcoll with ..., clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ int status = decoder.getDataAsInteger();
+ decoder.getNextParameter(); const char *typeName = decoder.getDataAsString();
+ decoder.getNextParameter(); const char *typeStructure = decoder.getDataAsString();
+ decoder.getNextParameter(); const char *oidstring = decoder.getDataAsString();
+ decoder.getNextParameter(); const char *collectionName= decoder.getDataAsString();
+
+ r_Set< r_Ref_Any >* set = 0;
+
+ if( status != 0 && status != 1 )
+ {
+ r_Error err;
+ switch( status )
+ {
+ case 2:
+ err = r_Error( r_Error::r_Error_ObjectUnknown );
+ break;
+ case 3:
+ err = r_Error( r_Error::r_Error_ClientUnknown );
+ break;
+ default:
+ err = r_Error( r_Error::r_Error_TransferFailed );
+ break;
+ }
+ clearAnswer();
+ LEAVE( "RnpClientComm::executeGetCollByNameOrOId(): exception, status = " << status );
+ throw err;
+ }
+
+ // create the set
+ r_OId rOId( oidstring );
+ set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref_Any >;
+
+ // initialize data elements
+ set->set_type_by_name ( typeName );
+ set->set_type_structure( typeStructure );
+ set->set_object_name ( collectionName );
+
+ clearAnswer();
+
+ // get collection elements
+ if( status == 0 )
+ getMDDCollection( *set, 0 );
+ // else rpcStatus == 1 -> Result collection is empty and nothing has to be got.
+
+ r_Ref_Any result = r_Ref_Any( set->get_oid(), set );
+ LEAVE( "RnpClientComm::executeGetCollByNameOrOId() -> (result set not displayed)" );
+ return result;
+}
+
+
+// common function using the dynamic parameter facility of RNP
+r_Ref_Any RnpClientComm::executeGetCollOIdsByNameOrOId ( const char* collName, const r_OId& oid ) throw( r_Error )
+{
+ ENTER( "RnpClientComm::executeGetCollOIdsByNameOrOId( collName=" << (collName?collName:"(null)") << ", oid=" << oid << " )" );
+
+ startRequest(RnpRasserver::cmd_getcolloids);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ if( collName != NULL)
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+ else
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
+ TALK( "request RnpRasserver::cmd_getcolloids with ..., clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ int status = decoder.getDataAsInteger();
+ decoder.getNextParameter(); const char *typeName = decoder.getDataAsString();
+ decoder.getNextParameter(); const char *typeStructure = decoder.getDataAsString();
+ decoder.getNextParameter(); const char *oidstring = decoder.getDataAsString();
+ decoder.getNextParameter(); const char *collectionName= decoder.getDataAsString();
+
+ r_Set< r_Ref<r_GMarray> >* set = 0;
+
+ if( status != 0 && status != 1 )
+ {
+ r_Error err;
+ switch( status )
+ {
+ case 2:
+ err = r_Error( r_Error::r_Error_ObjectUnknown );
+ break;
+ case 3:
+ err = r_Error( r_Error::r_Error_ClientUnknown );
+ break;
+ default:
+ err = r_Error( r_Error::r_Error_TransferFailed );
+ break;
+ }
+ clearAnswer();
+ LEAVE( "RnpClientComm::executeGetCollOIdsByNameOrOId(): exception, status = " << status );
+ throw err;
+ }
+
+ // create the set
+ r_OId rOId( oidstring );
+ set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref< r_GMarray > >;
+
+ set->set_type_by_name ( typeName );
+ set->set_type_structure( typeStructure );
+ set->set_object_name ( collName );
+
+ // fill set with oids
+ if( status == 0 )
+ {
+ while(decoder.getNextParameter() != 0)
+ {
+ r_OId roid( decoder.getDataAsString() );
+ set->insert_element( r_Ref<r_GMarray>(roid), 1 );
+ }
+ }
+
+ clearAnswer();
+
+ r_Ref_Any result = r_Ref_Any( set->get_oid(), set );
+ LEAVE( "RnpClientComm::executeGetCollOIdsByNameOrOId() -> (result not displayed)" );
+ return result;
+ }
+
+
+GetMDDRes* RnpClientComm::executeGetNextMDD()
+{
+ ENTER( "RnpClientComm::executeGetNextMDD()" );
+
+ startRequest(RnpRasserver::cmd_getnextmdd);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "request RnpRasserver::cmd_getnextmdd with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ GetMDDRes* result = new GetMDDRes;
+
+ result->status = decoder.getDataAsInteger();
+ decoder.getNextParameter(); result->domain = strdup(decoder.getDataAsString());
+ decoder.getNextParameter(); result->typeName = strdup(decoder.getDataAsString());
+ decoder.getNextParameter(); result->typeStructure = strdup(decoder.getDataAsString());
+ decoder.getNextParameter(); result->oid = strdup(decoder.getDataAsString());
+ decoder.getNextParameter(); result->currentFormat = decoder.getDataAsInteger();
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeGetNextMDD() -> " << result );
+ return result;
+}
+
+GetTileRes* RnpClientComm::executeGetNextTile()
+{
+ ENTER( "RnpClientComm::executeGetNextTile()" );
+
+ startRequest(RnpRasserver::cmd_getnexttile);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "request RnpRasserver::cmd_getnexttile with clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ GetTileRes* result = new GetTileRes;
+ result->marray = new RPCMarray;
+
+ result->status = decoder.getDataAsInteger();
+ if(decoder.getNextParameter() != 0)
+ {
+ result->marray->domain = strdup(decoder.getDataAsString());
+ decoder.getNextParameter(); result->marray->cellTypeLength = decoder.getDataAsInteger();
+ decoder.getNextParameter(); result->marray->currentFormat = decoder.getDataAsInteger();
+ decoder.getNextParameter(); result->marray->storageFormat = decoder.getDataAsInteger();
+ decoder.getNextParameter();
+ int length = decoder.getDataLength();
+ result->marray->data.confarray_len = length;
+ result->marray->data.confarray_val = (char*)mymalloc(length);
+ memcpy(result->marray->data.confarray_val, decoder.getData(), length);
+ }
+ else
+ {
+ result->marray->domain = 0;
+ result->marray->data.confarray_val = 0;
+ }
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeGetNextTile() -> " << result );
+ return result;
+}
+
+int RnpClientComm::executeInitUpdate()
+{
+ ENTER( "RnpClientComm::executeInitUpdate()" );
+
+ startRequest(RnpRasserver::cmd_initupdate);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "adding fragment XXX with clientID 0x" << hex << clientID << dec );
+
+ int result = sendAndReturnStatus();
+ LEAVE( "RnpClientComm::executeInitUpdate() -> " << result );
+ return result;
+}
+
+int RnpClientComm::executeStartInsertTransMDD(r_GMarray* mdd)
+{
+ ENTER( "RnpClientComm::executeStartInsertTransMDD( mdd=" << ((unsigned long) mdd) << " )" );
+
+ startRequest(RnpRasserver::cmd_startinsTmdd);
+ encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter( RnpRasserver::pmt_domain, mdd->spatial_domain().get_string_representation());
+ encoder.addInt32Parameter( RnpRasserver::pmt_typelength, mdd->get_type_length());
+ encoder.addStringParameter( RnpRasserver::pmt_typename, mdd->get_type_name());
+ TALK( "request RnpRasserver::cmd_startinsTmdd with ..., clientID 0x" << hex << clientID << dec );
+
+ int result = sendAndReturnStatus();
+ LEAVE( "RnpClientComm::executeStartInsertTransMDD() -> " << result );
+ return result;
+}
+
+int RnpClientComm::executeInsertTile(bool persistent, RPCMarray *tile)
+{
+ ENTER( "RnpClientComm::executeInsertTile( persistent=" << persistent << ", tile=" << ((unsigned long) tile) << " )" );
+
+ int size = tile->data.confarray_len;
+ startRequest(RnpRasserver::cmd_inserttile, RNP_DEFAULTBUFFERSIZE + size);
+ encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID);
+ encoder.addInt32Parameter( RnpRasserver::pmt_ispersistent, persistent ? 1:0);
+ encoder.addStringParameter( RnpRasserver::pmt_domain, tile->domain);
+ encoder.addInt32Parameter( RnpRasserver::pmt_typelength, tile->cellTypeLength);
+ encoder.addInt32Parameter( RnpRasserver::pmt_currentformat, tile->currentFormat);
+ encoder.addInt32Parameter( RnpRasserver::pmt_storageformat, tile->storageFormat);
+ encoder.addOpaqueParameter( RnpRasserver::pmt_tiledata, tile->data.confarray_val, size);
+ TALK( "request RnpRasserver::cmd_inserttile with ..., clientID 0x" << hex << clientID << dec );
+
+ int result = sendAndReturnStatus();
+
+ LEAVE( "RnpClientComm::executeInsertTile() -> " << result );
+ return result;
+}
+
+void RnpClientComm::executeEndInsertMDD(bool persistent)
+{
+ ENTER( "RnpClientComm::executeEndInsertMDD( persistent=" << persistent << " )" );
+
+ startRequest(RnpRasserver::cmd_endinsmdd);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addInt32Parameter(RnpRasserver::pmt_ispersistent, persistent ? 1:0);
+ TALK( "request RnpRasserver::cmd_endinsmdd with persistent " << persistent << ", clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeEndInsertMDD()" );
+}
+
+void RnpClientComm::executeExecuteUpdateQuery(const char *query) throw(r_Error)
+ {
+ ENTER( "RnpClientComm::executeExecuteUpdateQuery( query=" << (query?query:"(null)") << " )" );
+
+ startRequest(RnpRasserver::cmd_updaterpc);
+ encoder.adjustBufferSize(strlen(query));
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_querystring, query);
+ TALK( "request RnpRasserver::cmd_updaterpc with query '" << query << "', clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+ int status = decoder.getDataAsInteger();
+ decoder.getNextParameter();
+ int errNo = decoder.getDataAsInteger();
+ decoder.getNextParameter();
+ int lineNo = decoder.getDataAsInteger();
+ decoder.getNextParameter();
+ int colNo = decoder.getDataAsInteger();
+ decoder.getNextParameter();
+ const char* token = decoder.getDataAsString();
+
+ clearAnswer();
+
+ if( status == 2 || status == 3 )
+ {
+ LEAVE( "RnpClientComm::executeExecuteUpdateQuery(): exception, status = " << status );
+ throw r_Equery_execution_failed( errNo, lineNo, colNo, token );
+ }
+
+ if( status == 1 )
+ {
+ LEAVE( "RnpClientComm::executeExecuteUpdateQuery(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_ClientUnknown );
+ }
+
+ if( status > 3 )
+ {
+ LEAVE( "RnpClientComm::executeExecuteUpdateQuery(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ }
+
+ LEAVE( "RnpClientComm::executeExecuteUpdateQuery()" );
+}
+
+r_OId RnpClientComm::executeGetNewOId( unsigned short objType ) throw(r_Error)
+{
+ //cout<<" RnpClientComm::getNewOId: not implemented"<<endl;
+
+ ENTER( "RnpClientComm::executeGetNewOId( objType=" << objType << " )" );
+
+ startRequest(RnpRasserver::cmd_getnewoid);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addInt32Parameter(RnpRasserver::pmt_objecttype, objType);
+ TALK( "request RnpRasserver::cmd_getnewoid with ..., clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ r_OId oid( decoder.getDataAsString() );
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeGetNewOId() -> " << oid );
+ return oid;
+}
+
+unsigned short RnpClientComm::executeGetObjectType( const r_OId& oid ) throw(r_Error)
+{
+ ENTER( "RnpClientComm::executeGetObjectType( oid=" << oid << " )" );
+
+ startRequest(RnpRasserver::cmd_getobjecttype);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
+ TALK( "request RnpRasserver::cmd_getobjecttype with ..., clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ int status = decoder.getDataAsInteger();
+ switch( status )
+ {
+ case 0:
+ break; //OK
+ case 1:
+ clearAnswer();
+ LEAVE( "RnpClientComm::executeGetObjectType(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_ClientUnknown );
+ break;
+ case 2:
+ clearAnswer();
+ LEAVE( "RnpClientComm::executeGetObjectType(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_ObjectUnknown );
+ break;
+ default:
+ clearAnswer();
+ LEAVE( "RnpClientComm::executeGetObjectType(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ break;
+ }
+
+ decoder.getNextParameter(); unsigned short objType = decoder.getDataAsInteger();
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeGetObjectType() -> " << objType );
+ return objType;
+}
+
+char* RnpClientComm::executeGetTypeStructure( const char* typeName, r_Type_Type typeType ) throw(r_Error)
+{
+ ENTER( "RnpClientComm::executeGetTypeStructure( typeName=" << (typeName?typeName:"(null)") << ", typeType=" << typeType << " )" );
+
+ startRequest(RnpRasserver::cmd_gettypestruct);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_typename, typeName);
+ encoder.addInt32Parameter(RnpRasserver::pmt_typetype, typeType);
+ TALK( "request RnpRasserver::cmd_gettypestruct with ..., clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ int status = decoder.getDataAsInteger();
+ switch(status)
+ {
+ case 0:
+ break; //OK
+ case 1:
+ clearAnswer();
+ LEAVE( "RnpClientComm::executeGetTypeStructure(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_TransactionNotOpen );
+ break;
+ case 2:
+ clearAnswer();
+ LEAVE( "RnpClientComm::executeGetTypeStructure(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_DatabaseClassUndefined );
+ break;
+ default:
+ clearAnswer();
+ LEAVE( "RnpClientComm::executeGetTypeStructure(): exception, status = " << status );
+ throw r_Error( r_Error::r_Error_TransferFailed );
+ break;
+ }
+
+ decoder.getNextParameter();
+ char* typeStructure = new char [decoder.getDataLength() + 1];
+ strcpy(typeStructure, decoder.getDataAsString());
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeGetTypeStructure() -> " << typeStructure );
+ return typeStructure;
+}
+
+int RnpClientComm::executeSetFormat( bool lTransferFormat, r_Data_Format format, const char* formatParams)
+{
+ ENTER( "RnpClientComm::executeSetFormat( lTransferFormat=" << lTransferFormat << ", format=" << format << ", formatParams=" << (formatParams?formatParams:"(null)") << " )" );
+
+ startRequest(RnpRasserver::cmd_setformat);
+ encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID);
+ encoder.addInt32Parameter( RnpRasserver::pmt_whichformat, lTransferFormat);
+ encoder.addInt32Parameter( RnpRasserver::pmt_format, format);
+ encoder.addStringParameter(RnpRasserver::pmt_formatparams, formatParams);
+ TALK( "request RnpRasserver::cmd_setformat with ..., clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ int status = decoder.getDataAsInteger();
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeSetFormat() -> " << status );
+ return status;
+}
+
+//----------------------------------------------------------
+int RnpClientComm::sendAndReturnStatus() throw (r_Error)
+{
+ ENTER( "RnpClientComm::sendAndReturnStatus()" );
+
+ sendRequestGetAnswer();
+
+ int status = decoder.getDataAsInteger();
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::sendAndReturnStatus() -> " << status );
+ return status;
+}
+
+void RnpClientComm::sendRequestGetAnswer() throw (r_Error)
+{
+ ENTER( "RnpClientComm::sendRequestGetAnswer()" );
+
+ RnpBaseClientComm::setMaxRetry(RNP_MAX_RETRY); // defined in raslib/rminit.hh -- PB 2005-sep-01
+ if(RnpBaseClientComm::sendRequestGetAnswer() == false)
+ {
+ clearAnswer();
+ LEAVE( "RnpClientComm::sendRequestGetAnswer(): exception, sendRequestGetAnswer() == false" );
+ throw r_Error( r_Error::r_Error_TransferFailed);
+ }
+
+ detectErrors();
+ if(decoder.countParameters() > 0) decoder.getFirstParameter();
+
+ LEAVE( "RnpClientComm::sendRequestGetAnswer()" );
+}
+
+void RnpClientComm::helper012d(const char* caller) throw (r_Error)
+{
+ int status = sendAndReturnStatus();
+
+ switch( status )
+ {
+ case 0:
+ break;
+ case 1:
+ TALK( "RnpClientComm::helper012d( " << (caller?caller:"(null)") << " ): error: status = " << status );
+ throw r_Error( r_Error::r_Error_ClientUnknown );
+ break;
+ case 2:
+ TALK( "RnpClientComm::helper012d( " << (caller?caller:"(null)") << " ): error: status = " << status );
+ throw r_Error( r_Error::r_Error_ObjectUnknown );
+ break;
+ default:
+ TALK( "RnpClientComm::helper012d( " << (caller?caller:"(null)") << " ): error: status = " << status );
+ throw r_Error( r_Error::r_Error_General );
+ break;
+ }
+}
+
+bool RnpClientComm::detectErrors()
+{
+ if(decoder.getFragmentType() != Rnp::fgt_Error)
+ return false;
+
+ reassemble_r_Error() ;
+
+ return true;
+}
+
+void RnpClientComm::reassemble_r_Error() throw (r_Error)
+{
+ decoder.getFirstParameter();
+ if(decoder.getParameterType() != Rnp::ert_Other)
+ return;
+
+ decoder.getNextParameter();
+
+ r_Error *temp = r_Error::getAnyError((char*)decoder.getDataAsString());
+
+ r_Error err = *temp;
+
+ delete temp;
+
+ TALK( "npClientComm::reassemble_r_Error() throwing exception: " << (char*)decoder.getDataAsString() );
+ throw err;
+}
+
+void RnpClientComm::setTimeoutInterval(int seconds)
+{
+ akg::NbJob::setTimeoutInterval(seconds);
+}
+
+int RnpClientComm::getTimeoutInterval()
+{
+ return akg::NbJob::getTimeoutInterval();
+}
+
+//## unofficial functions
+
+r_OId RnpClientComm::createCollection(const char *collName, const char *collTypeName) throw(r_Error)
+{
+ ENTER( "RnpClientComm::createCollection( collName=" << (collName?collName:"(null)") << ", collTypeName=" << (collTypeName?collTypeName:"(null)") << " )" );
+
+ checkForRwTransaction();
+ startRequest(RnpRasserver::cmd_createcollection);
+ encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+ encoder.addStringParameter(RnpRasserver::pmt_typename, collTypeName);
+ TALK( "request RnpRasserver::cmd_createcollection with ..., clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ r_OId oid(decoder.getDataAsString());
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::createCollection() -> " << oid );
+ return oid;
+}
+
+r_OId RnpClientComm::createMDD(const char* collName, const char* mddTypeName, const char* definitionDomain, const char *tileDomain, bool rcindex) throw(r_Error)
+{
+ ENTER( "RnpClientComm::createMDD( collName=" << (collName?collName:"(null)") << "; mddTypeName=" << (mddTypeName?mddTypeName:"(null)") << ", definitionDomain=" << (definitionDomain?definitionDomain:"(null)") << ", tileDomain=" << (tileDomain?tileDomain:"(null)") << ", rcindex=" << rcindex << " )" );
+
+ checkForRwTransaction();
+ startRequest(RnpRasserver::cmd_createmdd);
+ encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+ encoder.addStringParameter(RnpRasserver::pmt_typename, mddTypeName);
+ encoder.addStringParameter(RnpRasserver::pmt_domain, definitionDomain);
+ encoder.addInt32Parameter( RnpRasserver::pmt_indextype, rcindex);
+ encoder.addStringParameter(RnpRasserver::pmt_domain, tileDomain);
+ TALK( "request RnpRasserver::cmd_createmdd with collName " << collName << ", mddTypeName " << mddTypeName << ", definitionDomain " << definitionDomain << ", rcindex " << rcindex << ", tileDomain " << tileDomain << ", clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ r_OId oid(decoder.getDataAsString());
+ TALK( "RnpClientComm::createMDD() receiving oid " << oid );
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::createMDD() -> " << oid );
+ return oid;
+}
+
+void RnpClientComm::extendMDD(r_OId mddOId, const char *stripeDomain, const char* tileDomain) throw(r_Error)
+{
+ ENTER( "RnpClientComm::extendMDD( mddOId=" << mddOId << ", stripeDomain=" << (stripeDomain?stripeDomain:"(null)") << ", tileDomain=" << (tileDomain?tileDomain:"(null)") << " )" );
+
+ checkForRwTransaction();
+ startRequest(RnpRasserver::cmd_extendmdd);
+ encoder.addInt32Parameter( RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, mddOId.get_string_representation());
+ encoder.addStringParameter(RnpRasserver::pmt_domain, stripeDomain);
+ encoder.addStringParameter(RnpRasserver::pmt_domain, tileDomain);
+ TALK( "request RnpRasserver::cmd_extendmdd with oid " << mddOId.get_string_representation() << ", stripeDomain " << stripeDomain << ", tileDomain " << tileDomain << ", clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::extendMDD()" );
+}
+
+vector<r_OId> RnpClientComm::getOIdsFromCollection( const char* collName ) throw( r_Error )
+{
+ ENTER( "RnpClientComm::getOIdsFromCollection( collName=" << (collName?collName:"(null)") << " )" );
+
+ vector<r_OId> result = executeGetOIdsFromCollection ( collName, r_OId());
+
+ LEAVE( "RnpClientComm::getOIdsFromCollection()" );
+ return result;
+}
+
+vector<r_OId> RnpClientComm::getOIdsFromCollection( const r_OId& oid ) throw( r_Error )
+{
+ ENTER( "RnpClientComm::getOIdsFromCollection( oid=" << oid << " )" );
+
+ vector<r_OId> result = executeGetOIdsFromCollection ( 0, oid );
+
+ LEAVE( "RnpClientComm::getOIdsFromCollection()" );
+ return result;
+}
+
+vector<r_OId> RnpClientComm::executeGetOIdsFromCollection ( const char* collName, const r_OId& oid ) throw( r_Error )
+{
+ ENTER( "RnpClientComm::executeGetOIdsFromCollection( collName=" << (collName?collName:"(null)") << ", oid=" << oid << " )" );
+
+ startRequest(RnpRasserver::cmd_getcolloids);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ TALK( "request RnpRasserver::cmd_getcolloids with clientID 0x" << hex << clientID << dec );
+ if( collName != NULL)
+ {
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+ TALK( "RnpClientComm::executeGetOIdsFromCollection() adding collName " << collName );
+ }
+ else
+ {
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
+ TALK( "RnpClientComm::executeGetOIdsFromCollection() adding oid " << oid.get_string_representation() );
+ }
+
+ sendRequestGetAnswer();
+
+ int status = decoder.getDataAsInteger();
+ // we have to read all this, but we don't use them here
+ decoder.getNextParameter(); const char *typeName = decoder.getDataAsString();
+ TALK( "RnpClientComm::executeGetOIdsFromCollection() received typeName " << typeName );
+ decoder.getNextParameter(); const char *typeStructure = decoder.getDataAsString();
+ TALK( "RnpClientComm::executeGetOIdsFromCollection() received typeStructure " << typeStructure );
+ decoder.getNextParameter(); const char *oidstring = decoder.getDataAsString();
+ TALK( "RnpClientComm::executeGetOIdsFromCollection() received oid " << oidstring );
+ decoder.getNextParameter(); const char *collectionName= decoder.getDataAsString();
+ TALK( "RnpClientComm::executeGetOIdsFromCollection() received collectionName " << collectionName );
+
+ if( status != 0 && status != 1 )
+ {
+ r_Error err;
+ switch( status )
+ {
+ case 2:
+ LEAVE( "RnpClientComm::executeGetOIdsFromCollection(): exception, status = " << status );
+ err = r_Error( r_Error::r_Error_ObjectUnknown );
+ break;
+ case 3:
+ LEAVE( "RnpClientComm::executeGetOIdsFromCollection(): exception, status = " << status );
+ err = r_Error( r_Error::r_Error_ClientUnknown );
+ break;
+ default:
+ LEAVE( "RnpClientComm::executeGetOIdsFromCollection(): exception, status = " << status );
+ err = r_Error( r_Error::r_Error_TransferFailed );
+ break;
+ }
+ clearAnswer();
+ throw err;
+ }
+
+ // create the set
+ vector<r_OId> result;
+ // fill set with oids
+ if( status == 0 )
+ {
+ while(decoder.getNextParameter() != 0)
+ {
+ r_OId roid( decoder.getDataAsString() );
+ TALK( "RnpClientComm::executeGetOIdsFromCollection() received oid set component " << roid );
+
+ result.push_back(roid);
+ }
+ }
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::executeGetOIdsFromCollection()" );
+ return result;
+ }
+
+vector<r_Minterval> RnpClientComm::getTileDomains(r_OId mddOId, const char *stripeDomain) throw( r_Error )
+{
+ ENTER( "RnpClientComm::getTileDomains( mddOId=" << mddOId << ", stripeDomain=" << (stripeDomain?stripeDomain:"(null)") << " )" );
+
+ startRequest(RnpRasserver::cmd_gettiledomains);
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, mddOId.get_string_representation());
+ encoder.addStringParameter(RnpRasserver::pmt_domain, stripeDomain);
+ TALK( "request RnpRasserver::cmd_gettiledomains with oid " << mddOId.get_string_representation() << ", stripeDomain " << stripeDomain << ", clientID 0x" << hex << clientID << dec );
+
+ sendRequestGetAnswer();
+
+ vector<r_Minterval> result;
+
+ const RnpParameter *currParam = decoder.getFirstParameter();
+
+ while(currParam)
+ {
+ r_Minterval interval(decoder.getDataAsString());
+ TALK( "RnpClientComm::getTileDomains() received minterval " << interval );
+
+ result.push_back(interval);
+
+ currParam = decoder.getNextParameter();
+ }
+
+ clearAnswer();
+
+ LEAVE( "RnpClientComm::getTileDomains() -> " << result );
+ return result;
+}
+
diff --git a/rnprotocol/rnpcommunication.cc b/rnprotocol/rnpcommunication.cc
new file mode 100644
index 0000000..65c5ed8
--- /dev/null
+++ b/rnprotocol/rnpcommunication.cc
@@ -0,0 +1,702 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/****************************************************************************
+ *
+ *
+ * COMMENTS:
+ * - FIXME: uses assert() !!! -- PB 2003-nov-22
+ *
+ ****************************************************************************/
+
+#include <assert.h>
+#include <rnpcommunication.hh>
+
+#ifdef AFTERV52
+#include <rnpexception.hh>
+#endif
+
+#include "debug.hh"
+#include "raslib/rminit.hh" // for RNP_COMM_TIMEOUT
+
+using namespace rnp;
+
+RnpClientJob::RnpClientJob() throw()
+ {
+ }
+
+void RnpClientJob::init(CommBuffer* transmitterBuffer, RnpBaseClientComm* newClientComm) throw()
+ {
+ ENTER( "RnpClientJob::init" );
+
+ if ( ! ( transmitterBuffer != 0 ) )
+ {
+ TALK( "RnpClientJob::init(): warning: assert will fire." );
+ }
+ assert(transmitterBuffer != 0);
+ if ( ! ( newClientComm != 0 ) )
+ {
+ TALK( "RnpClientJob::init(): warning: assert will fire." );
+ }
+ assert(newClientComm != 0);
+
+ rnpReceiver.reset();
+ answerOk = false;
+ currentBufferPtr = transmitterBuffer;
+ clientCommPtr = newClientComm;
+ invalidFormat = false;
+
+ status=wks_notdefined;
+
+ LEAVE( "RnpClientJob::init" );
+ }
+
+void RnpClientJob::clearAnswerBuffer() throw()
+ {
+ rnpReceiver.reset();
+ }
+
+void RnpClientJob::resetState() throw()
+ {
+ ENTER( "RnpClientJob::resetState" );
+
+ clearConnection();
+
+ clientCommPtr->jobIsReady();
+
+ status = wks_notdefined;
+
+ LEAVE( "RnpClientJob::resetState" );
+ }
+void RnpClientJob::processRequest() throw()
+ {
+ ENTER( "RnpClientJob::processRequest" );
+
+ answerOk = true;
+
+ invalidFormat = false;
+
+ resetState();
+
+ LEAVE( "RnpClientJob::processRequest" );
+ }
+
+bool RnpClientJob::validateMessage() throw()
+ {
+ ENTER( "RnpClientJob::validateMessage()" );
+
+ bool validated = rnpReceiver.validateMessage();
+
+ currentBufferPtr = rnpReceiver.getCurrentBuffer();
+
+ if( validated == true)
+ {
+ status=wks_processing;
+ LEAVE( "RnpClientJob::validateMessage() -> true" );
+ return true;
+ }
+
+ if(rnpReceiver.isDiscarding())
+ {
+ TALK( "RnpClientJob::validateMessage - discarding message" );
+ resetState();
+ answerOk = false;
+ invalidFormat = true;
+ }
+ answerOk = false;
+
+ LEAVE( "RnpClientJob::validateMessage() -> false" );
+ return false;
+ }
+
+void RnpClientJob::executeOnWriteReady() throw()
+ {
+ ENTER( "RnpClientJob::executeOnWriteReady()" );
+
+ rnpReceiver.reset();
+
+ currentBufferPtr->freeBuffer();
+
+ currentBufferPtr = rnpReceiver.getCurrentBuffer();
+
+ readyToReadAnswer();
+
+ LEAVE( "RnpClientJob::executeOnWriteReady()" );
+ }
+
+void RnpClientJob::specificCleanUpOnTimeout() throw()
+ {
+ ENTER( "RnpClientJob::specificCleanUpOnTimeout()" );
+
+ answerOk = false;
+ resetState();
+
+ LEAVE( "RnpClientJob::specificCleanUpOnTimeout()" );
+ }
+
+void RnpClientJob::executeOnReadError() throw()
+ {
+ ENTER( "RnpClientJob::executeOnReadError()" );
+
+ answerOk = false;
+ resetState();
+
+ LEAVE( "RnpClientJob::executeOnReadError()" );
+ }
+
+void RnpClientJob::executeOnWriteError() throw()
+ {
+ ENTER( "RnpClientJob::executeOnWriteError()" );
+
+ answerOk = false;
+ resetState();
+
+ LEAVE( "RnpClientJob::executeOnWriteError()" );
+ }
+
+CommBuffer* RnpClientJob::getAnswerBuffer() throw()
+ {
+ return rnpReceiver.getMessageBuffer();
+ }
+
+bool RnpClientJob::isAnswerOk() throw()
+ {
+ return answerOk;
+ }
+
+bool RnpClientJob::isInvalidFormat() throw()
+ {
+ return invalidFormat;
+ }
+
+
+//###################################################
+
+RnpBaseClientComm::RnpBaseClientComm(RnpQuark theServerType, RnpTransport::CarrierProtocol theProtocol) throw()
+ {
+ ENTER( "RnpBaseClientComm::RnpBaseClientComm( serverType="<<theServerType<<" protocol="<<theProtocol << " )" );
+
+ serverHost = NULL;
+ serverPort = 0;
+ serverType = theServerType;
+ carrierProtocol = theProtocol;
+
+ initDefaultCommunication();
+ maxRetry = 0; // # of RE-tries -- PB 2005-aug-31
+
+ LEAVE( "RnpBaseClientComm::RnpBaseClientComm()" );
+ }
+
+RnpBaseClientComm::RnpBaseClientComm(const char* theServerHost, int theServerPort, RnpQuark theServerType, RnpTransport::CarrierProtocol theProtocol) throw()
+ {
+ ENTER( "RnpBaseClientComm::RnpBaseClientComm( server="<<theServerHost<<" port="<<theServerPort<<" serverType="<<theServerType<<" protocol="<<theProtocol << " )" );
+
+ if ( ! ( theServerHost != 0 ) )
+ {
+ TALK( "RnpBaseClientComm::RnpBaseClientComm(): warning: assert will fire." );
+ }
+ assert(theServerHost != 0);
+ if ( ! ( theServerPort > 0 ) )
+ {
+ TALK( "RnpBaseClientComm::RnpBaseClientComm(): warning: assert will fire." );
+ }
+ assert(theServerPort > 0);
+
+ serverHost = theServerHost;
+ serverPort = theServerPort;
+ serverType = theServerType;
+ carrierProtocol = theProtocol;
+
+ initDefaultCommunication();
+
+ maxRetry = 0; // # of RE-tries -- PB 2005-aug-31
+
+ LEAVE( "RnpBaseClientComm::RnpBaseClientComm()" );
+ }
+RnpBaseClientComm::~RnpBaseClientComm() throw()
+ {
+ }
+
+void RnpBaseClientComm::setConnectionParameters(const char* theServerHost, int theServerPort) throw()
+ {
+ ENTER( "RnpBaseClientComm::setConnectionParameters( server="<<theServerHost<<" port="<<theServerPort << " )" );
+
+ if ( ! ( theServerHost != 0 ) )
+ {
+ TALK( "RnpBaseClientComm::setConnectionParameters(): warning: assert will fire." );
+ }
+ assert(theServerHost != 0);
+ if ( ! ( theServerPort > 0 ) )
+ {
+ TALK( "RnpBaseClientComm::setConnectionParameters(): warning: assert will fire." );
+ }
+ assert(theServerPort > 0);
+
+ serverHost = theServerHost;
+ serverPort = theServerPort;
+
+ LEAVE( "RnpBaseClientComm::setConnectionParameters()" );
+ }
+
+void RnpBaseClientComm::setCarrierProtocol(RnpTransport::CarrierProtocol theProtocol) throw()
+ {
+ carrierProtocol = theProtocol;
+ }
+
+RnpTransport::CarrierProtocol RnpBaseClientComm::getCarrierProtocol() throw()
+ {
+ return carrierProtocol;
+ }
+
+void RnpBaseClientComm::initDefaultCommunication() throw()
+ {
+ ENTER( "RnpBaseClientComm::initDefaultCommunication()" );
+
+ communicatorPtr = &internalCommunicator;
+
+ communicatorPtr->initJobs(1);
+ communicatorPtr->setTimeout(RNP_COMM_TIMEOUT,0); // defined in raslib/rminit.hh -- PB 2005-sep-09
+
+ communicatorPtr->attachJob(clientJob);
+
+ // not necessary? transmitterBuffer.allocate(RNP_DEFAULTBUFFERSIZE);
+
+ LEAVE( "RnpBaseClientComm::initDefaultCommunication()" );
+ }
+
+
+void RnpBaseClientComm::jobIsReady() throw()
+ {
+ ENTER( "RnpBaseClientComm::jobIsReady()" );
+
+ communicatorPtr->shouldExit();
+
+ LEAVE( "RnpBaseClientComm::jobIsReady()" );
+ }
+
+void RnpBaseClientComm::startRequest(RnpQuark command, int transmitterBufferSize)
+ {
+ ENTER( "RnpBaseClientComm::startRequest( command="<<command<<" transmitterBufferSize="<<transmitterBufferSize << " )" );
+
+ transmitterBuffer.allocate(transmitterBufferSize);
+
+ clientJob.init(&transmitterBuffer,this);
+
+ encoder.setBuffer(&transmitterBuffer);
+
+ encoder.startRequest(serverType, carrierProtocol);
+ encoder.startFragment(Rnp::fgt_Command, command);
+
+ LEAVE( "RnpBaseClientComm::startRequest()" );
+ }
+
+bool RnpBaseClientComm::sendRequestGetAnswer()
+ {
+ ENTER( "RnpBaseClientComm::sendMessageGetAnswer()" );
+
+ if ( ! ( serverHost != NULL ) )
+ {
+ TALK( "RnpBaseClientComm::sendRequestGetAnswer(): warning: assert will fire." );
+ }
+ assert(serverHost != NULL);
+ if ( ! ( serverPort > 0 ) )
+ {
+ TALK( "RnpBaseClientComm::sendRequestGetAnswer(): warning: assert will fire." );
+ }
+ assert(serverPort > 0);
+
+ encoder.endFragment();
+ encoder.endMessage();
+
+ bool connected = false;
+ for (int retry = 0; retry < maxRetry+1 && !connected; retry++) // NB: first attempt + RE-tries! -- PB 2005-aug-31
+ {
+ connected = clientJob.connectToServer(serverHost,serverPort);
+ }
+
+ if(connected == false)
+ {
+#ifdef AFTERV52
+ LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): exception - cannot connect to server "<<serverHost<<":"<<serverPort );
+ throw RnpIOException(clientJob.getErrno());
+#endif
+ LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): -> false" );
+ return false;
+ }
+
+ communicatorPtr->runClient();
+
+ if(clientJob.isAnswerOk()== false)
+ {
+#ifdef AFTERV52
+ LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): exception - answer not OK" );
+ if(clientJob.isInvalidFormat()) throw RnpInvalidFormatException();
+ else throw RnpIOException(clientJob.getErrno());
+#endif
+ LEAVE( "RnpBaseClientComm::sendMessageGetAnswer(): -> false" );
+ return false;
+ }
+
+ CommBuffer* receiverBuffer = clientJob.getAnswerBuffer();
+ decoder.decode(receiverBuffer);
+ decoder.getFirstFragment();
+
+ LEAVE( "RnpBaseClientComm::sendMessageGetAnswer() -> true");
+ return true;
+ }
+
+bool RnpBaseClientComm::checkForExceptions()
+ {
+ if(decoder.getFragmentType() != Rnp::fgt_Error) return false;
+ return true;
+ }
+
+void RnpBaseClientComm::clearAnswer() throw()
+ {
+ clientJob.clearAnswerBuffer();
+ }
+
+void RnpBaseClientComm::setMaxRetry(unsigned int newMaxRetry)
+ {
+ maxRetry = newMaxRetry;
+ }
+
+unsigned int RnpBaseClientComm::getMaxRetry()
+ {
+ return maxRetry;
+ }
+//############# Server side ################################################
+//#######################################################################
+//#######################################################################
+
+RnpServerJob::RnpServerJob() throw()
+ {
+ }
+
+void RnpServerJob::init(RnpBaseServerComm* theServerComm) throw()
+ {
+ if ( ! ( theServerComm != 0 ) )
+ {
+ TALK( "RnpServerJob::init(): warning: assert will fire." );
+ }
+ assert(theServerComm != 0);
+
+ rnpReceiver.reset();
+ currentBufferPtr = rnpReceiver.getCurrentBuffer();
+ serverCommPtr = theServerComm;
+
+ status=wks_accepting;
+ }
+
+void RnpServerJob::processRequest() throw()
+ {
+ serverCommPtr->processRequest(currentBufferPtr, &transmiterBuffer, rnpReceiver.getCarrierProtocol(), this);
+
+ rnpReceiver.reset();
+
+ currentBufferPtr = &transmiterBuffer;
+
+ readyToWriteAnswer();
+ }
+
+bool RnpServerJob::validateMessage() throw()
+ {
+
+ bool validated = false;
+
+ if(rnpReceiver.validateMessage() == true)
+ {
+ status=wks_processing;
+ validated = true;
+ }
+
+ if(rnpReceiver.isDiscarding())
+ {
+ resetJob();
+ validated = false;
+ }
+
+ currentBufferPtr = rnpReceiver.getCurrentBuffer();
+
+ return validated;
+ }
+
+void RnpServerJob::executeOnWriteReady() throw()
+ {
+ resetJob();
+ }
+
+void RnpServerJob::executeOnAccept() throw()
+ {
+ }
+
+void RnpServerJob::specificCleanUpOnTimeout() throw()
+ {
+ // initial era gol, dar...
+ // clearConnection face cine apeleaza: NbJob::cleanUpIfTimeout()
+ rnpReceiver.reset();
+
+ transmiterBuffer.freeBuffer();
+
+ currentBufferPtr = rnpReceiver.getCurrentBuffer();
+
+ currentBufferPtr->clearToRead();
+
+ status=wks_accepting;
+ }
+
+void RnpServerJob::executeOnReadError() throw()
+ {
+ resetJob();
+ }
+
+void RnpServerJob::executeOnWriteError() throw()
+ {
+ resetJob();
+ }
+
+void RnpServerJob::resetJob() throw()
+ {
+ clearConnection();
+
+ rnpReceiver.reset();
+
+ transmiterBuffer.freeBuffer();
+
+ currentBufferPtr = rnpReceiver.getCurrentBuffer();
+
+ currentBufferPtr->clearToRead();
+
+ status=wks_accepting;
+ }
+
+//###################################################
+RnpBaseServerComm::RnpBaseServerComm() throw()
+ {
+ nrServerJobs = 1;
+
+ transmitterBufferSize = RNP_DEFAULTBUFFERSIZE;
+
+ communicator = NULL;
+ }
+
+RnpBaseServerComm::~RnpBaseServerComm() throw()
+ {
+ disconnectFromCommunicator();
+ }
+
+bool RnpBaseServerComm::setServerJobs(int nrOfServerJobs) throw()
+ {
+ if(communicator != 0 ) return false;
+
+ nrServerJobs = nrOfServerJobs;
+
+ return true;
+ }
+
+int RnpBaseServerComm::countServerJobs() throw()
+ {
+ return nrServerJobs;
+ }
+
+void RnpBaseServerComm::connectToCommunicator(NbCommunicator &theCommunicator)
+ { // throws whatever 'new' throws
+ if ( ! ( communicator == NULL ) )
+ {
+ TALK( "RnpServerJob::init(): warning: assert will fire." );
+ }
+ assert(communicator == NULL);
+
+ communicator = &theCommunicator;
+
+ for(int i=0; i<nrServerJobs;i++)
+ {
+ RnpServerJob* job = createJob();
+
+ job->init(this);
+
+ communicator->attachJob(*job);
+
+ serverJob.push_back(job);
+ }
+ }
+
+bool RnpBaseServerComm::disconnectFromCommunicator() throw()
+ {
+ if(communicator == NULL) return false;
+
+ for(int i=0; i<nrServerJobs;i++)
+ {
+ communicator->deattachJob(*(serverJob[i]));
+
+ delete serverJob[i];
+ }
+
+ serverJob.clear();
+
+ communicator = NULL;
+
+ return true;
+ }
+
+RnpServerJob* RnpBaseServerComm::createJob()
+ {
+ return new RnpServerJob;
+ }
+
+
+void RnpBaseServerComm::setTransmitterBufferSize(int nSize) throw()
+ {
+ transmitterBufferSize = nSize;
+ }
+
+int RnpBaseServerComm::getTransmitterBufferSize() throw()
+ {
+ return transmitterBufferSize;
+ }
+
+
+void RnpBaseServerComm::processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol protocol, RnpServerJob *callingJob) throw()
+ {
+ // use 'callingJob' to get info about the client (hostaddress, etc)
+
+ decoder.decode(receiverBuffer);
+ RnpQuark destServerType = decoder.getDestinationServerType();
+ Rnp::Endianness desEndianness = decoder.getDesiredEndianness();
+
+ // test if servertype matches!
+
+ transmiterBuffer->allocate(transmitterBufferSize);
+ transmiterBuffer->clearToRead();
+
+ encoder.setBuffer(transmiterBuffer);
+ encoder.setFinalEndianness(desEndianness);
+ encoder.startAnswer(destServerType, protocol);
+
+ decoder.getFirstFragment();
+ bool wasError = false;
+ for(int fragment=0; fragment < decoder.countFragments(); fragment++)
+ {
+ if(wasError == false)
+ {
+ try
+ {
+ decodeFragment();
+ }
+ catch(...)
+ {
+ wasError = true;
+ answerUnknownError();
+ }
+ }
+ else
+ {
+ discardFragment();
+ }
+ decoder.getNextFragment();
+ }
+ encoder.endMessage();
+ }
+
+const char* RnpBaseServerComm::getNextAsString(RnpQuark parameterType) const
+ {
+ decoder.getNextParameter();
+ //if(decoder.getParameterType != parameterType) throw something
+ return decoder.getDataAsString();
+ }
+
+int RnpBaseServerComm::getNextAsInteger(RnpQuark parameterType) const
+ {
+ decoder.getNextParameter();
+ //if(decoder.getParameterType != parameterType) throw something
+ return decoder.getDataAsInteger();
+ }
+
+float RnpBaseServerComm::getNextAsFloat(RnpQuark parameterType) const
+ {
+ decoder.getNextParameter();
+ //if(decoder.getParameterType != parameterType) throw something
+ return decoder.getDataAsFloat();
+ }
+
+double RnpBaseServerComm::getNextAsDouble(RnpQuark parameterType) const
+ {
+ decoder.getNextParameter();
+ //if(decoder.getParameterType != parameterType) throw something
+ return decoder.getDataAsDouble();
+ }
+
+const void* RnpBaseServerComm::getNextAsOpaque(RnpQuark parameterType) const
+ {
+ decoder.getNextParameter();
+ //if(decoder.getParameterType != parameterType) throw something
+ return decoder.getDataAsOpaque();
+ }
+
+int RnpBaseServerComm::getCurrentParameterLength() const throw()
+ {
+ return decoder.getDataLength();
+ }
+
+void RnpBaseServerComm::answerSTLException(exception &ex) throw()
+ {
+ encoder.startFragment(Rnp::fgt_Error, decoder.getCommand());
+ encoder.addInt32Parameter(Rnp::ert_StlException, 0);
+ encoder.addStringParameter(Rnp::erp_whatValue, ex.what());
+ encoder.endFragment();
+ }
+
+void RnpBaseServerComm::answerUnknownError() throw()
+ {
+ encoder.startFragment(Rnp::fgt_Error, decoder.getCommand());
+ encoder.addInt32Parameter(Rnp::ert_Unknown, 0);
+ encoder.endFragment();
+ }
+
+void RnpBaseServerComm::discardFragment() throw()
+ {
+ encoder.startFragment(Rnp::fgt_DiscardedRequest, decoder.getCommand());
+
+ encoder.endFragment();
+ }
+
+void RnpBaseServerComm::startOkAnswer() throw()
+ {
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ }
+
+void RnpBaseServerComm::endOkAnswer() throw()
+ {
+ encoder.endFragment();
+ }
+
+void RnpBaseServerComm::communicatorShouldExit() throw()
+ {
+ if ( ! ( communicator != NULL ) )
+ {
+ TALK( "RnpServerJob::init(): warning: assert will fire." );
+ }
+ assert(communicator != NULL);
+
+ communicator->shouldExit();
+ }
+
diff --git a/rnprotocol/rnpcommunication.hh b/rnprotocol/rnpcommunication.hh
new file mode 100644
index 0000000..56aca81
--- /dev/null
+++ b/rnprotocol/rnpcommunication.hh
@@ -0,0 +1,345 @@
+#ifndef RNPCOMMUNICATION_HH
+#define RNPCOMMUNICATION_HH
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/****************************************************************************
+ *
+ *
+ * COMMENTS:
+ *
+ *
+ ****************************************************************************/
+
+#include "network/akgnetwork.hh"
+#include "rnprotocol/rnpembedded.hh"
+#include <vector>
+
+#ifdef AFTERV52
+ #include <akglogging.hh>
+ #include <akg_exception.hh>
+#else
+ #include <exception>
+#endif
+
+namespace rnp
+ {
+using namespace akg;
+using namespace std;
+
+class RnpBaseClientComm;
+
+/** This class represents the RNP client job. It taker a CommBuffer, sends its data
+ and receives the answer. Is directy owned and used by 'RnpBaseClientComm',
+ so you don't have to worry about it
+ Be aware that the transmitter buffer is freed after transmission! */
+class RnpClientJob : public NbClientJob
+ {
+ public:
+ /// Default constructor
+ RnpClientJob() throw();
+
+ /** Initialization: takes the tarnsmitter buffer containing data to be send
+ and a pointer to a Communicator object, which will coordinate the transmission
+ Assert: transmitterBuffer!=0, newClientComm !=0 */
+ void init(CommBuffer *transmitterBuffer, RnpBaseClientComm *newClientComm) throw();
+
+ /// Call-back function for the Communicator.
+ void processRequest() throw();
+
+ /** Returns a pointer to the buffer containing the answer. The buffer
+ holds only the RNP message, without carrier header */
+ CommBuffer* getAnswerBuffer() throw();
+
+ /// Returns 'true' if the answer was correctly received
+ bool isAnswerOk() throw();
+
+ /// Returns true if the format of the received message is not valid RNP and was discarded
+ bool isInvalidFormat() throw();
+
+ /** Clears the answer buffer. Important if huge amount of data where received.
+ The buffer is cleared by the next transmission, also. */
+ void clearAnswerBuffer() throw();
+ protected:
+ /// (See the explanations from NbJob)
+ bool validateMessage() throw();
+ void executeOnWriteReady() throw();
+ void specificCleanUpOnTimeout() throw();
+ void executeOnReadError() throw();
+ void executeOnWriteError() throw();
+
+ /// Resets the object: clears the connection and marks the job as ready
+ void resetState() throw();
+ private:
+ RnpBaseClientComm *clientCommPtr;
+
+ RnpReceiver rnpReceiver;
+ bool answerOk;
+ bool invalidFormat;
+ };
+
+/**
+ RnpBaseClientComm is the base class for the client communication. It offers
+ the necessary elements for creating the request, send it, receive the answer
+ and decode it. Every specific client comm will inherit from this and will
+ implement the various functions using the functions provided by this class.
+
+ It has a private NbCommunicator object, but if you need a shared one
+ be my guest. The RnpClientJob is its own also and this stays like that!
+*/
+class RnpBaseClientComm
+ {
+ public:
+ /// Constructor taking the server type and the carrier protocol
+ RnpBaseClientComm(RnpQuark serverType, RnpTransport::CarrierProtocol = RnpTransport::crp_Rnp) throw();
+
+ /** Constructor taking also the connection info for the server
+ Assert: serverHost != 0, serverPort > 0 */
+ RnpBaseClientComm(const char* serverHost, int serverPort, RnpQuark serverType, RnpTransport::CarrierProtocol = RnpTransport::crp_Rnp) throw();
+
+ /// Destructor
+ virtual ~RnpBaseClientComm() throw();
+
+ /** Set the connection parameter
+ Assert: serverHost != 0, serverPort > 0 */
+ void setConnectionParameters(const char* serverHost, int serverPort) throw();
+
+ /// Set the carrier protocol
+ void setCarrierProtocol(RnpTransport::CarrierProtocol) throw();
+ /// Returns the used carrier protocol
+ RnpTransport::CarrierProtocol getCarrierProtocol() throw();
+
+ // callback from RnpClientJob
+ void jobIsReady() throw();
+
+ // Set the maximal retry count (retries to connect to the server)
+ void setMaxRetry(unsigned int newMaxRetry);
+
+ /// Returns the maximal retry count
+ unsigned int getMaxRetry();
+
+ protected:
+ // stuff for helping creating the function calls
+ RnpQuark serverType;
+ RnpTransport::CarrierProtocol carrierProtocol;
+
+ /// Start building the request, might throw whatever new throws
+ void startRequest(RnpQuark command, int transmitterBufferSize = RNP_DEFAULTBUFFERSIZE);
+
+ /** Does the dirty work: sends the request and brings the answer
+ Later it will throw various exceptions, but for now it only
+ returns 'true' if everything is OK
+ Assert: serverHost != 0, serverPort > 0 */
+ bool sendRequestGetAnswer();
+
+ /** detects an exception as answer and throws it. this version only Akg and STL
+ returns true if there is an exception, but can't reassemble it
+ returns false if there is a correct answer, no exception
+ doesn't return, but throws, if there is an exception and it can reassemble it*/
+ virtual bool checkForExceptions();
+
+#ifdef AFTERV52
+ // reassembles and throws an AkgSerializableException. Returns if it isn't an Akg...
+ void reassembleAkgSerializable() throw(AkgSerializableException);
+
+ // reassembles and throws a STL-exception. Returns only if it isn't a stl-exception
+ void reassembleStlException() throw(RnpStlException);
+#endif
+ /// Clear the answer when you don't need it any more, memory is released
+ void clearAnswer() throw();
+
+ /** Default communication init, build another init() if you don't like this
+ This sets 1 job, 60sec as timeout, attaches the internal job.
+ Be aware that this timeout is not the timeout of the client job,
+ but the one of the communicator */
+ void initDefaultCommunication() throw();
+
+ // encoding and decoding
+ RnpProtocolDecoder decoder;
+ RnpTransmitter encoder;
+ CommBuffer transmitterBuffer; // to go, use internal of encoder
+
+ // stuff for non blocking communication
+ RnpClientJob clientJob; // the client job
+ NbCommunicator *communicatorPtr; // the communicator to be used
+ NbCommunicator internalCommunicator; // an internal communicator, if you dont like that you put another one
+
+ // connection parameters
+ const char* serverHost;
+ unsigned int serverPort;
+ unsigned int maxRetry;
+
+ /// Helper function for ptinting the current parameter
+ void printCurrentParameter() throw();
+ };
+
+
+//############ Server side ###################################
+
+class RnpBaseServerComm;
+
+/** This class represents the RNP server job. It receives the request, sends it to 'RnpBaseServerComm'
+ for processing and gets from there the answer which it transmittes to the client
+*/
+class RnpServerJob : public NbServerJob
+ {
+ public:
+ /// Default constructor
+ RnpServerJob() throw();
+
+ /** Initialization: it connects to the given 'RnpBaseServerComm'
+ Assert: theServerComm != 0 */
+ void init(RnpBaseServerComm*) throw();
+
+ /// Calls the 'RnpBaseServerComm->processRequest()' and than initiates the transmission
+ void processRequest() throw();
+
+ protected:
+ /// (See explanations from NbJob)
+ bool validateMessage() throw();
+ void executeOnAccept() throw();
+ void executeOnWriteReady() throw();
+ void specificCleanUpOnTimeout() throw();
+ void executeOnReadError() throw();
+ void executeOnWriteError() throw();
+
+ void resetJob() throw();
+
+ RnpBaseServerComm *serverCommPtr;
+
+ RnpReceiver rnpReceiver;
+
+ CommBuffer transmiterBuffer;
+ };
+
+/**
+ RnpBaseServerComm is the base class for the server communication. It offers
+ the necessary elements for decoding the request, and for creating and transmitting
+ the answer. Every specific server comm will inherit from this and will
+ implement the various functions, most important the 'processRequest()',
+ using the elements provided by this class.
+
+ It has a pool of 'RnpServerJob's which deal with the communication. Whichever has
+ a valid request calls 'processRequest()'. The communicator object is external
+*/
+
+class RnpBaseServerComm
+ {
+ public:
+ /// Default constructor - 1 server job
+ RnpBaseServerComm() throw();
+
+ /// Destructor
+ virtual ~RnpBaseServerComm() throw();
+
+ /** Sets the number of server jobs, only if there is no connection to a communicator
+ Otherwise it changes nothing and returns 'false' */
+ bool setServerJobs(int nrOfServerJobs) throw();
+
+ /// Returns the number of server jobs
+ int countServerJobs() throw();
+
+ /// Connect to the communicator. It also creates the jobs. Throws whatever new throws. Assert: no other connection!
+ void connectToCommunicator(NbCommunicator&);
+
+ /** Disconnect the jobs from the communicator and destroys them.
+ Returns 'false' if there wasn't any connection to a communicator */
+ bool disconnectFromCommunicator() throw();
+
+ /// Set the transmitter buffer size
+ void setTransmitterBufferSize(int) throw();
+
+ /// Returns the transmitter buffer size
+ int getTransmitterBufferSize() throw();
+
+ /** The heart of the class. It takes the request, decodes it, sends every fragment
+ to the 'decodeFragment()', which has to dispatch the commands to the specific
+ functions. These functions have to use 'decoder' and 'encoder' to do their job and
+ might throw whatever is appropriate. 'processRequest()' catches 'AkgException',
+ 'exception' and (...) and converts them for transmission.
+ If you don't like this version, make another one */
+ virtual void processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol, RnpServerJob *callingJob) throw();
+
+ /** Instructs the communicator that it should exit. Usefull to implement
+ 'down server' commands */
+ void communicatorShouldExit() throw();
+
+ protected:
+ /** Called by 'processRequest' to dispatch to the specific functions
+ Might throw whatever appropriate */
+ virtual void decodeFragment() = 0;
+
+ /// Returns next parameter as string(can be NULL), verifying the parameter type.
+ const char* getNextAsString(RnpQuark parameterType) const;
+
+ /// Returns next parameter as integer, verifying the parameter type.
+ int getNextAsInteger(RnpQuark parameterType) const;
+
+ /// Returns next parameter as float, verifying the parameter type.
+ float getNextAsFloat(RnpQuark parameterType) const;
+
+ /// Returns next parameter as double, verifying the parameter type.
+ double getNextAsDouble(RnpQuark parameterType) const;
+
+ /// Returns next parameter as const void* (can be NULL), verifying the parameter type.
+ const void* getNextAsOpaque(RnpQuark parameterType) const;
+
+ /// Returns the length of the data of the current parameter
+ int getCurrentParameterLength() const throw();
+#ifdef AFTERV52
+ /// Helper function to serialize an 'AkgException'
+ void answerAkgSerializable(AkgSerializableException&) throw();
+#endif
+ /// Helper function to serialize an 'exception' (based on it's 'what()'-member
+ void answerSTLException(exception&) throw();
+
+ /// Helper function to serialize an unknown exception
+ void answerUnknownError() throw();
+
+ /// Helper function to discard a fragment
+ void discardFragment() throw();
+
+ /// Start building an OK-answer
+ void startOkAnswer() throw();
+
+ /// Just for completeness, it's only an 'encoder.endFragment()'
+ void endOkAnswer() throw();
+
+ RnpProtocolDecoder decoder;
+ RnpTransmitter encoder;
+
+ private:
+ /** Creates a server jobs. Default is a RnpServerJob, but you might want
+ some other kind of job */
+ virtual RnpServerJob* createJob();
+
+ vector<RnpServerJob*> serverJob;
+
+ int nrServerJobs;
+
+ NbCommunicator *communicator;
+
+ int transmitterBufferSize;
+ };
+
+} // namespace
+#endif
diff --git a/rnprotocol/rnpembedded.cc b/rnprotocol/rnpembedded.cc
new file mode 100644
index 0000000..c0362bb
--- /dev/null
+++ b/rnprotocol/rnpembedded.cc
@@ -0,0 +1,467 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/****************************************************************************
+ *
+ *
+ * COMMENTS:
+ *
+ ****************************************************************************/
+
+#include <rnpembedded.hh>
+#include <assert.h>
+
+#include "debug.hh"
+
+
+using namespace rnp;
+
+const char* RnpTransport::carrierNames[] =
+ {
+ "(unknown)","RNP","HTTP","(bad)"
+ };
+
+const char* RnpTransport::getCarrierName(RnpTransport::CarrierProtocol x) throw()
+ {
+ if(x < crp_Unknown || x >= crp_HowMany) return carrierNames[0];
+ return carrierNames[x];
+ }
+
+
+const int RnpReceiver::headerBufferLength = 1000;
+
+RnpReceiver::RnpReceiver() throw()
+ {
+ headerBuffer.allocate(headerBufferLength);
+
+ reset();
+ }
+
+RnpReceiver::~RnpReceiver() throw()
+ {
+ }
+
+void RnpReceiver::reset() throw()
+ {
+ rnpMessageBuffer.freeBuffer();
+ headerBuffer.clearToRead();
+ status = waitingHeader;
+ carrier = RnpTransport::crp_Unknown;
+ }
+
+akg::CommBuffer* RnpReceiver::getCurrentBuffer() throw()
+ {
+ return status == readingMessage ? &rnpMessageBuffer : &headerBuffer;
+ }
+
+akg::CommBuffer* RnpReceiver::getMessageBuffer() throw()
+ {
+ return &rnpMessageBuffer;
+ }
+
+RnpTransport::CarrierProtocol
+RnpReceiver::getCarrierProtocol() const throw()
+ {
+ return carrier;
+ }
+
+int RnpReceiver::getCarrierHeaderSize() const throw()
+ {
+ return carrierHeaderLength;
+ }
+
+const void* RnpReceiver::getCarrierHeader() throw()
+ {
+ return headerBuffer.getData();
+ }
+
+bool RnpReceiver::isDiscarding() const throw()
+ {
+ return status == discarding ? true:false;
+ }
+
+bool RnpReceiver::validateMessage() throw()
+ {
+ ENTER( "RnpReceiver::validateMessage()");
+
+ if(status == waitingHeader)
+ { rnpHeader = NULL;
+
+ if(isHttpCarrier() || isRnpCarrier())
+ { // a valid carrier header was detected
+ if(rnpHeader != NULL)
+ { // we can switch to reading the message
+ if(prepareMessageBuffer())
+ {
+ status = readingMessage;
+ }
+ else {
+ status = discarding;
+ LEAVE( "RnpReceiver::validateMessage() -> false - discarding message: not enough memory for message buffer.");
+ return false;
+ }
+ }
+ else
+ {
+ // status == readingHeader, but rnpHeader == NULL
+ // so we wait for some more message
+ LEAVE( "RnpReceiver::validateMessage - wait for more message(s)");
+ return false;
+ }
+ }
+ else
+ { status = discarding;
+ LEAVE( "RnpReceiver::validateMessage() -> false - discarding message: no valid carrier header.");
+ return false;
+ }
+ }
+
+ if(status == readingMessage)
+ {
+ if(rnpMessageBuffer.getNotFilledSize() != 0)
+ {
+ LEAVE( "RnpReceiver::validateMessage() -> false");
+ return false;
+ }
+ }
+ if(status == discarding)
+ {
+ TALK( "RnpReceiver::validateMessage - discarding(3)." );
+ headerBuffer.clearToRead();
+ LEAVE( "RnpReceiver::validateMessage() -> false");
+ return false;
+ }
+ LEAVE( "RnpReceiver::validateMessage() -> true");
+ return true;
+ }
+
+/* the isXXXCarrier() functions have to:
+ - return true if the message is or might be an XXX embedded RnpMessage
+ - set rnpHeader only if there is a valid carrier header
+ - set carrierHeaderLength in this case
+
+ so:
+ - invalid carrier returns false / rnpHeader == NULL
+ - valid carrier but not rnp - false /rnpHeader != NULL
+ - valid carrier but not enough data to be sure it's also valid rnp - true/rnpHeader == NULL
+ - valid carrier & valid rnp header =>true/rnpHeader != NULL carrierHeaderLength set
+*/
+
+bool RnpReceiver::isHttpCarrier() throw()
+ {
+ ENTER( "RnpReceiver::isHttpCarrier()" );
+
+ char *data = (char*)headerBuffer.getData();
+
+ rnpHeader = NULL;
+ carrierHeaderLength = -1;
+
+ bool isHttpReqHeader = strncmp(data,"POST RnpMessage HTTP/1.1",24) ? false : true;
+ bool isHttpAnsHeader = strncmp(data,"HTTP/1.1 200 OK" ,15) ? false : true;
+
+ if(isHttpReqHeader == false && isHttpAnsHeader == false)
+ {
+ LEAVE( "RnpReceiver::isHttpCarrier() -> false" );
+ return false;
+ }
+
+ char *eoHttp = strstr(data,"\r\n\r\n");
+
+ if(eoHttp == NULL)
+ {
+ LEAVE( "RnpReceiver::isHttpCarrier() -> false" );
+ return false;
+ }
+
+ carrierHeaderLength = eoHttp - data + 4;
+
+ if( carrierHeaderLength == -1)
+ {
+ LEAVE( "RnpReceiver::isHttpCarrier() -> false" );
+ return false;
+ }
+
+ if(carrierHeaderLength + sizeof(RnpHeader) > headerBuffer.getDataSize())
+ { // is HTTP carrier, but we need more data to say if it's an embedded RnpMessage
+ LEAVE( "RnpReceiver::isHttpCarrier() -> true" );
+ return true;
+ }
+
+ rnpHeader = (RnpHeader*)(data + carrierHeaderLength);
+
+ bool isRnp = rnpHeader->isRnpMessage();
+
+ if(isRnp)
+ { carrier = RnpTransport::crp_Http;
+ TALK( "RnpReceiver::isHttpCarrier - valid HTTP carrier detected.");
+ }
+
+ LEAVE( "RnpReceiver::isHttpCarrier() -> " << isRnp);
+ return isRnp;
+ }
+
+bool RnpReceiver::isRnpCarrier() throw()
+ {
+ ENTER( "RnpReceiver::isRnpCarrier()" );
+
+ char *data = (char*)headerBuffer.getData();
+
+ rnpHeader = NULL;
+ carrierHeaderLength = -1;
+
+ if(sizeof(RnpHeader) > headerBuffer.getDataSize())
+ { // we need more data to say if it's an RnpMessage
+ LEAVE( "RnpReceiver::isRnpCarrier() -> true" );
+ return true;
+ }
+
+ rnpHeader = (RnpHeader*)data;
+ carrierHeaderLength = 0;
+
+ bool isRnp = rnpHeader->isRnpMessage();
+
+ if(isRnp)
+ { carrier = RnpTransport::crp_Rnp;
+ TALK( "RnpReceiver::isRnpCarrier - valid RNP carrier detected!");
+ }
+
+ LEAVE( "RnpReceiver::isRnpCarrier() -> " << isRnp );
+ return isRnp;
+ }
+
+bool RnpReceiver::prepareMessageBuffer() throw()
+ {
+ if(rnpMessageBuffer.allocate(rnpHeader->getTotalLength()) == false)
+ return false;
+
+ char *data = (char*)headerBuffer.getData();
+
+ rnpMessageBuffer.read(data + carrierHeaderLength, headerBuffer.getDataSize() - carrierHeaderLength);
+
+ RnpHeader *nRnpHeader = (RnpHeader*)rnpMessageBuffer.getData();
+
+ return true;
+ }
+
+//########################################################################################################
+
+RnpTransmitter::RnpTransmitter() throw()
+ {
+ carrier = NULL;
+ carrierType = RnpTransport::crp_Http;
+ }
+
+RnpTransmitter::~RnpTransmitter() throw()
+ {
+ if(carrier != NULL) delete carrier;
+ }
+
+bool RnpTransmitter::startRequest(RnpQuark serverType, RnpTransport::CarrierProtocol desiredProtocol) throw()
+ {
+ getCarrierObject(desiredProtocol);
+
+ if(carrier == NULL) return false;
+
+ startMessage(serverType, carrier->getRequestHeaderLength());
+
+ return true;
+ }
+
+bool RnpTransmitter::startAnswer(RnpQuark serverType, RnpTransport::CarrierProtocol desiredProtocol) throw()
+ {
+ getCarrierObject(desiredProtocol);
+
+ if(carrier == NULL) return false;
+
+ startMessage(serverType, carrier->getAnswerHeaderLength());
+
+ return true;
+ }
+
+akg::CommBuffer* RnpTransmitter::endMessage() throw()
+ {
+ if(carrier == NULL) return NULL;
+
+ akg::CommBuffer *theBuffer = RnpProtocolEncoder::endMessage();
+ carrier->putHeader(theBuffer);
+
+ return theBuffer;
+ }
+
+RnpTransport::CarrierProtocol
+RnpTransmitter::getCarrierProtocol() throw()
+ {
+ return carrierType;
+ }
+
+RnpCarrier* RnpTransmitter::getCarrierObject(RnpTransport::CarrierProtocol desiredProtocol) throw()
+ {
+ carrierType = desiredProtocol;
+
+ if(carrier != NULL) delete carrier;
+
+ switch(carrierType)
+ {
+ case RnpTransport::crp_Rnp : carrier = new RnpCarrier;break;
+ case RnpTransport::crp_Http: carrier = new HttpRnpCarrier;break;
+
+ case RnpTransport::crp_BadCarrier:
+ carrier = new BadRnpCarrier;break;
+
+ default : carrier = NULL;break;
+ }
+
+ return carrier;
+ }
+
+int RnpTransmitter::getBufferSize() const throw()
+ {
+ if ( ! ( commBuffer != 0 ) )
+ {
+ TALK( "RnpTransmitter::getBufferSize(): warning: assert will fire." );
+ }
+ assert(commBuffer != 0);
+ return commBuffer->getBufferSize();
+ }
+
+int RnpTransmitter::getNotFilledSize() const throw()
+ {
+ if ( ! ( commBuffer != 0 ) )
+ {
+ TALK( "RnpTransmitter::getNotFilledSize(): warning: assert will fire." );
+ }
+ assert(commBuffer != 0);
+ return commBuffer->getNotFilledSize();
+ }
+
+int RnpTransmitter::getDataSize() const throw()
+ {
+ if ( ! ( commBuffer != 0 ) )
+ {
+ TALK( "RnpTransmitter::getDataSize(): warning: assert will fire." );
+ }
+ assert(commBuffer != 0);
+ return commBuffer->getDataSize();
+ }
+
+//################################################
+
+RnpCarrier::RnpCarrier() throw()
+ {
+ type = RnpTransport::crp_Rnp;
+ }
+
+RnpCarrier::~RnpCarrier() throw()
+ {
+ }
+
+RnpTransport::CarrierProtocol RnpCarrier::getType() throw()
+ {
+ return type;
+ }
+
+int RnpCarrier::getRequestHeaderLength() throw()
+ {
+ requestHeader = true;
+ return 0;
+ }
+
+int RnpCarrier::getAnswerHeaderLength() throw()
+ {
+ requestHeader = false;
+ return 0;
+ }
+
+void RnpCarrier::putHeader(akg::CommBuffer*) throw()
+ { // nothing!!
+ }
+
+
+HttpRnpCarrier::HttpRnpCarrier() throw()
+ {
+ type = RnpTransport::crp_Http;
+ }
+
+
+int HttpRnpCarrier::getRequestHeaderLength() throw()
+ {
+ requestHeader = true;
+ return strlen(theRequestHeader);
+ }
+
+int HttpRnpCarrier::getAnswerHeaderLength() throw()
+ {
+ requestHeader = false;
+ return strlen(theAnswerHeader);
+ }
+
+void HttpRnpCarrier::putHeader(akg::CommBuffer *messageBuffer) throw()
+ {
+ char *data = (char*)messageBuffer->getData();
+
+ const char *header = requestHeader ? theRequestHeader : theAnswerHeader;
+ int headerLength = strlen(header);
+ int posOfLength = headerLength - 14;
+
+ strncpy(data,header,posOfLength);
+
+ sprintf(data + posOfLength, "%10d",messageBuffer->getDataSize() - headerLength);
+
+ strncpy(data + headerLength - 4,"\r\n\r\n",4); // it shouldn't be null terminated!
+ }
+
+const char HttpRnpCarrier::theRequestHeader[]=
+ "POST RnpMessage HTTP/1.1\r\nAccept: bin/rnp\r\nUserAgent: RnpClient/1.0\r\nContent-length: uxxxyyyzzz\r\n\r\n";
+ // ^10 digits
+
+const char HttpRnpCarrier::theAnswerHeader[]=
+ "HTTP/1.1 200 OK\r\nContent-type: bin/rnp\r\nContent-length: uxxxyyyzzz\r\n\r\n";
+
+
+
+BadRnpCarrier::BadRnpCarrier() throw()
+ {
+ type = RnpTransport::crp_BadCarrier;
+ }
+
+int BadRnpCarrier::getRequestHeaderLength() throw()
+ {
+ requestHeader = true;
+ return strlen(theHeader);
+ }
+
+int BadRnpCarrier::getAnswerHeaderLength() throw()
+ {
+ requestHeader = false;
+ return strlen(theHeader);
+ }
+
+void BadRnpCarrier::putHeader(akg::CommBuffer *messageBuffer) throw()
+ {
+ char *data = (char*)messageBuffer->getData();
+
+ strncpy(data,theHeader,strlen(theHeader));
+ }
+
+const char BadRnpCarrier::theHeader[]="BadCarrier";
+
+
diff --git a/rnprotocol/rnpembedded.hh b/rnprotocol/rnpembedded.hh
new file mode 100644
index 0000000..2e7c454
--- /dev/null
+++ b/rnprotocol/rnpembedded.hh
@@ -0,0 +1,256 @@
+#ifndef RNPEMBEDDED_HH
+#define RNPEMBEDDED_HH
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/****************************************************************************
+ *
+ *
+ * COMMENTS:
+ *
+ *
+ ****************************************************************************/
+#include "rnprotocol/rnprotocol.hh"
+
+namespace rnp
+ {
+
+/** RNP messages may be embedded in messages of the carrier protocol
+ We will use for now only HTTP carrier or none at all
+ But we define a BadCarrier for testing purposes
+*/
+
+
+/** Class containing definitions and some helper functions
+*/
+class RnpTransport
+ {
+ public:
+ enum CarrierProtocol
+ {
+ crp_Unknown,
+ crp_Rnp,
+ crp_Http,
+ crp_BadCarrier,
+ //....
+ crp_HowMany
+ };
+
+ static const char* getCarrierName(CarrierProtocol) throw();
+ private:
+ static const char* carrierNames[];
+ };
+
+/**
+ RnpReceiver is a class that is able to receive an message in a CommBuffer and decide if it is a
+ valid Rnp message, embedded or not.
+
+ It is designed to be used by a NbJob for receiving the message and cooperate with it.
+
+ If it is an invalid message or the message buffer can't be allocated,
+ the rest of the message is discarded and the NbJob has to close the connection and do appropriate cleaning
+
+ The receiver has two buffers, a fixed length one, for header and a dynamic one for the RNP message
+*/
+
+class RnpReceiver
+ {
+ public:
+ /// Default constructor
+ RnpReceiver() throw();
+
+ /// Destructor
+ ~RnpReceiver() throw();
+
+ /// Resets the receiver, preparing for a new message
+ void reset() throw();
+
+ /// Returns a pointer to the current buffer (the header one or the message one)
+ akg::CommBuffer* getCurrentBuffer() throw();
+
+ /** Returns a pointer to the message buffer, which contains the RNP message,
+ whitout any carrier header */
+ akg::CommBuffer* getMessageBuffer() throw();
+
+ /// Returns 'true' if the whole message was received, 'false' if more data is expected
+ bool validateMessage() throw();
+
+ /** Returns 'true' if an error occured and the message has to be discarded
+ If validate()==false and isDiscarding()==true => NbJob has to reset receiver and close connection*/
+ bool isDiscarding() const throw();
+
+ /// Returns the type of the carrier protocol
+ RnpTransport::CarrierProtocol getCarrierProtocol() const throw();
+
+ /// Returns the size of the carrier header
+ int getCarrierHeaderSize() const throw();
+
+ /// Returns a pointer to the carrier header
+ const void* getCarrierHeader() throw();
+
+ private:
+
+ enum Status
+ {
+ waitingHeader,
+ readingMessage,
+ discarding
+ };
+
+
+ Status status;
+
+ akg::CommBuffer headerBuffer;
+ akg::CommBuffer rnpMessageBuffer;
+
+ RnpHeader *rnpHeader;
+
+ RnpTransport::CarrierProtocol carrier;
+ int carrierHeaderLength;
+
+ static const int headerBufferLength;
+
+ bool isHttpCarrier() throw();
+ bool isRnpCarrier() throw();
+ bool prepareMessageBuffer() throw();
+
+ };
+
+class RnpCarrier;
+
+/** Class for creating an embedded RNP message. Most methods are inherited
+ from RnpProtocolEncoder, it offers just convenient methods for
+ dealing with carriers
+*/
+class RnpTransmitter : public RnpProtocolEncoder
+ {
+ public:
+ /// Default constructor
+ RnpTransmitter() throw();
+
+ /// Destructor
+ ~RnpTransmitter() throw();
+
+ /// Starts a new message, as a request, embedded in a specified protocol
+ bool startRequest(RnpQuark serverType, RnpTransport::CarrierProtocol) throw();
+
+ /// Starts a new message, as an answer, embedded in a specified protocol
+ bool startAnswer(RnpQuark serverType, RnpTransport::CarrierProtocol) throw();
+
+ /// ends the message, puts the carrier headers and, if requested, changes endianness
+ akg::CommBuffer* endMessage() throw();
+
+ /// Returns the carrier protocol
+ RnpTransport::CarrierProtocol getCarrierProtocol() throw();
+
+ /// Returns the total size of the buffer
+ int getBufferSize() const throw();
+
+ /// Return the space left in the buffer
+ int getNotFilledSize() const throw();
+
+ /// Returns the data size in the buffer
+ int getDataSize() const throw();
+ private:
+
+ RnpTransport::CarrierProtocol carrierType;
+
+ /** Creates and returns a RnpCarrier object, based on the type got as parameter
+ It assignes the object to 'carrier' and it also destroys the previous
+ assigned object
+ */
+ RnpCarrier* getCarrierObject(RnpTransport::CarrierProtocol) throw();
+ RnpCarrier* carrier;
+ };
+
+/** Base class for the various carriers, is itself the RNP carrier
+*/
+class RnpCarrier
+ {
+ public:
+ /// Default constructor
+ RnpCarrier() throw();
+
+ /// Virtual destructor
+ virtual ~RnpCarrier() throw();
+
+ /// Returns the type of the object
+ RnpTransport::CarrierProtocol getType() throw();
+
+ /// Returns the length of the request header
+ virtual int getRequestHeaderLength() throw();
+
+ /// Returns the length of the answer header
+ virtual int getAnswerHeaderLength() throw();
+
+ /** Write the header directly into the reserved space of the buffer,
+ since the rest of the message is already there*/
+ virtual void putHeader(akg::CommBuffer*) throw();
+
+ protected:
+ /// The type of the carrier
+ RnpTransport::CarrierProtocol type;
+
+ /// Flag for 'putHeader' to know which header to write
+ bool requestHeader;
+
+ };
+
+/** The HTTP-carrier
+*/
+class HttpRnpCarrier : public RnpCarrier
+ {
+ public:
+ /// Default constructor
+ HttpRnpCarrier() throw();
+
+ /// Returns the length of the request header
+ int getRequestHeaderLength() throw();
+
+ /// Returns the length of the answer header
+ int getAnswerHeaderLength() throw();
+
+ /// Writes the header into the buffer
+ void putHeader(akg::CommBuffer*) throw();
+
+ private:
+ static const char theRequestHeader[];
+ static const char theAnswerHeader[];
+ };
+
+/** A 'bad carrier', just for testing purposes
+*/
+class BadRnpCarrier : public RnpCarrier
+ {
+ public:
+ BadRnpCarrier() throw();
+
+ int getRequestHeaderLength() throw();
+ int getAnswerHeaderLength() throw();
+ void putHeader(akg::CommBuffer*) throw();
+
+ private:
+ static const char theHeader[];
+ };
+
+} //namespace
+#endif
diff --git a/rnprotocol/rnprasserver.cc b/rnprotocol/rnprasserver.cc
new file mode 100644
index 0000000..f877403
--- /dev/null
+++ b/rnprotocol/rnprasserver.cc
@@ -0,0 +1,72 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+#include "debug.hh"
+
+#include "rnprasserver.hh"
+
+// forever and ever
+const RnpQuark RnpRasserver::serverID = 3072002;
+
+
+const char* RnpRasserver::parameterTypeNames[pmt_HowMany] =
+ {
+ "(pmt_none)", "pmt_clientid"
+ };
+
+const char* RnpRasserver::commandNames[cmd_HowMany] =
+ {
+ "(cmd_none)", "cmd_connect", "cmd_disconnect",
+ "cmd_opendb", "cmd_closedb", "cmd_beginta",
+ "cmd_committa", "cmd_abortta", "cmd_istaopen",
+ "cmd_queryhttp", "cmd_getnewoid", "cmd_queryrpc",
+ "cmd_getnextelem", "cmd_endtransfer", "cmd_getnextmdd",
+ "cmd_getnexttile", "cmd_updaterpc", "cmd_startinsTmdd",
+ "cmd_inserttile", "cmd_endinsmdd", "cmd_initupdate",
+ "cmd_gettypestruct", "cmd_startinsPmdd", "cmd_insertmdd",
+ "cmd_insertcoll", "cmd_removeobjfromcoll", "cmd_delobjbyoid",
+ "cmd_delcollbyname", "cmd_getcoll", "cmd_getcolloids",
+ "cmd_getobjecttype", "cmd_setformat",
+
+ "cmd_createcollection", "cmd_createmdd", "cmd_extendmdd",
+ "cmd_gettiledomains"
+ };
+
+const char* RnpRasserver::getParameterTypeName(RnpQuark pType) const throw()
+ {
+ if(0<= pType && pType < pmt_HowMany) return parameterTypeNames[pType];
+ return undefValue;
+ }
+
+const char* RnpRasserver::getCommandName(RnpQuark cmd) const throw()
+ {
+ const char *result = NULL;
+
+ if(0<= cmd && cmd < cmd_HowMany)
+ result = commandNames[cmd];
+ else
+ result = undefValue;
+
+ TALK( "RnpRasserver::getCommandName( " << cmd << " ) -> " << result );
+ return result;
+ }
+
diff --git a/rnprotocol/rnprasserver.hh b/rnprotocol/rnprasserver.hh
new file mode 100644
index 0000000..44b6473
--- /dev/null
+++ b/rnprotocol/rnprasserver.hh
@@ -0,0 +1,124 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+#ifndef RNPRASSERVER_HH
+#define RNPRASSERVER_HH
+
+#include "rnprotocol/rnprotocol.hh"
+
+using namespace rnp;
+
+class RnpRasserver : public Rnp
+ {
+ public:
+
+ static const RnpQuark serverID; // 3072002
+ enum ParameterType
+ {
+ pmt_none,
+ pmt_clientid = 1,
+ pmt_rErrorString = 2,
+ pmt_dbname = 3,
+ pmt_accesmode = 4,
+ pmt_querystring = 5,
+ pmt_httpqanswer = 6,
+ pmt_oidstring = 7,
+ pmt_capability = 8,
+ pmt_transstatus = 9,
+ pmt_objecttype =10,
+ pmt_returnstatus =11,
+ pmt_skalarobject =12,
+ pmt_tiledata =13,
+ pmt_domain =14,
+ pmt_typename =15,
+ pmt_typelength =16,
+ pmt_typetype =17,
+ pmt_typestructure = 18,
+ pmt_collname = 19,
+ pmt_whichformat = 20,
+ pmt_format = 21,
+ pmt_formatparams = 22,
+ pmt_currentformat = 23,
+ pmt_storageformat = 24,
+ pmt_ispersistent = 25,
+ pmt_errorno = 26,
+ pmt_lineno = 27,
+ pmt_columnno = 28,
+ pmt_errortoken = 29,
+ pmt_indextype = 30,
+ //.......
+ pmt_HowMany
+ };
+
+ enum Command
+ {
+ cmd_none,
+ cmd_connect = 1,
+ cmd_disconnect = 2,
+ cmd_opendb = 3,
+ cmd_closedb = 4,
+ cmd_beginta = 5,
+ cmd_committa = 6,
+ cmd_abortta = 7,
+ cmd_istaopen = 8,
+ cmd_queryhttp = 9,
+ cmd_getnewoid = 10,
+ cmd_queryrpc = 11,
+ cmd_getnextelem = 12,
+ cmd_endtransfer = 13,
+ cmd_getnextmdd = 14,
+ cmd_getnexttile = 15,
+ cmd_updaterpc = 16,
+ cmd_startinsTmdd = 17,
+ cmd_inserttile = 18,
+ cmd_endinsmdd = 19,
+ cmd_initupdate = 20,
+ cmd_gettypestruct = 21,
+ cmd_startinsPmdd = 22,
+ cmd_insertmdd = 23,
+ cmd_insertcoll = 24,
+ cmd_remobjfromcoll = 25,
+ cmd_delobjbyoid = 26,
+ cmd_delcollbyname = 27,
+ cmd_getcoll = 28,
+ cmd_getcolloids = 29,
+ cmd_getobjecttype = 30,
+ cmd_setformat = 31,
+ //-- until here the compatibility functions --
+
+ cmd_createcollection = 32,
+ cmd_createmdd = 33,
+ cmd_extendmdd = 34,
+ cmd_gettiledomains = 35,
+ cmd_HowMany
+ };
+
+ const char* getParameterTypeName(RnpQuark) const throw();
+ const char* getCommandName(RnpQuark) const throw();
+
+ protected:
+ /// Arrays containing the names of the various elements
+ static const char* parameterTypeNames[pmt_HowMany];
+ static const char* commandNames[cmd_HowMany];
+
+ };
+#endif
diff --git a/rnprotocol/rnprotocol.cc b/rnprotocol/rnprotocol.cc
new file mode 100644
index 0000000..ce0deec
--- /dev/null
+++ b/rnprotocol/rnprotocol.cc
@@ -0,0 +1,819 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/****************************************************************************
+ *
+ *
+ * COMMENTS:
+ *
+ ****************************************************************************/
+
+#include <rnprotocol.hh>
+#include <assert.h>
+
+using namespace rnp;
+using namespace std;
+
+#include "raslib/rminit.hh"
+#include "debug.hh"
+
+
+const RnpQuark Rnp::rnpProtocolId = 25112001;
+
+
+const char* Rnp::endiannessNames[2]=
+ {
+ "big endian", "little endian"
+ };
+
+const char* Rnp::fragmentTypeNames[Rnp::fgt_HowMany]=
+ {
+ "(fgt_none)", "Command", "OkAnswer", "ErrorAnswer"
+ };
+
+const char* Rnp::dataTypeNames[Rnp::dtt_HowMany] =
+ {
+ "(dtt_none)", "Asciiz", "Int32","Float","Double","Opaque","NullPtr"
+ };
+
+const char* Rnp::errorTypeNames[Rnp::ert_HowMany] =
+ {
+ "(ert_unknown)", "StlException", "OtherException"
+ };
+
+const char* Rnp::errorParamNames[Rnp::erp_HowMany] =
+ {
+ "(erp_none)", "StlWhatValue"
+ };
+
+const char* Rnp::undefValue = "(undef)";
+
+const char* Rnp::getFragmentTypeName(RnpQuark fType) throw()
+ {
+ if(0<= fType && fType < fgt_HowMany) return fragmentTypeNames[fType];
+ return undefValue;
+ }
+
+const char* Rnp::getDataTypeName(RnpQuark dType) throw()
+ {
+ if(0<= dType && dType < dtt_HowMany) return dataTypeNames[dType];
+ return undefValue;
+ }
+
+const char* Rnp::getErrorTypeName(RnpQuark eType) throw()
+ {
+ if(0<= eType && eType < ert_HowMany) return errorTypeNames[eType];
+ return undefValue;
+ }
+
+const char* Rnp::getErrorParamName(RnpQuark eName) throw()
+ {
+ if(0<= eName && eName < erp_HowMany) return errorParamNames[eName];
+ return undefValue;
+ }
+
+const char* Rnp::getEndiannessName(Rnp::Endianness endianness) throw()
+ {
+ return endiannessNames[endianness];
+ }
+
+Rnp::Endianness Rnp::detectHostEndianness() throw()
+ {
+ unsigned int uInteger = 1;
+
+ char* ptr = (char*)&uInteger;
+
+ return *ptr==1 ? littleEndian : bigEndian;
+ }
+
+RnpQuark Rnp::swapBytes(RnpQuark orig) throw()
+ {
+ RnpQuark result = orig;
+
+ char *buf = (char*)&result;
+
+ char tmp = buf[0];
+ buf[0] = buf[3];
+ buf[3] = tmp;
+
+ tmp = buf[1];
+ buf[1] = buf[2];
+ buf[2] = tmp;
+
+ return result;
+ }
+
+//############## RNP Header #################################
+
+bool RnpHeader::isRnpMessage() const throw()
+ {
+ Rnp::Endianness hostEndianness = Rnp::detectHostEndianness();
+
+ RnpQuark cProtocolId = hostEndianness == Rnp::littleEndian ? protocolId : Rnp::swapBytes(protocolId);
+
+ return cProtocolId == Rnp::rnpProtocolId ? true:false;
+ }
+
+Rnp::Endianness RnpHeader::getEndianness() const throw()
+ {
+ return (Rnp::Endianness)messageEndianness;
+ }
+
+RnpQuark RnpHeader::getTotalLength() const throw()
+ {
+ Rnp::Endianness endianness = (Rnp::Endianness)messageEndianness;
+ return endianness == Rnp::detectHostEndianness() ? totalMessageLength : Rnp::swapBytes(totalMessageLength);
+ }
+
+bool RnpHeader::changeEndianness(Rnp::Endianness newEndianness) throw()
+ {
+ Rnp::Endianness endianness = (Rnp::Endianness)messageEndianness;
+
+ if(newEndianness == endianness) return false; // no change necessary
+
+ messageEndianness = newEndianness;
+
+ totalMessageLength = Rnp::swapBytes(totalMessageLength);
+ nrFragments = Rnp::swapBytes(nrFragments);
+ serverType = Rnp::swapBytes(serverType);
+ authInfoStart = Rnp::swapBytes(authInfoStart);
+ authInfoLength = Rnp::swapBytes(authInfoLength);
+ comprInfoStart = Rnp::swapBytes(comprInfoStart);
+ comprInfoLength = Rnp::swapBytes(comprInfoLength);
+ dataStart = Rnp::swapBytes(dataStart);
+ dataLength = Rnp::swapBytes(dataLength);
+
+ return true;
+ }
+
+RnpFragmentHeader* RnpHeader::getFirstFragment() const throw()
+ {
+ return (RnpFragmentHeader*)((char*)this + dataStart);
+ }
+
+//############ RNP Fragment ##################################
+
+RnpFragmentHeader* RnpFragmentHeader::getNextFragment() const throw()
+ {
+ char *ptr = (char*)this;
+
+ return (RnpFragmentHeader*)(ptr + totalLength);
+ }
+
+RnpParameter* RnpFragmentHeader::getFirstParameter() const throw()
+ {
+ return (RnpParameter*)(this+1);
+ }
+
+void RnpFragmentHeader::changeEndianness() throw()
+ {
+ // there is no info about the initial endianness so be carefull
+ fragmType = Rnp::swapBytes(fragmType);
+ command = Rnp::swapBytes(command);
+ nrParams = Rnp::swapBytes(nrParams);
+ totalLength = Rnp::swapBytes(totalLength);
+ }
+
+//############ RNP Parameter ##################################
+RnpParameter* RnpParameter::getNextParameter() const throw()
+ {
+ char *ptr = (char*)this;
+ return (RnpParameter*)(ptr + totalLength);
+ }
+
+void* RnpParameter::getData() const throw()
+ {
+ return (void*)(this+1);
+ }
+
+RnpQuark RnpParameter::getDataLength() const throw()
+ {
+ return dataLength;
+ }
+
+RnpQuark RnpParameter::computeTotalAlignedLength() throw()
+ {
+ totalLength = (sizeof(RnpParameter) + dataLength + 3) & 0xFFFFFFFC;
+ return totalLength;
+ }
+
+RnpQuark RnpParameter::getPaddLength() const throw()
+ {
+ return totalLength - (sizeof(RnpParameter) + dataLength);
+ }
+
+void RnpParameter::changeToHostEndianness() throw()
+ {
+ // there is no info about the initial endianness so be carefull
+ paramType = Rnp::swapBytes(paramType);
+ dataType = Rnp::swapBytes(dataType);
+ dataLength = Rnp::swapBytes(dataLength);
+ totalLength = Rnp::swapBytes(totalLength);
+
+ RnpQuark *valPtr = (RnpQuark*)getData();
+ switch(dataType)
+ {
+ case Rnp::dtt_Int32:
+ case Rnp::dtt_Float32: *valPtr = Rnp::swapBytes(*valPtr);
+ break;
+
+ case Rnp::dtt_Double64: RnpQuark temp = Rnp::swapBytes(*valPtr);
+ *valPtr = Rnp::swapBytes(*(valPtr +1));
+ *(valPtr+1) = temp;
+ break;
+ }
+ }
+
+void RnpParameter::changeToPartnerEndianness() throw()
+ {
+ // there is no info about the initial endianness so be careful
+
+ RnpQuark *valPtr = (RnpQuark*)getData();
+ switch(dataType)
+ {
+ case Rnp::dtt_Int32:
+ case Rnp::dtt_Float32: *valPtr = Rnp::swapBytes(*valPtr);
+ break;
+
+ case Rnp::dtt_Double64: RnpQuark temp = Rnp::swapBytes(*valPtr);
+ *valPtr = Rnp::swapBytes(*(valPtr +1));
+ *(valPtr+1) = temp;
+ break;
+ }
+ paramType = Rnp::swapBytes(paramType);
+ dataType = Rnp::swapBytes(dataType);
+ dataLength = Rnp::swapBytes(dataLength);
+ totalLength = Rnp::swapBytes(totalLength);
+ }
+
+//###########################################################
+
+RnpProtocolEncoder::RnpProtocolEncoder() throw()
+ {
+ commBuffer = NULL;
+
+ rnpHeader = NULL;
+ currFragment = NULL;
+ currParameter = NULL;
+ allocated = false;
+ carrierHeaderSize = 0;
+ finalEndianness = Rnp::detectHostEndianness();
+ }
+
+RnpProtocolEncoder::~RnpProtocolEncoder() throw()
+ {
+ if(commBuffer != NULL && allocated == true) delete commBuffer;
+ // the other pointers are not pointing to allocated memory!!
+ }
+
+void RnpProtocolEncoder::setBuffer(akg::CommBuffer* buffer) throw()
+ {
+ if(commBuffer != NULL && allocated == true) delete commBuffer;
+
+ commBuffer = buffer;
+ allocated = false;
+ }
+
+bool RnpProtocolEncoder::allocateBuffer(int maxMessageLength) throw()
+ {
+ if(commBuffer != NULL && allocated ==true) delete commBuffer;
+
+ commBuffer = new akg::CommBuffer(maxMessageLength);
+ allocated = true;
+ return true;
+ }
+
+bool RnpProtocolEncoder::adjustBufferSize(int differenceSize) throw()
+ {
+ ENTER( "RnpProtocolEncoder::adjustBufferSize( differenceSize=" << differenceSize << " )" );
+
+ if (commBuffer == 0)
+ {
+ TALK( "RnpProtocolEncoder::adjustBufferSize(): warning: null commBuffer, assert would fire." );
+ return false;
+ }
+ assert(commBuffer != 0);
+
+ if (differenceSize <= 0)
+ {
+ TALK( "RnpProtocolEncoder::adjustBufferSize(): warning: nonpositive differenceSize, assert would fire." );
+ return false;
+ }
+ assert(differenceSize > 0);
+
+ // we need to adjust the pointers to the new location
+ char *orig = (char*)(commBuffer->getData());
+ char *head = (char*)rnpHeader;
+ char *frag = (char*)currFragment;
+ char *para = (char*)currParameter;
+
+ if(commBuffer->resize(commBuffer->getDataSize() + differenceSize + RNP_DEFAULTBUFFERSIZE) == true)
+ {
+ char *final = (char*)(commBuffer->getData());
+ rnpHeader = (RnpHeader*)(final + (head - orig));
+ currFragment = (RnpFragmentHeader*)(final + (frag - orig));
+ currParameter= (RnpParameter*)(final + (para - orig));
+ LEAVE( "RnpProtocolEncoder::adjustBufferSize() -> true" );
+ return true;
+ }
+
+ LEAVE( "RnpProtocolEncoder::adjustBufferSize() -> false" );
+ return false;
+ }
+
+int RnpProtocolEncoder::getBufferSize() throw()
+ {
+ if (commBuffer == 0)
+ {
+ TALK( "RnpProtocolEncoder::getBufferSize(): warning: null commBuffer, assert will fire." );
+ }
+ assert(commBuffer != 0);
+
+ return commBuffer->getBufferSize();
+ }
+
+void RnpProtocolEncoder::startMessage(RnpQuark serverType, int nCarrierHeaderSize) throw()
+ {
+ if (commBuffer == 0)
+ {
+ TALK( "RnpProtocolEncoder::startMessage(): warning: null commBuffer, assert will fire." );
+ }
+ assert(commBuffer != NULL);
+
+ carrierHeaderSize = nCarrierHeaderSize;
+
+ commBuffer->clearToRead();
+
+ commBuffer->reserve(sizeof(RnpHeader) + carrierHeaderSize);
+
+ rnpHeader = (RnpHeader*)((char*)commBuffer->getData() + carrierHeaderSize);
+
+ Rnp::Endianness hostEndianness = Rnp::detectHostEndianness();
+
+ // the protocolID is always 25112001 little endian!!
+ rnpHeader->protocolId = hostEndianness == Rnp::littleEndian ? Rnp::rnpProtocolId : Rnp::swapBytes(Rnp::rnpProtocolId);
+
+ rnpHeader->messageEndianness = hostEndianness;
+ rnpHeader->desiredEndianness = hostEndianness;
+ rnpHeader->majorVersion = 1;
+ rnpHeader->minorVersion = 0;
+ rnpHeader->totalMessageLength = sizeof(RnpHeader);
+ rnpHeader->nrFragments = 0;
+ rnpHeader->serverType = serverType;
+ rnpHeader->authInfoStart = 0;
+ rnpHeader->authInfoLength = 0;
+ rnpHeader->comprInfoStart = 0;
+ rnpHeader->comprInfoLength = 0;
+ rnpHeader->dataStart = sizeof(RnpHeader);
+ rnpHeader->dataLength = 0;
+
+ currFragment = (RnpFragmentHeader*)(rnpHeader+1);
+
+ }
+
+int RnpProtocolEncoder::getCarrierHeaderSize() throw()
+ {
+ return carrierHeaderSize;
+ }
+
+void RnpProtocolEncoder::setDesiredEndianness(Rnp::Endianness desiredEndianness) throw()
+ {
+ rnpHeader->desiredEndianness = desiredEndianness;
+ }
+void RnpProtocolEncoder::setFinalEndianness(Rnp::Endianness endianness) throw()
+ {
+ finalEndianness = endianness;
+ }
+
+void RnpProtocolEncoder::startFragment(Rnp::FragmentType fType,RnpQuark command) throw()
+ {
+ commBuffer->reserve(sizeof(RnpFragmentHeader));
+
+ currFragment->fragmType = fType;
+ currFragment->command = command;
+ currFragment->nrParams = 0;
+ currFragment->totalLength = sizeof(RnpFragmentHeader);
+
+ currParameter = currFragment->getFirstParameter();
+ rnpHeader->nrFragments++;
+
+ }
+
+void RnpProtocolEncoder::addStringParameter(RnpQuark parameterType,const char* str) throw()
+ {
+ if(str != 0) addParameter(parameterType, Rnp::dtt_Asciiz, str, strlen(str)+1);
+ else addParameter(parameterType, Rnp::dtt_NullPtr, str, 0);
+ }
+
+void RnpProtocolEncoder::addInt32Parameter(RnpQuark parameterType, int par) throw()
+ {
+ addParameter(parameterType, Rnp::dtt_Int32, &par, sizeof(par));
+ }
+
+void RnpProtocolEncoder::addFloat32Parameter(RnpQuark parameterType, float par) throw()
+ {
+ addParameter(parameterType, Rnp::dtt_Float32, &par, sizeof(par));
+ }
+void RnpProtocolEncoder::addDouble64Parameter(RnpQuark parameterType, double par) throw()
+ {
+ addParameter(parameterType, Rnp::dtt_Double64, &par, sizeof(par));
+ }
+
+void RnpProtocolEncoder::addOpaqueParameter(RnpQuark parameterType, const void *buf, int size) throw()
+ {
+ if(buf != 0) addParameter(parameterType, Rnp::dtt_Opaque, buf, size);
+ else addParameter(parameterType, Rnp::dtt_NullPtr, buf, 0);
+ }
+
+void RnpProtocolEncoder::addParameter(RnpQuark parameterType, Rnp::DataType dtt, const void *data, int length) throw()
+ {
+ commBuffer->reserve(sizeof(RnpParameter));
+
+ currParameter->paramType = parameterType;
+ currParameter->dataType = dtt;
+ currParameter->dataLength = length;
+ currParameter->totalLength = currParameter->computeTotalAlignedLength();
+
+ int paddlen = currParameter->getPaddLength();
+
+ if(data != 0 ) commBuffer->read(data,length);
+ if(paddlen) commBuffer->reserve(paddlen);
+
+ currFragment->nrParams++;
+ currFragment->totalLength += currParameter->totalLength;
+ currParameter = currParameter->getNextParameter();
+ }
+
+void RnpProtocolEncoder::endFragment() throw()
+ {
+ rnpHeader->totalMessageLength += currFragment->totalLength;
+ rnpHeader->dataLength += currFragment->totalLength;
+ currFragment = currFragment->getNextFragment();
+ }
+
+akg::CommBuffer* RnpProtocolEncoder::endMessage() throw()
+ {
+ changeToPartnerEndianness(finalEndianness);
+ return commBuffer;
+ }
+
+bool RnpProtocolEncoder::changeToPartnerEndianness(Rnp::Endianness newEndianness) throw()
+ {
+ if(newEndianness == rnpHeader->getEndianness()) return false;
+ // so newEndianness is the same as the message endiannes, no change necessary
+
+ // don't forget that the endianness is now the host endianness so,
+ // after changed, the data can't be used correctly any more!!
+
+ RnpFragmentHeader* lCurrFragment = rnpHeader->getFirstFragment();
+
+ for(int fragment=0; fragment < rnpHeader->nrFragments; fragment++)
+ {
+
+ RnpParameter* lCurrParameter = lCurrFragment->getFirstParameter();
+ for(int parameter = 0; parameter < lCurrFragment->nrParams; parameter++)
+ {
+ RnpParameter* nextParameter = lCurrParameter->getNextParameter();
+ lCurrParameter->changeToPartnerEndianness();
+ lCurrParameter = nextParameter;
+ }
+
+ RnpFragmentHeader* nextFragment = lCurrFragment->getNextFragment();
+ lCurrFragment->changeEndianness();
+ lCurrFragment = nextFragment;
+ }
+
+ rnpHeader->changeEndianness(newEndianness);
+
+ return true;
+ }
+
+//###### DECODER #######################
+
+RnpProtocolDecoder::RnpProtocolDecoder() throw()
+ {
+ // this memory doesn't belong to us, so do not deallocate!!
+ commBuffer = NULL;
+ rnpHeader = NULL;
+ currFragment = NULL;
+ currParameter = NULL;
+ }
+
+bool RnpProtocolDecoder::decode(akg::CommBuffer *buffer) throw()
+ {
+ // Later, throw something intelligible!
+ commBuffer = buffer;
+
+ rnpHeader = (RnpHeader*)commBuffer->getData();
+
+ if(rnpHeader->isRnpMessage() == false) return false;
+
+ originalEndianness = rnpHeader->getEndianness();
+
+ if(originalEndianness == Rnp::detectHostEndianness())
+ {
+ // -- test validity of message --
+ if(testIntegrity() == false) return false;
+ }
+ else
+ { // -- endianess of message --
+ changeToHostEndianness();
+ }
+ return true;
+ }
+
+bool RnpProtocolDecoder::testIntegrity() const throw()
+ {
+ // could be done better...
+ ENTER( "RnpProtocolDecoder::testIntegrity()" );
+
+ if(rnpHeader->isRnpMessage() == false)
+ {
+ RMInit::logOut << "Communication error: received invalid RNP header." << endl;
+ return false;
+ }
+
+ bool ok = true;
+ char *endOfMessage = (char*)rnpHeader + rnpHeader->getTotalLength();
+ char *endOfHeader = (char*)rnpHeader + sizeof(RnpHeader);
+ int maxLength = commBuffer->getDataSize() - sizeof(RnpHeader);
+ // max of every length
+
+ RnpFragmentHeader* lCurrFragment = (RnpFragmentHeader*)getFirstFragment();
+ for(int fragment=0; fragment < countFragments(); fragment++)
+ {
+ if( endOfHeader <= (char*)lCurrFragment && (char*)lCurrFragment < endOfMessage)
+ ;
+ else
+ {
+ ok = false;
+ RMInit::logOut << "Communication error: RNP message corrupt: short header." << endl;
+ break;
+ }
+
+ if(lCurrFragment->totalLength > maxLength)
+ {
+ ok = false;
+ RMInit::logOut << "Communication error: RNP message corrupt: actual length (" << lCurrFragment->totalLength << ") larger than foreseen (" << maxLength << ")." << endl;
+ break;
+ }
+
+ char *startOfParameters = (char*)lCurrFragment + sizeof(RnpFragmentHeader);
+ char *endOfParameters = (char*)lCurrFragment + lCurrFragment->totalLength;
+
+ RnpParameter* lCurrParameter = (RnpParameter*)getFirstParameter();
+ for(int parameter = 0; parameter < countParameters(); parameter++)
+ {
+ if( startOfParameters <= (char*)lCurrParameter && (char*)lCurrParameter < endOfParameters)
+ ;
+ else
+ {
+ ok = false;
+ RMInit::logOut << "Communication error: RNP message corrupt: current parameter location outside parameter area." << endl;
+ break;
+ }
+
+ if(lCurrParameter->totalLength > lCurrFragment->totalLength)
+ {
+ ok = false;
+ RMInit::logOut << "Communication error: RNP message corrupt: current parameter length (" << lCurrParameter->totalLength << ") larger than total fragment size (" << lCurrFragment->totalLength << ")." << endl;
+ break;
+ }
+
+ lCurrParameter = (RnpParameter*)getNextParameter();
+ }
+
+ if( (char*)lCurrParameter != endOfParameters)
+ {
+ ok = false;
+// the counting seems to differ from the protocol specs;
+// to avoid log flooding we disable it for the moment being -- PB 2005-aug-28
+#ifdef RMANDEBUG
+ RMInit::logOut << "Communication warning: puzzled by message: parameter count too small, found extra parameter(s). (this message can be ignored)" << endl;
+#endif
+ }
+
+ if(ok == false)
+ break;
+
+ // we found a valid fragment, proceed to next one
+ lCurrFragment = (RnpFragmentHeader*)getNextFragment();
+ }
+
+ LEAVE( "RnpProtocolDecoder::testIntegrity() -> " << ok );
+ return ok;
+ }
+
+bool RnpProtocolDecoder::changeToHostEndianness() throw()
+ {
+ if(rnpHeader->changeEndianness(Rnp::detectHostEndianness()) == false ) return false;
+ // so host endianness is the same as the message endiannes, no change necessary
+
+ currFragment = (RnpFragmentHeader*)getFirstFragment();
+
+ for(int fragment=0; fragment < countFragments(); fragment++)
+ {
+ currFragment->changeEndianness();
+
+ currParameter = (RnpParameter*)getFirstParameter();
+ for(int parameter = 0; parameter < countParameters(); parameter++)
+ {
+ currParameter->changeToHostEndianness();
+
+ currParameter = (RnpParameter*)getNextParameter();
+ }
+
+ currFragment = (RnpFragmentHeader*)getNextFragment();
+ }
+ return true;
+ }
+
+RnpQuark RnpProtocolDecoder::getDestinationServerType() const throw()
+ {
+ return rnpHeader->serverType;
+ }
+
+Rnp::Endianness RnpProtocolDecoder::getDesiredEndianness() const throw()
+ {
+ return (Rnp::Endianness)rnpHeader->desiredEndianness;
+ }
+
+Rnp::Endianness RnpProtocolDecoder::getOriginalEndianness() const throw()
+ {
+ return originalEndianness;
+ }
+
+int RnpProtocolDecoder::getMessageLength() const throw()
+ {
+ return rnpHeader->totalMessageLength;
+ }
+
+int RnpProtocolDecoder::getMessageVersion() const throw()
+ {
+ return 1000*rnpHeader->majorVersion + rnpHeader->minorVersion;
+ }
+
+RnpQuark RnpProtocolDecoder::countFragments() const throw()
+ {
+ return rnpHeader->nrFragments;
+ }
+
+const RnpFragmentHeader* RnpProtocolDecoder::getFirstFragment() const throw()
+ {
+ currFragmentIdx = 0;
+
+ return currFragment = (currFragmentIdx < rnpHeader->nrFragments ? rnpHeader->getFirstFragment() : 0);
+ }
+
+const RnpFragmentHeader* RnpProtocolDecoder::getNextFragment() const throw()
+ {
+ currFragmentIdx++;
+
+ return currFragment = (currFragmentIdx < rnpHeader->nrFragments ? currFragment->getNextFragment() : 0);
+ }
+
+RnpQuark RnpProtocolDecoder::getFragmentType() const throw()
+ {
+ return currFragment->fragmType;
+ }
+
+const char* RnpProtocolDecoder::getFragmentTypeName() const throw()
+ {
+ return Rnp::getFragmentTypeName(currFragment->fragmType);
+ }
+RnpQuark RnpProtocolDecoder::getCommand() const throw()
+ {
+ return currFragment->command;
+ }
+
+int RnpProtocolDecoder::countParameters() const throw()
+ {
+ return currFragment->nrParams;
+ }
+
+RnpQuark RnpProtocolDecoder::getFragmentLength() const throw()
+ {
+ return currFragment->totalLength;
+ }
+
+const RnpParameter* RnpProtocolDecoder::getFirstParameter() const throw()
+ {
+ currParameterIdx = 0;
+
+ return currParameter = ( currParameterIdx < currFragment->nrParams ? currFragment->getFirstParameter() : 0);
+ }
+
+const RnpParameter* RnpProtocolDecoder::getNextParameter() const throw()
+ {
+ currParameterIdx++;
+
+ return currParameter = ( currParameterIdx < currFragment->nrParams ? currParameter->getNextParameter() : 0);
+ }
+
+RnpQuark RnpProtocolDecoder::getParameterType() const throw()
+ {
+ return currParameter->paramType;
+ }
+
+RnpQuark RnpProtocolDecoder::getDataType() const throw()
+ {
+ return currParameter->dataType;
+ }
+
+const void* RnpProtocolDecoder::getData() const throw()
+ {
+ return currParameter->getData();
+ }
+
+const char* RnpProtocolDecoder::getDataAsString() const throw()
+ {
+ if ( !( currParameter->dataType == Rnp::dtt_Asciiz || currParameter->dataType == Rnp::dtt_NullPtr) )
+ {
+ TALK( "RnpProtocolEncoder::getDataAsString(): warning: assert will fire." );
+ }
+ assert(currParameter->dataType == Rnp::dtt_Asciiz || currParameter->dataType == Rnp::dtt_NullPtr);
+
+ return currParameter->dataType == Rnp::dtt_Asciiz ? (const char*)currParameter->getData() : (const char*)0;
+ }
+
+int RnpProtocolDecoder::getDataAsInteger() const throw()
+ {
+ if ( !( currParameter->dataType == Rnp::dtt_Int32 ) )
+ {
+ TALK( "RnpProtocolEncoder::getDataAsInteger(): warning: assert will fire." );
+ }
+ assert(currParameter->dataType == Rnp::dtt_Int32);
+
+ return *(int*)currParameter->getData();
+ }
+
+float RnpProtocolDecoder::getDataAsFloat() const throw()
+ {
+ if ( !( currParameter->dataType == Rnp::dtt_Float32 ) )
+ {
+ TALK( "RnpProtocolEncoder::getDataAsFloat(): warning: assert will fire." );
+ }
+ assert(currParameter->dataType == Rnp::dtt_Float32);
+
+ return *(float*)currParameter->getData();
+ }
+
+double RnpProtocolDecoder::getDataAsDouble() const throw()
+ {
+ if ( !( currParameter->dataType == Rnp::dtt_Double64 ) )
+ {
+ TALK( "RnpProtocolEncoder::getDataAsDouble(): warning: assert will fire." );
+ }
+ assert(currParameter->dataType == Rnp::dtt_Double64);
+
+ return *(double*)currParameter->getData();
+ }
+
+const void* RnpProtocolDecoder::getDataAsOpaque() const throw()
+ {
+ if ( !( currParameter->dataType == Rnp::dtt_Opaque || currParameter->dataType == Rnp::dtt_NullPtr) )
+ {
+ TALK( "RnpProtocolEncoder::getDataAsOpaque(): warning: assert will fire." );
+ }
+ assert(currParameter->dataType == Rnp::dtt_Opaque || currParameter->dataType == Rnp::dtt_NullPtr);
+
+ return currParameter->dataType == Rnp::dtt_Opaque ? (const void*)currParameter->getData() : (const void*)0;
+ }
+
+int RnpProtocolDecoder::getDataLength() const throw()
+ {
+ return currParameter->getDataLength();
+ }
+
+void RnpProtocolDecoder::printRnpHeader(RnpHeader *lRnpHeader) const throw()
+ {
+ cout<<"RnpHeader ID="<<lRnpHeader->protocolId<<endl;
+ cout<<" total fragments="<<lRnpHeader->nrFragments<<endl;
+ cout<<" sizeof header="<<sizeof(RnpHeader)<<endl;
+ cout<<endl;
+ }
+
+bool RnpProtocolDecoder::isRnpMessage() const throw()
+ {
+ return rnpHeader->isRnpMessage();
+ }
+
diff --git a/rnprotocol/rnprotocol.hh b/rnprotocol/rnprotocol.hh
new file mode 100644
index 0000000..dfa7885
--- /dev/null
+++ b/rnprotocol/rnprotocol.hh
@@ -0,0 +1,462 @@
+#ifndef RNPROTOCOL_HH
+#define RNPROTOCOL_HH
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/****************************************************************************
+ *
+ *
+ * COMMENTS:
+ *
+ *
+ ****************************************************************************/
+
+#include "network/akgnetwork.hh"
+#ifdef AFTERV52
+ #include "akglogging.hh"
+ #include "rnpexception.hh"
+#else
+ #define AKGLOGLN(a,b,c)
+#endif
+
+namespace rnp
+ {
+
+//using namespace akg;
+
+/** If nothing else is specified, this is the size of the RNP buffers
+ It is enough as long as you dont send large opaque data */
+#define RNP_DEFAULTBUFFERSIZE 1024
+
+/// The basic type used in RNP. It is always 32-bit long
+typedef int RnpQuark;
+
+/** Class Rnp contains definitions and general helper functions for RNP
+*/
+class Rnp
+ {
+ public:
+ /** The 32-bit protocol ID. value 25112001, stored always little endian.
+ In big endian this is 0xc12d7f01.
+ */
+ static const RnpQuark rnpProtocolId;//always little endian!!!
+
+ enum Endianness
+ {
+ bigEndian = 0,
+ littleEndian = 1
+ };
+
+ enum FragmentType
+ {
+ fgt_None = 0,
+ fgt_Command,
+ fgt_OkAnswer,
+ fgt_Error,
+ fgt_DiscardedRequest,
+ //...
+ // to know how many where defined
+ fgt_HowMany
+ };
+ enum DataType
+ {
+ dtt_None = 0,
+ dtt_Asciiz = 1,
+ dtt_Int32 = 2,
+ dtt_Float32 = 3,
+ dtt_Double64 = 4,
+ dtt_Opaque = 5,
+ dtt_NullPtr = 6, // NULL pointer
+ //...
+ // to know how many where defined
+ dtt_HowMany
+ };
+
+ // the type of the error, so the receiver can rebuild it
+ enum ErrorType
+ {
+ ert_Unknown = 0, // unknown error type, no exception, something else
+ ert_StlException, // ... has a "what()" - member
+ ert_AkgSerializable, // akg serializable exception, we don't carry usual exceptions!
+ ert_Other, // other exceptions
+
+ ert_HowMany
+ };
+
+ enum ErrorParam
+ {
+ erp_None = 0,
+ erp_whatValue = 1, // used by "exception"
+ erp_Key = 2, // key of "akgexception"
+ erp_Value = 3, // value of "akgexception"
+
+ erp_HowMany
+ };
+
+ /// Functions to get the names of the various elements
+ static const char* getFragmentTypeName(RnpQuark) throw();
+ static const char* getDataTypeName(RnpQuark) throw();
+ static const char* getEndiannessName(Endianness) throw();
+ static const char* getErrorTypeName(RnpQuark) throw();
+ static const char* getErrorParamName(RnpQuark) throw();
+
+ /** Every server has his own command set, each with parameters
+ Define your own functions to get names for this elements */
+ virtual const char* getParameterTypeName(RnpQuark) const throw() =0;
+ virtual const char* getCommandName(RnpQuark) const throw() =0;
+
+ /// Helper functions for endianness
+ static RnpQuark swapBytes(RnpQuark) throw();
+ static Endianness detectHostEndianness() throw();
+
+#ifdef AFTERV52
+ /// Log connection for the whole RNP module
+ static AkgLogConnection logConn;
+#endif
+ protected:
+ /// Arrays containing the names of the various elements
+ static const char* undefValue;
+ static const char* endiannessNames[2];
+ static const char* fragmentTypeNames[fgt_HowMany];
+ static const char* dataTypeNames[dtt_HowMany];
+ static const char* errorTypeNames[ert_HowMany];
+ static const char* errorParamNames[erp_HowMany];
+ };
+
+struct RnpFragmentHeader;
+
+/** The header of the RNP message. Always 64 bytes long
+*/
+struct RnpHeader
+ {
+ RnpQuark protocolId;
+ char messageEndianness;
+ char desiredEndianness;
+ char majorVersion;
+ char minorVersion;
+ RnpQuark totalMessageLength;
+ RnpQuark nrFragments;
+ RnpQuark serverType;
+ RnpQuark authInfoStart;
+ RnpQuark authInfoLength;
+ RnpQuark comprInfoStart;
+ RnpQuark comprInfoLength;
+ RnpQuark dataStart;
+ RnpQuark dataLength;
+ RnpQuark _unused[5];
+ // sizeof = 64
+
+ /// Returns 'true' if this is a valid RNP header
+ bool isRnpMessage() const throw();
+
+ /// Returns the message endianness
+ Rnp::Endianness getEndianness() const throw();
+
+ /// Returns the total length of the message, regardless of endianness
+ RnpQuark getTotalLength() const throw();
+
+ /** Changes the endianness of the header to the specified one
+ Returns 'true' if a change was necessary */
+ bool changeEndianness(Rnp::Endianness) throw();
+
+ /// Returns a pointer to the first fragment. Header has to be in host endianness
+ RnpFragmentHeader* getFirstFragment() const throw();
+ };
+
+/** The header of parameters. Size is 16.
+ The parameter has a header like this and then the data
+*/
+struct RnpParameter
+ {
+ /// The logical type of the parameter. Server dependent
+ RnpQuark paramType;
+
+ /// The data type of the parameter. One of Rnp::DataType
+ RnpQuark dataType;
+
+ /// The length of the data
+ RnpQuark dataLength;
+
+ /// Total length of teh parameter, header + data + alignment bytes
+ // (Length is always 4bytes aligned!, at least Sun requires it)
+ RnpQuark totalLength;
+
+ /// Returns a pointer to the next parameter
+ RnpParameter* getNextParameter() const throw();
+
+ /// Returns a pointer to the parameter data
+ void* getData() const throw();
+
+ /// Returns the length of the parameter data
+ RnpQuark getDataLength() const throw();
+
+ /** Changes the endianness of the parameter. Since there is no info
+ about the current endianness, be carefull when you use it.
+ It also changes the endianness of the data, except when it is
+ opaque data.*/
+ void changeToHostEndianness() throw();
+ void changeToPartnerEndianness() throw();
+
+ RnpQuark computeTotalAlignedLength() throw();
+ RnpQuark getPaddLength() const throw();
+ };
+
+/** The header of fragments. Size is 16.
+ Every fragment has a header like this and a number of parameters
+*/
+struct RnpFragmentHeader
+ {
+ /// The type of the fragment. One of Rnp::FragmentType
+ RnpQuark fragmType;
+
+ /// The command. Server dependent
+ RnpQuark command;
+
+ /// Number of parameters
+ RnpQuark nrParams;
+
+ /// Total length of the fragment, this header + all parameters
+ RnpQuark totalLength;
+
+ /// Returns a pointer to the next fragment
+ RnpFragmentHeader* getNextFragment() const throw();
+
+ /// Returns a pointer to the first parameter of this fragment
+ RnpParameter* getFirstParameter() const throw();
+
+ /** Changes the endianness of the fragment. Since there is no info
+ about the current endianness, be carefull when you use it */
+ void changeEndianness() throw();
+ };
+
+/** Class for encoding a RNP message. It has support for the header of the
+ embedding protocol and for the endianness of the partner. The rest is for
+ creating the message into a akg::CommBuffer, which can be internal or external.
+ The buffer has to be big enough, the size is not adapted
+*/
+class RnpProtocolEncoder
+ {
+ public:
+ /// Default constructor
+ RnpProtocolEncoder() throw();
+ /// Destructor
+ ~RnpProtocolEncoder() throw();
+
+ /// Sets an external buffer as work buffer.
+ void setBuffer(akg::CommBuffer*) throw();
+
+ /// Allocates an internal buffer as work buffer
+ bool allocateBuffer(int maxMessageLength) throw();
+
+ /** resizes the internal buffer, so the new buffer can hold the actual data plus
+ the requested difference. Additionally we allocate also RNP_DEFAULTBUFFERSIZE bytes
+ Assert: commBuffer != 0 , differenceSize >= 0*/
+ bool adjustBufferSize(int differenceSize) throw();
+
+ int getBufferSize() throw();
+
+ /** Makes the necessary initializations for a new message.
+ Takes as parameter the type of the destination server and allocates
+ space for an embedding protocol header
+ Assert: commBuffer != NULL, meaning there is a valid working buffer
+
+ IMPORTANT: Be aware that all this functions for creating the
+ message have to be called in the correct order, otherwise undefined
+ results may occur!
+ */
+ void startMessage(RnpQuark serverType, int carrierHeaderSize = 0) throw();
+
+ /** Sets the desired endianness for the answer. Servers have to use
+ this endianness when they answer, clients might use it for the next
+ requests
+ */
+ void setDesiredEndianness(Rnp::Endianness) throw();
+
+ /** Sets the final endianness for the message. 'endMessage()' is the one
+ who changes the endianness to the final one
+ */
+ void setFinalEndianness(Rnp::Endianness) throw();
+
+ /// Starts a new fragment.
+ void startFragment(Rnp::FragmentType, RnpQuark command) throw();
+
+ /// Adds a string parameter to the current fragment
+ void addStringParameter(RnpQuark parameterType, const char*) throw();
+
+ /// Adds an int parameter to the current fragment
+ void addInt32Parameter(RnpQuark parameterType, int) throw();
+
+ /// Adds a float parameter to the current fragment
+ void addFloat32Parameter(RnpQuark parameterType, float) throw();
+
+ /// Adds a double parameter to the current fragment
+ void addDouble64Parameter(RnpQuark parameterType, double) throw();
+
+ /// Adds an opaque parameter to the current fragment
+ void addOpaqueParameter(RnpQuark parameterType, const void*, int size) throw();
+
+ /// Ends the current fragment
+ void endFragment() throw();
+
+ /// Ends the message and, if necessary, changes the endianness
+ akg::CommBuffer* endMessage() throw();
+
+ /// Returns the size of the reserved space for the embedding carrier header
+ int getCarrierHeaderSize() throw();
+
+ protected:
+
+ akg::CommBuffer *commBuffer;
+
+ private:
+
+ /// Helper function to add a parameter to the current fragment
+ void addParameter(RnpQuark parameterType, Rnp::DataType, const void *data, int length) throw();
+
+ /// The function which does the endianness change
+ bool changeToPartnerEndianness(Rnp::Endianness) throw();
+
+ bool allocated;
+ int carrierHeaderSize;
+ Rnp::Endianness finalEndianness;
+
+ RnpHeader *rnpHeader;
+ RnpFragmentHeader *currFragment;
+ RnpParameter *currParameter;
+ };
+
+/** Class for decoding a RNP message. The buffer is always an external one
+ Decoding the messsage means also changing the endianness to the host endianness
+*/
+class RnpProtocolDecoder
+ {
+ public:
+ /// Default constructor
+ RnpProtocolDecoder() throw();
+
+ /** Takes the buffer and decodes it, provided it is a RNP message
+ Returns 'false' if it is not a RNP message, or the message is corrupt
+ (for now, if the endianness is not the one of the host, no integrity
+ verification is done. In this case endianness is changes, but if the
+ message is corrupt...bang!!). Later this will have to throw something
+ */
+ bool decode(akg::CommBuffer*) throw();
+
+ /// Returns the code of the destination server
+ RnpQuark getDestinationServerType() const throw();
+
+ /// Returns the desired endianness
+ Rnp::Endianness getDesiredEndianness() const throw();
+
+ /// Returns the original endianness of the message
+ Rnp::Endianness getOriginalEndianness() const throw();
+
+ /// Returns the total message length
+ int getMessageLength() const throw();
+
+ /// Returns the version of the message
+ int getMessageVersion() const throw();
+
+ /// Returns the number of fragments contained in the message
+ RnpQuark countFragments() const throw();
+
+ /// Returns a pointer to the first fragment
+ const RnpFragmentHeader* getFirstFragment() const throw();
+
+ /// Returns a pointer to the next fragment
+ const RnpFragmentHeader* getNextFragment() const throw();
+
+ /// Returns the type of the current fragment
+ RnpQuark getFragmentType() const throw();
+
+ /// Returns the name of type of the current fragment
+ const char* getFragmentTypeName() const throw();
+
+ /// Returns the command of the current fragment
+ RnpQuark getCommand() const throw();
+
+ /// Returns the number of parameters of the current fragment
+ int countParameters() const throw();
+
+ /// Returns the length of the current fragment
+ RnpQuark getFragmentLength() const throw();
+
+ /// Returns a pointer to the first parameter of the current fragment
+ const RnpParameter* getFirstParameter() const throw();
+
+ /// Returns a pointer to the next parameter of the current fragment
+ const RnpParameter* getNextParameter() const throw();
+
+ /// Returns the logical type of the current parameter
+ RnpQuark getParameterType() const throw();
+
+ /// Returns the data type of the current parameter
+ RnpQuark getDataType() const throw();
+
+ /// Returns a pointer to the data of the current parameter, can't be NULL
+ const void* getData() const throw();
+
+ /// Returns a pointer to the data of the current parameter, as string-asciiz (assert!) (can be NULL)
+ const char* getDataAsString() const throw();
+
+ /// Returns a pointer to the data of the current parameter, as integer (assert!)
+ int getDataAsInteger() const throw();
+
+ /// Returns a pointer to the data of the current parameter, as float (assert!)
+ float getDataAsFloat() const throw();
+
+ /// Returns a pointer to the data of the current parameter, as double (assert!)
+ double getDataAsDouble() const throw();
+
+ /// Returns a pointer to the data of the current parameter, as const void* (assert!) (can be NULL)
+ const void* getDataAsOpaque() const throw();
+
+ /// Returns the length of the data of the current parameter
+ int getDataLength() const throw();
+
+ private:
+ akg::CommBuffer *commBuffer;
+ Rnp::Endianness originalEndianness;
+ mutable RnpHeader *rnpHeader;
+ mutable RnpFragmentHeader *currFragment;
+ mutable int currFragmentIdx;
+
+ mutable RnpParameter *currParameter;
+ mutable int currParameterIdx;
+
+ /// Helper function to print a RNP header
+ void printRnpHeader(RnpHeader*) const throw();
+
+ /// Tests the integrity of the message
+ bool testIntegrity() const throw();
+
+
+ /// Returns 'true' if the message is a RNP message
+ bool isRnpMessage() const throw();
+
+ /// Changes the endianness of the message to the message of the host
+ bool changeToHostEndianness() throw();
+ };
+
+
+} //namespace
+#endif
diff --git a/rnprotocol/rnpserver.cc b/rnprotocol/rnpserver.cc
new file mode 100644
index 0000000..cac982c
--- /dev/null
+++ b/rnprotocol/rnpserver.cc
@@ -0,0 +1,134 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/*************************************************************
+ *
+ *
+ * PURPOSE:
+ *
+ *
+ * COMMENTS:
+ * - called from rasserver_main.cc
+ * - startRnpServer() is twin to startRpcServer and startHttpServer()
+ *
+ ************************************************************/
+
+#include <iostream>
+#include <signal.h>
+#include "rnpserver.hh"
+#include "srvrasmgrcomm.hh"
+#include "server/rasserver_config.hh"
+#include "rnpservercomm.hh"
+
+#include "raslib/rminit.hh"
+#include "debug-srv.hh"
+
+// only for access control
+#include "servercomm/servercomm.hh"
+
+#include "server/rasserver_entry.hh"
+
+RnpRasDaManComm rnpServerComm;
+RasserverCommunicator communicator(&rnpServerComm);
+
+extern "C"
+{
+void rnpSignalHandler(int sig);
+}
+
+void startRnpServer()
+{
+ ENTER( "startRnpServer" );
+
+ signal (SIGTERM, rnpSignalHandler);
+
+ RMInit::logOut << "Initializing control connections..." << flush;
+ rasmgrComm.init(configuration.getTimeout(), configuration.getServerName(), configuration.getRasmgrHost(), configuration.getRasmgrPort());
+
+ accessControl.setServerName(configuration.getServerName());
+
+ RMInit::logOut << "informing rasmgr: server available..." << flush;
+ rasmgrComm.informRasmgrServerAvailable();
+ RMInit::logOut << "ok" << endl;
+
+ //##################
+
+ RMInit::logOut << "Initializing job control..." << flush;
+ communicator.initJobs(1);
+ communicator.setTimeout(RNP_TIMEOUT_LISTEN,0); // the select loop!
+
+ communicator.setListenPort(configuration.getListenPort());
+
+ rnpServerComm.setServerJobs(1);
+ rnpServerComm.connectToCommunicator(communicator);
+ rnpServerComm.setTransmitterBufferSize(configuration.getMaxTransferBufferSize());
+
+ RMInit::logOut<<"setting timeout to "<<configuration.getTimeout()<< " secs..." << flush;
+
+ rnpServerComm.setTimeoutInterval(configuration.getTimeout());
+ NbJob::setTimeoutInterval(configuration.getTimeout());
+
+ RMInit::logOut << "connecting to base DBMS..." << flush;
+ RasServerEntry &rasserver = RasServerEntry::getInstance();
+ rasserver.compat_connectToDBMS();
+
+ RMInit::logOut<<"ok, waiting for clients."<<endl<<endl;
+ communicator.runServer();
+
+ RMInit::logOut<<"RNP server shutdown in progress..."<<flush;
+ rnpServerComm.disconnectFromCommunicator();
+
+ //##################
+
+ RMInit::logOut<<"informing rasmgr..."<<flush;
+ rasmgrComm.informRasmgrServerDown();
+
+ RMInit::logOut<<"server stopped."<<endl;
+ LEAVE( "startRnpServer" );
+}
+
+void stopRnpServer()
+{
+ ENTER( "stopRnpServer" );
+
+ communicator.shouldExit();
+
+ LEAVE( "stopRnpServer" );
+}
+
+
+void rnpSignalHandler(int sig)
+{
+ static int in_progress=0; // sema for signal-in-signal
+
+ if (in_progress) // routine already active?
+ return; // ...then don't interfere
+
+ in_progress = 1; // block further signals
+
+ for(long j=0;j<1000000;j++) // make sure server notices shutdown
+ ; // NB: why this large number? doesn't seem to be thought over carefully -- PB 2003-nov-23
+
+ stopRnpServer(); // send shutdown request
+}
+
+
diff --git a/rnprotocol/rnpserver.hh b/rnprotocol/rnpserver.hh
new file mode 100644
index 0000000..9d7162f
--- /dev/null
+++ b/rnprotocol/rnpserver.hh
@@ -0,0 +1,33 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+#ifndef RNPSERVER_HH
+#define RNPSERVER_HH
+
+
+// This header contains only a function for starting the RNP server
+// It is done like this to keep the rest of the server as separate as possible from the rnp part
+
+void startRnpServer();
+
+
+#endif
diff --git a/rnprotocol/rnpservercomm.cc b/rnprotocol/rnpservercomm.cc
new file mode 100644
index 0000000..677cf4e
--- /dev/null
+++ b/rnprotocol/rnpservercomm.cc
@@ -0,0 +1,1344 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/*************************************************************
+ *
+ *
+ * PURPOSE:
+ *
+ *
+ * COMMENTS:
+ * - CLIENTID: | 0|.counter.|....timestamp ......|
+ * |31,30.....24|23...16|15...8|7...0|
+ * - requests count from 1..n between connect and disconnect
+ * - fragments count from 1..n between ???
+ *
+ ************************************************************/
+
+#ifndef RMANVERSION
+#error "Please specify RMANVERSION variable!"
+#endif
+
+#ifndef COMPDATE
+#error "Please specify the COMPDATE variable!"
+/*
+COMPDATE=`date +"%d.%m.%Y %H:%M:%S"`
+
+and -DCOMPDATE="\"$(COMPDATE)\"" when compiling
+*/
+#endif
+
+#include "mymalloc/mymalloc.h"
+#include <time.h>
+#include "rnprasserver.hh"
+#include "rnpservercomm.hh"
+#include "srvrasmgrcomm.hh"
+
+#include "server/rasserver_entry.hh"
+#include "time/akgtime.hh"
+
+#include "debug-srv.hh"
+
+
+
+// aux function to avoid a compiler warning (see 'man strftime')
+size_t my_strftime(char *s, size_t max, const char *fmt, const struct tm *tm)
+{
+ return strftime(s, max, fmt, tm);
+}
+
+// now(): aux function returning, as a static string, the current time
+// keep in sync with same function in rasmgr/rasmgr_localsrv.cc!
+const char* now()
+{
+ size_t strfResult = 0; // return value of strftime()
+ static char timestring[50]; // must hold 20+1 chars
+
+ time_t t = time(NULL); // get time
+ struct tm* tm = localtime(&t); // break down time
+ strfResult = my_strftime( timestring, sizeof(timestring), "[%F %T]", tm ); // format time
+ if (strfResult == 0) // bad luck? then take fallback message
+ (void) strncpy( timestring, "[-no time available-]", sizeof(timestring) );
+ return( timestring );
+}
+
+const int RnpRasDaManComm::NoClient = -1;
+
+RnpRasDaManComm::RnpRasDaManComm() throw()
+{
+ ENTER( "RnpRasDaManComm::RnpRasDaManComm" );
+ requestCounter = 0;
+ fragmentCounter = 0;
+ clientID = NoClient;
+ LEAVE( "RnpRasDaManComm::RnpRasDaManComm" );
+}
+
+RnpRasDaManComm::~RnpRasDaManComm() throw()
+{
+ TALK( "RnpRasDaManComm::~RnpRasDaManComm" );
+}
+
+// we need our implementation because of r_Error, but we will go for the default when r_Error is AkgException
+void RnpRasDaManComm::processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol protocol, RnpServerJob *callingJob) throw()
+{
+ ENTER( "RnpRasDaManComm::processRequest, at " << now() << ", client=" << callingJob->getClientHostAddress().getStringAddress() );
+ RMInit::logOut << now() << " request from " << callingJob->getClientHostAddress().getStringAddress() << endl;
+
+ BenchmarkTimer bmt("request time");
+ bmt.start();
+
+ decoder.decode(receiverBuffer);
+ RnpQuark destServerType = decoder.getDestinationServerType();
+ Rnp::Endianness desEndianness = decoder.getDesiredEndianness();
+
+ // test if servertype matches!
+
+ transmiterBuffer->allocate(getTransmitterBufferSize());
+ transmiterBuffer->clearToWrite();
+
+ encoder.setBuffer(transmiterBuffer);
+ encoder.setFinalEndianness(desEndianness);
+ encoder.startAnswer(destServerType, protocol);
+
+ requestCounter++;
+
+ decoder.getFirstFragment();
+ bool wasError = false;
+ for(int fragment=0; fragment < decoder.countFragments(); fragment++)
+ {
+ if(wasError == false)
+ {
+ try
+ {
+ decodeFragment();
+ }
+ // DBMS connection lost? then need to disconnect client to allow to resync
+ catch(r_Ebase_dbms &edb)
+ {
+ RMInit::logOut << "Error: base DBMS reports: " << edb.what() << endl;
+ wasError = true;
+ answerr_Error(edb);
+#if 0 // seems too hard -- PB 2005-jul-25
+ try
+ {
+ RMInit::logOut << "detaching client..." ;
+ executeDisconnect();
+ RMInit::logOut << "ok" << endl;
+ }
+ catch (...) // ignore any further error, just log it
+ {
+ RMInit::logOut << "failed" << endl;
+ }
+#endif // 0
+ }
+ catch(r_Error &ex)
+ {
+ RMInit::logOut << "Error: request terminated: " << ex.what() << endl;
+ TALK( "rasdaman exception kind=" << ex.get_kind() << ", errorno=" << ex.get_errorno() );
+ wasError = true;
+ answerr_Error(ex);
+
+#if 0 // seems too hard -- PB 2005-jul-25
+ // a base DBMS error we treat just like above
+ // -- PB 2003-nov-24
+ if (ex.get_kind() == r_Error::r_Error_BaseDBMSFailed
+ || ex.get_errorno() == 206) // serializable error, see errtxts
+ {
+ try
+ {
+ RMInit::logOut << "detaching client...";
+ executeDisconnect();
+ RMInit::logOut << "ok" << endl;
+ }
+ catch (...) // ignore any further error, just log it
+ {
+ RMInit::logOut << "failed" << endl;
+ }
+ }
+#endif // 0
+ }
+ catch(exception &ex)
+ {
+ RMInit::logOut << "Error: request terminated with general exception: " << ex.what() << endl;
+ wasError = true;
+ answerSTLException(ex);
+ }
+ catch(...)
+ {
+ RMInit::logOut << "Error: request terminated with generic exception." << endl;
+ wasError = true;
+ answerUnknownError();
+ }
+ }
+ else
+ {
+ discardFragment();
+ }
+ decoder.getNextFragment();
+ }
+ encoder.endMessage();
+
+ bmt.stop();
+
+ RMInit::logOut << now() << " request completed; " << bmt << endl << endl;
+ LEAVE( "RnpRasDaManComm::processRequest" );
+}
+
+RnpServerJob* RnpRasDaManComm::createJobs(int howMany)
+{
+ TALK( "RNP: creating "<<howMany<<" RnpRasserverJob's" );
+ return new RnpRasserverJob[howMany];
+}
+
+void RnpRasDaManComm::setTimeoutInterval(int seconds)
+{
+ clientTimer.setTimeoutInterval(seconds);
+}
+
+void RnpRasDaManComm::checkForTimeout()
+{
+ ENTER( "RnpRasDaManComm::checkForTimeout" );
+ if(clientID != NoClient)
+ {
+ if(clientTimer.checkForTimeout())
+ {
+ TALK( "Client 0x" << hex << clientID << dec << " has timed out." );
+ RMInit::logOut << "Client has timed out, connection being freed." << endl;
+ disconnectClient();
+ }
+ }
+ else
+ rasmgrComm.informRasmgrServerStillAvailable();
+ LEAVE( "RnpRasDaManComm::checkForTimeout" );
+}
+
+
+void RnpRasDaManComm::decodeFragment() throw( r_Error )
+{
+ ENTER( "RnpRasDaManComm::decodeFragment" );
+
+ try // somewhere during cmd execution there can be exceptions; we pass them thru
+ {
+ fragmentCounter++;
+
+ RnpQuark command = decoder.getCommand();
+
+ RnpRasserver *hook = new RnpRasserver;
+ TALK( "fragmentCounter=" << fragmentCounter << ", command is " << hook->getCommandName( command ) );
+
+ // first parameter has to be the clientID
+ verifyClientID( command );
+
+ switch(command)
+ {
+ case RnpRasserver::cmd_connect: executeConnect(); break;
+ case RnpRasserver::cmd_disconnect: executeDisconnect(); break;
+ case RnpRasserver::cmd_opendb: executeOpenDB(); break;
+ case RnpRasserver::cmd_closedb: executeCloseDB(); break;
+ case RnpRasserver::cmd_beginta: executeBeginTA(); break;
+ case RnpRasserver::cmd_committa: executeCommitTA(); break;
+ case RnpRasserver::cmd_abortta: executeAbortTA(); break;
+ case RnpRasserver::cmd_istaopen: executeIsTAOpen(); break;
+ case RnpRasserver::cmd_queryhttp: executeQueryHttp(); break;
+ case RnpRasserver::cmd_getnewoid: executeGetNewOId(); break;
+ case RnpRasserver::cmd_queryrpc: executeQueryRpc(); break;
+ case RnpRasserver::cmd_getnextelem: executeGetNextElement(); break;
+ case RnpRasserver::cmd_endtransfer: executeEndTransfer(); break;
+ case RnpRasserver::cmd_getnextmdd: executeGetNextMDD(); break;
+ case RnpRasserver::cmd_getnexttile: executeGetNextTile(); break;
+ case RnpRasserver::cmd_updaterpc : executeUpdateQuery(); break;
+ case RnpRasserver::cmd_startinsTmdd: executeStartInsertTransMDD(); break;
+ case RnpRasserver::cmd_inserttile : executeInsertTile(); break;
+ case RnpRasserver::cmd_endinsmdd : executeEndInsertMDD(); break;
+ case RnpRasserver::cmd_initupdate : executeInitUpdate(); break;
+ case RnpRasserver::cmd_gettypestruct: executeGetTypeStructure(); break;
+ case RnpRasserver::cmd_startinsPmdd: executeStartInsertPersMDD(); break;
+ case RnpRasserver::cmd_insertmdd: executeInsertMDD(); break;
+ case RnpRasserver::cmd_insertcoll: executeInsertCollection(); break;
+ case RnpRasserver::cmd_remobjfromcoll: executeRemoveObjFromColl(); break;
+ case RnpRasserver::cmd_delobjbyoid: executeDeleteObjByOId(); break;
+ case RnpRasserver::cmd_delcollbyname: executeDeleteCollByName(); break;
+ case RnpRasserver::cmd_getcoll: executeGetCollection(); break;
+ case RnpRasserver::cmd_getcolloids: executeGetCollectionOIds(); break;
+ case RnpRasserver::cmd_getobjecttype: executeGetObjectType(); break;
+ case RnpRasserver::cmd_setformat: executeSetFormat(); break;
+ // --- until here the compatible ones ---
+
+ // -- the secret, unofficial ones --
+ case RnpRasserver::cmd_createcollection: executeCreateCollection(); break;
+ case RnpRasserver::cmd_createmdd: executeCreateMDD(); break;
+ case RnpRasserver::cmd_extendmdd: executeExtendMDD(); break;
+ case RnpRasserver::cmd_gettiledomains: executeGetTileDomains(); break;
+
+ default:
+ RMInit::logOut << "Protocol error: Unknown command: " << command << endl;
+ LEAVE( "RnpRasDaManComm::decodeFragment: Unknown command: "<<command );
+ throw r_Error(822);
+ break;
+ }
+
+ clientTimer.markAction();
+ }
+ catch (r_Error &e) // any rasdaman error is passed through
+ {
+ LEAVE( "RnpRasDaManComm::decodeFragment, rasdaman exception caught: " << e.what() );
+ throw; // pass on
+ }
+
+ catch (...) // any other error is "unexpected", by def
+ {
+ LEAVE( "RnpRasDaManComm::decodeFragment, general exception caught." );
+ throw( r_Error( 10000 ) ); // unexpected internal error - FIXME: can we be more precise?
+ }
+
+ LEAVE( "RnpRasDaManComm::decodeFragment" );
+}
+
+//######## here the executing functions ################
+void RnpRasDaManComm::executeConnect()
+{
+ ENTER( "RnpRasDaManComm::executeConnect" );
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter();
+ const char *capability = decoder.getDataAsString();
+
+ TALK("capability: "<<capability);
+
+ // a new connect requires to drop any eventually preexisting connection first -- PB 2005-sep-02
+ // if (clientID != NoClient) // any previous un-disconnected activity?
+ if (fragmentCounter > 1 || requestCounter > 1) // any previous un-disconnected activity?
+ {
+ RMInit::logOut << "Preparing request for new connect by resetting old connection; ";
+ RnpRasDaManComm::disconnectInternally();
+ // FIXME: the entry in CltTable still remains (see compat_*())
+ // - although this doesn't harm in any way it should be removed
+ }
+
+ rasserver.compat_connectNewClient(capability);
+ connectClient();
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
+ encoder.endFragment();
+ TALK( "adding clientID 0x" << hex << clientID << dec );
+
+ LEAVE( "RnpRasDaManComm::executeConnect, assigned clientID=0x"<<hex<<clientID<<dec);
+}
+
+void RnpRasDaManComm::executeDisconnect()
+{
+ ENTER("RnpRasDaManComm::executeDisconnect, clientID=0x"<<hex<<clientID<<dec);
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ rasserver.compat_disconnectClient();
+ TALK( "rasserver.compat_disconnectClient() done, now disconnectClient()." );
+ disconnectClient();
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.endFragment();
+
+ LEAVE("RnpRasDaManComm::executeDisconnect");
+}
+
+void RnpRasDaManComm::executeOpenDB()
+{
+ ENTER("RnpRasDaManComm::executeOpenDB");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter();
+ const char* databaseName = decoder.getDataAsString();
+
+ TALK( "Execute open DB, database="<< databaseName );
+
+ rasserver.compat_openDB(databaseName);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.endFragment();
+
+ LEAVE("RnpRasDaManComm::executeOpenDB");
+}
+
+void RnpRasDaManComm::executeCloseDB()
+{
+ ENTER("RnpRasDaManComm::executeCloseDB");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ rasserver.compat_closeDB();
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.endFragment();
+
+ LEAVE("RnpRasDaManComm::executeCloseDB");
+}
+
+void RnpRasDaManComm::executeBeginTA()
+{
+ ENTER( "RnpRasDaManComm::executeBeginTA" );
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter();
+ bool rw = decoder.getDataAsInteger() ? true:false;
+
+ TALK( "executeBeginTA transaction: "<<(rw ? "rw":"ro") );
+
+ rasserver.compat_beginTA(rw);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.endFragment();
+
+ LEAVE( "RnpRasDaManComm::executeBeginTA" );
+}
+
+void RnpRasDaManComm::executeCommitTA()
+{
+ ENTER("executeCommitTA - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ rasserver.compat_commitTA();
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.endFragment();
+ LEAVE("executeCommitTA - out");
+}
+
+void RnpRasDaManComm::executeAbortTA()
+ {
+ ENTER("executeAbortTA - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ rasserver.compat_abortTA();
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.endFragment();
+ LEAVE("executeAbortTA - out");
+ }
+
+void RnpRasDaManComm::executeIsTAOpen()
+ {
+ ENTER("executeIsTAOpen - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ bool isOpen = rasserver.compat_isOpenTA();
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_transstatus, isOpen);
+ encoder.endFragment();
+ LEAVE("executeIsTAOpen - out; isOpen=" << isOpen);
+ }
+
+void RnpRasDaManComm::executeQueryHttp()
+{
+ ENTER("executeQueryHttp - in");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+ // TALK("have inst" );
+ decoder.getNextParameter();
+
+ const void* httpParams = decoder.getData();
+ int httpParamsLen = decoder.getDataLength();
+ // TALK( "httpParamsLen=" << httpParamsLen );
+ char *resultBuffer = 0;
+ int resultLen = rasserver.compat_executeQueryHttp((const char*)httpParams, httpParamsLen, resultBuffer);
+ // TALK( "resultLen=" << resultLen );
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+
+ if(resultLen && resultBuffer)
+ {
+ encoder.adjustBufferSize(resultLen);
+ encoder.addOpaqueParameter(RnpRasserver::pmt_httpqanswer, resultBuffer, resultLen);
+ delete[] resultBuffer;
+ resultBuffer = 0;
+ }
+ encoder.endFragment();
+
+ LEAVE("executeQueryHttp - out; resultLen=" << resultLen );
+}
+
+void RnpRasDaManComm::executeGetNewOId()
+ {
+ ENTER("executeGetNewOId - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter();
+
+ int objType = decoder.getDataAsInteger();
+
+ r_OId oid = rasserver.compat_getNewOId( (unsigned short)objType );
+ const char* cOId = oid.get_string_representation();
+
+ TALK("executeGetNewOId objType = "<<objType<<" oid="<<cOId);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, cOId);
+ encoder.endFragment();
+
+ LEAVE("executeGetNewOId - out");
+ }
+
+#define INITPTR(a) a = 0
+#define SECUREPTR(a) if(a == 0) a = strdup("")
+#define FREEPTR(a) free(a)
+
+void RnpRasDaManComm::executeQueryRpc()
+ {
+ ENTER("executeQueryRpc() - in");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter();
+
+ const char *query = decoder.getDataAsString();
+ TALK("query="<<query);
+
+ ExecuteQueryRes queryResult;
+ INITPTR(queryResult.token);
+ INITPTR(queryResult.typeName);
+ INITPTR(queryResult.typeStructure);
+
+ int status = rasserver.compat_executeQueryRpc(query, queryResult);
+ SECUREPTR(queryResult.token);
+ SECUREPTR(queryResult.typeName);
+ SECUREPTR(queryResult.typeStructure);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter( RnpRasserver::pmt_returnstatus,status);
+ TALK( "adding return status " << status );
+ encoder.addInt32Parameter( RnpRasserver::pmt_errorno, queryResult.errorNo);
+ encoder.addInt32Parameter( RnpRasserver::pmt_lineno, queryResult.lineNo);
+ encoder.addInt32Parameter( RnpRasserver::pmt_columnno, queryResult.columnNo);
+ encoder.addStringParameter(RnpRasserver::pmt_errortoken,queryResult.token);
+ encoder.addStringParameter(RnpRasserver::pmt_typename, queryResult.typeName);
+ encoder.addStringParameter(RnpRasserver::pmt_typestructure,queryResult.typeStructure);
+ encoder.endFragment();
+
+ FREEPTR(queryResult.token);
+ FREEPTR(queryResult.typeName);
+ FREEPTR(queryResult.typeStructure);
+
+ LEAVE("executeQueryRpc - out");
+ }
+
+void RnpRasDaManComm::executeGetNextElement()
+ {
+ ENTER("executeGetNextElement - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ char *buffer = NULL;
+ unsigned int bufferSize;
+
+ int status = rasserver.compat_getNextElement(buffer,bufferSize);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus,status);
+ TALK( "adding return status " << status );
+ if(buffer != NULL)
+ encoder.addOpaqueParameter(RnpRasserver::pmt_skalarobject, buffer, bufferSize);
+
+ encoder.endFragment();
+
+ free(buffer);
+
+ LEAVE("executeGetNextElement - out");
+ }
+
+void RnpRasDaManComm::executeEndTransfer()
+ {
+ ENTER("executeEndTransfer - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ int status = rasserver.compat_endTransfer();
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus,status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ LEAVE("executeEndTransfer - out");
+ }
+
+
+void RnpRasDaManComm::executeGetNextMDD()
+ {
+ ENTER("executeGetNextMDD - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+
+ r_Minterval mddDomain;
+ char* typeName;
+ char* typeStructure;
+ r_OId oid;
+ unsigned short currentFormat;
+
+
+ int status = rasserver.compat_getNextMDD(mddDomain, typeName, typeStructure, oid, currentFormat);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus,status);
+ TALK( "adding return status " << status );
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, mddDomain.get_string_representation());
+ encoder.addStringParameter(RnpRasserver::pmt_typename, typeName);
+ encoder.addStringParameter(RnpRasserver::pmt_typestructure, typeStructure);
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation() ? oid.get_string_representation() : "");
+ encoder.addInt32Parameter(RnpRasserver::pmt_currentformat, currentFormat);
+ encoder.endFragment();
+
+ free(typeName);
+ free(typeStructure);
+
+ LEAVE("executeGetNextMDD - out");
+ }
+
+void RnpRasDaManComm::executeGetNextTile()
+ {
+ ENTER("executeGetNextTile - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ RPCMarray *tempRpcMarray;
+
+ int status = rasserver.compat_getNextTile(&tempRpcMarray);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus,status);
+ TALK( "adding return status " << status );
+
+ if(tempRpcMarray != 0)
+ {
+ encoder.addStringParameter(RnpRasserver::pmt_domain, tempRpcMarray->domain);
+ encoder.addInt32Parameter( RnpRasserver::pmt_typelength, tempRpcMarray->cellTypeLength);
+ encoder.addInt32Parameter( RnpRasserver::pmt_currentformat, tempRpcMarray->currentFormat);
+ encoder.addInt32Parameter( RnpRasserver::pmt_storageformat, tempRpcMarray->storageFormat);
+
+ encoder.adjustBufferSize(tempRpcMarray->data.confarray_len);
+ encoder.addOpaqueParameter(RnpRasserver::pmt_tiledata, tempRpcMarray->data.confarray_val, tempRpcMarray->data.confarray_len);
+
+ // Do not free this! "tempRpcMarray->data.confarray_val";
+ free(tempRpcMarray->domain);
+ free(tempRpcMarray);
+
+ }
+ encoder.endFragment();
+
+ /* Notez aici ca n-am unde: e adevarat ca tilele trebuie transferate si pe bucati, fiindca
+ un tiff mare creat cu select e o tila!
+ */
+
+
+ LEAVE("executeGetNextTile - out");
+ }
+
+//----------
+
+void RnpRasDaManComm::executeUpdateQuery()
+ {
+ ENTER("executeUpdateQuery - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); const char* query = decoder.getDataAsString();
+
+ ExecuteUpdateRes returnStructure;
+ returnStructure.token = NULL;
+
+ int status = rasserver.compat_ExecuteUpdateQuery(query, returnStructure);
+
+ const char *token = returnStructure.token != NULL ? returnStructure.token : "";
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter( RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.addInt32Parameter( RnpRasserver::pmt_errorno, returnStructure.errorNo);
+ encoder.addInt32Parameter( RnpRasserver::pmt_lineno, returnStructure.lineNo);
+ encoder.addInt32Parameter( RnpRasserver::pmt_columnno, returnStructure.columnNo);
+ encoder.addStringParameter(RnpRasserver::pmt_errortoken, token );
+ encoder.endFragment();
+
+ if(returnStructure.token) free(returnStructure.token);
+
+ LEAVE("executeUpdateQuery - out");
+ }
+
+void RnpRasDaManComm::executeInitUpdate()
+ {
+ ENTER("executeInitUpdate - in");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ int status = rasserver.compat_InitUpdate();
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+ LEAVE("executeInitUpdate - out");
+ }
+
+void RnpRasDaManComm::executeStartInsertTransMDD()
+ {
+ ENTER("executeStartInsertTransMDD - in");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); const char* domain = decoder.getDataAsString();
+ decoder.getNextParameter(); int typeLength = decoder.getDataAsInteger();
+ decoder.getNextParameter(); const char* typeName = decoder.getDataAsString();
+
+ int status = rasserver.compat_StartInsertTransMDD(domain, typeLength, typeName);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ LEAVE("executeStartInsertTransMDD - out");
+ }
+
+void RnpRasDaManComm::executeInsertTile()
+ {
+ ENTER("executeInsertTile - in");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ RPCMarray *rpcMarray = new RPCMarray;
+
+ decoder.getNextParameter(); int persistent = decoder.getDataAsInteger();
+ decoder.getNextParameter(); rpcMarray->domain = (char*)decoder.getDataAsString();
+ decoder.getNextParameter(); rpcMarray->cellTypeLength = decoder.getDataAsInteger();
+ decoder.getNextParameter(); rpcMarray->currentFormat = decoder.getDataAsInteger();
+ decoder.getNextParameter(); rpcMarray->storageFormat = decoder.getDataAsInteger();
+
+ decoder.getNextParameter();
+ const void* buffer = decoder.getData();
+ int length = decoder.getDataLength();
+
+ rpcMarray->data.confarray_val = (char*)mymalloc(length); memcpy(rpcMarray->data.confarray_val, buffer, length);
+ rpcMarray->data.confarray_len = length;
+
+ int status = rasserver.compat_InsertTile(persistent, rpcMarray);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ // rpcMarray->data.confarray_val is freed by Tile::Tile(...), which is stupid, but...
+ delete rpcMarray;
+
+ LEAVE("executeInsertTile - out");
+ }
+
+void RnpRasDaManComm::executeEndInsertMDD()
+ {
+ ENTER("executeEndInsertMDD - in");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); int persistent = decoder.getDataAsInteger();
+
+ int status = rasserver.compat_EndInsertMDD(persistent);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ LEAVE("executeEndInsertMDD - out");
+ }
+
+void RnpRasDaManComm::executeGetTypeStructure()
+ {
+ ENTER("executeGetTypeStructure - in");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); const char *typeName = decoder.getDataAsString();
+ decoder.getNextParameter(); int typeType = decoder.getDataAsInteger();
+
+ char *typeStructure=NULL;
+
+ int status = rasserver.compat_GetTypeStructure(typeName, typeType, typeStructure);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.addStringParameter(RnpRasserver::pmt_typestructure, typeStructure ? typeStructure : "");
+ encoder.endFragment();
+
+ if(typeStructure)
+ {
+ free(typeStructure);
+ }
+ LEAVE("executeGetTypeStructure - out");
+ }
+
+void RnpRasDaManComm::executeStartInsertPersMDD()
+ {
+ ENTER("executeStartInsertPersMDD - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); const char* collName = decoder.getDataAsString();
+ decoder.getNextParameter(); r_Minterval mddDomain( decoder.getDataAsString() );
+ decoder.getNextParameter(); int typeLength = decoder.getDataAsInteger();
+ decoder.getNextParameter(); const char* typeName = decoder.getDataAsString();
+ decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() );
+
+ int status = rasserver.compat_StartInsertPersMDD(collName, mddDomain, typeLength, typeName, oid);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+ LEAVE("executeStartInsertPersMDD - out");
+ }
+
+void RnpRasDaManComm::executeInsertMDD()
+ {
+ ENTER("executeInsertMDD - in");
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); const char* collName = decoder.getDataAsString();
+ decoder.getNextParameter(); const char* typeName = decoder.getDataAsString();
+ decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() );
+
+ RPCMarray *rpcMarray = new RPCMarray;
+
+ decoder.getNextParameter(); rpcMarray->domain = (char*)decoder.getDataAsString();
+ decoder.getNextParameter(); rpcMarray->cellTypeLength = decoder.getDataAsInteger();
+ decoder.getNextParameter(); rpcMarray->currentFormat = decoder.getDataAsInteger();
+ decoder.getNextParameter(); rpcMarray->storageFormat = decoder.getDataAsInteger();
+
+ decoder.getNextParameter();
+ const void* buffer = decoder.getData();
+ int length = decoder.getDataLength();
+
+ rpcMarray->data.confarray_val = (char*)mymalloc(length); memcpy(rpcMarray->data.confarray_val, buffer, length);
+ rpcMarray->data.confarray_len = length;
+
+ int status = rasserver.compat_InsertMDD(collName, rpcMarray, typeName, oid);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ LEAVE("executeInsertMDD - out");
+ }
+
+void RnpRasDaManComm::executeInsertCollection()
+ {
+ ENTER("executeInsertCollection - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); const char* collName = decoder.getDataAsString();
+ decoder.getNextParameter(); const char* typeName = decoder.getDataAsString();
+ decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() );
+
+ int status = rasserver.compat_InsertCollection(collName, typeName, oid);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ LEAVE("executeInsertCollection - out");
+ }
+
+void RnpRasDaManComm::executeDeleteCollByName()
+ {
+ ENTER("executeDeleteCollByName - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); const char* collName = decoder.getDataAsString();
+
+ int status = rasserver.compat_DeleteCollByName(collName);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ LEAVE("executeDeleteCollByName - out");
+ }
+
+void RnpRasDaManComm::executeDeleteObjByOId()
+ {
+ ENTER("executeDeleteObjByOId - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() );
+
+ int status = rasserver.compat_DeleteObjByOId(oid);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ LEAVE("executeDeleteObjByOId - out");
+ }
+
+void RnpRasDaManComm::executeRemoveObjFromColl()
+ {
+ ENTER("executeRemoveObjFromColl - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); const char* collName = decoder.getDataAsString();
+ decoder.getNextParameter(); r_OId oid( decoder.getDataAsString() );
+
+ int status = rasserver.compat_RemoveObjFromColl(collName, oid);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ LEAVE("executeRemoveObjFromColl - out");
+ }
+
+void RnpRasDaManComm::executeGetCollection()
+ {
+ ENTER("executeGetCollection - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ char* typeName = NULL;
+ char* typeStructure = NULL;
+ char* collName = NULL;
+ r_OId oid;
+ int status = 0;
+
+ decoder.getNextParameter();
+ if(decoder.getParameterType() == RnpRasserver::pmt_collname)
+ {
+ collName = strdup(decoder.getDataAsString());
+ status = rasserver.compat_GetCollectionByName(collName, typeName, typeStructure, oid);
+ }
+ else
+ {
+ const char* oidstring = decoder.getDataAsString();
+ oid = r_OId(oidstring);
+ status = rasserver.compat_GetCollectionByName(oid, typeName, typeStructure, collName);
+ }
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.addStringParameter(RnpRasserver::pmt_typename, typeName);
+ encoder.addStringParameter(RnpRasserver::pmt_typestructure, typeStructure);
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+ encoder.endFragment();
+
+ free((void*)typeName);
+ free((void*)typeStructure);
+ free((void*)collName);
+
+ LEAVE("executeGetCollection - out");
+ }
+
+void RnpRasDaManComm::executeGetCollectionOIds()
+ {
+ ENTER("executeGetCollectionOIds - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ char* typeName = NULL;
+ char* typeStructure = NULL;
+ char* collName = NULL;
+ r_OId oid;
+ RPCOIdEntry* oidTable = NULL;
+ unsigned int oidTableSize = 0;
+ int status = 0;
+
+ decoder.getNextParameter();
+ if(decoder.getParameterType() == RnpRasserver::pmt_collname)
+ {
+ collName = strdup(decoder.getDataAsString());
+ status = rasserver.compat_GetCollectionOidsByName(collName, typeName, typeStructure, oid, oidTable, oidTableSize);
+ }
+ else
+ { oid = r_OId(decoder.getDataAsString());
+ status = rasserver.compat_GetCollectionOidsByOId(oid, typeName, typeStructure, oidTable, oidTableSize, collName);
+ }
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.addStringParameter(RnpRasserver::pmt_typename, typeName);
+ encoder.addStringParameter(RnpRasserver::pmt_typestructure, typeStructure);
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
+ encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
+
+ if(oidTable)
+ for(int i=0;i<oidTableSize; i++)
+ {
+ encoder.adjustBufferSize(strlen(oidTable[i].oid));
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, oidTable[i].oid);
+ free(oidTable[i].oid);
+ }
+ encoder.endFragment();
+
+ free((void*)typeName);
+ free((void*)typeStructure);
+ free((void*)collName);
+ free(oidTable);
+
+ LEAVE("executeGetCollectionOIds - out");
+ }
+
+void RnpRasDaManComm::executeGetObjectType()
+ {
+ ENTER("executeGetObjectType - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); const char* oidstring = decoder.getDataAsString();
+
+ r_OId oid(oidstring);
+ unsigned short objType;
+
+ int status=rasserver.compat_GetObjectType(oid, objType);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.addInt32Parameter(RnpRasserver::pmt_objecttype, objType);
+ encoder.endFragment();
+
+
+ LEAVE("executeGetObjectType - out");
+ }
+
+void RnpRasDaManComm::executeSetFormat()
+ {
+ ENTER("executeSetFormat - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ decoder.getNextParameter(); int whichFormat = decoder.getDataAsInteger();
+ decoder.getNextParameter(); int format = decoder.getDataAsInteger();
+ decoder.getNextParameter(); const char* params = decoder.getDataAsString();
+
+ int status = 0;
+
+ if(whichFormat == 1)
+ status = rasserver.compat_SetTransferFormat(format, params);
+ else
+ status = rasserver.compat_SetStorageFormat(format, params);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addInt32Parameter(RnpRasserver::pmt_returnstatus, status);
+ TALK( "adding return status " << status );
+ encoder.endFragment();
+
+ LEAVE("executeSetFormat - out");
+ }
+
+//########### until here the compatible ones ###############
+
+void RnpRasDaManComm::executeCreateCollection()
+ {
+ ENTER("executeCreateCollection - in");
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ const char* collName = getNextAsString(RnpRasserver::pmt_collname);
+ const char* collTypeName = getNextAsString(RnpRasserver::pmt_typename);
+
+ TALK("rasserver.createCollection( " << collName << ", " << collTypeName << " )" );
+ r_OId roid = rasserver.createCollection(collName, collTypeName);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, roid.get_string_representation());
+ encoder.endFragment();
+
+ LEAVE("executeCreateCollection - out");
+ }
+
+void RnpRasDaManComm::executeCreateMDD()
+{
+ ENTER( "RnpRasDaManComm::executeCreateMDD" );
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ const char *collName = getNextAsString(RnpRasserver::pmt_collname);
+ const char *mddTypeName = getNextAsString(RnpRasserver::pmt_typename);
+ const char *definitionDomain = getNextAsString(RnpRasserver::pmt_domain);
+ bool rcindex = false;
+ const char *tileDomain = 0;
+
+ if(decoder.getNextParameter())
+ {
+ rcindex = decoder.getDataAsInteger() ? true : false;
+ tileDomain = getNextAsString(RnpRasserver::pmt_domain);
+ }
+ TALK( "collName=" << collName << ", mddTypeName=" << mddTypeName << ", definitionDomain=" << definitionDomain << ", tileDomain=" << tileDomain << ", rcindex=" << rcindex );
+ r_OId roid = rasserver.createMDD(collName, mddTypeName, definitionDomain, tileDomain, rcindex);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.addStringParameter(RnpRasserver::pmt_oidstring, roid.get_string_representation());
+ encoder.endFragment();
+
+ LEAVE( "RnpRasDaManComm::executeCreateMDD, oid=" << roid.get_string_representation() );
+}
+
+void RnpRasDaManComm::executeExtendMDD()
+{
+ ENTER( "RnpRasDaManComm::executeExtendMDD" );
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ const char *oidstring = getNextAsString(RnpRasserver::pmt_oidstring);
+ const char *stripeDomain = getNextAsString(RnpRasserver::pmt_domain);
+ const char *tileDomain = getNextAsString(RnpRasserver::pmt_domain);
+
+ r_OId mddOId = r_OId(oidstring);
+
+ TALK( "mddOId=" << oidstring << ", stripeDomain=" << stripeDomain << ", tileDomain=" << tileDomain );
+ rasserver.extendMDD(mddOId, stripeDomain, tileDomain);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+ encoder.endFragment();
+
+ LEAVE( "RnpRasDaManComm::executeExtendMDD" );
+}
+
+void RnpRasDaManComm::executeGetTileDomains()
+{
+ ENTER( "RnpRasDaManComm::executeGetTileDomains" );
+
+ RasServerEntry& rasserver = RasServerEntry::getInstance();
+
+ const char *oidstring = getNextAsString(RnpRasserver::pmt_oidstring);
+ const char *stripeDomain = getNextAsString(RnpRasserver::pmt_domain);
+
+ r_OId mddOId = r_OId(oidstring);
+
+ vector<r_Minterval> result = rasserver.getTileDomains(mddOId, stripeDomain);
+
+ encoder.startFragment(Rnp::fgt_OkAnswer, decoder.getCommand());
+
+ for(int i=0;i < result.size(); i++)
+ {
+ const char *domain = result[i].get_string_representation();
+ encoder.addStringParameter(RnpRasserver::pmt_domain, domain);
+
+ free((void*)domain);
+ }
+
+ encoder.endFragment();
+
+ LEAVE( "RnpRasDaManComm::executeGetTileDomains" );
+}
+
+//######### helper functions ###########################
+
+void RnpRasDaManComm::connectClient()
+ {
+ clientID = makeNewClientID();
+ TALK( "RnpRasDaManComm::connectClient(): assigned new client id 0x" << hex << clientID << dec );
+ }
+
+void RnpRasDaManComm::disconnectInternally()
+{
+ clientID = NoClient;
+ requestCounter = 1; // because pre-increment before request processing will not be reached when this is called
+ fragmentCounter = 1; // same phenomenon, different reason: verify needs this counter for OK'ing connect
+}
+
+void RnpRasDaManComm::disconnectClient()
+ {
+ clientID = NoClient;
+ requestCounter = 0;
+ fragmentCounter = 0;
+ rasmgrComm.informRasmgrServerAvailable();
+ }
+
+
+void RnpRasDaManComm::verifyClientID( RnpQuark command ) throw (r_Error)
+ {
+ ENTER( "RnpRasDaManComm::verifyClientID( command=" << command << " ), fragmentCounter=" << fragmentCounter << ", requestCounter=" << requestCounter );
+
+ decoder.getFirstParameter();
+
+ if(decoder.getParameterType() != RnpRasserver::pmt_clientid)
+ {
+ RMInit::logOut << "Error: unidentified client." << endl;
+ LEAVE( "RnpRasDaManComm::verifyClientID() - exception, unknown client id." );
+ throw r_Error(820); // sorry, I know, symbolic constants
+ }
+
+ int verClientID = decoder.getDataAsInteger();
+ TALK( "RnpRasDaManComm::verifyClientID: clientID 0x" << hex << clientID << dec << ", verClientID 0x" << hex << verClientID << dec );
+
+ // it's our client, it's OK
+ if(clientID == verClientID)
+ {
+ LEAVE( "RnpRasDaManComm::verifyClientID() - it's our client, it's OK" );
+ return;
+ }
+
+ // connect cmd is OK too
+ if(command == RnpRasserver::cmd_connect)
+ {
+ LEAVE( "RnpRasDaManComm::verifyClientID() - connect requested, OK" );
+ return;
+ }
+
+ // new client, first request, it's probably connect, so OK
+ if(clientID == NoClient && fragmentCounter == 1)
+ {
+ LEAVE( "RnpRasDaManComm::verifyClientID() - new client, first request, it's probably connect, so OK" );
+ return;
+ }
+
+ // new client, same message, a new request, it's also OK (he is allowed to put more fragments in a request!)
+ if(clientID != NoClient && fragmentCounter > 1 && requestCounter == 1 && verClientID == 0)
+ {
+ LEAVE( "RnpRasDaManComm::verifyClientID() - new client, same message, a new request (multi-fragment), so OK" );
+ return;
+ }
+
+ RMInit::logOut << "Error: unregistered client." << endl;
+ LEAVE("RnpRasDaManComm::verifyClientID(): stored clientID is 0x" << hex << clientID << dec << ", but client identified as 0x" << hex << verClientID << dec << ", fragmentCounter=" << fragmentCounter << ", requestCounter=" << requestCounter);
+ throw r_Error(821); // invalid sequence number
+ }
+
+int RnpRasDaManComm::makeNewClientID()
+ {
+
+ // CLIENTID: | 0|.counter.|....timestamp ......|
+ // |31,30.....24|23...16|15...8|7...0|
+ static int counter = 0;
+
+ int timeNow = time(NULL);
+
+ int result = (timeNow & 0xFFFFFF) + (counter << 24);
+
+ counter = (counter+1) & 0x7F;
+
+ TALK( "RnpRasDaManComm::makeNewClientID() -> 0x" << hex << result << " (counter now: " << counter << ")" );
+ return result;
+ }
+
+void RnpRasDaManComm::answerr_Error(r_Error &err)
+{
+ const char *errText = err.serialiseError();
+
+ RMInit::logOut << "Error in response: (" << errText << ") " << err.what() << endl;
+
+ encoder.startFragment(Rnp::fgt_Error, decoder.getCommand());
+ encoder.addInt32Parameter(Rnp::ert_Other, 0);
+ encoder.addStringParameter(RnpRasserver::pmt_rErrorString, errText);
+
+ // add descriptive text -- PB 2003-nov-24
+ encoder.addStringParameter(RnpRasserver::pmt_rErrorString, err.what() );
+
+ encoder.endFragment();
+
+ delete[] errText;
+}
+
+//######################################################
+RnpRasserverJob::RnpRasserverJob() throw()
+ {
+ TALK( "RNP: RnpRasserverJob created" );
+ }
+
+bool RnpRasserverJob::validateMessage() throw()
+ {
+ TALK( "RNP: validateMessage()" );
+ return RnpServerJob::validateMessage();
+ }
+
+void RnpRasserverJob::executeOnAccept() throw()
+ {
+ TALK( "RNP: executeOnAccept()" );
+ RnpServerJob::executeOnAccept();
+ }
+
+void RnpRasserverJob::executeOnWriteReady() throw()
+ {
+ TALK( "RNP: executeOnWriteReady()" );
+ RnpServerJob::executeOnWriteReady();
+ }
+
+void RnpRasserverJob::specificCleanUpOnTimeout() throw()
+ {
+ TALK( "RNP: specificCleanUpOnTimeout()" );
+ RnpServerJob::specificCleanUpOnTimeout();
+ }
+
+void RnpRasserverJob::executeOnReadError() throw()
+ {
+ RMInit::logOut << "Error while executing read operation." << endl;
+ RnpServerJob::executeOnReadError();
+ }
+
+void RnpRasserverJob::executeOnWriteError() throw()
+ {
+ RMInit::logOut << "Error while executing write operation." << endl;
+ RnpServerJob::executeOnWriteError();
+ }
+
+//#########################################################
+RasserverCommunicator::RasserverCommunicator(RnpRasDaManComm* cmm) throw()
+ {
+ commPtr = cmm;
+ }
+
+bool RasserverCommunicator::executeOnTimeout() throw()
+ {
+ TALK( "RasserverCommunicator::executeOnTimeout()" );
+
+ commPtr->checkForTimeout();
+
+ return true;
+ }
+
+//#########################################################
+ClientTimer::ClientTimer()
+ {
+ interval = 0;
+ lastAction = 0;
+ enabled = false;
+ }
+
+void ClientTimer::setTimeoutInterval(int seconds)
+ {
+ interval = seconds;
+ enabled = true;
+ markAction();
+ }
+
+void ClientTimer::markAction()
+ {
+ lastAction = time(NULL);
+ }
+
+bool ClientTimer::checkForTimeout()
+ {
+ if(enabled == false) return false;
+
+ time_t now = time(NULL);
+
+ return now >= lastAction + interval;
+ }
+
diff --git a/rnprotocol/rnpservercomm.hh b/rnprotocol/rnpservercomm.hh
new file mode 100644
index 0000000..9a173f4
--- /dev/null
+++ b/rnprotocol/rnpservercomm.hh
@@ -0,0 +1,157 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/*************************************************************
+ *
+ *
+ * PURPOSE:
+ *
+ *
+ * COMMENTS:
+ *
+ ************************************************************/
+
+#ifndef RNPSERVERCOMM_HH
+#define RNPSERVERCOMM_HH
+
+//#include <rnprotocol.hh>
+//#include <rnpembedded.hh>
+#include "rnpcommunication.hh"
+#include "raslib/error.hh"
+
+using namespace rnp;
+
+class ClientTimer
+ {
+ public:
+ ClientTimer();
+ void setTimeoutInterval(int seconds);
+ void markAction();
+ bool checkForTimeout();
+ private:
+ time_t interval;
+ time_t lastAction;
+ bool enabled;
+ };
+
+class RnpRasserverJob : public RnpServerJob
+ {
+ public:
+ RnpRasserverJob() throw();
+
+ private:
+ bool validateMessage() throw();
+ void executeOnAccept() throw();
+ void executeOnWriteReady() throw();
+ void specificCleanUpOnTimeout() throw();
+ void executeOnReadError() throw();
+ void executeOnWriteError() throw();
+ };
+
+class RnpRasDaManComm : public RnpBaseServerComm
+ {
+ public:
+ RnpRasDaManComm() throw();
+
+ ~RnpRasDaManComm() throw();
+
+ void processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol, RnpServerJob *callingJob) throw();
+
+ void setTimeoutInterval(int seconds);
+ void checkForTimeout();
+
+ private: // inherited from RnpBaseServerComm
+ RnpServerJob* createJobs(int howMany);
+
+ void decodeFragment() throw( r_Error );
+
+ ClientTimer clientTimer;
+ private: // the execution functions:
+ void executeConnect();
+ void executeDisconnect();
+ void executeOpenDB();
+ void executeCloseDB();
+ void executeBeginTA();
+ void executeCommitTA();
+ void executeAbortTA();
+ void executeIsTAOpen();
+ void executeQueryHttp();
+ void executeGetNewOId();
+ void executeQueryRpc();
+ void executeGetNextElement();
+ void executeEndTransfer();
+ void executeGetNextMDD();
+ void executeGetNextTile();
+
+ void executeUpdateQuery();
+ void executeStartInsertTransMDD();
+ void executeInsertTile();
+ void executeEndInsertMDD();
+ void executeInitUpdate();
+ void executeGetTypeStructure();
+ void executeStartInsertPersMDD();
+ void executeInsertMDD();
+ void executeInsertCollection();
+ void executeRemoveObjFromColl();
+ void executeDeleteObjByOId();
+ void executeDeleteCollByName();
+ void executeGetCollection();
+ void executeGetCollectionOIds();
+ void executeGetObjectType();
+ void executeSetFormat();
+
+
+ void executeCreateCollection();
+ void executeCreateMDD();
+ void executeExtendMDD();
+ void executeGetTileDomains();
+
+ void answerr_Error(r_Error&);
+ private: // helper functions
+ void connectClient();
+ // reset connection, without reporting availability to rasmgr
+ void disconnectInternally();
+ // reset connection, with reporting availability to rasmgr
+ void disconnectClient();
+ void verifyClientID( RnpQuark command ) throw (r_Error);
+ int makeNewClientID();
+
+ int clientID; // un timestamp, de fapt!
+ int requestCounter; // numara pachetele trimise de un client
+ int fragmentCounter; // numara fragmentele trimise de un client
+
+ static const int NoClient;
+ };
+
+class RasserverCommunicator : public NbCommunicator
+ {
+ public:
+ RasserverCommunicator(RnpRasDaManComm*) throw();
+
+ protected:
+ bool executeOnTimeout() throw();
+
+ RnpRasDaManComm *commPtr;
+ };
+
+#endif // RNPSERVERCOMM_HH
+
diff --git a/rnprotocol/srvrasmgrcomm.cc b/rnprotocol/srvrasmgrcomm.cc
new file mode 100644
index 0000000..d977045
--- /dev/null
+++ b/rnprotocol/srvrasmgrcomm.cc
@@ -0,0 +1,213 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/*************************************************************
+ *
+ *
+ * PURPOSE:
+ *
+ *
+ * COMMENTS:
+ * - why sometimes exit() and sometimes return on error?
+ * should be return or exception, always.
+ *
+ ************************************************************/
+
+#include "srvrasmgrcomm.hh"
+#include<stdio.h>
+#include<errno.h>
+#include<stdlib.h>
+#include<unistd.h>
+#include<sys/types.h>
+#include<sys/socket.h>
+#include<netinet/in.h>
+#include<netdb.h>
+#include<iostream>
+#include<string.h>
+#include "raslib/rminit.hh"
+
+
+#include "debug.hh"
+
+
+// exit code of the server when communication is impossible
+// PB: Why 10 ? Anyway, should not use exit in a server, how often did I tell you, Walter Schatz!!!
+const unsigned int EXIT_CODE = 10;
+
+// max number of retries to connect to rasmgr in informRasMgr()
+const unsigned int SRV_MAX_RETRY = 10000000;
+
+// how many retries are attempted before a message is issued informRasMgr()
+const unsigned int SRV_TALK_INTERVAL = 100000;
+
+
+SrvRasmgrComm rasmgrComm;
+
+SrvRasmgrComm::SrvRasmgrComm()
+{
+ timeout = 0;
+ serverName = 0;
+ rasmgrHost = 0;
+ rasmgrPort = -1;
+}
+
+// note: should make use of timeout as defined in cmd line,
+// but that's a major undertaking -- PB 2005-sep-02
+
+void SrvRasmgrComm::init(unsigned int timeOut, const char* instanceName, const char* nRasmgrHost, int nRasmgrPort)
+{
+ timeout = timeOut;
+ serverName = instanceName;
+ rasmgrHost = nRasmgrHost;
+ rasmgrPort = nRasmgrPort;
+}
+
+unsigned int SrvRasmgrComm::getTimeout()
+{
+ return timeout;
+}
+
+void SrvRasmgrComm::informRasmgrServerAvailable()
+{
+ informRasMGR(SERVER_AVAILABLE);
+}
+
+void SrvRasmgrComm::informRasmgrServerDown()
+{
+ informRasMGR(SERVER_DOWN);
+}
+
+void SrvRasmgrComm::informRasmgrServerStillAvailable()
+{
+ // too verbose, blows up log file
+ // RMInit::logOut << "informing rasmgr: server still available." << endl;
+ informRasMGR(SERVER_REGULARSIG);
+}
+
+void SrvRasmgrComm::informRasMGR(int what)
+{ //what: 0 - going down
+ // 1 - available
+ // 2 - regular signal
+
+// cout<<"servername ="<<serverName<<" rasmgrhost="<<rasmgrHost<<" port="<<rasmgrPort<<endl;
+
+// if(what == SERVER_AVAILABLE) accessControl.resetForNewClient();
+
+ struct protoent* getprotoptr = getprotobyname("tcp");
+
+ struct hostent *hostinfo = gethostbyname(rasmgrHost);
+ if(hostinfo==NULL)
+ {
+ RMInit::logOut << "Error: cannot locate rasmgr host '" << rasmgrHost << "': " << strerror(errno) << std::endl;
+ return;
+ }
+
+ sockaddr_in internetSocketAddress;
+ internetSocketAddress.sin_family = AF_INET;
+ internetSocketAddress.sin_port=htons(rasmgrPort);
+ internetSocketAddress.sin_addr=*(struct in_addr*)hostinfo->h_addr;
+
+ int sock;
+
+ bool ok=false;
+ long talkInterval=SRV_TALK_INTERVAL;
+ long maxRetry=SRV_MAX_RETRY;
+ long retry =0;
+ // creating socket
+ for(retry=0;retry<maxRetry;retry++)
+ {
+ sock=socket(PF_INET,SOCK_STREAM,getprotoptr->p_proto);
+ //std::cout<<"Socket="<<sock<<" protocol(tcp)="<<getprotoptr->p_proto<<std::endl;
+
+ if(sock<0)
+ {
+ if( (retry%talkInterval) == 0)
+ {
+ std::cerr<< "Error: server '" << serverName << " cannot open socket to rasmgr (" << retry << " attempts, still retrying): " << strerror(errno) << std::endl;
+ RMInit::logOut << "Error: server '" << serverName << " cannot open socket to rasmgr (" << retry << " attempts, still retrying): " << strerror(errno) << std::endl;
+ }
+ continue;
+ }
+
+ if(connect(sock,(struct sockaddr*)&internetSocketAddress,sizeof(internetSocketAddress)) < 0)
+ {
+ if( (retry%talkInterval) == 0)
+ {
+ std::cerr << "Error: server '" << serverName << " cannot connect to rasmgr (" << retry << " attempts, still retrying): " << strerror(errno) << std::endl;
+ RMInit::logOut << "Error: server '" << serverName << " cannot connect to rasmgr (" << retry << " attempts, still retrying): " << strerror(errno) << std::endl;
+ }
+ close(sock); //yes, some SO requieres this, like DEC from BLVA
+ continue;
+ }
+ ok = true;
+ break;
+ }
+
+ if( !ok )
+ {
+ std::cerr << "Error: unable to contact rasmgr, server '" << serverName << "' herewith giving up." <<std::endl;
+ RMInit::logOut << "Error: unable to contact rasmgr, server '" << serverName << "' herewith giving up." <<std::endl;
+ if(sock)
+ close(sock);
+ exit( EXIT_CODE );
+ }
+
+ // creating the HTTP message
+ char message[200];
+ sprintf(message,"%s%d\r\n\r\n%s %d %ld ","POST rasservernewstatus HTTP/1.1\r\nUserAgent: RasServer/1.0\r\nContent-length: ",strlen(serverName)+3,serverName,what,0);
+
+ // writing message;
+ if(writeWholeMessage(sock,message,strlen(message)+1)<0)
+ {
+ std::cerr << "Error: cannot send message to rasmgr: " << strerror(errno) << std::endl;
+ RMInit::logOut << "Error: cannot send message to rasmgr: " << strerror(errno) << std::endl;
+ close(sock);
+ exit( EXIT_CODE );
+ }
+ close(sock);
+}
+
+
+int SrvRasmgrComm::writeWholeMessage(int socket,char *destBuffer,int buffSize)
+{
+ // we write the whole message, including the ending '\0', which is already in
+ // the buffSize provided by the caller
+ int totalLength=0;
+ int writeNow;
+ while(1)
+ {
+ writeNow = write(socket,destBuffer+totalLength,buffSize-totalLength);
+ if(writeNow == -1)
+ {
+ if(errno == EINTR)
+ continue; // read was interrupted by signal (on bad SO's)
+ return -1; // another error
+ }
+ totalLength+=writeNow;
+
+ if( totalLength==buffSize )
+ break; // THE END
+ }
+
+ return totalLength;
+}
+
diff --git a/rnprotocol/srvrasmgrcomm.hh b/rnprotocol/srvrasmgrcomm.hh
new file mode 100644
index 0000000..0d15c4c
--- /dev/null
+++ b/rnprotocol/srvrasmgrcomm.hh
@@ -0,0 +1,67 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+/*************************************************************
+ *
+ *
+ * PURPOSE:
+ *
+ *
+ * COMMENTS:
+ *
+ ************************************************************/
+
+#ifndef SRVRASMGR_HH
+#define SRVRASMGR_HH
+
+#define SERVER_DOWN 0
+#define SERVER_AVAILABLE 1
+ // 2 is server crushed, but it's generated by rasmgr!
+ // regularly signal the rasmgr that we are available
+#define SERVER_REGULARSIG 3
+
+class SrvRasmgrComm
+ {
+ public:
+ SrvRasmgrComm();
+
+ void init(unsigned int timeOut, const char* instanceName, const char* rasmgrHost, int rasmgrPort);
+
+ void informRasmgrServerAvailable();
+ void informRasmgrServerDown();
+ void informRasmgrServerStillAvailable();
+
+ unsigned int getTimeout();
+
+ private:
+ void informRasMGR(int what);
+ int writeWholeMessage(int socket,char *destBuffer,int buffSize);
+
+ const char* serverName;
+ const char* rasmgrHost;
+ int rasmgrPort;
+ unsigned int timeout;
+ };
+
+extern SrvRasmgrComm rasmgrComm;
+
+#endif