#include "mymalloc/mymalloc.h"
/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community. If not, see .
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see
* or contact Peter Baumann via .
*/
/*************************************************************
*
*
* PURPOSE:
*
*
* COMMENTS:
*
* Contains the upper level functions, everything related to communication
* is in rnpclientcomm2
*
************************************************************/
#include
#include "rnpclientcomm.hh"
#include "rasdaman.hh"
#include "debug.hh"
// waiting time increment upon subsecuqnet connect tries in RNP [msecs] -- PB 2005-sep-09
const unsigned int RNP_PAUSE_INCREMENT = 100;
RnpClientComm::RnpClientComm( const char* nRasmgrHost, int nRasmgrPort) throw( r_Error )
:RnpBaseClientComm(RnpRasserver::serverID, RnpTransport::crp_Rnp)
{
ENTER( "RpcClientComm::RnpClientComm( " << nRasmgrHost << "," << nRasmgrPort << " )" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "RnpClientComm(" << nRasmgrHost << "," << nRasmgrPort << ")" );
clientID = -1;
clientParams = new r_Parse_Params();
clientParams->add("compserver", &serverCompresses, r_Parse_Params::param_type_int);
clientParams->add("exactformat", &exactFormat, r_Parse_Params::param_type_int);
endianClient = (int)r_Endian::get_endianness();
rasmgrHost=(char*)nRasmgrHost;
rasmgrPort=nRasmgrPort;
serverHost[0]=0;
capability[0]=0;
strcpy(identificationString,"rasguest:8e70a429be359b6dace8b5b2500dedb0"); // this is MD5("rasguest");
transferFormatParams = 0;
storageFormatParams = 0;
// experimentally disabled -- PB 2005-sep-01
// useTurbo = true;
useTurbo = true;
TALK( "useTurbo=" << useTurbo );
akg::NbJob::setTimeoutInterval(3600);
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "RnpClientComm()" );
LEAVE( "RpcClientComm::RnpClientComm()" );
}
RnpClientComm::~RnpClientComm() throw ()
{
ENTER( "RpcClientComm::~RnpClientComm()" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "~RnpClientComm()" );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "~RnpClientComm()" );
LEAVE( "RpcClientComm::~RnpClientComm()" );
}
bool RnpClientComm::effectivTypeIsRNP() throw()
{
bool retval = true;
ENTER( "RpcClientComm::effectivTypeIsRNP()" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "effectivTypeIsRNP()" );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "effectivTypeIsRNP() -> " << retval );
LEAVE( "RpcClientComm::effectivTypeIsRNP() -> " << retval );
return retval;
}
int RnpClientComm::openDB( const char* database )
{
int retval = 0;
ENTER( "RpcClientComm::openDB(" << database << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "openDB(" << database << ")" );
strcpy(databaseName,database);
getFreeServer(false, true); // read only, openDB
TALK( "openDB: Connected to server: "< " << retval );
LEAVE( "RpcClientComm::openDB() -> " << retval );
return retval;
}
int RnpClientComm::closeDB()
{
int retval = 0;
ENTER( "RpcClientComm::closeDB()" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "closeDB()" );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "closeDB() -> " << retval );
LEAVE( "RpcClientComm::closeDB() -> " << retval );
return retval;
}
int RnpClientComm::createDB( const char* name ) throw(r_Error)
{
int retval = -1;
ENTER( "RpcClientComm::createDB( " << (name?name:"(null)") << " )" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "createDB()" );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "createDB() -> " << retval );
LEAVE( "RpcClientComm::createDB() -> " << retval );
return retval;
}
int RnpClientComm::destroyDB( const char* name ) throw(r_Error)
{
int retval = -1;
ENTER( "RpcClientComm::destroyDB( " << (name?name:"(null)") << " )" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "destroyDB()" );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "destroyDB() -> " << retval );
LEAVE( "RpcClientComm::destroyDB() -> " << retval );
return retval;
}
int RnpClientComm::openTA( unsigned short readOnly ) throw(r_Error)
{
int retval = 1;
ENTER( "RpcClientComm::openTA(" << readOnly << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "openTA(" << readOnly << ")" );
bool rw = (readOnly == 0 ? true : false);
getFreeServer(rw, false); // readwrite?, not openDB
TALK( "openTA: connected to server "< " << retval );
LEAVE( "RpcClientComm::openTA() -> " << retval );
return retval;
}
int RnpClientComm::commitTA() throw(r_Error)
{
int retval = 1;
ENTER( "RpcClientComm::commitTA()" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "commitTA()" );
if(useTurbo)
{
turboCommitTA();
}
else
{
executeCommitTA();
executeCloseDB();
executeDisconnect();
}
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "commitTA() -> " << retval );
LEAVE( "RpcClientComm::commitTA() -> " << retval );
return retval;
}
int RnpClientComm::abortTA()
{
int retval = 1;
ENTER( "RpcClientComm::abortTA()" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "abortTA()" );
try
{
if(useTurbo)
{
turboAbortTA();
}
else
{
executeAbortTA();
executeCloseDB();
executeDisconnect();
}
}
// make it nicer, but we are not allowed to throw anything! Later will change the declaration of the function
catch(...)
{
TALK( "RpcClientComm::abortTA(): caught & ignored exception." );
}
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "abortTA() -> " << retval );
LEAVE( "RpcClientComm::abortTA() -> " << retval );
return retval;
}
void RnpClientComm::insertMDD( const char* collName, r_GMarray* mar ) throw( r_Error )
{
ENTER( "RpcClientComm::insertMDD(" << (collName?collName:"(null)") << "," << (long) mar << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD(" << collName << "," << (long) mar << ")" );
checkForRwTransaction();
r_Minterval spatdom;
r_Bytes marBytes;
RPCMarray* rpcMarray;
r_Bytes tileSize = 0;
// get the spatial domain of the r_GMarray
spatdom = mar->spatial_domain();
// determine the amount of data to be transferred
marBytes = mar->get_array_size();
const r_Base_Type* baseType = mar->get_base_type_schema();
// if the MDD is too large for being transfered as one block, it has to be
// divided in tiles
const r_Tiling* til = mar->get_storage_layout()->get_tiling();
r_Tiling_Scheme scheme = til->get_tiling_scheme();
if (scheme == r_NoTiling)
tileSize = RMInit::RMInit::clientTileSize;
else
//allowed because the only subclass of tiling without size is no tiling
tileSize = ((const r_Size_Tiling*)til)->get_tile_size();
if( RMInit::tiling && marBytes > tileSize )
{
// initiate composition of MDD at server side
int status = executeStartInsertPersMDD(collName, mar); //rpcStatusPtr = rpcstartinsertpersmdd_1( params, binding_h );
switch( status )
{
case 0:
break; // OK
case 2:
LEAVE( "RpcClientComm::insertMDD() Error: database class undefined." );
throw r_Error( r_Error::r_Error_DatabaseClassUndefined );
break;
case 3:
LEAVE( "RpcClientComm::insertMDD() Error: collection element type mismatch." );
throw r_Error( r_Error::r_Error_CollectionElementTypeMismatch );
break;
case 4:
LEAVE( "RpcClientComm::insertMDD() Error: type invalid." );
throw r_Error( r_Error::r_Error_TypeInvalid );
break;
default:
LEAVE( "RpcClientComm::insertMDD() Error: transfer invalid." );
throw r_Error( r_Error::r_Error_TransferFailed );
break;
}
r_Set< r_GMarray* >* bagOfTiles;
bagOfTiles = mar->get_storage_layout()->decomposeMDD( mar );
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "decomposing into " << bagOfTiles->cardinality() << " tiles")
r_Iterator< r_GMarray* > iter = bagOfTiles->create_iterator();
r_GMarray *origTile;
for(iter.reset(); iter.not_done(); iter.advance() )
{
origTile = *iter;
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "inserting Tile with domain " << origTile->spatial_domain() << ", " << origTile->spatial_domain().cell_count() * origTile->get_type_length() << " bytes")
getMarRpcRepresentation( origTile, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType );
status = executeInsertTile(true, rpcMarray);
// free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else)
freeMarRpcRepresentation( origTile, rpcMarray );
// delete current tile (including data block)
delete origTile;
if( status > 0 )
{
throw r_Error( r_Error::r_Error_TransferFailed );
}
}
executeEndInsertMDD(true); //rpcendinsertmdd_1( params3, binding_h );
// delete transient data
bagOfTiles->remove_all();
delete bagOfTiles;
}
else // begin: MDD is transferred in one piece
{
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", ", one tile" )
getMarRpcRepresentation( mar, rpcMarray, mar->get_storage_layout()->get_storage_format(), baseType );
int status = executeInsertMDD(collName, mar, rpcMarray); // rpcStatusPtr = rpcinsertmdd_1( params, binding_h );
freeMarRpcRepresentation( mar, rpcMarray );
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "ok" )
switch( status )
{
case 0:
break; // OK
case 2:
LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status );
throw r_Error( r_Error::r_Error_DatabaseClassUndefined );
break;
case 3:
LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status );
throw r_Error( r_Error::r_Error_CollectionElementTypeMismatch );
break;
case 4:
LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status );
throw r_Error( r_Error::r_Error_TypeInvalid );
break;
default:
LEAVE( "RpcClientComm::insertMDD(): exception, status = " << status );
throw r_Error( r_Error::r_Error_TransferFailed );
break;
}
} // end: MDD i transferred in one piece
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertMDD()" );
LEAVE( "RpcClientComm::insertMDD()" );
}
//################################################################
r_Ref_Any RnpClientComm::getMDDByOId( const r_OId& oid ) throw( r_Error )
{
ENTER( "RpcClientComm::getMDDByOId(" << oid << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDByOId(" << oid << ")" );
RMInit::logOut << "Internal error: RnpClientComm::getMDDByOId() not implemented, returning empty r_Ref_Any()." << endl;
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDByOId()" );
LEAVE( "RpcClientComm::getMDDByOId()" );
return r_Ref_Any();
}
void RnpClientComm::insertColl( const char* collName, const char* typeName, const r_OId& oid ) throw( r_Error )
{
ENTER( "RpcClientComm::insertColl(" << collName << "," << typeName << "," << oid << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertColl(" << collName << "," << typeName << "," << oid << ")" );
checkForRwTransaction();
int status = executeInsertCollection(collName, typeName, oid);
switch( status )
{
case 0:
break; //OK
case 1:
LEAVE( "RpcClientComm::insertColl(): exception, status = " << status );
throw r_Error( r_Error::r_Error_ClientUnknown ); break;
case 2:
LEAVE( "RpcClientComm::insertColl(): exception, status = " << status );
throw r_Error( r_Error::r_Error_DatabaseClassUndefined ); break;
case 3:
LEAVE( "RpcClientComm::insertColl(): exception, status = " << status );
throw r_Error( r_Error::r_Error_NameNotUnique ); break;
default:
LEAVE( "RpcClientComm::insertColl(): exception, status = " << status );
throw r_Error( r_Error::r_Error_General );break;
}
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "insertColl()" );
LEAVE( "RpcClientComm::insertColl()" );
}
void RnpClientComm::deleteCollByName( const char* collName ) throw( r_Error )
{
ENTER( "RpcClientComm::deleteCollByName(" << (collName?collName:"(null)") << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteCollByName(" << collName << ")" );
checkForRwTransaction();
startRequest(RnpRasserver::cmd_delcollbyname);
encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
TALK( "adding clientID 0x" << hex << clientID << dec );
encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
helper012d("removeObjFromColl");
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteCollByName()" );
LEAVE( "RpcClientComm::deleteCollByName()" );
}
void RnpClientComm::deleteObjByOId( const r_OId& oid ) throw( r_Error )
{
ENTER( "RpcClientComm::deleteObjByOId(" << oid << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteObjByOId(" << oid << ")" );
checkForRwTransaction();
startRequest(RnpRasserver::cmd_delobjbyoid);
encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
TALK( "adding clientID 0x" << hex << clientID << dec );
encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
helper012d("deleteObjByOId");
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "deleteObjByOId()" );
LEAVE( "RpcClientComm::deleteObjByOId()" );
}
void RnpClientComm::removeObjFromColl( const char* collName, const r_OId& oid ) throw ( r_Error )
{
ENTER( "RpcClientComm::removeObjFromColl(" << (collName?collName:"(null)") << "," << oid << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "removeObjFromColl(" << collName << "," << oid << ")" );
checkForRwTransaction();
startRequest(RnpRasserver::cmd_remobjfromcoll);
encoder.addInt32Parameter(RnpRasserver::pmt_clientid, clientID);
TALK( "adding clientID 0x" << hex << clientID << dec );
encoder.addStringParameter(RnpRasserver::pmt_collname, collName);
encoder.addStringParameter(RnpRasserver::pmt_oidstring, oid.get_string_representation());
helper012d("removeObjFromColl");
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "removeObjFromColl()" );
LEAVE( "RpcClientComm::removeObjFromColl()" );
}
r_Ref_Any RnpClientComm::getCollByName( const char* collName ) throw( r_Error )
{
ENTER( "RpcClientComm::getCollByName(" << (collName?collName:"(null)") << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName(" << collName << ")" );
r_Ref_Any result = executeGetCollByNameOrOId ( collName, r_OId() );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByName()" );
LEAVE( "RpcClientComm::getCollByName()" );
return result;
}
r_Ref_Any RnpClientComm::getCollByOId ( const r_OId& oid ) throw( r_Error )
{
ENTER( "RpcClientComm::getCollByOId(" << oid << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByOId(" << oid << ")" );
r_Ref_Any result = executeGetCollByNameOrOId ( NULL, oid );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollByOId()" );
LEAVE( "RpcClientComm::getCollByOId()" );
return result;
}
r_Ref_Any RnpClientComm::getCollOIdsByName( const char* name ) throw( r_Error )
{
ENTER( "RpcClientComm::getCollOIdsByName(" << (name?name:"(null)") << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName(" << name << ")" );
r_Ref_Any result = executeGetCollOIdsByNameOrOId ( name, r_OId() );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByName()" );
LEAVE( "RpcClientComm::getCollOIdsByName()" );
return result;
}
r_Ref_Any RnpClientComm::getCollOIdsByOId ( const r_OId& oid ) throw( r_Error )
{
ENTER( "RpcClientComm::getCollOIdsByOId(" << oid << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId(" << oid << ")" );
r_Ref_Any result = executeGetCollOIdsByNameOrOId ( NULL, oid );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getCollOIdsByOId()" );
LEAVE( "RpcClientComm::getCollOIdsByOId()" );
return result;
}
//------------------------------
void RnpClientComm::executeQuery( const r_OQL_Query& query, r_Set< r_Ref_Any >& result ) throw( r_Error )
{
ENTER( "RpcClientComm::executeQuery(_,_)" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery(_,_)" );
int status = executeExecuteQuery( query.get_query(), result );
switch(status)
{
case 0:
getMDDCollection( result, 1 );
break; // 1== isQuery
case 1:
getElementCollection( result );
break;
//case 2: nothing
default:
RMInit::logOut << "Internal error: RpcClientComm::executeQuery(): illegal status value " << status << endl;
}
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "executeQuery()" );
LEAVE( "RpcClientComm::executeQuery()" );
}
void RnpClientComm::getMDDCollection( r_Set< r_Ref_Any >& mddColl, unsigned int isQuery ) throw(r_Error)
{
ENTER( "RpcClientComm::getMDDCollection(_," << isQuery << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCollection(_," << isQuery << ")" );
unsigned short tileStatus=0;
unsigned short mddStatus = 0;
while( mddStatus == 0 ) // repeat until all MDDs are transferred
{
r_Ref mddResult;
// Get spatial domain of next MDD
GetMDDRes* thisResult = executeGetNextMDD();
mddStatus = thisResult->status;
if( mddStatus == 2 )
{
RMInit::logOut << "Error: getMDDCollection(...) - no transfer collection or empty transfer collection" << endl;
LEAVE( "RpcClientComm::getMDDCollection(): exception, status = " << mddStatus );
throw r_Error( r_Error::r_Error_TransferFailed );
}
tileStatus = getMDDCore(mddResult, thisResult, isQuery);
// finally, insert the r_Marray into the set
mddColl.insert_element( mddResult, 1 );
free(thisResult->domain);
free(thisResult->typeName);
free(thisResult->typeStructure);
free(thisResult->oid);
delete thisResult;
if( tileStatus == 0 ) // if this is true, we're done with this collection
break;
} // end while( mddStatus == 0 )
executeEndTransfer();
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCollection()" );
LEAVE( "RpcClientComm::getMDDCollection()" );
}
// small helper for ...
void freeGetTileRes(GetTileRes *ptr)
{
ENTER( "RpcClientComm::freeGetTileRes(_)" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "freeGetTileRes(_)" );
if(ptr->marray->domain)
free(ptr->marray->domain);
if(ptr->marray->data.confarray_val)
free(ptr->marray->data.confarray_val);
delete ptr->marray;
delete ptr;
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "freeGetTileRes()" );
LEAVE( "RpcClientComm::freeGetTileRes(_)" );
}
unsigned short
RnpClientComm::getMDDCore( r_Ref< r_GMarray > &mdd, GetMDDRes *thisResult, unsigned int isQuery ) throw( r_Error )
{
ENTER( "RpcClientComm::getMDDCore(_,_," << isQuery << ")" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCore(_,_," << isQuery << ")" );
// create r_Minterval and oid
r_Minterval mddDomain( thisResult->domain );
r_OId rOId ( thisResult->oid );
r_GMarray *marray;
if( isQuery )
marray = new( r_Database::actual_database, r_Object::transient, rOId ) r_GMarray();
else
marray = new( r_Database::actual_database, r_Object::read , rOId ) r_GMarray();
marray->set_spatial_domain( mddDomain );
marray->set_type_by_name ( thisResult->typeName );
marray->set_type_structure( thisResult->typeStructure );
r_Data_Format currentFormat = (r_Data_Format)(thisResult->currentFormat);
if (r_Tile_Compression::check_data_format(currentFormat) == 1)
currentFormat = r_Array;
marray->set_current_format( currentFormat );
r_Data_Format decompFormat;
const r_Base_Type *baseType = marray->get_base_type_schema();
// Variables needed for tile transfer
GetTileRes* tileRes=0;
unsigned short mddDim = mddDomain.dimension(); // we assume that each tile has the same dimensionality as the MDD
r_Minterval tileDomain;
r_GMarray* tile; // for temporary tile
char* memCopy;
unsigned long memCopyLen;
int tileCntr = 0;
unsigned short tileStatus = 0;
tileStatus = 2; // call rpcgetnexttile_1 at least once
while( tileStatus == 2 || tileStatus == 3 ) // while( for all tiles of the current MDD )
{
tileRes = executeGetNextTile();
tileStatus = tileRes->status;
if( tileStatus == 4 )
{
freeGetTileRes(tileRes);
RMInit::logOut << "Error: rpcGetNextTile(...) - no tile to transfer or empty transfer collection" << endl;
LEAVE( "RpcClientComm::getMDDCore(): exception, status = " << tileStatus );
throw r_Error( r_Error::r_Error_TransferFailed );
}
// take cellTypeLength for current MDD of the first tile
if( tileCntr == 0 )
marray->set_type_length( tileRes->marray->cellTypeLength );
tileDomain = r_Minterval( tileRes->marray->domain );
memCopyLen = tileDomain.cell_count() * marray->get_type_length(); // cell type length of the tile must be the same
if (memCopyLen < tileRes->marray->data.confarray_len)
memCopyLen = tileRes->marray->data.confarray_len; // may happen when compression expands
memCopy = new char[ memCopyLen ];
// create temporary tile
tile = new r_GMarray();
tile->set_spatial_domain( tileDomain );
tile->set_array( memCopy );
tile->set_array_size( memCopyLen );
tile->set_type_length( tileRes->marray->cellTypeLength );
tileCntr++;
// Variables needed for block transfer of a tile
unsigned long blockOffset = 0;
unsigned short subStatus = 3;
currentFormat = (r_Data_Format)(tileRes->marray->currentFormat);
switch( tileStatus )
{
case 3: // at least one block of the tile is left
// Tile arrives in several blocks -> put them together
concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset);
freeGetTileRes(tileRes);
tileRes = executeGetNextTile();//rpcgetnexttile_1( &clientID, binding_h );
subStatus = tileRes->status;
if( subStatus == 4 )
{
freeGetTileRes(tileRes);
LEAVE( "RpcClientComm::getMDDCore(): exception, status = " << tileStatus << ", subStatus = " << subStatus );
throw r_Error( r_Error::r_Error_TransferFailed );
}
concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset);
freeGetTileRes(tileRes);
tileStatus = subStatus;
break;
default: // tileStatus = 0,3 last block of the current tile
// Tile arrives as one block.
concatArrayData(tileRes->marray->data.confarray_val, tileRes->marray->data.confarray_len, memCopy, memCopyLen, blockOffset);
freeGetTileRes(tileRes);
break;
}
// uncompress the tile if necessary
decompFormat = doTransferDecompression( tile, baseType, currentFormat, blockOffset );
char* marrayData = NULL;
// Now the tile is transferred completely, insert it into current MDD
if( tileStatus < 2 && tileCntr == 1 && (tile->spatial_domain() == marray->spatial_domain()))
{
// MDD consists of just one tile that is the same size of the mdd
// simply take the data memory of the tile
marray->set_array( tile->get_array() );
marray->set_array_size( tile->get_array_size() );
tile->set_array( 0 );
}
else
{
// MDD consists of more than one tile or the tile does not cover the whole domain
r_Bytes size = mddDomain.cell_count() * marray->get_type_length();
if( tileCntr == 1 )
{
// allocate memory for the MDD
marrayData = new char[ size ];
memset(marrayData, 0, size);
marray->set_array( marrayData );
}
else
marrayData = marray->get_array();
// copy tile data into MDD data space (optimized, relying on the internal representation of an MDD )
char* mddBlockPtr;
char* tileBlockPtr = tile->get_array();
unsigned long blockCells = tileDomain[tileDomain.dimension()-1].high()-tileDomain[tileDomain.dimension()-1].low()+1;
unsigned long blockSize = blockCells * marray->get_type_length();
unsigned long blockNo = tileDomain.cell_count() / blockCells;
for( unsigned long blockCtr = 0; blockCtr < blockNo; blockCtr++ )
{
mddBlockPtr = marrayData + marray->get_type_length()*mddDomain.cell_offset( tileDomain.cell_point( blockCtr * blockCells ) );
memcpy( (void*)mddBlockPtr, (void*)tileBlockPtr, (size_t)blockSize );
tileBlockPtr += blockSize;
}
// former non-optimized version
// for( i=0; icell_count(); i++ )
// (*marray)[tileDomain->cell_point( i )] = (*tile)[tileDomain->cell_point( i )];
marray->set_array_size( size );
}
// delete temporary tile
delete tile;
} // end while( MDD is not transferred completely )
mdd = r_Ref( marray->get_oid(), marray );
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMDDCore() -> " << tileStatus );
LEAVE( "RpcClientComm::getMDDCore() -> " << tileStatus );
return tileStatus;
}
int RnpClientComm::concatArrayData( const char *source, unsigned long srcSize, char *&dest, unsigned long &destSize, unsigned long &destLevel )
{
ENTER( "RpcClientComm::concatArrayData( 0x" << hex << (unsigned long) source << dec << "," << srcSize << ",_,_,_ )" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "concatArrayData(" << source << "," << srcSize << ",_,_,_)" );
if (destLevel + srcSize > destSize)
{
// need to extend dest
unsigned long newSize = destLevel + srcSize;
char *newArray;
// allocate a little extra if we have to extend
newSize = newSize + newSize / 16;
// RMDBGOUT( 1, "RpcClientComm::concatArrayData(): need to extend from " << destSize << " to " << newSize );
if ((newArray = new char[newSize]) == NULL)
{
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "concatArrayData() -> " << -1 );
LEAVE( "RpcClientComm::concatArrayData() -> -1" );
return -1;
}
memcpy(newArray, dest, destLevel);
delete [] dest;
dest = newArray;
destSize = newSize;
}
memcpy(dest + destLevel, source, srcSize);
destLevel += srcSize;
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "concatArrayData() -> " << 0 );
LEAVE( "RpcClientComm::concatArrayData() -> 0" );
return 0;
}
r_Data_Format
RnpClientComm::doTransferDecompression( r_GMarray* tile, const r_Base_Type *type, r_Data_Format fmt, unsigned long size )
{
ENTER( "RpcClientComm::doTransferDecompression(...) tile dom:" << tile->spatial_domain() << " array size:" << tile->get_array_size() << " type size:" << tile->get_type_length());
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile dom:"
<< tile->spatial_domain() << " array size:" << tile->get_array_size()
<< " type size:" << tile->get_type_length());
if (fmt != r_Array)
{
r_Tile_Compression *engine = NULL;
char *newTileData = NULL;
r_Data_Format newFormat;
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) decompressing from "
<< fmt << ", " << size << "bytes");
try
{
r_Storage_Man_CPP sman;
engine = r_Tile_Compression::create( fmt, tile->spatial_domain(), type );
engine->set_storage_handler(sman);
newTileData = (char*)(engine->decompress(tile->get_array(), size));
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm",
"doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " OK");
}
catch (r_Error &err)
{
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm",
"doTransferDecompression(...) decompression to " << engine->get_decomp_format() << " FAILED");
RMInit::logOut << "RpcClientComm::doTransferDecompression() Error decompressing data, assuming raw" << endl;
}
newFormat = engine->get_decomp_format();
if (newTileData != NULL)
{
delete [] tile->get_array();
tile->set_array(newTileData);
tile->set_array_size(tile->spatial_domain().cell_count()*tile->get_type_length());
}
else
newFormat = fmt;
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile newFmt:"
<< newFormat << " dom:" << tile->spatial_domain()
<< " array size:" << tile->get_array_size()
<< " type size:" << tile->get_type_length());
// ... also make sure the decoded format is really raw array data (r_Array)
if ((endianClient != endianServer) && (newFormat == r_Array))
{
// if compression engine already handles endianness we mustn't change again
if (!engine->converts_endianness()) {
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for "
<< fmt << " endianness changed from "
<< (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient);
changeEndianness(tile, type);
}
}
if (engine != NULL)
delete engine;
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << newFormat);
LEAVE( "RpcClientComm::doTransferDecompression() -> " << newFormat );
return newFormat;
}
if (endianClient != endianServer)
{
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) for "
<< fmt << " endianness changed from "
<< (r_Endian::r_Endianness)endianServer << " to " << (r_Endian::r_Endianness) endianClient);
changeEndianness(tile, type);
}
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "doTransferDecompression(...) tile fmt:" << r_Array);
LEAVE( "RpcClientComm::doTransferDecompression() -> " << r_Array );
return r_Array;
}
void RnpClientComm::getElementCollection( r_Set< r_Ref_Any >& resultColl ) throw(r_Error)
{
ENTER( "RpcClientComm::getElementCollection()" );
RMDBGENTER( 2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection(_)" );
unsigned short rpcStatus = 0;
TALK( "got set of type " << resultColl.get_type_structure() );
while( rpcStatus == 0 ) // repeat until all elements are transferred
{
GetElementRes* thisResult = executeGetNextElement();
rpcStatus = thisResult->status;
if( rpcStatus == 2 )
{
RMInit::logOut << "Error: getElementCollection(...) - no transfer collection or empty transfer collection" << endl;
LEAVE( "RpcClientComm::getElementCollection(): exception: rpcStatus = " << rpcStatus );
throw r_Error( r_Error::r_Error_TransferFailed );
}
// create new collection element, use type of collection resultColl
r_Ref_Any element;
const r_Type* elementType = resultColl.get_element_type_schema();
// convert the endianness before creating the new element!
if (endianClient != endianServer)
{
if (endianClient == 0)
elementType->convertToBigEndian(thisResult->data.confarray_val, 1);
else
elementType->convertToLittleEndian(thisResult->data.confarray_val, 1);
}
switch( elementType->type_id() )
{
case r_Type::BOOL:
case r_Type::CHAR:
case r_Type::OCTET:
case r_Type::SHORT:
case r_Type::USHORT:
case r_Type::LONG:
case r_Type::ULONG:
case r_Type::FLOAT:
case r_Type::DOUBLE:
element = new r_Primitive( thisResult->data.confarray_val, (r_Primitive_Type*) elementType );
r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element );
break;
case r_Type::COMPLEXTYPE1:
case r_Type::COMPLEXTYPE2:
element = new r_Complex(thisResult->data.confarray_val, (r_Complex_Type *)elementType);
r_Transaction::actual_transaction->add_object_list(r_Transaction::SCALAR, (void *)element);
break;
case r_Type::STRUCTURETYPE:
element = new r_Structure( thisResult->data.confarray_val, (r_Structure_Type*) elementType );
r_Transaction::actual_transaction->add_object_list( r_Transaction::SCALAR, (void*) element );
break;
case r_Type::POINTTYPE:
{
char* stringRep = new char[thisResult->data.confarray_len+1];
strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
stringRep[thisResult->data.confarray_len] = '\0';
r_Point* typedElement = new r_Point( stringRep );
element = typedElement;
r_Transaction::actual_transaction->add_object_list( r_Transaction::POINT, (void*) typedElement );
delete [] stringRep;
}
break;
case r_Type::SINTERVALTYPE:
{
char* stringRep = new char[thisResult->data.confarray_len+1];
strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
stringRep[thisResult->data.confarray_len] = '\0';
r_Sinterval* typedElement = new r_Sinterval( stringRep );
element = typedElement;
r_Transaction::actual_transaction->add_object_list( r_Transaction::SINTERVAL, (void*) typedElement );
delete [] stringRep;
}
break;
case r_Type::MINTERVALTYPE:
{
char* stringRep = new char[thisResult->data.confarray_len+1];
strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
stringRep[thisResult->data.confarray_len] = '\0';
r_Minterval* typedElement = new r_Minterval( stringRep );
element = typedElement;
r_Transaction::actual_transaction->add_object_list( r_Transaction::MINTERVAL, (void*) typedElement );
delete [] stringRep;
}
break;
case r_Type::OIDTYPE:
{
char* stringRep = new char[thisResult->data.confarray_len+1];
strncpy( stringRep, thisResult->data.confarray_val, thisResult->data.confarray_len );
stringRep[thisResult->data.confarray_len] = '\0';
r_OId* typedElement = new r_OId( stringRep );
element = typedElement;
r_Transaction::actual_transaction->add_object_list( r_Transaction::OID, (void*) typedElement );
delete [] stringRep;
}
break;
default:
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection(...) bad element typeId" << elementType->type_id())
break;
}
TALK( "got an element" );
// insert element into result set
resultColl.insert_element( element, 1 );
delete[] thisResult->data.confarray_val;
delete thisResult;
}
executeEndTransfer();
RMDBGEXIT( 2, RMDebug::module_clientcomm, "RpcClientComm", "getElementCollection()" );
LEAVE( "RpcClientComm::getElementCollection()" );
}
//##################################################################################
void RnpClientComm::executeQuery( const r_OQL_Query& query ) throw( r_Error )
{
ENTER( "RnpClientComm::executeQuery(_)" );
checkForRwTransaction();
unsigned short status;
//
// Send MDD constants to the server.
//
if( query.get_constants() )
{
r_Set< r_GMarray* >* mddConstants = (r_Set< r_GMarray* >*)query.get_constants();
if(executeInitUpdate() < 0) // error would be nicer
{
LEAVE( "Error: RnpClientComm::executeQuery(): update query initialization failed." );
return;
}
r_Iterator iter = mddConstants->create_iterator();
for( iter.reset(); iter.not_done(); iter++ )
{
r_GMarray* mdd = *iter;
const r_Base_Type* baseType = mdd->get_base_type_schema();
if( mdd )
{
status = executeStartInsertTransMDD(mdd);
switch( status )
{
case 0:
break; // OK
case 2:
LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status );
throw r_Error( r_Error::r_Error_DatabaseClassUndefined );
break;
case 3:
LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status );
throw r_Error( r_Error::r_Error_TypeInvalid );
break;
default:
LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status );
throw r_Error( r_Error::r_Error_TransferFailed );
break;
}
r_Set< r_GMarray* >* bagOfTiles;
bagOfTiles = mdd->get_storage_layout()->decomposeMDD( mdd );
r_Iterator< r_GMarray* > iter2 = bagOfTiles->create_iterator();
for(iter2.reset(); iter2.not_done();iter2.advance())
{
RPCMarray* rpcMarray;
r_GMarray *origTile = *iter2;
getMarRpcRepresentation( origTile, rpcMarray, mdd->get_storage_layout()->get_storage_format(), baseType );
status = executeInsertTile(false, rpcMarray);
// free rpcMarray structure (rpcMarray->data.confarray_val is freed somewhere else)
freeMarRpcRepresentation( origTile, rpcMarray );
// delete current tile (including data block)
delete origTile;
if( status > 0 )
{
LEAVE( "RnpClientComm::executeQuery(): exception, status = " << status );
throw r_Error( r_Error::r_Error_TransferFailed );
}
}
bagOfTiles->remove_all();
delete bagOfTiles;
executeEndInsertMDD(false);
}
}
}
executeExecuteUpdateQuery(query.get_query());
LEAVE( "RnpClientComm::executeQuery(_)" );
}
// helper functions
void
RnpClientComm::getMarRpcRepresentation( const r_GMarray* mar, RPCMarray*& rpcMarray,
r_Data_Format initStorageFormat,
const r_Base_Type *baseType)
{
ENTER( "RpcClientComm::getMarRpcRepresentation(...)");
RMDBGENTER(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)");
// allocate memory for the RPCMarray data structure and assign its fields
rpcMarray = (RPCMarray*)mymalloc( sizeof(RPCMarray) );
rpcMarray->domain = mar->spatial_domain().get_string_representation();
rpcMarray->cellTypeLength = mar->get_type_length();
void* arrayData = NULL;
r_ULong arraySize=0;
if (initStorageFormat == r_Array)
{
if (transferFormat != r_Array)
{
r_Tile_Compression *engine = NULL;
try
{
r_Storage_Man_CPP sman;
engine = r_Tile_Compression::create(transferFormat, mar->spatial_domain(), baseType);
engine->set_storage_handler(sman);
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) compress with " << engine->get_name())
if ((endianClient != endianServer) && (!engine->converts_endianness()))
{
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for "
<< transferFormat << " endianness changed before compression from " << (r_Endian::r_Endianness)endianClient
<< " to " << (r_Endian::r_Endianness) endianServer);
char *endianData = new char[mar->get_array_size()];
changeEndianness(mar, endianData, baseType);
arrayData = engine->compress(endianData, arraySize, transferFormatParams);
delete [] endianData;
endianData=NULL;
}
else
{
arrayData = engine->compress(mar->get_array(), arraySize, transferFormatParams);
}
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compression returned " << arrayData << " (" << arraySize << " bytes)")
/* void *testData = engine->decompress(arrayData, arraySize);
cout << "Decompression worked " << ((memcmp(mar->get_array(), testData, (mar->get_type_length()) * (mar->spatial_domain().cell_count())) == 0) ? "OK" : "!NOT!") << endl;
delete [] testData;
*/
// ForWiss: revert to uncompressed data if the compressed data is larger
// coman: and introduced a bug of endianess ...
if (arraySize > mar->get_array_size())
{
RMInit::logOut << "RpcClientComm::getMarRpcRepresentation(...) Warning: overriding compression setting("
<< transferFormat << ") to " << r_Array
<< " because compressed size( " << arraySize
<< " bytes) > uncompressed size( " << mar->get_array_size() << " bytes)" << endl;
delete [] arrayData;
arrayData = NULL;
}
}
catch (r_Error &err)
{
RMInit::logOut << "RpcClientComm::getMarRpcRepresentation() Error: Unsupported data format " << transferFormat << endl;
}
if (engine != NULL)
delete engine;
}
else
{
if (endianClient != endianServer)
{
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for "
<< transferFormat << " endianness changed from "
<< (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer);
arraySize = mar->get_array_size();
arrayData = new char[arraySize];
changeEndianness(mar, arrayData, baseType);
}
}
}
if (arrayData == NULL)
{
//error in compression or compression inefficient
rpcMarray->currentFormat = initStorageFormat;
rpcMarray->data.confarray_len = mar->get_array_size();
if (endianClient != endianServer)
{
RMDBGMIDDLE( 2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...) for "
<< transferFormat << "endianness changed from "
<< (r_Endian::r_Endianness)endianClient << " to " << (r_Endian::r_Endianness) endianServer
<< " because compression " << transferFormat << " failed" );
arrayData = new char[arraySize];
changeEndianness(mar, arrayData, baseType);
rpcMarray->data.confarray_val = (char*)(arrayData);
}
else
{
rpcMarray->data.confarray_val = (char*)(mar->get_array());
}
}
else
{
if (arraySize != mar->get_array_size())
{
RMDBGMIDDLE(2, RMDebug::module_clientcomm, "RpcClientComm", "compressed to " << (100.0*arraySize) / mar->get_array_size() << "%")
}
rpcMarray->currentFormat = transferFormat;
rpcMarray->data.confarray_len = arraySize;
rpcMarray->data.confarray_val = (char*)arrayData;
}
rpcMarray->storageFormat = storageFormat;
RMDBGEXIT(2, RMDebug::module_clientcomm, "RpcClientComm", "getMarRpcRepresentation(...)");
LEAVE( "RpcClientComm::getMarRpcRepresentation()");
}
void
RnpClientComm::freeMarRpcRepresentation( const r_GMarray* mar, RPCMarray* rpcMarray )
{
ENTER( "RnpClientComm::freeMarRpcRepresentation(_,_)" );
if (rpcMarray->data.confarray_val != ((r_GMarray*)mar)->get_array())
{
delete[] rpcMarray->data.confarray_val;
}
free( rpcMarray->domain );
free( rpcMarray );
LEAVE( "RnpClientComm::freeMarRpcRepresentation()" );
}
//#########################################################################
r_OId RnpClientComm::getNewOId( unsigned short objType ) throw(r_Error)
{
return executeGetNewOId(objType);
}
unsigned short RnpClientComm::getObjectType( const r_OId& oid ) throw(r_Error)
{
return executeGetObjectType(oid);
}
char* RnpClientComm::getTypeStructure( const char* typeName, r_Type_Type typeType ) throw(r_Error)
{
return executeGetTypeStructure( typeName, typeType );
}
int RnpClientComm::setStorageFormat( r_Data_Format format, const char *formatParams)
{
ENTER( "RnpClientComm::setStorageFormat( format=" << format << ", formatParams=" << (formatParams?formatParams:"(null)") << " )" );
storageFormat = format;
if (storageFormatParams != NULL)
{
free(storageFormatParams);
storageFormatParams = NULL;
}
if (formatParams != NULL)
{
storageFormatParams = (char*)mymalloc(strlen(formatParams) + 1);
strcpy(storageFormatParams, formatParams);
// extract ``compserver'' if present
clientParams->process(storageFormatParams);
}
int result = executeSetFormat( false, format, formatParams);
LEAVE( "RnpClientComm::setStorageFormat() -> " << result );
return result;
}
int RnpClientComm::setTransferFormat( r_Data_Format format, const char* formatParams)
{
ENTER( "RnpClientComm::setTransferFormat( format=" << format << ", formatParams=" << (formatParams?formatParams:"(null)") << " )" );
transferFormat = format;
if (transferFormatParams != NULL)
{
free(transferFormatParams);
transferFormatParams = NULL;
}
if (formatParams != NULL)
{
transferFormatParams = (char*)mymalloc(strlen(formatParams)+1);
strcpy(transferFormatParams, formatParams);
// extract ``exactformat'' if present
clientParams->process(transferFormatParams);
}
int result = executeSetFormat( true, format, formatParams);
LEAVE( "RnpClientComm::setTransferFormat() -> " << result );
return result;
}
const char* RnpClientComm::getExtendedErrorInfo() throw(r_Error)
{
// This function shouldn't be called for RNP protocol
static char *errorInfo = new char[30];
strcpy(errorInfo,"No info");
return errorInfo;
}
const char* RnpClientComm::getServerName()
{
return serverHost;
}
void RnpClientComm::setUserIdentification(const char *userName, const char *plainTextPassword)
{
ENTER( "RnpClientComm::setUserIdentification( userName=" << (userName?userName:"(null)") << ", plainTextPassword=" << (plainTextPassword?plainTextPassword:"(null)") << " )" );
char digest[33]="";
messageDigest(plainTextPassword,digest,"MD5");
sprintf(identificationString,"%s:%s",userName,digest);
LEAVE( "RnpClientComm::setUserIdentification()" );
}
unsigned long RnpClientComm::getClientID() const
{
return clientID;
}
void RnpClientComm::triggerAliveSignal(){}
void RnpClientComm::sendAliveSignal(){}
//############# helper functions ###################
int RnpClientComm::messageDigest(const char *input,char *output,const char *mdName)
{
EVP_MD_CTX mdctx;
const EVP_MD *md;
unsigned int md_len, i;
unsigned char md_value[100];
OpenSSL_add_all_digests();
md = EVP_get_digestbyname(mdName);
if(!md)
return 0;
EVP_DigestInit(&mdctx, md);
EVP_DigestUpdate(&mdctx,input, strlen(input));
EVP_DigestFinal(&mdctx, md_value, &md_len);
for(i = 0; i < md_len; i++)
sprintf(output+i+i,"%02x", md_value[i]);
return strlen(output);
}
void RnpClientComm::setMaxRetry(unsigned int newMaxRetry)
{
RMInit::clientcommMaxRetry = newMaxRetry;
}
unsigned int RnpClientComm::getMaxRetry()
{
return RMInit::clientcommMaxRetry;
}
// aux function: sleep incrementally, with subsecond precision
static void pause(int retryCount)
{
// changed PB 2005-sep-09
// was: unsigned int millisec = 50 + retryCount * 50;
unsigned int millisec = RNP_PAUSE_INCREMENT * (retryCount + 1);
// if(millisec > 1000)
// millisec = 1000;
timeval tv;
tv.tv_sec = millisec / 1000;
tv.tv_usec = millisec * 1000;
select(0,NULL,NULL,NULL,&tv);
}
int RnpClientComm::getFreeServer(bool readwrite, bool openDB)
{
ENTER( "RnpClientComm::getFreeServer( readwrite=" << readwrite << ", openDB=" << openDB << " )" );
// to avoid nested retries, only this loop is kept,
// the one in executeGetFreeServer() has been removed -- PB 2005-aug-31
for ( int retryCount=0; ; retryCount++ )
{
try
{
executeGetFreeServer(readwrite, openDB);
// if no error, we have the server, so break
break;
}
catch(r_Error &e)
{
int errorno = e.get_errorno();
RMInit::logOut << "Error: Connection to rasdaman server failed with " << errorno << ": " << e.what() << endl;
// for these errors a retry makes sense, as long as we haven't reached the retry limit:
if ( ( errorno==801 || errorno==805 || errorno==806 ) && retryCount < RMInit::clientcommMaxRetry )
{
TALK( " retry="< 1" );
return 1;
}
#define MAXMSG 512
int RnpClientComm::executeGetFreeServer(bool readwrite, bool openDB)
{
ENTER( "RnpClientComm::executeGetFreeServer( readwrite=" << readwrite << ", openDB=" << openDB << " )" );
static char myRasmgrID[100]="";
if(myRasmgrID[0]==0)
{
unsigned int hostid = gethostid();
unsigned int pid = getpid();
sprintf(myRasmgrID,"%u:%u",hostid,pid);
}
char message[MAXMSG];
char header[MAXMSG];
char body[MAXMSG];
if (openDB)
sprintf(header,"POST getfreeserver2 HTTP/1.1\r\nAccept: text/plain\r\nUserAgent: RasClient/1.0\r\nAuthorization: ras %s\r\nContent-length:",identificationString);
else
sprintf(header,"POST getfreeserver HTTP/1.1\r\nAccept: text/plain\r\nUserAgent: RasClient/1.0\r\nAuthorization: ras %s\r\nContent-length:",identificationString);
sprintf(body,"%s RNP %s %s",databaseName,(readwrite ? "rw":"ro"), myRasmgrID);
sprintf(message,"%s %d\r\n\r\n%s",header,strlen(body)+1,body);
struct protoent* getprotoptr = getprotobyname("tcp");
struct hostent *hostinfo = gethostbyname(rasmgrHost);
if(hostinfo==NULL)
{
RMInit::logOut << "Error locating rasmgr on host " << rasmgrHost <<" ("<h_addr;
int sock;
bool ok = false;
// old comment by Walter Schatz: "this has to be 5000 or so, now that counter is 120 default (later we'll make this better)"
// new comment by PB 2005-aug-31:
// - these retries + the ones in getFreeServer lead to max*max*40 = 400,000 retries! one loop should be enough.
// - there are unmotivated factors at every step (cf pause()), let's do just one wait loop;
// all loop code is commented out with prefix "NOLOOP"
// X: int retry;
// X: for(retry=0;retryp_proto);
if(sock<0)
{
// X: if(retry==0)
RMInit::logOut << "Error: cannot open socket to rasmgr (" << strerror(errno) << ")." << endl;
// X: sleep(RMInit::clientcommSleep);
// X: continue;
}
else if(connect(sock,(struct sockaddr*)&internetSocketAddress,sizeof(internetSocketAddress)) < 0)
{
// X: if(retry==0)
// X: RMInit::logOut <<"getFreeServer: Connection to rasmgr failed: "<p_proto );
ok = true;
// X: break;
// X:}
// X:if(retry)
// X: RMInit::logOut << "Warning: tried connecting " << retry+1 << " times " < 1" );
return 1;
}
int RnpClientComm::readWholeMessage(int socket,char *destBuffer,int buffSize)
{
// we read what is comming in until we encounter a '\0'
// this is our end-sign.
int totalLength=0;
int redNow;
while(1)
{
redNow = read(socket,destBuffer+totalLength,buffSize-totalLength);
if(redNow == -1)
{
if(errno == EINTR)
continue; // read was interrupted by signal
return -1; // another error
}
totalLength+=redNow;
if(destBuffer[totalLength-1]==0)
break; // THE END
}
TALK( "RnpClientComm::readWholeMessage(): read " << totalLength << " bytes, buffer is: " << destBuffer );
return totalLength;
}
int RnpClientComm::writeWholeMessage(int socket,char *destBuffer,int buffSize)
{
// we write the whole message, including the ending '\0', which is already in
// the buffSize provided by the caller
int totalLength=0;
int writeNow;
while(1)
{
writeNow = write(socket,destBuffer+totalLength,buffSize-totalLength);
if(writeNow == -1)
{
if(errno == EINTR)
continue; // read was interrupted by signal
return -1; // another error
}
totalLength+=writeNow;
if( totalLength==buffSize )
break; // THE END
}
TALK( "RnpClientComm::writeWholeMessage(): wrote " << totalLength << " bytes, buffer is: " << destBuffer );
return totalLength;
}
void RnpClientComm::checkForRwTransaction() throw (r_Error)
{
r_Transaction *trans = r_Transaction::actual_transaction;
if( trans == 0 || trans->get_mode() == r_Transaction::read_only )
{
TALK( "RnpClientComm::checkForRwTransaction(): throwing exception from failed TA rw check." );
throw r_Error( r_Error::r_Error_TransactionReadOnly );
}
}