diff options
Diffstat (limited to 'clientcomm/rpcclientcomm.cc')
-rw-r--r-- | clientcomm/rpcclientcomm.cc | 3672 |
1 files changed, 3672 insertions, 0 deletions
diff --git a/clientcomm/rpcclientcomm.cc b/clientcomm/rpcclientcomm.cc new file mode 100644 index 0000000..206e7ce --- /dev/null +++ b/clientcomm/rpcclientcomm.cc @@ -0,0 +1,3672 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +/ +/** + * SOURCE: rpcclientcomm.cc + * + * MODULE: clientcomm + * CLASS: RpcClientComm + * + * PURPOSE: + * + * COMMENTS: + * None +*/ + +#include "mymalloc/mymalloc.h" + +static const char rcsid[] = "@(#)clientcomm, RpcClientComm: $Id: rpcclientcomm.cc,v 1.11 2005/09/09 16:16:29 rasdev Exp $"; + +#include <openssl/evp.h> + +#include <stdio.h> +#include <math.h> // for ceil(), log(), exp() + +#include <vector> +#include <iostream> +#include <fstream> +#include <string> +using namespace std; + +#ifdef __VISUALC__ + #include <windows.h> // for the required defines and stuff + #include <mmsystem.h> // for setEventTimer and killEventTimer + #include <winbase.h> +#else + + #if defined(DECALPHA) || defined(LINUX) + + #if !defined(_XOPEN_SOURCE_EXTENDED) + #define _XOPEN_SOURCE_EXTENDED // for gethostid + #endif + + #endif + + #include <unistd.h> // for sleep(), alarm() + #include <signal.h> // for signal() +#endif +#ifdef __VISUALC__ // do this ONLY for VisualC! Not for EARLY_TEMPLATE + #define __EXECUTABLE__ +#endif + +#if (defined(__VISUALC__) || defined(CYGWIN)) + extern "C" + { + #include "clientcomm/clnt_control/clnt_control.h" + } +# define XDRFREE(proc, res) xdrfree( ntxdr_##proc, (char*)res ) +#else +# define XDRFREE(proc, res) xdr_free( (xdrproc_t)xdr_##proc, (char*)res ) +#endif + +#include "rasodmg/transaction.hh" +#include "rasodmg/database.hh" +#include "rasodmg/iterator.hh" +#include "rasodmg/set.hh" +#include "rasodmg/ref.hh" +#include "rasodmg/storagelayout.hh" +#include "rasodmg/tiling.hh" + +#include "raslib/minterval.hh" +#include "raslib/rmdebug.hh" +#include "raslib/rminit.hh" +#include "raslib/primitivetype.hh" +#include "raslib/complextype.hh" +#include "raslib/structuretype.hh" +#include "raslib/primitive.hh" +#include "raslib/complex.hh" +#include "raslib/structure.hh" +#include "raslib/endian.hh" +#include "raslib/parseparams.hh" +// for transfer compression +#include "compression/tilecompression.hh" + +#include "clientcomm/rpcclientcomm.hh" + + +#ifdef __VISUALC__ // do this ONLY for VisualC! Not for EARLY_TEMPLATE + #undef __EXECUTABLE__ +#endif + +RMINITGLOBALS('C') + +#include<stdio.h> +#include<errno.h> +#include<stdlib.h> +#include<unistd.h> +#include<sys/types.h> +#include<sys/socket.h> +#include<netinet/in.h> +#include<netdb.h> +#include<iostream> +#include<string.h> + +#define ALIVEINTERVAL 60 +#define TAWRITEWAITINTERVAL 10 + +#ifdef SOLARIS +extern "C" void aliveSignal( int ); +#endif +#ifdef __VISUALC__ +void CALLBACK TimerProc(UINT wTimerID, UINT wMsg, DWORD dwUser, DWORD dw1, DWORD dw2) +#else +void aliveSignal( int ) +#endif +{ + if(!RMInit::noTimeOut) { + // get the current clientcomm object + ClientComm* myComm = r_Database::actual_database->getComm(); + if( myComm == 0 ) + { + RMInit::logOut << "RpcClientComm: Error: RpcClientComm object only usable within r_Database object." << endl; + return; + } + + myComm->triggerAliveSignal(); + } +} + + +RpcClientComm::RpcClientComm( const char* _rasmgrHost, int _rasmgrPort ) throw( r_Error ) +: binding_h(NULL), + clientID(0), +#ifdef __VISUALC__ + UINT timerid(0), +#endif + status(0), + serverUp(0), + rpcActive(0), + aliveSignalRemaining(0), + endianServer(0), + endianClient(0), + serverRPCversion(0), + transferFormat(r_Array), + storageFormat(r_Array), + transferFormatParams(NULL), + storageFormatParams(NULL), + clientParams(NULL), + serverCompresses(0), + exactFormat(0), + RPCIF_PARA(0) +{ + clientParams = new r_Parse_Params(); + clientParams->add("compserver", &serverCompresses, r_Parse_Params::param_type_int); + clientParams->add("exactformat", &exactFormat, r_Parse_Params::param_type_int); + + endianClient = (int)r_Endian::get_endianness(); + + this->rasmgrHost=(char*)_rasmgrHost; + this->rasmgrPort=_rasmgrPort; + serverHost[0]=0; + capability[0]=0; + strcpy(identificationString,"rasguest:8e70a429be359b6dace8b5b2500dedb0"); // this is MD5("rasguest"); +} + +static int rpcRetryCounter = 0; + +RpcClientComm::~RpcClientComm() throw() +{ + disconnectFromServer(); + delete clientParams; + clientParams = NULL; +} + +bool RpcClientComm::effectivTypeIsRNP() throw() + { + return false; + } + +void +RpcClientComm::executeQuery( const r_OQL_Query& query, r_Set< r_Ref_Any >& result ) + throw( r_Error ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeQuery(query, result) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + char* queryString; + + // Get the query string by using a backdoor function of r_OQL_Query + queryString = (char*)query.get_query(); + + // Finally, this is the remote procedure which sends the query and receives a + // client Id under which the client can access the r_Marrays he is to receive + ExecuteQueryParams* params = new ExecuteQueryParams; + ExecuteQueryRes* res; + params->clientID = clientID; + params->query = queryString; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + res = rpcexecutequery_1( params, binding_h ); + + if( !res ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcexecutequery_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcexecutequery' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( res == 0 ); + setRPCInactive(); + + delete params; + + if( res->status == 0 ) + { + result.set_type_by_name( res->typeName ); + result.set_type_structure( res->typeStructure ); + + XDRFREE(ExecuteQueryRes, res); + + getMDDCollection( result, 1 ); + } + else if( res->status == 1 ) + { + result.set_type_by_name( res->typeName ); + result.set_type_structure( res->typeStructure ); + + XDRFREE(ExecuteQueryRes, res); + + getElementCollection( result ); + } + else if (res->status == 2) + { + // Result collection is empty and nothing has to be got. + XDRFREE(ExecuteQueryRes, res); + } + else if( res->status == 4 || res->status == 5 ) + { + r_Equery_execution_failed err( res->errorNo, res->lineNo, res->columnNo, res->token ); + XDRFREE(ExecuteQueryRes, res); + throw err; + } + else + { + r_Error err; + + if( res->status == 3 ) + err = r_Error( r_Error::r_Error_ClientUnknown ); + else + err = r_Error( r_Error::r_Error_TransferFailed ); + + XDRFREE(ExecuteQueryRes, res); + + throw err; + } +} + + + +void +RpcClientComm::executeQuery( const r_OQL_Query& query ) + throw( r_Error ) +{ + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery(query)") + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + // update -> check for read_only transaction + if( r_Transaction::actual_transaction == 0 || + r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only ) + { + r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly ); + throw err; + } + + unsigned short rpcStatus; + unsigned short* rpcStatusPtr = 0; + + // + // Send MDD constants to the server. + // + if( query.get_constants() ) + { + r_Set< r_GMarray* >* mddConstants = (r_Set< r_GMarray* >*)query.get_constants(); + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcinitexecuteupdate_1( &clientID, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinitexecuteupdate_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcexecuteupdate' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + r_Iterator<r_GMarray*> iter = mddConstants->create_iterator(); + + for( iter.reset(); iter.not_done(); iter++ ) + { + r_GMarray* mdd = *iter; + const r_Base_Type* baseType = mdd->get_base_type_schema(); + + if( mdd ) + { + // initiate composition of MDD at server side + InsertTransMDDParams* params = new InsertTransMDDParams; + params->clientID = clientID; + params->collName = strdup(""); // not used + params->domain = mdd->spatial_domain().get_string_representation(); + params->typeLength = mdd->get_type_length(); + params->typeName = (char*)mdd->get_type_name(); + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcstartinserttransmdd_1( params, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinserttransmdd_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcexecutequery' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + free( params->domain ); + free( params->collName ); + delete params; + + if( rpcStatus > 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 2: + err = r_Error( r_Error::r_Error_DatabaseClassUndefined ); + break; + case 3: + err = r_Error( r_Error::r_Error_TypeInvalid ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + RMInit::logOut << "Error: rpcinitmdd() - " << err.what() << endl; + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery(query) error occured") + throw err; + } + + r_Set< r_GMarray* >* bagOfTiles; + + + bagOfTiles = mdd->get_storage_layout()->decomposeMDD( mdd ); + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "decomposing into " << bagOfTiles->cardinality() << " tiles"); + + r_Iterator< r_GMarray* > iter2 = bagOfTiles->create_iterator(); + r_GMarray *origTile; + iter2.reset(); + + while( iter2.not_done() ) + { + RPCMarray* rpcMarray; + + origTile = *iter2; + + // advance iter here to determine if this is the last call (not_done()) + iter2.advance(); + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "inserting Tile with domain " << origTile->spatial_domain() << ", " << origTile->spatial_domain().cell_count() * origTile->get_type_length() << " bytes") + + getMarRpcRepresentation( origTile, rpcMarray, mdd->get_storage_layout()->get_storage_format(), baseType ); + + InsertTileParams* params2 = new InsertTileParams; + params2->clientID = clientID; + params2->isPersistent = 0; + params2->marray = rpcMarray; + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcinserttile_1( params2, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinserttile_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcinserttile' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; +RMDBGIF(20, RMDebug::module_clientcomm, "WAITAFTERSENDTILE", \ + RMInit::dbgOut << "Waiting 10 sec after send tile\n" << std::endl; \ + sleep(10); \ + RMInit::dbgOut << "Continue now\n" << std::endl; ); + setRPCInactive(); + + // free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else) + freeMarRpcRepresentation( origTile, rpcMarray ); + delete params2; + + // delete current tile (including data block) + delete origTile; + + if( rpcStatus > 0 ) + { + RMInit::logOut << "Error: rpctransfertile() - general" << endl; + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery() error occured") + throw r_Error( r_Error::r_Error_TransferFailed ); + } + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "OK") + } + + EndInsertMDDParams* params3 = new EndInsertMDDParams; + params3->clientID = clientID; + params3->isPersistent = 0; + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcendinsertmdd_1( params3, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendinsertmdd_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcinsertmdd' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + delete params3; + + // delete transient data + bagOfTiles->remove_all(); + delete bagOfTiles; + } + } + } + + // + // Send the update query. + // + ExecuteQueryParams* params = new ExecuteQueryParams; + ExecuteUpdateRes* res; + params->clientID = clientID; + params->query = (char*)query.get_query(); + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + res = rpcexecuteupdate_1( params, binding_h ); + + if( !res ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcexecuteupdate_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcexecuteupdate' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( res == 0 ); + setRPCInactive(); + + delete params; + + rpcStatus = res->status; + + if( rpcStatus == 2 || rpcStatus == 3 ) + { + r_Equery_execution_failed err( res->errorNo, res->lineNo, res->columnNo, res->token ); + + XDRFREE(ExecuteUpdateRes, res); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery() error occured") + throw err; + } + + XDRFREE(ExecuteUpdateRes, res); + + if( rpcStatus == 1 || rpcStatus > 3 ) + { + r_Error err; + + if( rpcStatus == 1 ) + err = r_Error( r_Error::r_Error_ClientUnknown ); + else + err = r_Error( r_Error::r_Error_TransferFailed ); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery() error occured") + throw err; + } + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery()") +} + + + +void +RpcClientComm::insertColl( const char* collName, const char* typeName, const r_OId& oid ) + throw( r_Error ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::insertColl(collName, typeName, oid ) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + // update -> check for read_only transaction + if( r_Transaction::actual_transaction == 0 || + r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only ) + { + r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly ); + throw err; + } + + unsigned short rpcStatus; + unsigned short* rpcStatusPtr = 0; + + InsertCollParams* params = new InsertCollParams; + params->clientID = clientID; + params->collName = (char*)collName; + params->typeName = (char*)typeName; + params->oid = (char*)oid.get_string_representation(); + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcinsertcoll_1( params, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinsertcoll_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcinsertcoll' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + delete params; + + if( rpcStatus > 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 1: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + case 2: + err = r_Error( r_Error::r_Error_DatabaseClassUndefined ); + break; + case 3: + err = r_Error( r_Error::r_Error_NameNotUnique ); + break; + default: + err = r_Error( r_Error::r_Error_General ); + } + RMInit::logOut << "Error: rpcCreateMDDCollection() - " << err.what() << endl; + throw err; + } +} + + + +void +RpcClientComm::deleteCollByName( const char* collName ) + throw( r_Error ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::deleteCollByName(collName) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + // update -> check for read_only transaction + if( r_Transaction::actual_transaction == 0 || + r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only ) + { + r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly ); + throw err; + } + + unsigned short rpcStatus; + unsigned short* rpcStatusPtr=0; + + NameSpecParams* params = new NameSpecParams; + params->clientID = clientID; + params->name = (char*)collName; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcdeletecollbyname_1( params, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcdeletecollbyname_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcdeletecollbyname' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + delete params; + + if( rpcStatus > 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 1: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + case 2: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_General ); + } + RMInit::logOut << "Error: rpcInsertMDD() - " << err.what() << endl; + throw err; + } +} + + + +void +RpcClientComm::deleteObjByOId( const r_OId& oid ) + throw( r_Error ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::deleteObjectByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + // update -> check for read_only transaction + if( r_Transaction::actual_transaction == 0 || + r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only ) + { + r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly ); + throw err; + } + + unsigned short rpcStatus; + unsigned short* rpcStatusPtr; + + OIdSpecParams* params = new OIdSpecParams; + params->clientID = clientID; + params->oid = (char*) oid.get_string_representation(); + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcdeleteobjbyoid_1( params, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcdeleteobjbyoid_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcdeleteobjbyoid' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + delete params; + + if( rpcStatus > 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 1: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + case 2: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_General ); + } + RMInit::logOut << "Error: rpcInsertMDD() - " << err.what() << endl; + throw err; + } +} + + + +void +RpcClientComm::removeObjFromColl( const char* collName, const r_OId& oid ) + throw( r_Error ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::removeObjFromColl(collName, oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + // update -> check for read_only transaction + if( r_Transaction::actual_transaction == 0 || + r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only ) + { + r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly ); + throw err; + } + + unsigned short rpcStatus; + unsigned short* rpcStatusPtr; + + RemoveObjFromCollParams* params = new RemoveObjFromCollParams; + params->clientID = clientID; + params->collName = (char*) collName; + params->oid = (char*) oid.get_string_representation(); + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcremoveobjfromcoll_1( params, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcremoveobjfromcoll_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcremoveobjfromcoll' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + delete params; + + if( rpcStatus > 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 1: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + case 2: + case 3: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_General ); + } + RMInit::logOut << "Error: rpcInsertMDD() - " << err.what() << endl; + throw err; + } +} + + + +void +RpcClientComm::insertMDD( const char* collName, r_GMarray* mar ) + throw( r_Error ) +{ + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << ", marray)") + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::insertMDD(collName, mar) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + // update -> check for read_only transaction + if( r_Transaction::actual_transaction == 0 || + r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only ) + { + r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly ); + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << ", marray) error") + throw err; + } + + unsigned short rpcStatus; + unsigned short* rpcStatusPtr=0; + r_Minterval spatdom; + r_Bytes marBytes; + RPCMarray* rpcMarray; + r_Bytes tileSize = 0; + + // get the spatial domain of the r_GMarray + spatdom = mar->spatial_domain(); + + // determine the amount of data to be transferred + marBytes = mar->get_array_size(); + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "inserting MDD with domain " << spatdom << ", cell length " << mar->get_type_length() << ", " << marBytes << " bytes" ) + + const r_Base_Type* baseType = mar->get_base_type_schema(); + + // if the MDD is too large for being transfered as one block, it has to be + // divided in tiles + const r_Tiling* til = mar->get_storage_layout()->get_tiling(); + r_Tiling_Scheme scheme = til->get_tiling_scheme(); + if (scheme == r_NoTiling) + tileSize = RMInit::RMInit::clientTileSize; + else + //allowed because the only subclass of tiling without size is no tiling + tileSize = ((const r_Size_Tiling*)til)->get_tile_size(); + + if( RMInit::tiling && marBytes > tileSize ) + { + // initiate composition of MDD at server side + InsertPersMDDParams* params = new InsertPersMDDParams; + params->clientID = clientID; + params->collName = (char*)collName; + params->domain = spatdom.get_string_representation(); + params->typeLength = mar->get_type_length(); + params->typeName = (char*)mar->get_type_name(); + params->oid = (char*)mar->get_oid().get_string_representation(); + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcstartinsertpersmdd_1( params, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcstartinsertpersmdd_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcstartinsertpersmdd' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + free( params->domain ); + delete params; + + if( rpcStatus > 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 2: + err = r_Error( r_Error::r_Error_DatabaseClassUndefined ); + break; + case 3: + err = r_Error( r_Error::r_Error_CollectionElementTypeMismatch ); + break; + case 4: + err = r_Error( r_Error::r_Error_TypeInvalid ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + RMInit::logOut << "Error: rpcInsertMDDObj() - " << err.what() << endl; + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << ", marray) error") + throw err; + } + + r_Set< r_GMarray* >* bagOfTiles; + + + bagOfTiles = mar->get_storage_layout()->decomposeMDD( mar ); + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "decomposing into " << bagOfTiles->cardinality() << " tiles") + + r_Iterator< r_GMarray* > iter = bagOfTiles->create_iterator(); + r_GMarray *origTile; + iter.reset(); + + while( iter.not_done() ) + { + origTile = *iter; + + // advance iter here to determine if this is the last call (not_done()) + iter.advance(); + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "inserting Tile with domain " << origTile->spatial_domain() << ", " << origTile->spatial_domain().cell_count() * origTile->get_type_length() << " bytes") + + getMarRpcRepresentation( origTile, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType ); + + InsertTileParams* params2 = new InsertTileParams; + params2->clientID = clientID; + params2->isPersistent = 1; + params2->marray = rpcMarray; + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::insertMDD(collName, mar) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcinserttile_1( params2, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinserttile_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcinserttile' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + // free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else) + freeMarRpcRepresentation( origTile, rpcMarray ); + delete params2; + + // delete current tile (including data block) + delete origTile; + + if( rpcStatus > 0 ) + { + RMInit::logOut << "Error: rpcInsertMDD() - general" << endl; + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << ", marray) error") + throw r_Error( r_Error::r_Error_TransferFailed ); + } + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "OK") + } + + EndInsertMDDParams* params3 = new EndInsertMDDParams; + params3->clientID = clientID; + params3->isPersistent = 1; + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::insertMDD(collName, mar) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcendinsertmdd_1( params3, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendinsertmdd_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcendinsertmdd' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + delete params3; + + // delete transient data + bagOfTiles->remove_all(); + delete bagOfTiles; + } + else // begin: MDD is transferred in one piece + { + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", ", one tile" ) + + getMarRpcRepresentation( mar, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType ); + + InsertMDDParams* params = new InsertMDDParams; + params->clientID = clientID; + params->collName = (char*)collName; + params->marray = rpcMarray; + params->typeName = (char*)mar->get_type_name(); + params->oid = (char*)mar->get_oid().get_string_representation(); + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::insertMDD(collName, mar) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcinsertmdd_1( params, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinsertmdd_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcinsertmdd' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + freeMarRpcRepresentation( mar, rpcMarray ); + delete params; + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok" ) + + if( rpcStatus > 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 2: + err = r_Error( r_Error::r_Error_DatabaseClassUndefined ); + break; + case 3: + err = r_Error( r_Error::r_Error_CollectionElementTypeMismatch ); + break; + case 4: + err = r_Error( r_Error::r_Error_TypeInvalid ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + RMInit::logOut << "Error: rpcInsertMDD() - " << err.what() << endl; + throw err; + } + + } // end: MDD i transferred in one piece +} + + + +r_Data_Format +RpcClientComm::doTransferDecompression( r_GMarray* tile, const r_Base_Type *type, + r_Data_Format fmt, unsigned long size ) +{ + + RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile dom:" + << tile->spatial_domain() << " array size:" << tile->get_array_size() + << " type size:" << tile->get_type_length()); + if (fmt != r_Array) + { + r_Tile_Compression *engine = NULL; + char *newTileData = NULL; + r_Data_Format newFormat; + + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) decompressing from " + << fmt << ", " << size << "bytes"); + + try + { + r_Storage_Man_CPP sman; + engine = r_Tile_Compression::create( fmt, tile->spatial_domain(), type ); + engine->set_storage_handler(sman); + newTileData = (char*)(engine->decompress(tile->get_array(), size)); + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", + "doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " OK"); + } + catch (r_Error &err) + { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", + "doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " FAILED"); + RMInit::logOut << "RpcClientComm::doTransferDecompression() Error decompressing data, assuming raw" << endl; + } + + newFormat = engine->get_decomp_format(); + + if (newTileData != NULL) + { + delete [] tile->get_array(); + tile->set_array(newTileData); + tile->set_array_size(tile->spatial_domain().cell_count()*tile->get_type_length()); + } + else + newFormat = fmt; + + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile newFmt:" + << newFormat << " dom:" << tile->spatial_domain() + << " array size:" << tile->get_array_size() + << " type size:" << tile->get_type_length()); + + // ... also make sure the decoded format is really raw array data (r_Array) + if ((endianClient != endianServer) && (newFormat == r_Array)) + { + // if compression engine already handles endianness we mustn't change again + if (!engine->converts_endianness()) { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for " + << fmt << " endianness changed from " + << (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient); + changeEndianness(tile, type); + } + } + + if (engine != NULL) + delete engine; + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << newFormat); + + return newFormat; + } + + if (endianClient != endianServer) { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for " + << fmt << " endianness changed from " + << (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient); + changeEndianness(tile, type); + } + + RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << r_Array); + return r_Array; +} + +r_Ref_Any +RpcClientComm::getMDDByOId( const r_OId& oid ) + throw( r_Error ) +{ + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getMDDByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDByOId(" << oid << ")") + + r_Ref_Any mddResult; + + r_GMarray* marray = 0; + unsigned short tileStatus = 0; + unsigned short rpcStatus = 0; + unsigned short* rpcStatusPtr = 0; + + OIdSpecParams params; + GetMDDRes* thisResult = 0; + params.clientID = clientID; + params.oid = (char*)oid.get_string_representation(); + + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgetmddbyoid_1( ¶ms, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetmddbyoid_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetmddbyoid' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + setRPCInactive(); + + rpcStatus = thisResult->status; + + if( rpcStatus != 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 1: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + case 2: + case 3: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + + XDRFREE(GetMDDRes, thisResult); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed") + throw err; + } + + r_Ref< r_GMarray > mdd; + getMDDCore(mdd, thisResult, 0); + mddResult = mdd; + + setRPCActive(); +RMDBGIF(20, RMDebug::module_clientcomm, "WAITENDTRANSFERSTART", \ + RMInit::dbgOut << "Waiting 100 sec before end transfer\n" << std::endl; \ + sleep(100); \ + RMInit::dbgOut << "Continue now\n" << std::endl; ); + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getMDDByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcendtransfer_1( &clientID, binding_h ); + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcendtransfer' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendtransfer_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; +RMDBGIF(20, RMDebug::module_clientcomm, "WAITENDTRANSFEREND", \ + RMInit::dbgOut << "Waiting 100 sec after end transfer\n" << std::endl; \ + sleep(100); \ + RMInit::dbgOut << "Continue now\n" << std::endl; ); + setRPCInactive(); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "ok") + + return mddResult; +} + + + +r_Ref_Any +RpcClientComm::getCollByName( const char* collName ) + throw( r_Error ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getCollByName(collName) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName(" << collName << ")") + r_Set< r_Ref_Any >* set = 0; + unsigned short rpcStatus = 0; + + NameSpecParams* params = new NameSpecParams; + GetCollRes* thisResult = 0; + params->clientID = clientID; + params->name = (char*)collName; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgetcollbyname_1( params, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetcollbyname_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetcollbyname' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + setRPCInactive(); + + delete params; + + rpcStatus = thisResult->status; + + if( rpcStatus != 0 && rpcStatus != 1 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 2: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + case 3: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + + XDRFREE(GetCollRes, thisResult); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed") + throw err; + } + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok") + + // create the set + r_OId rOId( thisResult->oid ); + set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref_Any >; + + // initialize data elements + set->set_type_by_name ( thisResult->typeName ); + set->set_type_structure( thisResult->typeStructure ); + set->set_object_name ( thisResult->collName ); + + // now the transfer structure of rpcgetcollbyname can be freed + XDRFREE(GetCollRes, thisResult); + + // get collection elements + if( rpcStatus == 0 ) + getMDDCollection( *set, 0 ); + // else rpcStatus == 1 -> Result collection is empty and nothing has to be got. + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName(" << collName << ")") + return r_Ref_Any( set->get_oid(), set ); +} + + + +r_Ref_Any +RpcClientComm::getCollByOId( const r_OId& oid ) + throw( r_Error ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getCollByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByOId(" << oid << ")") + r_Set< r_Ref_Any >* set = 0; + unsigned short rpcStatus = 0; + + OIdSpecParams* params = new OIdSpecParams; + GetCollRes* thisResult = 0; + params->clientID = clientID; + params->oid = (char*)oid.get_string_representation(); + + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgetcollbyoid_1( params, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetcollbyoid_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetcollbyoid' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + setRPCInactive(); + + delete params; + + rpcStatus = thisResult->status; + + if( rpcStatus != 0 && rpcStatus != 1 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 2: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + case 3: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + + XDRFREE(GetCollRes, thisResult); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed") + throw err; + } + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok") + + // create the set + r_OId rOId( thisResult->oid ); + set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref_Any >; + + // initialize data elements + set->set_type_by_name ( thisResult->typeName ); + set->set_type_structure( thisResult->typeStructure ); + set->set_object_name ( thisResult->collName ); + + // now the transfer structure can be freed + XDRFREE(GetCollRes, thisResult); + + // get collection elements + if( rpcStatus == 0 ) + getMDDCollection( *set, 0 ); + // else rpcStatus == 1 -> Result collection is empty and nothing has to be got. + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed") + return r_Ref_Any( set->get_oid(), set ); +} + + + +r_Ref_Any +RpcClientComm::getCollOIdsByName( const char* collName ) + throw( r_Error ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getCollOIdsByName(collName) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName(" << collName << ")") + r_Set< r_Ref<r_GMarray> >* set = 0; + unsigned short rpcStatus = 0; + + NameSpecParams* params = new NameSpecParams; + GetCollOIdsRes* thisResult = 0; + params->clientID = clientID; + params->name = (char*)collName; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgetcolloidsbyname_1( params, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetcolloidsbyname_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetcolloidsbyname' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + setRPCInactive(); + + delete params; + + rpcStatus = thisResult->status; + + if( rpcStatus != 0 && rpcStatus != 1 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 2: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + case 3: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + + XDRFREE(GetCollOIdsRes, thisResult); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed") + throw err; + } + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok") + + // create the set + r_OId rOId( thisResult->oid ); + set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref< r_GMarray > >; + + set->set_type_by_name ( thisResult->typeName ); + set->set_type_structure( thisResult->typeStructure ); + set->set_object_name ( thisResult->collName ); + + // fill set with oids + if( rpcStatus == 0 ) + { + for( unsigned int i=0; i<thisResult->oidTable.oidTable_len; i++ ) + { + set->insert_element( r_Ref<r_GMarray>( r_OId( thisResult->oidTable.oidTable_val[i].oid ) ), 1 ); + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "oid " << i << ": " << thisResult->oidTable.oidTable_val[i].oid) + } + } + + // now the transfer structure can be freed + XDRFREE(GetCollOIdsRes, thisResult); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName") + return r_Ref_Any( set->get_oid(), set ); +} + + + +r_Ref_Any +RpcClientComm::getCollOIdsByOId( const r_OId& oid ) + throw( r_Error ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getCollOIdsByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId(" << oid << ")") + r_Set< r_Ref<r_GMarray> >* set = 0; + unsigned short rpcStatus = 0; + + OIdSpecParams* params = new OIdSpecParams; + GetCollOIdsRes* thisResult = 0; + params->clientID = clientID; + params->oid = (char*)oid.get_string_representation(); + + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgetcolloidsbyoid_1( params, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetcolloidsbyoid_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetcolloidsbyoid' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult== 0 ); + setRPCInactive(); + + delete params; + + rpcStatus = thisResult->status; + + if( rpcStatus != 0 && rpcStatus != 1 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 2: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + case 3: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + + XDRFREE(GetCollOIdsRes, thisResult); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed") + throw err; + } + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok") + + // create the set + r_OId rOId( thisResult->oid ); + set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref< r_GMarray > >; + + set->set_type_by_name ( thisResult->typeName ); + set->set_type_structure( thisResult->typeStructure ); + set->set_object_name ( thisResult->collName ); + + // fill set with oids + if( rpcStatus == 0 ) + { + for( unsigned int i=0; i<thisResult->oidTable.oidTable_len; i++ ) + { + set->insert_element( r_Ref<r_GMarray>( r_OId( thisResult->oidTable.oidTable_val[i].oid ) ), 1 ); + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "contains oid #" << i << ":" << thisResult->oidTable.oidTable_val[i].oid) + } + } + + // now the transfer structure can be freed + XDRFREE(GetCollOIdsRes, thisResult); + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId") + return r_Ref_Any( set->get_oid(), set ); +} + +int +RpcClientComm::createDB( const char* name ) + throw(r_Error) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::createDB(name) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + unsigned short rpcStatus=0; + unsigned short* rpcStatusPtr=0; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpccreatedb_1( (char**)&name, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpccreatedb_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpccreatedb' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + return rpcStatus; +} + + +int +RpcClientComm::destroyDB( const char* name ) + throw(r_Error) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::destroyDB(name) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + unsigned short rpcStatus=0; + unsigned short* rpcStatusPtr=0; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcdestroydb_1( (char**)&name, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcdestroydb_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcdestroydb' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + return rpcStatus; +} + + +int +RpcClientComm::openDB( const char* database ) +{ + strcpy(dataBase,database); + + connectToServer(1); // this means read-only + + int answer=executeOpenDB(database); + + if(answer==0) answer=executeCloseDB(); + // else the DB is not open and makes ugly log output on the server + + disconnectFromServer(); + return answer; +} + +int +RpcClientComm::executeOpenDB( const char* database ) +{ + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeOpenDB(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + return CONNECTIONCLOSED; + } + + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "executeOpenDB(" << database << ")") + // Send first "I'm alive" signal in ALIVEINTERVAL seconds +#ifdef WIN32 + timerid = timeSetEvent(ALIVEINTERVAL * 1000, 0, TimerProc, 0, TIME_PERIODIC); +#else + alarm( ALIVEINTERVAL ); +#endif + OpenDBParams* params = new OpenDBParams; + OpenDBRes* thisResult = 0; + params->dbName = (char*)database; + params->userName = (char*)RMInit::userName; + params->capability = capability; + int* dummyParam = new int(0);// dummy + int* endianResult = NULL; + ServerVersionRes* versionResult = NULL; + + setRPCActive(); + versionResult = rpcgetserverversion_1( dummyParam, binding_h ); + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "server version " << versionResult->serverVersionNo << ", rpc version " << versionResult->rpcInterfaceVersionNo) + // don't forget to add 0.5, otherwise rounding errors! + serverRPCversion = (int)(1000.0 * versionResult->rpcInterfaceVersionNo + 0.5); + if (serverRPCversion != RPCVERSION) + { + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "RPC interface version mismatch: client (" << RPCVERSION/1000.0 << "), server (" << versionResult->rpcInterfaceVersionNo << ")") + RMInit::logOut << "Client Server Communication incompatible" << endl; + setRPCInactive(); + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeOpenDB(" << database << ") " << 4) + return 4; // servercomm::openDB creates codes 1-3. + } + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeOpenDB(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + return CONNECTIONCLOSED; + } + + rpcRetryCounter = 0; + do + { + thisResult = rpcopendb_1( params, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcopendb_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcopendb' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeOpenDB(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + return CONNECTIONCLOSED; + } + + endianResult = rpcgetserverendian_1( dummyParam, binding_h ); + + setRPCInactive(); + + delete params; + delete dummyParam; + + clientID = thisResult->clientID; + endianServer = *endianResult; + //cout << "server endianness: " << endianServer << ", client: " << endianClient << endl; + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeOpenDB(" << database << ") " << thisResult->status) + return thisResult->status; +} + + +int +RpcClientComm::closeDB() +{ + // We decided that it is not necessary to do anything for closeDB, the database is already closed by others + RMInit::logOut << "Fake closeDB" << endl; + + return 0;// answer; +} + +int +RpcClientComm::executeCloseDB() +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeCloseDB(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + return CONNECTIONCLOSED; + } + + unsigned short rpcStatus=0; + unsigned short* rpcStatusPtr=0; + + // Suspend "I'm alive" signal +#ifdef __VISUALC__ + timeKillEvent(timerid); +#else + alarm( 0 ); +#endif + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcclosedb_1( &clientID, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcclosedb_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcclosedb' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + return rpcStatus; +} + +int +RpcClientComm::openTA( unsigned short readOnly ) + throw(r_Error) + { + int answer=0; + connectToServer(readOnly); + + answer=executeOpenDB(dataBase); + + if(answer==0) executeOpenTA(readOnly); + + //If there is an error CONNECTIONCLOSED, we report this, it is important to know + if(answer == CONNECTIONCLOSED) { + throw r_Error(CONNECTIONCLOSED); + } + + return answer; + } + +int +RpcClientComm::executeOpenTA( unsigned short readOnly ) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeOpenTA(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + return CONNECTIONCLOSED; + } + + unsigned short rpcStatus=0; + unsigned short* rpcStatusPtr=0; + int secsWaited = 0; + + BeginTAParams params; + params.clientID = clientID; + params.readOnly = readOnly; + params.capability = capability; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcbeginta_1( ¶ms, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcbeginta_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcbeginta' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + + rpcStatus = *rpcStatusPtr; + +RMDBGIF(20, RMDebug::module_tools, "WAITRECEIVEDTILE", \ + RMInit::dbgOut << "Waiting 100 sec after receive tile\n" << std::endl; \ + sleep(100); \ + RMInit::dbgOut << "Continue now\n" << std::endl; ); + setRPCInactive(); + +/* not necessary with V5 + int sleepCntr = 1; + while( rpcStatus == 2 ) + { + if( sleepCntr == 64 ) sleepCntr /= 2; + RMInit::logOut << "Another transaction is already active, sleeping " << sleepCntr*2 << " secs..." << endl; + sleepCntr *= 2; + secsWaited += sleepCntr; +#ifndef __VISUALC__ + sleep( sleepCntr ); +#else + Sleep( sleepCntr*1000 ); +#endif + setRPCActive(); + do + { + rpcStatusPtr = rpcbeginta_1( ¶ms, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcbeginta_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + } +*/ + return rpcStatus; +} + +int +RpcClientComm::commitTA() + throw(r_Error) +{ + int answer=executeCommitTA(); + + if(answer==0) answer=executeCloseDB(); + + //If there is an error CONNECTIONCLOSED, we report this, it is important to know + if(answer == CONNECTIONCLOSED) { + throw r_Error(CONNECTIONCLOSED); + } + + disconnectFromServer(); + + return answer; +} + +int +RpcClientComm::executeCommitTA() +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeCommitTA(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + return CONNECTIONCLOSED; + } + + unsigned short rpcStatus=0; + unsigned short* rpcStatusPtr=0; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpccommitta_1( &clientID, binding_h ); + + if( !rpcStatusPtr ) { + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpccommitta_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpccommitta' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + return rpcStatus; +} + +int +RpcClientComm::abortTA() +{ + int answer=0; + try { + answer = executeAbortTA(); + + if(answer==0) answer=executeCloseDB(); + + disconnectFromServer(); + } + catch (r_Error& e) + { + RMInit::logOut << "RpcClientComm::abortTA() caught error: " << e.get_errorno() << " " << e.what() << std::endl; + answer = 1; + } + + //If there is an error CONNECTIONCLOSED, we ignore this, it is in abort + + return answer; +} + +int +RpcClientComm::executeAbortTA() +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::executeAbortTA(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + return CONNECTIONCLOSED; + } + + unsigned short rpcStatus=0; + unsigned short* rpcStatusPtr=0; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcabortta_1( &clientID, binding_h ); + + if( !rpcStatusPtr ) { + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcabortta_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcabortta' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + rpcStatus = *rpcStatusPtr; + setRPCInactive(); + + return rpcStatus; +} + + +r_OId +RpcClientComm::getNewOId( unsigned short objType ) + throw(r_Error) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getNewOId(objType) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + // update -> check for read_only transaction + if( r_Transaction::actual_transaction == 0 || + r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only ) + { + r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly ); + throw err; + } + + unsigned short rpcStatus = 0; + + NewOIdParams* params = new NewOIdParams; + OIdRes* thisResult = 0; + params->clientID = clientID; + params->objType = objType; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgetnewoid_1( params, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnewoid_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetnewoid' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + setRPCInactive(); + + delete params; + + rpcStatus = thisResult->status; + + r_OId oid( thisResult->oid ); + + // now the transfer structure of rpcgetcollbyname can be freed + XDRFREE(OIdRes, thisResult); + + if( rpcStatus != 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 1: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + case 2: + err = r_Error( r_Error::r_Error_CreatingOIdFailed ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + throw err; + } + + return oid; +} + + +unsigned short +RpcClientComm::getObjectType( const r_OId& oid ) + throw(r_Error) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getObjectType(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + OIdSpecParams* params = new OIdSpecParams; + ObjectTypeRes* thisResult = 0; + params->clientID = clientID; + params->oid = (char*)oid.get_string_representation(); + + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgetobjecttype_1( params, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetobjecttype_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetobjexttype' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + setRPCInactive(); + + delete params; + + unsigned short rpcStatus = thisResult->status; + unsigned short objType = thisResult->objType; + + // now the transfer structure can be freed + XDRFREE(ObjectTypeRes, thisResult); + + if( rpcStatus != 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 1: + err = r_Error( r_Error::r_Error_ClientUnknown ); + break; + case 2: + err = r_Error( r_Error::r_Error_ObjectUnknown ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + throw err; + } + + return objType; +} + + +char* +RpcClientComm::getTypeStructure( const char* typeName, r_Type_Type typeType ) + throw(r_Error) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getTypeStructure(typeName, typeType) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + GetTypeStructureParams* params = new GetTypeStructureParams; + GetTypeStructureRes* thisResult = 0; + params->clientID = clientID; + params->typeName = (char*)typeName; + params->typeType = typeType; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgettypestructure_1( params, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgettypestructure_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgettypestructure' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + setRPCInactive(); + + delete params; + + char* typeStructure = 0; + unsigned short rpcStatus = thisResult->status; + + if( rpcStatus == 0 ) + { + typeStructure = new char[strlen(thisResult->typeStructure) + 1]; + strcpy(typeStructure, thisResult->typeStructure); + // this has to be freed by rpc:thisResult->typeStructure = 0; + } + + // now the transfer structure can be freed + XDRFREE(GetTypeStructureRes, thisResult); + + if( rpcStatus != 0 ) + { + r_Error err; + + switch( rpcStatus ) + { + case 2: + err = r_Error( r_Error::r_Error_DatabaseClassUndefined ); + break; + default: + err = r_Error( r_Error::r_Error_TransferFailed ); + } + throw err; + } + + return typeStructure; +} + + +void +RpcClientComm::getMarRpcRepresentation( const r_GMarray* mar, RPCMarray*& rpcMarray, + r_Data_Format initStorageFormat, + const r_Base_Type *baseType) +{ + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)"); + + // allocate memory for the RPCMarray data structure and assign its fields + rpcMarray = (RPCMarray*)mymalloc( sizeof(RPCMarray) ); + rpcMarray->domain = mar->spatial_domain().get_string_representation(); + rpcMarray->cellTypeLength = mar->get_type_length(); + + void* arrayData = NULL; + r_ULong arraySize=0; + + if (initStorageFormat == r_Array) + { + if (transferFormat != r_Array) + { + r_Tile_Compression *engine = NULL; + + try + { + r_Storage_Man_CPP sman; + engine = r_Tile_Compression::create(transferFormat, mar->spatial_domain(), baseType); + engine->set_storage_handler(sman); + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) compress with " << engine->get_name()) + if ((endianClient != endianServer) && (!engine->converts_endianness())) + { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for " + << transferFormat << " endianness changed before compression from " << (r_Endian::r_Endianness)endianClient + << " to " << (r_Endian::r_Endianness) endianServer); + char *endianData = new char[mar->get_array_size()]; + changeEndianness(mar, endianData, baseType); + arrayData = engine->compress(endianData, arraySize, transferFormatParams); + delete [] endianData; + endianData=NULL; + } + else + { + arrayData = engine->compress(mar->get_array(), arraySize, transferFormatParams); + } + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compression returned " << arrayData << " (" << arraySize << " bytes)") + /*void *testData = engine->decompress(arrayData, arraySize); + cout << "Decompression worked " << ((memcmp(mar->get_array(), testData, (mar->get_type_length()) * (mar->spatial_domain().cell_count())) == 0) ? "OK" : "!NOT!") << endl; + delete [] testData;*/ + + // ForWiss: revert to uncompressed data if the compressed data is larger + // coman: and introduced a bug of endianess ... + if (arraySize > mar->get_array_size()) + { + RMInit::logOut << "RpcClientComm::getMarRpcRepresentation(...) Warning: overriding compression setting(" + << transferFormat << ") to " << r_Array + << " because compressed size( " << arraySize + << " bytes) > uncompressed size( " << mar->get_array_size() << " bytes)" << endl; + delete [] arrayData; + arrayData = NULL; + } + } + catch (r_Error &err) + { + RMInit::logOut << "RpcClientComm::getMarRpcRepresentation(...) Error: Unsupported data format " << transferFormat << endl; + } + if (engine != NULL) + delete engine; + } + else + { + if (endianClient != endianServer) + { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for " + << transferFormat << " endianness changed from " + << (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer); + arraySize = mar->get_array_size(); + arrayData = new char[arraySize]; + changeEndianness(mar, arrayData, baseType); + } + } + } + + if (arrayData == NULL) + { + //error in compression or compression inefficient + rpcMarray->currentFormat = initStorageFormat; + rpcMarray->data.confarray_len = mar->get_array_size(); + if (endianClient != endianServer) + { + RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for " + << transferFormat << "endianness changed from " + << (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer + << " because compression " << transferFormat << " failed" ); + arrayData = new char[arraySize]; + changeEndianness(mar, arrayData, baseType); + rpcMarray->data.confarray_val = (char*)(arrayData); + } + else + { + rpcMarray->data.confarray_val = (char*)(mar->get_array()); + } + } + else + { + if (arraySize != mar->get_array_size()) + { + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compressed to " << (100.0*arraySize) / mar->get_array_size() << "%") + } + rpcMarray->currentFormat = transferFormat; + rpcMarray->data.confarray_len = arraySize; + rpcMarray->data.confarray_val = (char*)arrayData; + } + rpcMarray->storageFormat = storageFormat; + + RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)"); +} + + +void +RpcClientComm::freeMarRpcRepresentation( const r_GMarray* mar, RPCMarray* rpcMarray ) +{ + if (rpcMarray->data.confarray_val != ((r_GMarray*)mar)->get_array()) + { + delete [] rpcMarray->data.confarray_val; + } + free( rpcMarray->domain ); + free( rpcMarray ); +} + +void +RpcClientComm::getMDDCollection( r_Set< r_Ref_Any >& mddColl, unsigned int isQuery ) + throw(r_Error) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getMDDCollection(mddColl, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCollection(...)") + unsigned short tileStatus=0; + unsigned short mddStatus = 0; +// r_Minterval mddDomain; + + RMInit::logOut << endl; + + while( mddStatus == 0 ) // repeat until all MDDs are transferred + { + r_Ref<r_GMarray> mddResult; + + GetMDDRes* thisResult = 0; + + // Get spatial domain of next MDD + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgetnextmdd_1( &clientID, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnextmdd_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetnextmdd' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + setRPCInactive(); + + mddStatus = thisResult->status; + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "read MDD") + + if( mddStatus == 2 ) + { + RMInit::logOut << "Error: getMDDCollection(...) - no transfer collection or empty transfer collection" << endl; + throw r_Error( r_Error::r_Error_TransferFailed ); + } + else + tileStatus = 0 ? 10 : 0; + + // create r_Minterval + // mddDomain = r_Minterval( thisResult->domain ); + + tileStatus = getMDDCore(mddResult, thisResult, isQuery); + + // finally, insert the r_Marray into the set + + mddColl.insert_element( mddResult, 1 ); + + if( tileStatus == 0 ) // if this is true, we're done with this collection + break; + + RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok"); + + } // end while( mddStatus == 0 ) + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getMDDCollection(mddColl, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + unsigned short* rpcStatusPtr = 0; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcendtransfer_1( &clientID, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendtransfer_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcendtransfer' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + setRPCInactive(); +} + + + +void +RpcClientComm::getElementCollection( r_Set< r_Ref_Any >& resultColl ) + throw(r_Error) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getElementCollection(resultColl) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + unsigned short rpcStatus = 0; + + RMInit::logOut << endl << " got set of type " << resultColl.get_type_structure() << endl; + + while( rpcStatus == 0 ) // repeat until all elements are transferred + { + GetElementRes* thisResult = 0; + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getElementCollection(resultColl) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + thisResult = rpcgetnextelement_1( &clientID, binding_h ); + + if( !thisResult ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnextelement_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetnextelement' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( thisResult == 0 ); + setRPCInactive(); + + rpcStatus = thisResult->status; + + if( rpcStatus == 2 ) + { + RMInit::logOut << "Error: getElementCollection(...) - no transfer collection or empty transfer collection" << endl; + throw r_Error( r_Error::r_Error_TransferFailed ); + } + + // create new collection element, use type of collection resultColl + r_Ref_Any element; + const r_Type* elementType = resultColl.get_element_type_schema(); + + // convert the endianness before creating the new element! + if (endianClient != endianServer) + { + if (endianClient == 0) + elementType->convertToBigEndian(thisResult->data.confarray_val, 1); + else + elementType->convertToLittleEndian(thisResult->data.confarray_val, 1); + } + + switch( elementType->type_id() ) + { + case r_Type::BOOL: + case r_Type::CHAR: + case r_Type::OCTET: + case r_Type::SHORT: + case r_Type::USHORT: + case r_Type::LONG: + case r_Type::ULONG: + case r_Type::FLOAT: + case r_Type::DOUBLE: + { + element = new r_Primitive( thisResult->data.confarray_val, (r_Primitive_Type*) elementType ); + r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element ); + } + break; + + case r_Type::COMPLEXTYPE1: + case r_Type::COMPLEXTYPE2: + element = new r_Complex(thisResult->data.confarray_val, (r_Complex_Type *)elementType); + r_Transaction::actual_transaction->add_object_list(r_Transaction::SCALAR, (void *)element); + break; + + case r_Type::STRUCTURETYPE: + { + element = new r_Structure( thisResult->data.confarray_val, (r_Structure_Type*) elementType ); + r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element ); + } + break; + + case r_Type::POINTTYPE: + { + char* stringRep = new char[thisResult->data.confarray_len+1]; + strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len ); + stringRep[thisResult->data.confarray_len] = '\0'; + + r_Point* typedElement = new r_Point( stringRep ); + element = typedElement; + r_Transaction::actual_transaction->add_object_list( r_Transaction::POINT, (void*) typedElement ); + delete [] stringRep; + } + break; + + case r_Type::SINTERVALTYPE: + { + char* stringRep = new char[thisResult->data.confarray_len+1]; + strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len ); + stringRep[thisResult->data.confarray_len] = '\0'; + r_Sinterval* typedElement = new r_Sinterval( stringRep ); + element = typedElement; + r_Transaction::actual_transaction->add_object_list( r_Transaction::SINTERVAL, (void*) typedElement ); + delete [] stringRep; + } + break; + + case r_Type::MINTERVALTYPE: + { + char* stringRep = new char[thisResult->data.confarray_len+1]; + strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len ); + stringRep[thisResult->data.confarray_len] = '\0'; + + r_Minterval* typedElement = new r_Minterval( stringRep ); + element = typedElement; + r_Transaction::actual_transaction->add_object_list( r_Transaction::MINTERVAL, (void*) typedElement ); + delete [] stringRep; + } + break; + + case r_Type::OIDTYPE: + { + char* stringRep = new char[thisResult->data.confarray_len+1]; + strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len ); + stringRep[thisResult->data.confarray_len] = '\0'; + + r_OId* typedElement = new r_OId( stringRep ); + element = typedElement; + r_Transaction::actual_transaction->add_object_list( r_Transaction::OID, (void*) typedElement ); + delete [] stringRep; + } + break; + default: + RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection(...) bad element typeId" << elementType->type_id()) + break; + } + + + RMInit::logOut << " got element" << endl; + + // now the transfer structure of rpcgetnextmdd can be freed + XDRFREE(GetElementRes, thisResult); + + // insert element into result set + resultColl.insert_element( element, 1 ); + } + + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getElementCollection(resultColl) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + unsigned short* rpcStatusPtr = 0; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcendtransfer_1( &clientID, binding_h ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendtransfer_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcendtransfer' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + setRPCInactive(); +} + +unsigned short +RpcClientComm::getMDDCore( r_Ref< r_GMarray > &mdd, GetMDDRes *thisResult, unsigned int isQuery ) + throw( r_Error ) +{ + // create r_Minterval and oid + r_Minterval mddDomain( thisResult->domain ); + r_OId rOId ( thisResult->oid ); + r_GMarray *marray; + + //cout << "getMDDCore..." << endl; + if( isQuery ) + marray = new( r_Database::actual_database, r_Object::transient, rOId ) r_GMarray(); + else + marray = new( r_Database::actual_database, r_Object::read , rOId ) r_GMarray(); + + marray->set_spatial_domain( mddDomain ); + marray->set_type_by_name ( thisResult->typeName ); + marray->set_type_structure( thisResult->typeStructure ); + + r_Data_Format currentFormat = (r_Data_Format)(thisResult->currentFormat); + if (r_Tile_Compression::check_data_format(currentFormat) == 1) + currentFormat = r_Array; + marray->set_current_format( currentFormat ); + + r_Data_Format decompFormat; + + const r_Base_Type *baseType = marray->get_base_type_schema(); + + // now the transfer structure of rpcgetnextmdd can be freed + XDRFREE(GetMDDRes, thisResult); + + //RMDBGOUTFLUSH( 2, "domain " << mddDomain << " ... " ) + + // Variables needed for tile transfer + GetTileRes* tileRes=0; + unsigned short mddDim = mddDomain.dimension(); // we assume that each tile has the same dimensionality as the MDD + r_Minterval tileDomain; + r_GMarray* tile; // for temporary tile + char* memCopy; + unsigned long memCopyLen; + int tileCntr = 0; + unsigned short tileStatus = 0; + + tileStatus = 2; // call rpcgetnexttile_1 at least once + + while( tileStatus == 2 || tileStatus == 3 ) // while( for all tiles of the current MDD ) + { + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getMDDCore(mdd, thisResult, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + tileRes = rpcgetnexttile_1( &clientID, binding_h ); + + if( !tileRes ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnexttile_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetnexttile' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + } while( tileRes == 0 ); +RMDBGIF(20, RMDebug::module_tools, "WAITRECEIVEDTILE", \ + RMInit::dbgOut << "Waiting 100 sec after receive tile\n" << std::endl; \ + sleep(100); \ + RMInit::dbgOut << "Continue now\n" << std::endl; ); + setRPCInactive(); + + + tileStatus = tileRes->status; + + if( tileStatus == 4 ) + { + XDRFREE(GetTileRes, tileRes); + + RMInit::logOut << "Error: rpcGetNextTile(...) - no tile to transfer or empty transfer collection" << endl; + throw r_Error( r_Error::r_Error_TransferFailed ); + } + + // take cellTypeLength for current MDD of the first tile + if( tileCntr == 0 ) + marray->set_type_length( tileRes->marray->cellTypeLength ); + + tileDomain = r_Minterval( tileRes->marray->domain ); + memCopyLen = tileDomain.cell_count() * marray->get_type_length(); // cell type length of the tile must be the same + if (memCopyLen < tileRes->marray->data.confarray_len) + memCopyLen = tileRes->marray->data.confarray_len; // may happen when compression expands + memCopy = new char[ memCopyLen ]; + + // create temporary tile + tile = new r_GMarray(); + tile->set_spatial_domain( tileDomain ); + tile->set_array( memCopy ); + tile->set_array_size( memCopyLen ); + tile->set_type_length( tileRes->marray->cellTypeLength ); + tileCntr++; + + // Variables needed for block transfer of a tile + unsigned long blockOffset = 0; + unsigned short subStatus = 3; + currentFormat = (r_Data_Format)(tileRes->marray->currentFormat); + + switch( tileStatus ) + { + case 3: // at least one block of the tile is left + + // Tile arrives in several blocks -> put them together + concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset); + XDRFREE(GetTileRes, tileRes); + + while( subStatus == 3 ) + { + + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getMDDCore(mdd, thisResult, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + setRPCActive(); + rpcRetryCounter = 0; + do + { + tileRes = rpcgetnexttile_1( &clientID, binding_h ); + + if( !tileRes ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnexttile_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgetnexttile' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + } while( tileRes == 0 ); + setRPCInactive(); + + subStatus = tileRes->status; + + if( subStatus == 4 ) + { + XDRFREE(GetTileRes, tileRes); + + RMInit::logOut << "Error: rpcGetNextTile(...) - no tile to transfer or empty transfer collection" << endl; + throw r_Error( r_Error::r_Error_TransferFailed ); + } + + // RMInit::logOut << "Status: " << subStatus << endl; + // RMInit::logOut << "BlockOffset: " << blockOffset << " Size: " << tileRes->marray->data.confarray_len << endl; + concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset); + XDRFREE(GetTileRes, tileRes); + } + + tileStatus = subStatus; + break; + + default: // tileStatus = 0,3 last block of the current tile + + // Tile arrives as one block. + concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset); + // RMInit::logOut << "Internal size: " << tileRes->marray->data.confarray_len << endl; + + XDRFREE(GetTileRes, tileRes); + break; + } + + // uncompress the tile if necessary + decompFormat = doTransferDecompression( tile, baseType, currentFormat, blockOffset ); + + char* marrayData = NULL; + // Now the tile is transferred completely, insert it into current MDD + if( tileStatus < 2 && tileCntr == 1 && (tile->spatial_domain() == marray->spatial_domain())) + { + // MDD consists of just one tile that is the same size of the mdd + + // simply take the data memory of the tile + marray->set_array( tile->get_array() ); + marray->set_array_size( tile->get_array_size() ); + tile->set_array( 0 ); + } + else + { + // MDD consists of more than one tile or the tile does not cover the whole domain + + r_Bytes size = mddDomain.cell_count() * marray->get_type_length(); + + if( tileCntr == 1 ) + { + // allocate memory for the MDD + marrayData = new char[ size ]; + memset(marrayData, 0, size); + + marray->set_array( marrayData ); + } + else + marrayData = marray->get_array(); + + + // copy tile data into MDD data space (optimized, relying on the internal representation of an MDD ) + char* mddBlockPtr; + char* tileBlockPtr = tile->get_array(); + unsigned long blockCells = tileDomain[tileDomain.dimension()-1].high()-tileDomain[tileDomain.dimension()-1].low()+1; + unsigned long blockSize = blockCells * marray->get_type_length(); + unsigned long blockNo = tileDomain.cell_count() / blockCells; + + for( unsigned long blockCtr = 0; blockCtr < blockNo; blockCtr++ ) + { + mddBlockPtr = marrayData + marray->get_type_length()*mddDomain.cell_offset( tileDomain.cell_point( blockCtr * blockCells ) ); + memcpy( (void*)mddBlockPtr, (void*)tileBlockPtr, (size_t)blockSize ); + tileBlockPtr += blockSize; + } + + // former non-optimized version + // for( i=0; i<tileDomain->cell_count(); i++ ) + // (*marray)[tileDomain->cell_point( i )] = (*tile)[tileDomain->cell_point( i )]; + + marray->set_array_size( size ); + } + + // delete temporary tile + delete tile; + + } // end while( MDD is not transferred completely ) + + + mdd = r_Ref<r_GMarray>( marray->get_oid(), marray ); + + return tileStatus; +} + +int RpcClientComm::concatArrayData( const char *source, unsigned long srcSize, char *&dest, unsigned long &destSize, unsigned long &destLevel ) +{ + if (destLevel + srcSize > destSize) + { + // need to extend dest + unsigned long newSize = destLevel + srcSize; + char *newArray; + + // allocate a little extra if we have to extend + newSize = newSize + newSize / 16; + +// RMDBGOUT( 1, "RpcClientComm::concatArrayData(): need to extend from " << destSize << " to " << newSize ); + + if ((newArray = new char[newSize]) == NULL) + return -1; + + memcpy(newArray, dest, destLevel); + delete [] dest; + dest = newArray; + destSize = newSize; + } + + memcpy(dest + destLevel, source, srcSize); + destLevel += srcSize; + + return 0; +} + + +void RpcClientComm::triggerAliveSignal() +{ + aliveSignalRemaining = 1; + + sendAliveSignal(); +} + + +void RpcClientComm::sendAliveSignal() +{ + if( aliveSignalRemaining && !checkRPCActive() ) + { + aliveSignalRemaining = 0; + + unsigned long myID = getClientID(); + + // tell the server I'm alive + + // determine my binding handle + CLIENT* myHandle = getBindingHandle(); + + unsigned short* rpcStatusPtr = 0; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcalive_1( &myID, myHandle ); + + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcalive_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcalive' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + }while( rpcStatusPtr == 0 ); + setRPCInactive(); + RMDBGONCE(3, RMDebug::module_clientcomm, "RpcClientComm", "sent alive signal") + +#ifdef __VISUALC__ + timeKillEvent(timerid); + timerid = timeSetEvent(ALIVEINTERVAL * 1000, 0, TimerProc, NULL, TIME_PERIODIC); +#else + // Re-initialize the signal handler to point to this function + signal( SIGALRM, aliveSignal ); + + // Reset the alarm + alarm( ALIVEINTERVAL ); +#endif + } +} + + +int RpcClientComm::setTransferFormat( r_Data_Format format, const char* formatParams ) +{ + transferFormat = format; + + if (transferFormatParams != NULL) + { + free(transferFormatParams); + transferFormatParams = NULL; + } + if (formatParams != NULL) + { + transferFormatParams = (char*)mymalloc(strlen(formatParams)+1); + strcpy(transferFormatParams, formatParams); + + // extract ``exactformat'' if present + clientParams->process(transferFormatParams); + } + + SetServerTransferParams* params = new SetServerTransferParams; + + params->clientID = getClientID(); + params->format = (unsigned short)format; + if (transferFormatParams == NULL) + params->formatParams = ""; + else + params->formatParams = transferFormatParams; + + CLIENT* myHandle = getBindingHandle(); + + unsigned short* rpcStatusPtr = 0; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcsetservertransfer_1( params, myHandle ); + + if (!rpcStatusPtr) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcsetservertransfer_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcsetservertransfer' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + } + while ( rpcStatusPtr == 0 ); + setRPCInactive(); + + delete params; + + return (int)(*rpcStatusPtr); +} + + +int RpcClientComm::setStorageFormat( r_Data_Format format, const char *formatParams ) +{ + storageFormat = format; + + if (storageFormatParams != NULL) + { + free(storageFormatParams); + storageFormatParams = NULL; + } + if (formatParams != NULL) + { + storageFormatParams = (char*)mymalloc(strlen(formatParams) + 1); + strcpy(storageFormatParams, formatParams); + // extract ``compserver'' if present + clientParams->process(storageFormatParams); + } + + SetServerTransferParams *params = new SetServerTransferParams; + + params->clientID = getClientID(); + params->format = (unsigned short)format; + if (storageFormatParams == NULL) + params->formatParams = ""; + else + params->formatParams = storageFormatParams; + + CLIENT *myHandle = getBindingHandle(); + + unsigned short *rpcStatusPtr = 0; + + setRPCActive(); + rpcRetryCounter = 0; + do + { + rpcStatusPtr = rpcsetserverstorage_1( params, myHandle ); + if( !rpcStatusPtr ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcsetserverstorage_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcsetserverstorage' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + } + while ( rpcStatusPtr == NULL ); + setRPCInactive(); + + delete params; + + return (int)(*rpcStatusPtr); +} + + +void +RpcClientComm::setRPCActive() +{ + rpcActive = 1; +} + + + +void +RpcClientComm::setRPCInactive() +{ + rpcActive = 0; + sendAliveSignal(); +} + + +int +RpcClientComm::checkRPCActive() +{ + return rpcActive; +} + +const char * +RpcClientComm::getExtendedErrorInfo() + throw(r_Error) +{ + if(binding_h == NULL) { + RMInit::logOut << endl << "RpcClientComm::getMDDCollection(mddColl, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl; + throw r_Error(CONNECTIONCLOSED); + } + + static char *errorInfo = NULL; + + GetExtendedErrorInfo *result=NULL; + + int dummy; + + if(errorInfo) delete[] errorInfo; + + setRPCActive(); + + rpcRetryCounter = 0; + do{ + + result = rpcgeterrorinfo_1(&dummy, binding_h); + if( !result ) + { + RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcalive_1)" << endl << endl; + sleep(RMInit::clientcommSleep); + } + if (rpcRetryCounter > RMInit::clientcommMaxRetry) + { + RMInit::logOut << "RPC call 'rpcgeterrorinfo' failed" << endl; + throw r_Error(CLIENTCOMMUICATIONFAILURE); + } + rpcRetryCounter++; + + }while(!result); + + setRPCInactive(); + + errorInfo = new char[strlen(result->errorText)+1]; + strcpy(errorInfo,result->errorText); + + return errorInfo; +} + + +#define MAXMSG 512 + +int RpcClientComm::readWholeMessage(int socket,char *destBuffer,int buffSize) + { + // we read what is comming in until we encounter a '\0' + // this is our end-sign. + int totalLength=0; + int redNow; + while(1) + { + redNow = read(socket,destBuffer+totalLength,buffSize-totalLength); + if(redNow == -1) + { if(errno == EINTR) continue; // read was interrupted by signal + + return -1; // another error + } + totalLength+=redNow; + + if(destBuffer[totalLength-1]==0) break; // THE END + } + return totalLength; + } + +int RpcClientComm::writeWholeMessage(int socket,char *destBuffer,int buffSize) + { + // we write the whole message, including the ending '\0', which is already in + // the buffSize provided by the caller + int totalLength=0; + int writeNow; + while(1) + { + writeNow = write(socket,destBuffer+totalLength,buffSize-totalLength); + if(writeNow == -1) + { if(errno == EINTR) continue; // read was interrupted by signal + + return -1; // another error + } + totalLength+=writeNow; + + if( totalLength==buffSize ) break; // THE END + } + return totalLength; + } + +void +RpcClientComm::setMaxRetry(unsigned int newMaxRetry) + { + RMInit::clientcommMaxRetry = newMaxRetry; + } + +unsigned int +RpcClientComm::getMaxRetry() + { + return RMInit::clientcommMaxRetry; + } + + +static void pause(int retryCount) + { + unsigned int milisec = 50 + retryCount * 50; + if(milisec > 1000) milisec = 1000; + + timeval tv; + tv.tv_sec = milisec / 1000; + tv.tv_usec = milisec * 1000; + + select(0,NULL,NULL,NULL,&tv); + } + +int +RpcClientComm::getFreeServer(unsigned short readOnly) + { + //RMInit::logOut << "getFreeServer in"<<endl; + for(int retryCount=0;;retryCount++) + { + try + { + executeGetFreeServer(readOnly); + + // if no error, we have the server, so break + break; + } + catch(r_Error &e) + { + int errorno = e.get_errorno(); + //cerr<<"errorno="<<errorno; + if(( errorno==801 || errorno==805 || errorno==806) && retryCount < RMInit::clientcommMaxRetry) + { //cerr<<" retry="<<retryCount<<endl; + RMInit::logOut << "Connection to RasDaMan failed with " << errorno << ": retry " << retryCount << endl; + pause(retryCount); + } + else throw; + } + } + //RMInit::logOut << "getFreeServer out"<<endl; + return 1; + } +int +RpcClientComm::executeGetFreeServer(unsigned short readOnly) + { + static char myRasmgrID[100]=""; + if(myRasmgrID[0]==0) + { unsigned int hostid = gethostid(); + unsigned int pid = getpid(); + sprintf(myRasmgrID,"%u:%u",hostid,pid); + } + + char message[MAXMSG]; + char header[MAXMSG]; + char body[MAXMSG]; + sprintf(header,"POST getfreeserver HTTP/1.1\r\nAccept: text/plain\r\nUserAgent: RasClient/1.0\r\nAuthorization: ras %s\r\nContent-length:",identificationString); + sprintf(body,"%s RPC %s %s",dataBase,(readOnly ? "ro":"rw"), myRasmgrID); + sprintf(message,"%s %d\r\n\r\n%s",header,strlen(body)+1,body); + + struct protoent* getprotoptr = getprotobyname("tcp"); + + struct hostent *hostinfo = gethostbyname(rasmgrHost); + if(hostinfo==NULL) + { RMInit::logOut << "Error locating RasMGR" << rasmgrHost <<" ("<<strerror(errno)<<')'<<endl; + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + sockaddr_in internetSocketAddress; + + internetSocketAddress.sin_family=AF_INET; + internetSocketAddress.sin_port=htons(rasmgrPort); + internetSocketAddress.sin_addr=*(struct in_addr*)hostinfo->h_addr; + + + int sock; + bool ok = false; + int retry; + for(retry=0;retry<RMInit::clientcommMaxRetry * 40 ;retry++) // this has to be 5000 or so, now that counter is 120 default (later we'll make this better) + { + sock=socket(PF_INET,SOCK_STREAM,getprotoptr->p_proto); + //cout<<"Socket="<<sock<<" protocol(tcp)="<<getprotoptr->p_proto<<endl; + if(sock<0) { //cerr<<"getFreeServer: cannot open socket to RasMGR, ("<<strerror(errno)<<')'<<endl; + if(retry==0) RMInit::logOut << "getFreeServer: cannot open socket to RasMGR, ("<<strerror(errno)<<')'<<endl; + sleep(RMInit::clientcommSleep); + continue; + } + + if(connect(sock,(struct sockaddr*)&internetSocketAddress,sizeof(internetSocketAddress)) < 0) + { if(retry==0) RMInit::logOut <<"getFreeServer: Connection to RasMGR failed! ("<<strerror(errno)<<')'<<endl; + close(sock); + sleep(RMInit::clientcommSleep); + continue; + } + + ok = true; + break; + } + if(retry) RMInit::logOut << "getFreeServer: tried " << retry+1 << " times " <<endl; + + if( !ok ){ RMInit::logOut << "getFreeServer: I give up, sorry" <<endl; + close(sock); + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + //write_to_server + int nbytes=writeWholeMessage(sock,message,strlen(message)+1); + + if(nbytes<0) + { RMInit::logOut << "Error writing message to RasMGR" << rasmgrHost << " ("<<strerror(errno)<<')' << endl; + close(sock); + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + //wait and read answer + nbytes=readWholeMessage(sock,message,MAXMSG); + close(sock); + + if(nbytes<0) + { RMInit::logOut << "Error reading answer from RasMGR" << rasmgrHost <<" ("<<strerror(errno)<<')'<<endl; + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + // and now, analize answer + // first line is: HTTP/1.1 code answertext(CRLF) + char *p=strstr(message," "); //looks for the first white space to locate status-code + + int statusCode=strtoul( p, (char **)NULL, 10); + + char *pEOL=strstr(p,"\r\n"); // locate CRLF + if(!pEOL) + { RMInit::logOut << "Invalid answer from RasMGR" << endl; + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + if(statusCode==200) + { // It's OK + char *addr = strstr(message,"\r\n\r\n")+4; //looks for the address of server + + addr = strtok(addr," \r\n\t"); //isolates the RasMGR host name + + char *portString = strtok(NULL," \r\n\t"); //looks for the rpc_prognum, sended as string + + char *capab = strtok(NULL," \r\n\t"); + + if(portString && addr && capab) + { + strcpy(serverHost,addr); + RPCIF_PARA= strtoul( portString, (char **)NULL, 0); // requires 0x if base16 + strcpy(capability,capab); + //cout<<"Got server="<<serverHost<<" servnr=0x"<<hex<<RPCIF_PARA<<dec<<endl; + } + else + { RMInit::logOut << "Invalid answer from RasMGR" << endl; + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + + } + else + { + char *errText = strstr(message,"\r\n\r\n")+4; + //cerr<<"cucu Error "<<errText<<endl; + //RMInit::logOut << "Error "<<errText<< endl; + + int errorCode = strtoul(errText, (char**)NULL, 0); + //cerr <<" throw "<< errorCode <<endl; + + switch(errorCode) + { case 802: + case 803: + case 804: throw r_Error( r_Error::r_Error_AccesDenied,errorCode); + break; + case 801: + case 805: + case 806: throw r_Error( r_Error::r_Error_SystemOverloaded,errorCode); + break; + case 807: throw r_Error( r_Error::r_Error_DatabaseUnknown,errorCode); + break; + default : + throw r_Error( r_Error::r_Error_General,808 ); + break; + } + } + return 1; + } + + +const char* +RpcClientComm::getServerName() + { return serverHost; + } + + +int +RpcClientComm::connectToServer(unsigned short readOnly) + { + disconnectFromServer(); // just to be sure + getFreeServer(readOnly); + +#if (defined(__VISUALC__) || defined(CYGWIN)) + RMInit::logOut << "Initializing the NT-RPC ..." << flush; + rpc_nt_init(); + RMInit::logOut << "OK" << endl; +#endif + RMInit::logOut << "Creating the binding..." << flush; +#if (defined(__VISUALC__) || defined(CYGWIN)) + binding_h = client_create( (char *) serverHost, RPCIF_PARA, RPCIFVERS, "tcp" ); +#else + binding_h = clnt_create( (char *) serverHost, RPCIF_PARA, RPCIFVERS, "tcp" ); + if( !binding_h ) + { + cout << endl; + clnt_pcreateerror(""); + } +#endif + if( !binding_h ) + { + RMInit::logOut << "FAILED" << endl; + throw r_Error( r_Error::r_Error_ServerInvalid ); + } + else + { + RMInit::logOut << "OK" << endl; + serverUp = 1; + } + + /* Default timeout can be changed using clnt_control() */ + // moved constant into raslib/riminit.hh, changed 25->3 -- PB 2005-sep-09 + static struct timeval timeout = { RPC_TIMEOUT, 0 }; + +#if (defined(__VISUALC__) || defined(CYGWIN)) + client_control( binding_h, CLGET_TIMEOUT, (char *)&timeout ); +#else + clnt_control( binding_h, CLGET_TIMEOUT, (char *)&timeout ); +#endif + RMInit::logOut << "Timeout: " << timeout.tv_sec << " sec " << timeout.tv_usec << " microsec" << endl; + + timeout.tv_sec = RMInit::timeOut; + timeout.tv_usec = 0; +#if (defined(__VISUALC__) || defined(CYGWIN)) + client_control( binding_h, CLSET_TIMEOUT, (char *)&timeout ); +#else + clnt_control( binding_h, CLSET_TIMEOUT, (char*)&timeout ); +#endif + + RMInit::logOut << "Timeout set to " << timeout.tv_sec / 60. << " min." << endl; + +#ifndef __VISUALC__ + // Install a signal handler for the alive signal + signal( SIGALRM, aliveSignal ); +#endif + return 1; +} + +int RpcClientComm::disconnectFromServer() throw() + { + if(!binding_h) + { + RMInit::logOut << "Disconnect from server: no binding" << endl; + return -1; + } + else + { + RMInit::logOut << "Disconnect from server: binding ok" << endl; + } + +#ifdef __VISUALC__ + RMInit::logOut << "Deleting the binding..."; + client_destroy(binding_h); + RMInit::logOut << "OK" << endl; + timeKillEvent(timerid); + RMInit::logOut << "Deactivating the NT-RPC feature..."; + rpc_nt_exit(); + RMInit::logOut << "OK" << endl; +#else + RMInit::logOut << "Deleting the binding..."; +#ifdef CYGWIN + client_destroy(binding_h); + rpc_nt_exit(); +#else + clnt_destroy(binding_h); +#endif + RMInit::logOut << "OK" << endl; + + if (storageFormatParams != NULL) + free(storageFormatParams); + storageFormatParams = NULL; + + if (transferFormatParams != NULL) + free(transferFormatParams); + transferFormatParams = NULL; + + binding_h = NULL; + + // suspend alarm timer for the periodical alive signal + alarm( 0 ); +#endif + + return 0; + } + +// we will make this nicer after the D-day (this means when we change to para-proc +int messageDigest(const char *input,char *output,const char *mdName); + +void +RpcClientComm::setUserIdentification(const char *userName, const char *plainTextPassword) + { + char digest[33]=""; + messageDigest(plainTextPassword,digest,"MD5"); + sprintf(identificationString,"%s:%s",userName,digest); + } + +int messageDigest(const char *input,char *output,const char *mdName) + { + + EVP_MD_CTX mdctx; + const EVP_MD *md; + unsigned int md_len, i; + unsigned char md_value[100]; + + OpenSSL_add_all_digests(); + + md = EVP_get_digestbyname(mdName); + + if(!md) return 0; + + EVP_DigestInit(&mdctx, md); + EVP_DigestUpdate(&mdctx,input, strlen(input)); + EVP_DigestFinal(&mdctx, md_value, &md_len); + + for(i = 0; i < md_len; i++) sprintf(output+i+i,"%02x", md_value[i]); + + return strlen(output); + } + +unsigned long +RpcClientComm::getClientID() const +{ + return clientID; +} + + +CLIENT* +RpcClientComm::getBindingHandle() const +{ + return binding_h; +} + + +void RpcClientComm::setTimeoutInterval(int seconds){ }; + +int RpcClientComm::getTimeoutInterval() { return 0;}; + |