/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community. If not, see .
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see
* or contact Peter Baumann via .
/
/**
* SOURCE: rpcclientcomm.cc
*
* MODULE: clientcomm
* CLASS: RpcClientComm
*
* PURPOSE:
*
* COMMENTS:
* None
*/
#include "mymalloc/mymalloc.h"
static const char rcsid[] = "@(#)clientcomm, RpcClientComm: $Id: rpcclientcomm.cc,v 1.11 2005/09/09 16:16:29 rasdev Exp $";
#include
#include
#include // for ceil(), log(), exp()
#include
#include
#include
#include
using namespace std;
#ifdef __VISUALC__
#include // for the required defines and stuff
#include // for setEventTimer and killEventTimer
#include
#else
#if defined(DECALPHA) || defined(LINUX)
#if !defined(_XOPEN_SOURCE_EXTENDED)
#define _XOPEN_SOURCE_EXTENDED // for gethostid
#endif
#endif
#include // for sleep(), alarm()
#include // for signal()
#endif
#ifdef __VISUALC__ // do this ONLY for VisualC! Not for EARLY_TEMPLATE
#define __EXECUTABLE__
#endif
#if (defined(__VISUALC__) || defined(CYGWIN))
extern "C"
{
#include "clientcomm/clnt_control/clnt_control.h"
}
# define XDRFREE(proc, res) xdrfree( ntxdr_##proc, (char*)res )
#else
# define XDRFREE(proc, res) xdr_free( (xdrproc_t)xdr_##proc, (char*)res )
#endif
#include "rasodmg/transaction.hh"
#include "rasodmg/database.hh"
#include "rasodmg/iterator.hh"
#include "rasodmg/set.hh"
#include "rasodmg/ref.hh"
#include "rasodmg/storagelayout.hh"
#include "rasodmg/tiling.hh"
#include "raslib/minterval.hh"
#include "raslib/rmdebug.hh"
#include "raslib/rminit.hh"
#include "raslib/primitivetype.hh"
#include "raslib/complextype.hh"
#include "raslib/structuretype.hh"
#include "raslib/primitive.hh"
#include "raslib/complex.hh"
#include "raslib/structure.hh"
#include "raslib/endian.hh"
#include "raslib/parseparams.hh"
// for transfer compression
#include "compression/tilecompression.hh"
#include "clientcomm/rpcclientcomm.hh"
#ifdef __VISUALC__ // do this ONLY for VisualC! Not for EARLY_TEMPLATE
#undef __EXECUTABLE__
#endif
RMINITGLOBALS('C')
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define ALIVEINTERVAL 60
#define TAWRITEWAITINTERVAL 10
#ifdef SOLARIS
extern "C" void aliveSignal( int );
#endif
#ifdef __VISUALC__
void CALLBACK TimerProc(UINT wTimerID, UINT wMsg, DWORD dwUser, DWORD dw1, DWORD dw2)
#else
void aliveSignal( int )
#endif
{
if(!RMInit::noTimeOut) {
// get the current clientcomm object
ClientComm* myComm = r_Database::actual_database->getComm();
if( myComm == 0 )
{
RMInit::logOut << "RpcClientComm: Error: RpcClientComm object only usable within r_Database object." << endl;
return;
}
myComm->triggerAliveSignal();
}
}
RpcClientComm::RpcClientComm( const char* _rasmgrHost, int _rasmgrPort ) throw( r_Error )
: binding_h(NULL),
clientID(0),
#ifdef __VISUALC__
UINT timerid(0),
#endif
status(0),
serverUp(0),
rpcActive(0),
aliveSignalRemaining(0),
endianServer(0),
endianClient(0),
serverRPCversion(0),
transferFormat(r_Array),
storageFormat(r_Array),
transferFormatParams(NULL),
storageFormatParams(NULL),
clientParams(NULL),
serverCompresses(0),
exactFormat(0),
RPCIF_PARA(0)
{
clientParams = new r_Parse_Params();
clientParams->add("compserver", &serverCompresses, r_Parse_Params::param_type_int);
clientParams->add("exactformat", &exactFormat, r_Parse_Params::param_type_int);
endianClient = (int)r_Endian::get_endianness();
this->rasmgrHost=(char*)_rasmgrHost;
this->rasmgrPort=_rasmgrPort;
serverHost[0]=0;
capability[0]=0;
strcpy(identificationString,"rasguest:8e70a429be359b6dace8b5b2500dedb0"); // this is MD5("rasguest");
}
static int rpcRetryCounter = 0;
RpcClientComm::~RpcClientComm() throw()
{
disconnectFromServer();
delete clientParams;
clientParams = NULL;
}
bool RpcClientComm::effectivTypeIsRNP() throw()
{
return false;
}
void
RpcClientComm::executeQuery( const r_OQL_Query& query, r_Set< r_Ref_Any >& result )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeQuery(query, result) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
char* queryString;
// Get the query string by using a backdoor function of r_OQL_Query
queryString = (char*)query.get_query();
// Finally, this is the remote procedure which sends the query and receives a
// client Id under which the client can access the r_Marrays he is to receive
ExecuteQueryParams* params = new ExecuteQueryParams;
ExecuteQueryRes* res;
params->clientID = clientID;
params->query = queryString;
setRPCActive();
rpcRetryCounter = 0;
do
{
res = rpcexecutequery_1( params, binding_h );
if( !res )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcexecutequery_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcexecutequery' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( res == 0 );
setRPCInactive();
delete params;
if( res->status == 0 )
{
result.set_type_by_name( res->typeName );
result.set_type_structure( res->typeStructure );
XDRFREE(ExecuteQueryRes, res);
getMDDCollection( result, 1 );
}
else if( res->status == 1 )
{
result.set_type_by_name( res->typeName );
result.set_type_structure( res->typeStructure );
XDRFREE(ExecuteQueryRes, res);
getElementCollection( result );
}
else if (res->status == 2)
{
// Result collection is empty and nothing has to be got.
XDRFREE(ExecuteQueryRes, res);
}
else if( res->status == 4 || res->status == 5 )
{
r_Equery_execution_failed err( res->errorNo, res->lineNo, res->columnNo, res->token );
XDRFREE(ExecuteQueryRes, res);
throw err;
}
else
{
r_Error err;
if( res->status == 3 )
err = r_Error( r_Error::r_Error_ClientUnknown );
else
err = r_Error( r_Error::r_Error_TransferFailed );
XDRFREE(ExecuteQueryRes, res);
throw err;
}
}
void
RpcClientComm::executeQuery( const r_OQL_Query& query )
throw( r_Error )
{
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery(query)")
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
// update -> check for read_only transaction
if( r_Transaction::actual_transaction == 0 ||
r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only )
{
r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly );
throw err;
}
unsigned short rpcStatus;
unsigned short* rpcStatusPtr = 0;
//
// Send MDD constants to the server.
//
if( query.get_constants() )
{
r_Set< r_GMarray* >* mddConstants = (r_Set< r_GMarray* >*)query.get_constants();
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcinitexecuteupdate_1( &clientID, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinitexecuteupdate_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcexecuteupdate' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
r_Iterator iter = mddConstants->create_iterator();
for( iter.reset(); iter.not_done(); iter++ )
{
r_GMarray* mdd = *iter;
const r_Base_Type* baseType = mdd->get_base_type_schema();
if( mdd )
{
// initiate composition of MDD at server side
InsertTransMDDParams* params = new InsertTransMDDParams;
params->clientID = clientID;
params->collName = strdup(""); // not used
params->domain = mdd->spatial_domain().get_string_representation();
params->typeLength = mdd->get_type_length();
params->typeName = (char*)mdd->get_type_name();
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcstartinserttransmdd_1( params, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinserttransmdd_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcexecutequery' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
free( params->domain );
free( params->collName );
delete params;
if( rpcStatus > 0 )
{
r_Error err;
switch( rpcStatus )
{
case 2:
err = r_Error( r_Error::r_Error_DatabaseClassUndefined );
break;
case 3:
err = r_Error( r_Error::r_Error_TypeInvalid );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
RMInit::logOut << "Error: rpcinitmdd() - " << err.what() << endl;
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery(query) error occured")
throw err;
}
r_Set< r_GMarray* >* bagOfTiles;
bagOfTiles = mdd->get_storage_layout()->decomposeMDD( mdd );
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "decomposing into " << bagOfTiles->cardinality() << " tiles");
r_Iterator< r_GMarray* > iter2 = bagOfTiles->create_iterator();
r_GMarray *origTile;
iter2.reset();
while( iter2.not_done() )
{
RPCMarray* rpcMarray;
origTile = *iter2;
// advance iter here to determine if this is the last call (not_done())
iter2.advance();
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "inserting Tile with domain " << origTile->spatial_domain() << ", " << origTile->spatial_domain().cell_count() * origTile->get_type_length() << " bytes")
getMarRpcRepresentation( origTile, rpcMarray, mdd->get_storage_layout()->get_storage_format(), baseType );
InsertTileParams* params2 = new InsertTileParams;
params2->clientID = clientID;
params2->isPersistent = 0;
params2->marray = rpcMarray;
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcinserttile_1( params2, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinserttile_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcinserttile' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
RMDBGIF(20, RMDebug::module_clientcomm, "WAITAFTERSENDTILE", \
RMInit::dbgOut << "Waiting 10 sec after send tile\n" << std::endl; \
sleep(10); \
RMInit::dbgOut << "Continue now\n" << std::endl; );
setRPCInactive();
// free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else)
freeMarRpcRepresentation( origTile, rpcMarray );
delete params2;
// delete current tile (including data block)
delete origTile;
if( rpcStatus > 0 )
{
RMInit::logOut << "Error: rpctransfertile() - general" << endl;
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery() error occured")
throw r_Error( r_Error::r_Error_TransferFailed );
}
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "OK")
}
EndInsertMDDParams* params3 = new EndInsertMDDParams;
params3->clientID = clientID;
params3->isPersistent = 0;
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcendinsertmdd_1( params3, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendinsertmdd_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcinsertmdd' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
delete params3;
// delete transient data
bagOfTiles->remove_all();
delete bagOfTiles;
}
}
}
//
// Send the update query.
//
ExecuteQueryParams* params = new ExecuteQueryParams;
ExecuteUpdateRes* res;
params->clientID = clientID;
params->query = (char*)query.get_query();
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeQuery(query) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
res = rpcexecuteupdate_1( params, binding_h );
if( !res )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcexecuteupdate_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcexecuteupdate' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( res == 0 );
setRPCInactive();
delete params;
rpcStatus = res->status;
if( rpcStatus == 2 || rpcStatus == 3 )
{
r_Equery_execution_failed err( res->errorNo, res->lineNo, res->columnNo, res->token );
XDRFREE(ExecuteUpdateRes, res);
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery() error occured")
throw err;
}
XDRFREE(ExecuteUpdateRes, res);
if( rpcStatus == 1 || rpcStatus > 3 )
{
r_Error err;
if( rpcStatus == 1 )
err = r_Error( r_Error::r_Error_ClientUnknown );
else
err = r_Error( r_Error::r_Error_TransferFailed );
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery() error occured")
throw err;
}
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery()")
}
void
RpcClientComm::insertColl( const char* collName, const char* typeName, const r_OId& oid )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::insertColl(collName, typeName, oid ) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
// update -> check for read_only transaction
if( r_Transaction::actual_transaction == 0 ||
r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only )
{
r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly );
throw err;
}
unsigned short rpcStatus;
unsigned short* rpcStatusPtr = 0;
InsertCollParams* params = new InsertCollParams;
params->clientID = clientID;
params->collName = (char*)collName;
params->typeName = (char*)typeName;
params->oid = (char*)oid.get_string_representation();
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcinsertcoll_1( params, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinsertcoll_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcinsertcoll' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
delete params;
if( rpcStatus > 0 )
{
r_Error err;
switch( rpcStatus )
{
case 1:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
case 2:
err = r_Error( r_Error::r_Error_DatabaseClassUndefined );
break;
case 3:
err = r_Error( r_Error::r_Error_NameNotUnique );
break;
default:
err = r_Error( r_Error::r_Error_General );
}
RMInit::logOut << "Error: rpcCreateMDDCollection() - " << err.what() << endl;
throw err;
}
}
void
RpcClientComm::deleteCollByName( const char* collName )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::deleteCollByName(collName) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
// update -> check for read_only transaction
if( r_Transaction::actual_transaction == 0 ||
r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only )
{
r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly );
throw err;
}
unsigned short rpcStatus;
unsigned short* rpcStatusPtr=0;
NameSpecParams* params = new NameSpecParams;
params->clientID = clientID;
params->name = (char*)collName;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcdeletecollbyname_1( params, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcdeletecollbyname_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcdeletecollbyname' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
delete params;
if( rpcStatus > 0 )
{
r_Error err;
switch( rpcStatus )
{
case 1:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
case 2:
err = r_Error( r_Error::r_Error_ObjectUnknown );
break;
default:
err = r_Error( r_Error::r_Error_General );
}
RMInit::logOut << "Error: rpcInsertMDD() - " << err.what() << endl;
throw err;
}
}
void
RpcClientComm::deleteObjByOId( const r_OId& oid )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::deleteObjectByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
// update -> check for read_only transaction
if( r_Transaction::actual_transaction == 0 ||
r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only )
{
r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly );
throw err;
}
unsigned short rpcStatus;
unsigned short* rpcStatusPtr;
OIdSpecParams* params = new OIdSpecParams;
params->clientID = clientID;
params->oid = (char*) oid.get_string_representation();
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcdeleteobjbyoid_1( params, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcdeleteobjbyoid_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcdeleteobjbyoid' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
delete params;
if( rpcStatus > 0 )
{
r_Error err;
switch( rpcStatus )
{
case 1:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
case 2:
err = r_Error( r_Error::r_Error_ObjectUnknown );
break;
default:
err = r_Error( r_Error::r_Error_General );
}
RMInit::logOut << "Error: rpcInsertMDD() - " << err.what() << endl;
throw err;
}
}
void
RpcClientComm::removeObjFromColl( const char* collName, const r_OId& oid )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::removeObjFromColl(collName, oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
// update -> check for read_only transaction
if( r_Transaction::actual_transaction == 0 ||
r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only )
{
r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly );
throw err;
}
unsigned short rpcStatus;
unsigned short* rpcStatusPtr;
RemoveObjFromCollParams* params = new RemoveObjFromCollParams;
params->clientID = clientID;
params->collName = (char*) collName;
params->oid = (char*) oid.get_string_representation();
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcremoveobjfromcoll_1( params, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcremoveobjfromcoll_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcremoveobjfromcoll' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
delete params;
if( rpcStatus > 0 )
{
r_Error err;
switch( rpcStatus )
{
case 1:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
case 2:
case 3:
err = r_Error( r_Error::r_Error_ObjectUnknown );
break;
default:
err = r_Error( r_Error::r_Error_General );
}
RMInit::logOut << "Error: rpcInsertMDD() - " << err.what() << endl;
throw err;
}
}
void
RpcClientComm::insertMDD( const char* collName, r_GMarray* mar )
throw( r_Error )
{
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << ", marray)")
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::insertMDD(collName, mar) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
// update -> check for read_only transaction
if( r_Transaction::actual_transaction == 0 ||
r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only )
{
r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly );
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << ", marray) error")
throw err;
}
unsigned short rpcStatus;
unsigned short* rpcStatusPtr=0;
r_Minterval spatdom;
r_Bytes marBytes;
RPCMarray* rpcMarray;
r_Bytes tileSize = 0;
// get the spatial domain of the r_GMarray
spatdom = mar->spatial_domain();
// determine the amount of data to be transferred
marBytes = mar->get_array_size();
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "inserting MDD with domain " << spatdom << ", cell length " << mar->get_type_length() << ", " << marBytes << " bytes" )
const r_Base_Type* baseType = mar->get_base_type_schema();
// if the MDD is too large for being transfered as one block, it has to be
// divided in tiles
const r_Tiling* til = mar->get_storage_layout()->get_tiling();
r_Tiling_Scheme scheme = til->get_tiling_scheme();
if (scheme == r_NoTiling)
tileSize = RMInit::RMInit::clientTileSize;
else
//allowed because the only subclass of tiling without size is no tiling
tileSize = ((const r_Size_Tiling*)til)->get_tile_size();
if( RMInit::tiling && marBytes > tileSize )
{
// initiate composition of MDD at server side
InsertPersMDDParams* params = new InsertPersMDDParams;
params->clientID = clientID;
params->collName = (char*)collName;
params->domain = spatdom.get_string_representation();
params->typeLength = mar->get_type_length();
params->typeName = (char*)mar->get_type_name();
params->oid = (char*)mar->get_oid().get_string_representation();
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcstartinsertpersmdd_1( params, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcstartinsertpersmdd_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcstartinsertpersmdd' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
free( params->domain );
delete params;
if( rpcStatus > 0 )
{
r_Error err;
switch( rpcStatus )
{
case 2:
err = r_Error( r_Error::r_Error_DatabaseClassUndefined );
break;
case 3:
err = r_Error( r_Error::r_Error_CollectionElementTypeMismatch );
break;
case 4:
err = r_Error( r_Error::r_Error_TypeInvalid );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
RMInit::logOut << "Error: rpcInsertMDDObj() - " << err.what() << endl;
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << ", marray) error")
throw err;
}
r_Set< r_GMarray* >* bagOfTiles;
bagOfTiles = mar->get_storage_layout()->decomposeMDD( mar );
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "decomposing into " << bagOfTiles->cardinality() << " tiles")
r_Iterator< r_GMarray* > iter = bagOfTiles->create_iterator();
r_GMarray *origTile;
iter.reset();
while( iter.not_done() )
{
origTile = *iter;
// advance iter here to determine if this is the last call (not_done())
iter.advance();
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "inserting Tile with domain " << origTile->spatial_domain() << ", " << origTile->spatial_domain().cell_count() * origTile->get_type_length() << " bytes")
getMarRpcRepresentation( origTile, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType );
InsertTileParams* params2 = new InsertTileParams;
params2->clientID = clientID;
params2->isPersistent = 1;
params2->marray = rpcMarray;
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::insertMDD(collName, mar) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcinserttile_1( params2, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinserttile_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcinserttile' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
// free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else)
freeMarRpcRepresentation( origTile, rpcMarray );
delete params2;
// delete current tile (including data block)
delete origTile;
if( rpcStatus > 0 )
{
RMInit::logOut << "Error: rpcInsertMDD() - general" << endl;
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << ", marray) error")
throw r_Error( r_Error::r_Error_TransferFailed );
}
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "OK")
}
EndInsertMDDParams* params3 = new EndInsertMDDParams;
params3->clientID = clientID;
params3->isPersistent = 1;
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::insertMDD(collName, mar) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcendinsertmdd_1( params3, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendinsertmdd_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcendinsertmdd' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
delete params3;
// delete transient data
bagOfTiles->remove_all();
delete bagOfTiles;
}
else // begin: MDD is transferred in one piece
{
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", ", one tile" )
getMarRpcRepresentation( mar, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType );
InsertMDDParams* params = new InsertMDDParams;
params->clientID = clientID;
params->collName = (char*)collName;
params->marray = rpcMarray;
params->typeName = (char*)mar->get_type_name();
params->oid = (char*)mar->get_oid().get_string_representation();
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::insertMDD(collName, mar) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcinsertmdd_1( params, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcinsertmdd_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcinsertmdd' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
freeMarRpcRepresentation( mar, rpcMarray );
delete params;
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok" )
if( rpcStatus > 0 )
{
r_Error err;
switch( rpcStatus )
{
case 2:
err = r_Error( r_Error::r_Error_DatabaseClassUndefined );
break;
case 3:
err = r_Error( r_Error::r_Error_CollectionElementTypeMismatch );
break;
case 4:
err = r_Error( r_Error::r_Error_TypeInvalid );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
RMInit::logOut << "Error: rpcInsertMDD() - " << err.what() << endl;
throw err;
}
} // end: MDD i transferred in one piece
}
r_Data_Format
RpcClientComm::doTransferDecompression( r_GMarray* tile, const r_Base_Type *type,
r_Data_Format fmt, unsigned long size )
{
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile dom:"
<< tile->spatial_domain() << " array size:" << tile->get_array_size()
<< " type size:" << tile->get_type_length());
if (fmt != r_Array)
{
r_Tile_Compression *engine = NULL;
char *newTileData = NULL;
r_Data_Format newFormat;
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) decompressing from "
<< fmt << ", " << size << "bytes");
try
{
r_Storage_Man_CPP sman;
engine = r_Tile_Compression::create( fmt, tile->spatial_domain(), type );
engine->set_storage_handler(sman);
newTileData = (char*)(engine->decompress(tile->get_array(), size));
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm",
"doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " OK");
}
catch (r_Error &err)
{
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm",
"doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " FAILED");
RMInit::logOut << "RpcClientComm::doTransferDecompression() Error decompressing data, assuming raw" << endl;
}
newFormat = engine->get_decomp_format();
if (newTileData != NULL)
{
delete [] tile->get_array();
tile->set_array(newTileData);
tile->set_array_size(tile->spatial_domain().cell_count()*tile->get_type_length());
}
else
newFormat = fmt;
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile newFmt:"
<< newFormat << " dom:" << tile->spatial_domain()
<< " array size:" << tile->get_array_size()
<< " type size:" << tile->get_type_length());
// ... also make sure the decoded format is really raw array data (r_Array)
if ((endianClient != endianServer) && (newFormat == r_Array))
{
// if compression engine already handles endianness we mustn't change again
if (!engine->converts_endianness()) {
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for "
<< fmt << " endianness changed from "
<< (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient);
changeEndianness(tile, type);
}
}
if (engine != NULL)
delete engine;
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << newFormat);
return newFormat;
}
if (endianClient != endianServer) {
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for "
<< fmt << " endianness changed from "
<< (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient);
changeEndianness(tile, type);
}
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << r_Array);
return r_Array;
}
r_Ref_Any
RpcClientComm::getMDDByOId( const r_OId& oid )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getMDDByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDByOId(" << oid << ")")
r_Ref_Any mddResult;
r_GMarray* marray = 0;
unsigned short tileStatus = 0;
unsigned short rpcStatus = 0;
unsigned short* rpcStatusPtr = 0;
OIdSpecParams params;
GetMDDRes* thisResult = 0;
params.clientID = clientID;
params.oid = (char*)oid.get_string_representation();
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgetmddbyoid_1( ¶ms, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetmddbyoid_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetmddbyoid' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
setRPCInactive();
rpcStatus = thisResult->status;
if( rpcStatus != 0 )
{
r_Error err;
switch( rpcStatus )
{
case 1:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
case 2:
case 3:
err = r_Error( r_Error::r_Error_ObjectUnknown );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
XDRFREE(GetMDDRes, thisResult);
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed")
throw err;
}
r_Ref< r_GMarray > mdd;
getMDDCore(mdd, thisResult, 0);
mddResult = mdd;
setRPCActive();
RMDBGIF(20, RMDebug::module_clientcomm, "WAITENDTRANSFERSTART", \
RMInit::dbgOut << "Waiting 100 sec before end transfer\n" << std::endl; \
sleep(100); \
RMInit::dbgOut << "Continue now\n" << std::endl; );
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getMDDByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcendtransfer_1( &clientID, binding_h );
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcendtransfer' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendtransfer_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
RMDBGIF(20, RMDebug::module_clientcomm, "WAITENDTRANSFEREND", \
RMInit::dbgOut << "Waiting 100 sec after end transfer\n" << std::endl; \
sleep(100); \
RMInit::dbgOut << "Continue now\n" << std::endl; );
setRPCInactive();
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "ok")
return mddResult;
}
r_Ref_Any
RpcClientComm::getCollByName( const char* collName )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getCollByName(collName) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName(" << collName << ")")
r_Set< r_Ref_Any >* set = 0;
unsigned short rpcStatus = 0;
NameSpecParams* params = new NameSpecParams;
GetCollRes* thisResult = 0;
params->clientID = clientID;
params->name = (char*)collName;
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgetcollbyname_1( params, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetcollbyname_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetcollbyname' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
setRPCInactive();
delete params;
rpcStatus = thisResult->status;
if( rpcStatus != 0 && rpcStatus != 1 )
{
r_Error err;
switch( rpcStatus )
{
case 2:
err = r_Error( r_Error::r_Error_ObjectUnknown );
break;
case 3:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
XDRFREE(GetCollRes, thisResult);
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed")
throw err;
}
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok")
// create the set
r_OId rOId( thisResult->oid );
set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref_Any >;
// initialize data elements
set->set_type_by_name ( thisResult->typeName );
set->set_type_structure( thisResult->typeStructure );
set->set_object_name ( thisResult->collName );
// now the transfer structure of rpcgetcollbyname can be freed
XDRFREE(GetCollRes, thisResult);
// get collection elements
if( rpcStatus == 0 )
getMDDCollection( *set, 0 );
// else rpcStatus == 1 -> Result collection is empty and nothing has to be got.
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName(" << collName << ")")
return r_Ref_Any( set->get_oid(), set );
}
r_Ref_Any
RpcClientComm::getCollByOId( const r_OId& oid )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getCollByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByOId(" << oid << ")")
r_Set< r_Ref_Any >* set = 0;
unsigned short rpcStatus = 0;
OIdSpecParams* params = new OIdSpecParams;
GetCollRes* thisResult = 0;
params->clientID = clientID;
params->oid = (char*)oid.get_string_representation();
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgetcollbyoid_1( params, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetcollbyoid_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetcollbyoid' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
setRPCInactive();
delete params;
rpcStatus = thisResult->status;
if( rpcStatus != 0 && rpcStatus != 1 )
{
r_Error err;
switch( rpcStatus )
{
case 2:
err = r_Error( r_Error::r_Error_ObjectUnknown );
break;
case 3:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
XDRFREE(GetCollRes, thisResult);
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed")
throw err;
}
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok")
// create the set
r_OId rOId( thisResult->oid );
set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref_Any >;
// initialize data elements
set->set_type_by_name ( thisResult->typeName );
set->set_type_structure( thisResult->typeStructure );
set->set_object_name ( thisResult->collName );
// now the transfer structure can be freed
XDRFREE(GetCollRes, thisResult);
// get collection elements
if( rpcStatus == 0 )
getMDDCollection( *set, 0 );
// else rpcStatus == 1 -> Result collection is empty and nothing has to be got.
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed")
return r_Ref_Any( set->get_oid(), set );
}
r_Ref_Any
RpcClientComm::getCollOIdsByName( const char* collName )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getCollOIdsByName(collName) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName(" << collName << ")")
r_Set< r_Ref >* set = 0;
unsigned short rpcStatus = 0;
NameSpecParams* params = new NameSpecParams;
GetCollOIdsRes* thisResult = 0;
params->clientID = clientID;
params->name = (char*)collName;
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgetcolloidsbyname_1( params, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetcolloidsbyname_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetcolloidsbyname' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
setRPCInactive();
delete params;
rpcStatus = thisResult->status;
if( rpcStatus != 0 && rpcStatus != 1 )
{
r_Error err;
switch( rpcStatus )
{
case 2:
err = r_Error( r_Error::r_Error_ObjectUnknown );
break;
case 3:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
XDRFREE(GetCollOIdsRes, thisResult);
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed")
throw err;
}
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok")
// create the set
r_OId rOId( thisResult->oid );
set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref< r_GMarray > >;
set->set_type_by_name ( thisResult->typeName );
set->set_type_structure( thisResult->typeStructure );
set->set_object_name ( thisResult->collName );
// fill set with oids
if( rpcStatus == 0 )
{
for( unsigned int i=0; ioidTable.oidTable_len; i++ )
{
set->insert_element( r_Ref( r_OId( thisResult->oidTable.oidTable_val[i].oid ) ), 1 );
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "oid " << i << ": " << thisResult->oidTable.oidTable_val[i].oid)
}
}
// now the transfer structure can be freed
XDRFREE(GetCollOIdsRes, thisResult);
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName")
return r_Ref_Any( set->get_oid(), set );
}
r_Ref_Any
RpcClientComm::getCollOIdsByOId( const r_OId& oid )
throw( r_Error )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getCollOIdsByOId(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId(" << oid << ")")
r_Set< r_Ref >* set = 0;
unsigned short rpcStatus = 0;
OIdSpecParams* params = new OIdSpecParams;
GetCollOIdsRes* thisResult = 0;
params->clientID = clientID;
params->oid = (char*)oid.get_string_representation();
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgetcolloidsbyoid_1( params, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetcolloidsbyoid_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetcolloidsbyoid' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult== 0 );
setRPCInactive();
delete params;
rpcStatus = thisResult->status;
if( rpcStatus != 0 && rpcStatus != 1 )
{
r_Error err;
switch( rpcStatus )
{
case 2:
err = r_Error( r_Error::r_Error_ObjectUnknown );
break;
case 3:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
XDRFREE(GetCollOIdsRes, thisResult);
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "failed")
throw err;
}
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok")
// create the set
r_OId rOId( thisResult->oid );
set = new ( r_Database::actual_database, r_Object::read, rOId ) r_Set< r_Ref< r_GMarray > >;
set->set_type_by_name ( thisResult->typeName );
set->set_type_structure( thisResult->typeStructure );
set->set_object_name ( thisResult->collName );
// fill set with oids
if( rpcStatus == 0 )
{
for( unsigned int i=0; ioidTable.oidTable_len; i++ )
{
set->insert_element( r_Ref( r_OId( thisResult->oidTable.oidTable_val[i].oid ) ), 1 );
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "contains oid #" << i << ":" << thisResult->oidTable.oidTable_val[i].oid)
}
}
// now the transfer structure can be freed
XDRFREE(GetCollOIdsRes, thisResult);
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId")
return r_Ref_Any( set->get_oid(), set );
}
int
RpcClientComm::createDB( const char* name )
throw(r_Error)
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::createDB(name) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
unsigned short rpcStatus=0;
unsigned short* rpcStatusPtr=0;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpccreatedb_1( (char**)&name, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpccreatedb_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpccreatedb' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
return rpcStatus;
}
int
RpcClientComm::destroyDB( const char* name )
throw(r_Error)
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::destroyDB(name) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
unsigned short rpcStatus=0;
unsigned short* rpcStatusPtr=0;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcdestroydb_1( (char**)&name, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcdestroydb_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcdestroydb' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
return rpcStatus;
}
int
RpcClientComm::openDB( const char* database )
{
strcpy(dataBase,database);
connectToServer(1); // this means read-only
int answer=executeOpenDB(database);
if(answer==0) answer=executeCloseDB();
// else the DB is not open and makes ugly log output on the server
disconnectFromServer();
return answer;
}
int
RpcClientComm::executeOpenDB( const char* database )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeOpenDB(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
return CONNECTIONCLOSED;
}
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "executeOpenDB(" << database << ")")
// Send first "I'm alive" signal in ALIVEINTERVAL seconds
#ifdef WIN32
timerid = timeSetEvent(ALIVEINTERVAL * 1000, 0, TimerProc, 0, TIME_PERIODIC);
#else
alarm( ALIVEINTERVAL );
#endif
OpenDBParams* params = new OpenDBParams;
OpenDBRes* thisResult = 0;
params->dbName = (char*)database;
params->userName = (char*)RMInit::userName;
params->capability = capability;
int* dummyParam = new int(0);// dummy
int* endianResult = NULL;
ServerVersionRes* versionResult = NULL;
setRPCActive();
versionResult = rpcgetserverversion_1( dummyParam, binding_h );
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "server version " << versionResult->serverVersionNo << ", rpc version " << versionResult->rpcInterfaceVersionNo)
// don't forget to add 0.5, otherwise rounding errors!
serverRPCversion = (int)(1000.0 * versionResult->rpcInterfaceVersionNo + 0.5);
if (serverRPCversion != RPCVERSION)
{
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "RPC interface version mismatch: client (" << RPCVERSION/1000.0 << "), server (" << versionResult->rpcInterfaceVersionNo << ")")
RMInit::logOut << "Client Server Communication incompatible" << endl;
setRPCInactive();
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeOpenDB(" << database << ") " << 4)
return 4; // servercomm::openDB creates codes 1-3.
}
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeOpenDB(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
return CONNECTIONCLOSED;
}
rpcRetryCounter = 0;
do
{
thisResult = rpcopendb_1( params, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcopendb_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcopendb' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeOpenDB(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
return CONNECTIONCLOSED;
}
endianResult = rpcgetserverendian_1( dummyParam, binding_h );
setRPCInactive();
delete params;
delete dummyParam;
clientID = thisResult->clientID;
endianServer = *endianResult;
//cout << "server endianness: " << endianServer << ", client: " << endianClient << endl;
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "executeOpenDB(" << database << ") " << thisResult->status)
return thisResult->status;
}
int
RpcClientComm::closeDB()
{
// We decided that it is not necessary to do anything for closeDB, the database is already closed by others
RMInit::logOut << "Fake closeDB" << endl;
return 0;// answer;
}
int
RpcClientComm::executeCloseDB()
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeCloseDB(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
return CONNECTIONCLOSED;
}
unsigned short rpcStatus=0;
unsigned short* rpcStatusPtr=0;
// Suspend "I'm alive" signal
#ifdef __VISUALC__
timeKillEvent(timerid);
#else
alarm( 0 );
#endif
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcclosedb_1( &clientID, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcclosedb_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcclosedb' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
return rpcStatus;
}
int
RpcClientComm::openTA( unsigned short readOnly )
throw(r_Error)
{
int answer=0;
connectToServer(readOnly);
answer=executeOpenDB(dataBase);
if(answer==0) executeOpenTA(readOnly);
//If there is an error CONNECTIONCLOSED, we report this, it is important to know
if(answer == CONNECTIONCLOSED) {
throw r_Error(CONNECTIONCLOSED);
}
return answer;
}
int
RpcClientComm::executeOpenTA( unsigned short readOnly )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeOpenTA(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
return CONNECTIONCLOSED;
}
unsigned short rpcStatus=0;
unsigned short* rpcStatusPtr=0;
int secsWaited = 0;
BeginTAParams params;
params.clientID = clientID;
params.readOnly = readOnly;
params.capability = capability;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcbeginta_1( ¶ms, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcbeginta_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcbeginta' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
RMDBGIF(20, RMDebug::module_tools, "WAITRECEIVEDTILE", \
RMInit::dbgOut << "Waiting 100 sec after receive tile\n" << std::endl; \
sleep(100); \
RMInit::dbgOut << "Continue now\n" << std::endl; );
setRPCInactive();
/* not necessary with V5
int sleepCntr = 1;
while( rpcStatus == 2 )
{
if( sleepCntr == 64 ) sleepCntr /= 2;
RMInit::logOut << "Another transaction is already active, sleeping " << sleepCntr*2 << " secs..." << endl;
sleepCntr *= 2;
secsWaited += sleepCntr;
#ifndef __VISUALC__
sleep( sleepCntr );
#else
Sleep( sleepCntr*1000 );
#endif
setRPCActive();
do
{
rpcStatusPtr = rpcbeginta_1( ¶ms, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcbeginta_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
}
*/
return rpcStatus;
}
int
RpcClientComm::commitTA()
throw(r_Error)
{
int answer=executeCommitTA();
if(answer==0) answer=executeCloseDB();
//If there is an error CONNECTIONCLOSED, we report this, it is important to know
if(answer == CONNECTIONCLOSED) {
throw r_Error(CONNECTIONCLOSED);
}
disconnectFromServer();
return answer;
}
int
RpcClientComm::executeCommitTA()
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeCommitTA(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
return CONNECTIONCLOSED;
}
unsigned short rpcStatus=0;
unsigned short* rpcStatusPtr=0;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpccommitta_1( &clientID, binding_h );
if( !rpcStatusPtr ) {
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpccommitta_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpccommitta' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
return rpcStatus;
}
int
RpcClientComm::abortTA()
{
int answer=0;
try {
answer = executeAbortTA();
if(answer==0) answer=executeCloseDB();
disconnectFromServer();
}
catch (r_Error& e)
{
RMInit::logOut << "RpcClientComm::abortTA() caught error: " << e.get_errorno() << " " << e.what() << std::endl;
answer = 1;
}
//If there is an error CONNECTIONCLOSED, we ignore this, it is in abort
return answer;
}
int
RpcClientComm::executeAbortTA()
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::executeAbortTA(database) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
return CONNECTIONCLOSED;
}
unsigned short rpcStatus=0;
unsigned short* rpcStatusPtr=0;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcabortta_1( &clientID, binding_h );
if( !rpcStatusPtr ) {
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcabortta_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcabortta' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
rpcStatus = *rpcStatusPtr;
setRPCInactive();
return rpcStatus;
}
r_OId
RpcClientComm::getNewOId( unsigned short objType )
throw(r_Error)
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getNewOId(objType) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
// update -> check for read_only transaction
if( r_Transaction::actual_transaction == 0 ||
r_Transaction::actual_transaction->get_mode() == r_Transaction::read_only )
{
r_Error err = r_Error( r_Error::r_Error_TransactionReadOnly );
throw err;
}
unsigned short rpcStatus = 0;
NewOIdParams* params = new NewOIdParams;
OIdRes* thisResult = 0;
params->clientID = clientID;
params->objType = objType;
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgetnewoid_1( params, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnewoid_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetnewoid' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
setRPCInactive();
delete params;
rpcStatus = thisResult->status;
r_OId oid( thisResult->oid );
// now the transfer structure of rpcgetcollbyname can be freed
XDRFREE(OIdRes, thisResult);
if( rpcStatus != 0 )
{
r_Error err;
switch( rpcStatus )
{
case 1:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
case 2:
err = r_Error( r_Error::r_Error_CreatingOIdFailed );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
throw err;
}
return oid;
}
unsigned short
RpcClientComm::getObjectType( const r_OId& oid )
throw(r_Error)
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getObjectType(oid) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
OIdSpecParams* params = new OIdSpecParams;
ObjectTypeRes* thisResult = 0;
params->clientID = clientID;
params->oid = (char*)oid.get_string_representation();
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgetobjecttype_1( params, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetobjecttype_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetobjexttype' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
setRPCInactive();
delete params;
unsigned short rpcStatus = thisResult->status;
unsigned short objType = thisResult->objType;
// now the transfer structure can be freed
XDRFREE(ObjectTypeRes, thisResult);
if( rpcStatus != 0 )
{
r_Error err;
switch( rpcStatus )
{
case 1:
err = r_Error( r_Error::r_Error_ClientUnknown );
break;
case 2:
err = r_Error( r_Error::r_Error_ObjectUnknown );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
throw err;
}
return objType;
}
char*
RpcClientComm::getTypeStructure( const char* typeName, r_Type_Type typeType )
throw(r_Error)
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getTypeStructure(typeName, typeType) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
GetTypeStructureParams* params = new GetTypeStructureParams;
GetTypeStructureRes* thisResult = 0;
params->clientID = clientID;
params->typeName = (char*)typeName;
params->typeType = typeType;
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgettypestructure_1( params, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgettypestructure_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgettypestructure' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
setRPCInactive();
delete params;
char* typeStructure = 0;
unsigned short rpcStatus = thisResult->status;
if( rpcStatus == 0 )
{
typeStructure = new char[strlen(thisResult->typeStructure) + 1];
strcpy(typeStructure, thisResult->typeStructure);
// this has to be freed by rpc:thisResult->typeStructure = 0;
}
// now the transfer structure can be freed
XDRFREE(GetTypeStructureRes, thisResult);
if( rpcStatus != 0 )
{
r_Error err;
switch( rpcStatus )
{
case 2:
err = r_Error( r_Error::r_Error_DatabaseClassUndefined );
break;
default:
err = r_Error( r_Error::r_Error_TransferFailed );
}
throw err;
}
return typeStructure;
}
void
RpcClientComm::getMarRpcRepresentation( const r_GMarray* mar, RPCMarray*& rpcMarray,
r_Data_Format initStorageFormat,
const r_Base_Type *baseType)
{
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)");
// allocate memory for the RPCMarray data structure and assign its fields
rpcMarray = (RPCMarray*)mymalloc( sizeof(RPCMarray) );
rpcMarray->domain = mar->spatial_domain().get_string_representation();
rpcMarray->cellTypeLength = mar->get_type_length();
void* arrayData = NULL;
r_ULong arraySize=0;
if (initStorageFormat == r_Array)
{
if (transferFormat != r_Array)
{
r_Tile_Compression *engine = NULL;
try
{
r_Storage_Man_CPP sman;
engine = r_Tile_Compression::create(transferFormat, mar->spatial_domain(), baseType);
engine->set_storage_handler(sman);
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) compress with " << engine->get_name())
if ((endianClient != endianServer) && (!engine->converts_endianness()))
{
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for "
<< transferFormat << " endianness changed before compression from " << (r_Endian::r_Endianness)endianClient
<< " to " << (r_Endian::r_Endianness) endianServer);
char *endianData = new char[mar->get_array_size()];
changeEndianness(mar, endianData, baseType);
arrayData = engine->compress(endianData, arraySize, transferFormatParams);
delete [] endianData;
endianData=NULL;
}
else
{
arrayData = engine->compress(mar->get_array(), arraySize, transferFormatParams);
}
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compression returned " << arrayData << " (" << arraySize << " bytes)")
/*void *testData = engine->decompress(arrayData, arraySize);
cout << "Decompression worked " << ((memcmp(mar->get_array(), testData, (mar->get_type_length()) * (mar->spatial_domain().cell_count())) == 0) ? "OK" : "!NOT!") << endl;
delete [] testData;*/
// ForWiss: revert to uncompressed data if the compressed data is larger
// coman: and introduced a bug of endianess ...
if (arraySize > mar->get_array_size())
{
RMInit::logOut << "RpcClientComm::getMarRpcRepresentation(...) Warning: overriding compression setting("
<< transferFormat << ") to " << r_Array
<< " because compressed size( " << arraySize
<< " bytes) > uncompressed size( " << mar->get_array_size() << " bytes)" << endl;
delete [] arrayData;
arrayData = NULL;
}
}
catch (r_Error &err)
{
RMInit::logOut << "RpcClientComm::getMarRpcRepresentation(...) Error: Unsupported data format " << transferFormat << endl;
}
if (engine != NULL)
delete engine;
}
else
{
if (endianClient != endianServer)
{
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for "
<< transferFormat << " endianness changed from "
<< (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer);
arraySize = mar->get_array_size();
arrayData = new char[arraySize];
changeEndianness(mar, arrayData, baseType);
}
}
}
if (arrayData == NULL)
{
//error in compression or compression inefficient
rpcMarray->currentFormat = initStorageFormat;
rpcMarray->data.confarray_len = mar->get_array_size();
if (endianClient != endianServer)
{
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for "
<< transferFormat << "endianness changed from "
<< (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer
<< " because compression " << transferFormat << " failed" );
arrayData = new char[arraySize];
changeEndianness(mar, arrayData, baseType);
rpcMarray->data.confarray_val = (char*)(arrayData);
}
else
{
rpcMarray->data.confarray_val = (char*)(mar->get_array());
}
}
else
{
if (arraySize != mar->get_array_size())
{
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compressed to " << (100.0*arraySize) / mar->get_array_size() << "%")
}
rpcMarray->currentFormat = transferFormat;
rpcMarray->data.confarray_len = arraySize;
rpcMarray->data.confarray_val = (char*)arrayData;
}
rpcMarray->storageFormat = storageFormat;
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)");
}
void
RpcClientComm::freeMarRpcRepresentation( const r_GMarray* mar, RPCMarray* rpcMarray )
{
if (rpcMarray->data.confarray_val != ((r_GMarray*)mar)->get_array())
{
delete [] rpcMarray->data.confarray_val;
}
free( rpcMarray->domain );
free( rpcMarray );
}
void
RpcClientComm::getMDDCollection( r_Set< r_Ref_Any >& mddColl, unsigned int isQuery )
throw(r_Error)
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getMDDCollection(mddColl, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCollection(...)")
unsigned short tileStatus=0;
unsigned short mddStatus = 0;
// r_Minterval mddDomain;
RMInit::logOut << endl;
while( mddStatus == 0 ) // repeat until all MDDs are transferred
{
r_Ref mddResult;
GetMDDRes* thisResult = 0;
// Get spatial domain of next MDD
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgetnextmdd_1( &clientID, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnextmdd_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetnextmdd' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
setRPCInactive();
mddStatus = thisResult->status;
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "read MDD")
if( mddStatus == 2 )
{
RMInit::logOut << "Error: getMDDCollection(...) - no transfer collection or empty transfer collection" << endl;
throw r_Error( r_Error::r_Error_TransferFailed );
}
else
tileStatus = 0 ? 10 : 0;
// create r_Minterval
// mddDomain = r_Minterval( thisResult->domain );
tileStatus = getMDDCore(mddResult, thisResult, isQuery);
// finally, insert the r_Marray into the set
mddColl.insert_element( mddResult, 1 );
if( tileStatus == 0 ) // if this is true, we're done with this collection
break;
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok");
} // end while( mddStatus == 0 )
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getMDDCollection(mddColl, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
unsigned short* rpcStatusPtr = 0;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcendtransfer_1( &clientID, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendtransfer_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcendtransfer' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
setRPCInactive();
}
void
RpcClientComm::getElementCollection( r_Set< r_Ref_Any >& resultColl )
throw(r_Error)
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getElementCollection(resultColl) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
unsigned short rpcStatus = 0;
RMInit::logOut << endl << " got set of type " << resultColl.get_type_structure() << endl;
while( rpcStatus == 0 ) // repeat until all elements are transferred
{
GetElementRes* thisResult = 0;
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getElementCollection(resultColl) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
thisResult = rpcgetnextelement_1( &clientID, binding_h );
if( !thisResult )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnextelement_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetnextelement' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( thisResult == 0 );
setRPCInactive();
rpcStatus = thisResult->status;
if( rpcStatus == 2 )
{
RMInit::logOut << "Error: getElementCollection(...) - no transfer collection or empty transfer collection" << endl;
throw r_Error( r_Error::r_Error_TransferFailed );
}
// create new collection element, use type of collection resultColl
r_Ref_Any element;
const r_Type* elementType = resultColl.get_element_type_schema();
// convert the endianness before creating the new element!
if (endianClient != endianServer)
{
if (endianClient == 0)
elementType->convertToBigEndian(thisResult->data.confarray_val, 1);
else
elementType->convertToLittleEndian(thisResult->data.confarray_val, 1);
}
switch( elementType->type_id() )
{
case r_Type::BOOL:
case r_Type::CHAR:
case r_Type::OCTET:
case r_Type::SHORT:
case r_Type::USHORT:
case r_Type::LONG:
case r_Type::ULONG:
case r_Type::FLOAT:
case r_Type::DOUBLE:
{
element = new r_Primitive( thisResult->data.confarray_val, (r_Primitive_Type*) elementType );
r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element );
}
break;
case r_Type::COMPLEXTYPE1:
case r_Type::COMPLEXTYPE2:
element = new r_Complex(thisResult->data.confarray_val, (r_Complex_Type *)elementType);
r_Transaction::actual_transaction->add_object_list(r_Transaction::SCALAR, (void *)element);
break;
case r_Type::STRUCTURETYPE:
{
element = new r_Structure( thisResult->data.confarray_val, (r_Structure_Type*) elementType );
r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element );
}
break;
case r_Type::POINTTYPE:
{
char* stringRep = new char[thisResult->data.confarray_len+1];
strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
stringRep[thisResult->data.confarray_len] = '\0';
r_Point* typedElement = new r_Point( stringRep );
element = typedElement;
r_Transaction::actual_transaction->add_object_list( r_Transaction::POINT, (void*) typedElement );
delete [] stringRep;
}
break;
case r_Type::SINTERVALTYPE:
{
char* stringRep = new char[thisResult->data.confarray_len+1];
strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
stringRep[thisResult->data.confarray_len] = '\0';
r_Sinterval* typedElement = new r_Sinterval( stringRep );
element = typedElement;
r_Transaction::actual_transaction->add_object_list( r_Transaction::SINTERVAL, (void*) typedElement );
delete [] stringRep;
}
break;
case r_Type::MINTERVALTYPE:
{
char* stringRep = new char[thisResult->data.confarray_len+1];
strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
stringRep[thisResult->data.confarray_len] = '\0';
r_Minterval* typedElement = new r_Minterval( stringRep );
element = typedElement;
r_Transaction::actual_transaction->add_object_list( r_Transaction::MINTERVAL, (void*) typedElement );
delete [] stringRep;
}
break;
case r_Type::OIDTYPE:
{
char* stringRep = new char[thisResult->data.confarray_len+1];
strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
stringRep[thisResult->data.confarray_len] = '\0';
r_OId* typedElement = new r_OId( stringRep );
element = typedElement;
r_Transaction::actual_transaction->add_object_list( r_Transaction::OID, (void*) typedElement );
delete [] stringRep;
}
break;
default:
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection(...) bad element typeId" << elementType->type_id())
break;
}
RMInit::logOut << " got element" << endl;
// now the transfer structure of rpcgetnextmdd can be freed
XDRFREE(GetElementRes, thisResult);
// insert element into result set
resultColl.insert_element( element, 1 );
}
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getElementCollection(resultColl) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
unsigned short* rpcStatusPtr = 0;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcendtransfer_1( &clientID, binding_h );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcendtransfer_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcendtransfer' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
setRPCInactive();
}
unsigned short
RpcClientComm::getMDDCore( r_Ref< r_GMarray > &mdd, GetMDDRes *thisResult, unsigned int isQuery )
throw( r_Error )
{
// create r_Minterval and oid
r_Minterval mddDomain( thisResult->domain );
r_OId rOId ( thisResult->oid );
r_GMarray *marray;
//cout << "getMDDCore..." << endl;
if( isQuery )
marray = new( r_Database::actual_database, r_Object::transient, rOId ) r_GMarray();
else
marray = new( r_Database::actual_database, r_Object::read , rOId ) r_GMarray();
marray->set_spatial_domain( mddDomain );
marray->set_type_by_name ( thisResult->typeName );
marray->set_type_structure( thisResult->typeStructure );
r_Data_Format currentFormat = (r_Data_Format)(thisResult->currentFormat);
if (r_Tile_Compression::check_data_format(currentFormat) == 1)
currentFormat = r_Array;
marray->set_current_format( currentFormat );
r_Data_Format decompFormat;
const r_Base_Type *baseType = marray->get_base_type_schema();
// now the transfer structure of rpcgetnextmdd can be freed
XDRFREE(GetMDDRes, thisResult);
//RMDBGOUTFLUSH( 2, "domain " << mddDomain << " ... " )
// Variables needed for tile transfer
GetTileRes* tileRes=0;
unsigned short mddDim = mddDomain.dimension(); // we assume that each tile has the same dimensionality as the MDD
r_Minterval tileDomain;
r_GMarray* tile; // for temporary tile
char* memCopy;
unsigned long memCopyLen;
int tileCntr = 0;
unsigned short tileStatus = 0;
tileStatus = 2; // call rpcgetnexttile_1 at least once
while( tileStatus == 2 || tileStatus == 3 ) // while( for all tiles of the current MDD )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getMDDCore(mdd, thisResult, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
tileRes = rpcgetnexttile_1( &clientID, binding_h );
if( !tileRes )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnexttile_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetnexttile' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
} while( tileRes == 0 );
RMDBGIF(20, RMDebug::module_tools, "WAITRECEIVEDTILE", \
RMInit::dbgOut << "Waiting 100 sec after receive tile\n" << std::endl; \
sleep(100); \
RMInit::dbgOut << "Continue now\n" << std::endl; );
setRPCInactive();
tileStatus = tileRes->status;
if( tileStatus == 4 )
{
XDRFREE(GetTileRes, tileRes);
RMInit::logOut << "Error: rpcGetNextTile(...) - no tile to transfer or empty transfer collection" << endl;
throw r_Error( r_Error::r_Error_TransferFailed );
}
// take cellTypeLength for current MDD of the first tile
if( tileCntr == 0 )
marray->set_type_length( tileRes->marray->cellTypeLength );
tileDomain = r_Minterval( tileRes->marray->domain );
memCopyLen = tileDomain.cell_count() * marray->get_type_length(); // cell type length of the tile must be the same
if (memCopyLen < tileRes->marray->data.confarray_len)
memCopyLen = tileRes->marray->data.confarray_len; // may happen when compression expands
memCopy = new char[ memCopyLen ];
// create temporary tile
tile = new r_GMarray();
tile->set_spatial_domain( tileDomain );
tile->set_array( memCopy );
tile->set_array_size( memCopyLen );
tile->set_type_length( tileRes->marray->cellTypeLength );
tileCntr++;
// Variables needed for block transfer of a tile
unsigned long blockOffset = 0;
unsigned short subStatus = 3;
currentFormat = (r_Data_Format)(tileRes->marray->currentFormat);
switch( tileStatus )
{
case 3: // at least one block of the tile is left
// Tile arrives in several blocks -> put them together
concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset);
XDRFREE(GetTileRes, tileRes);
while( subStatus == 3 )
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getMDDCore(mdd, thisResult, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
setRPCActive();
rpcRetryCounter = 0;
do
{
tileRes = rpcgetnexttile_1( &clientID, binding_h );
if( !tileRes )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcgetnexttile_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgetnexttile' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
} while( tileRes == 0 );
setRPCInactive();
subStatus = tileRes->status;
if( subStatus == 4 )
{
XDRFREE(GetTileRes, tileRes);
RMInit::logOut << "Error: rpcGetNextTile(...) - no tile to transfer or empty transfer collection" << endl;
throw r_Error( r_Error::r_Error_TransferFailed );
}
// RMInit::logOut << "Status: " << subStatus << endl;
// RMInit::logOut << "BlockOffset: " << blockOffset << " Size: " << tileRes->marray->data.confarray_len << endl;
concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset);
XDRFREE(GetTileRes, tileRes);
}
tileStatus = subStatus;
break;
default: // tileStatus = 0,3 last block of the current tile
// Tile arrives as one block.
concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset);
// RMInit::logOut << "Internal size: " << tileRes->marray->data.confarray_len << endl;
XDRFREE(GetTileRes, tileRes);
break;
}
// uncompress the tile if necessary
decompFormat = doTransferDecompression( tile, baseType, currentFormat, blockOffset );
char* marrayData = NULL;
// Now the tile is transferred completely, insert it into current MDD
if( tileStatus < 2 && tileCntr == 1 && (tile->spatial_domain() == marray->spatial_domain()))
{
// MDD consists of just one tile that is the same size of the mdd
// simply take the data memory of the tile
marray->set_array( tile->get_array() );
marray->set_array_size( tile->get_array_size() );
tile->set_array( 0 );
}
else
{
// MDD consists of more than one tile or the tile does not cover the whole domain
r_Bytes size = mddDomain.cell_count() * marray->get_type_length();
if( tileCntr == 1 )
{
// allocate memory for the MDD
marrayData = new char[ size ];
memset(marrayData, 0, size);
marray->set_array( marrayData );
}
else
marrayData = marray->get_array();
// copy tile data into MDD data space (optimized, relying on the internal representation of an MDD )
char* mddBlockPtr;
char* tileBlockPtr = tile->get_array();
unsigned long blockCells = tileDomain[tileDomain.dimension()-1].high()-tileDomain[tileDomain.dimension()-1].low()+1;
unsigned long blockSize = blockCells * marray->get_type_length();
unsigned long blockNo = tileDomain.cell_count() / blockCells;
for( unsigned long blockCtr = 0; blockCtr < blockNo; blockCtr++ )
{
mddBlockPtr = marrayData + marray->get_type_length()*mddDomain.cell_offset( tileDomain.cell_point( blockCtr * blockCells ) );
memcpy( (void*)mddBlockPtr, (void*)tileBlockPtr, (size_t)blockSize );
tileBlockPtr += blockSize;
}
// former non-optimized version
// for( i=0; icell_count(); i++ )
// (*marray)[tileDomain->cell_point( i )] = (*tile)[tileDomain->cell_point( i )];
marray->set_array_size( size );
}
// delete temporary tile
delete tile;
} // end while( MDD is not transferred completely )
mdd = r_Ref( marray->get_oid(), marray );
return tileStatus;
}
int RpcClientComm::concatArrayData( const char *source, unsigned long srcSize, char *&dest, unsigned long &destSize, unsigned long &destLevel )
{
if (destLevel + srcSize > destSize)
{
// need to extend dest
unsigned long newSize = destLevel + srcSize;
char *newArray;
// allocate a little extra if we have to extend
newSize = newSize + newSize / 16;
// RMDBGOUT( 1, "RpcClientComm::concatArrayData(): need to extend from " << destSize << " to " << newSize );
if ((newArray = new char[newSize]) == NULL)
return -1;
memcpy(newArray, dest, destLevel);
delete [] dest;
dest = newArray;
destSize = newSize;
}
memcpy(dest + destLevel, source, srcSize);
destLevel += srcSize;
return 0;
}
void RpcClientComm::triggerAliveSignal()
{
aliveSignalRemaining = 1;
sendAliveSignal();
}
void RpcClientComm::sendAliveSignal()
{
if( aliveSignalRemaining && !checkRPCActive() )
{
aliveSignalRemaining = 0;
unsigned long myID = getClientID();
// tell the server I'm alive
// determine my binding handle
CLIENT* myHandle = getBindingHandle();
unsigned short* rpcStatusPtr = 0;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcalive_1( &myID, myHandle );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcalive_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcalive' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while( rpcStatusPtr == 0 );
setRPCInactive();
RMDBGONCE(3, RMDebug::module_clientcomm, "RpcClientComm", "sent alive signal")
#ifdef __VISUALC__
timeKillEvent(timerid);
timerid = timeSetEvent(ALIVEINTERVAL * 1000, 0, TimerProc, NULL, TIME_PERIODIC);
#else
// Re-initialize the signal handler to point to this function
signal( SIGALRM, aliveSignal );
// Reset the alarm
alarm( ALIVEINTERVAL );
#endif
}
}
int RpcClientComm::setTransferFormat( r_Data_Format format, const char* formatParams )
{
transferFormat = format;
if (transferFormatParams != NULL)
{
free(transferFormatParams);
transferFormatParams = NULL;
}
if (formatParams != NULL)
{
transferFormatParams = (char*)mymalloc(strlen(formatParams)+1);
strcpy(transferFormatParams, formatParams);
// extract ``exactformat'' if present
clientParams->process(transferFormatParams);
}
SetServerTransferParams* params = new SetServerTransferParams;
params->clientID = getClientID();
params->format = (unsigned short)format;
if (transferFormatParams == NULL)
params->formatParams = "";
else
params->formatParams = transferFormatParams;
CLIENT* myHandle = getBindingHandle();
unsigned short* rpcStatusPtr = 0;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcsetservertransfer_1( params, myHandle );
if (!rpcStatusPtr)
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcsetservertransfer_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcsetservertransfer' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}
while ( rpcStatusPtr == 0 );
setRPCInactive();
delete params;
return (int)(*rpcStatusPtr);
}
int RpcClientComm::setStorageFormat( r_Data_Format format, const char *formatParams )
{
storageFormat = format;
if (storageFormatParams != NULL)
{
free(storageFormatParams);
storageFormatParams = NULL;
}
if (formatParams != NULL)
{
storageFormatParams = (char*)mymalloc(strlen(formatParams) + 1);
strcpy(storageFormatParams, formatParams);
// extract ``compserver'' if present
clientParams->process(storageFormatParams);
}
SetServerTransferParams *params = new SetServerTransferParams;
params->clientID = getClientID();
params->format = (unsigned short)format;
if (storageFormatParams == NULL)
params->formatParams = "";
else
params->formatParams = storageFormatParams;
CLIENT *myHandle = getBindingHandle();
unsigned short *rpcStatusPtr = 0;
setRPCActive();
rpcRetryCounter = 0;
do
{
rpcStatusPtr = rpcsetserverstorage_1( params, myHandle );
if( !rpcStatusPtr )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcsetserverstorage_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcsetserverstorage' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}
while ( rpcStatusPtr == NULL );
setRPCInactive();
delete params;
return (int)(*rpcStatusPtr);
}
void
RpcClientComm::setRPCActive()
{
rpcActive = 1;
}
void
RpcClientComm::setRPCInactive()
{
rpcActive = 0;
sendAliveSignal();
}
int
RpcClientComm::checkRPCActive()
{
return rpcActive;
}
const char *
RpcClientComm::getExtendedErrorInfo()
throw(r_Error)
{
if(binding_h == NULL) {
RMInit::logOut << endl << "RpcClientComm::getMDDCollection(mddColl, isQuery) ERROR: CONNECTION TO SERVER ALREADY CLOSED" << endl << endl;
throw r_Error(CONNECTIONCLOSED);
}
static char *errorInfo = NULL;
GetExtendedErrorInfo *result=NULL;
int dummy;
if(errorInfo) delete[] errorInfo;
setRPCActive();
rpcRetryCounter = 0;
do{
result = rpcgeterrorinfo_1(&dummy, binding_h);
if( !result )
{
RMInit::logOut << endl << "WARNING: RPC NULL POINTER (rpcalive_1)" << endl << endl;
sleep(RMInit::clientcommSleep);
}
if (rpcRetryCounter > RMInit::clientcommMaxRetry)
{
RMInit::logOut << "RPC call 'rpcgeterrorinfo' failed" << endl;
throw r_Error(CLIENTCOMMUICATIONFAILURE);
}
rpcRetryCounter++;
}while(!result);
setRPCInactive();
errorInfo = new char[strlen(result->errorText)+1];
strcpy(errorInfo,result->errorText);
return errorInfo;
}
#define MAXMSG 512
int RpcClientComm::readWholeMessage(int socket,char *destBuffer,int buffSize)
{
// we read what is comming in until we encounter a '\0'
// this is our end-sign.
int totalLength=0;
int redNow;
while(1)
{
redNow = read(socket,destBuffer+totalLength,buffSize-totalLength);
if(redNow == -1)
{ if(errno == EINTR) continue; // read was interrupted by signal
return -1; // another error
}
totalLength+=redNow;
if(destBuffer[totalLength-1]==0) break; // THE END
}
return totalLength;
}
int RpcClientComm::writeWholeMessage(int socket,char *destBuffer,int buffSize)
{
// we write the whole message, including the ending '\0', which is already in
// the buffSize provided by the caller
int totalLength=0;
int writeNow;
while(1)
{
writeNow = write(socket,destBuffer+totalLength,buffSize-totalLength);
if(writeNow == -1)
{ if(errno == EINTR) continue; // read was interrupted by signal
return -1; // another error
}
totalLength+=writeNow;
if( totalLength==buffSize ) break; // THE END
}
return totalLength;
}
void
RpcClientComm::setMaxRetry(unsigned int newMaxRetry)
{
RMInit::clientcommMaxRetry = newMaxRetry;
}
unsigned int
RpcClientComm::getMaxRetry()
{
return RMInit::clientcommMaxRetry;
}
static void pause(int retryCount)
{
unsigned int milisec = 50 + retryCount * 50;
if(milisec > 1000) milisec = 1000;
timeval tv;
tv.tv_sec = milisec / 1000;
tv.tv_usec = milisec * 1000;
select(0,NULL,NULL,NULL,&tv);
}
int
RpcClientComm::getFreeServer(unsigned short readOnly)
{
//RMInit::logOut << "getFreeServer in"<h_addr;
int sock;
bool ok = false;
int retry;
for(retry=0;retryp_proto);
//cout<<"Socket="<p_proto<3 -- PB 2005-sep-09
static struct timeval timeout = { RPC_TIMEOUT, 0 };
#if (defined(__VISUALC__) || defined(CYGWIN))
client_control( binding_h, CLGET_TIMEOUT, (char *)&timeout );
#else
clnt_control( binding_h, CLGET_TIMEOUT, (char *)&timeout );
#endif
RMInit::logOut << "Timeout: " << timeout.tv_sec << " sec " << timeout.tv_usec << " microsec" << endl;
timeout.tv_sec = RMInit::timeOut;
timeout.tv_usec = 0;
#if (defined(__VISUALC__) || defined(CYGWIN))
client_control( binding_h, CLSET_TIMEOUT, (char *)&timeout );
#else
clnt_control( binding_h, CLSET_TIMEOUT, (char*)&timeout );
#endif
RMInit::logOut << "Timeout set to " << timeout.tv_sec / 60. << " min." << endl;
#ifndef __VISUALC__
// Install a signal handler for the alive signal
signal( SIGALRM, aliveSignal );
#endif
return 1;
}
int RpcClientComm::disconnectFromServer() throw()
{
if(!binding_h)
{
RMInit::logOut << "Disconnect from server: no binding" << endl;
return -1;
}
else
{
RMInit::logOut << "Disconnect from server: binding ok" << endl;
}
#ifdef __VISUALC__
RMInit::logOut << "Deleting the binding...";
client_destroy(binding_h);
RMInit::logOut << "OK" << endl;
timeKillEvent(timerid);
RMInit::logOut << "Deactivating the NT-RPC feature...";
rpc_nt_exit();
RMInit::logOut << "OK" << endl;
#else
RMInit::logOut << "Deleting the binding...";
#ifdef CYGWIN
client_destroy(binding_h);
rpc_nt_exit();
#else
clnt_destroy(binding_h);
#endif
RMInit::logOut << "OK" << endl;
if (storageFormatParams != NULL)
free(storageFormatParams);
storageFormatParams = NULL;
if (transferFormatParams != NULL)
free(transferFormatParams);
transferFormatParams = NULL;
binding_h = NULL;
// suspend alarm timer for the periodical alive signal
alarm( 0 );
#endif
return 0;
}
// we will make this nicer after the D-day (this means when we change to para-proc
int messageDigest(const char *input,char *output,const char *mdName);
void
RpcClientComm::setUserIdentification(const char *userName, const char *plainTextPassword)
{
char digest[33]="";
messageDigest(plainTextPassword,digest,"MD5");
sprintf(identificationString,"%s:%s",userName,digest);
}
int messageDigest(const char *input,char *output,const char *mdName)
{
EVP_MD_CTX mdctx;
const EVP_MD *md;
unsigned int md_len, i;
unsigned char md_value[100];
OpenSSL_add_all_digests();
md = EVP_get_digestbyname(mdName);
if(!md) return 0;
EVP_DigestInit(&mdctx, md);
EVP_DigestUpdate(&mdctx,input, strlen(input));
EVP_DigestFinal(&mdctx, md_value, &md_len);
for(i = 0; i < md_len; i++) sprintf(output+i+i,"%02x", md_value[i]);
return strlen(output);
}
unsigned long
RpcClientComm::getClientID() const
{
return clientID;
}
CLIENT*
RpcClientComm::getBindingHandle() const
{
return binding_h;
}
void RpcClientComm::setTimeoutInterval(int seconds){ };
int RpcClientComm::getTimeoutInterval() { return 0;};