/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community. If not, see .
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see
* or contact Peter Baumann via .
*/
#include "mymalloc/mymalloc.h"
// This is -*- C++ -*-
/*************************************************************
*
*
* PURPOSE:
* implements blobtile interface using the PostgreSQL DBMS.
*
*
* COMMENTS:
* - uses LO blob method, this has been tested to be faster than bytea
* - RMDBG macros generate so weird output that I added DEBUG macros
* although this is very ugly (2 trace facilities in parallel)
* - exceptions thrown are r_Error, either directly or through common
* function generateException() (reladminif) which throws r_Ebase_dbms
*
************************************************************/
using namespace std;
static const char rcsid[] = "@(#)blobif,BLOBTile: $Id: blobtile.ec,v 1.8 2003/12/27 23:19:03 rasdev Exp $";
#include
#include /* atoi */
// PG stuff:
#include "libpq-fe.h" /* C interface to PgSQL */
#include "libpq/libpq-fs.h" /* large object (lo) api */
// simple trace facility
// trace macros are activated through externally defined compile variable DEBUG
#include "debug-srv.hh"
// general embedded SQL related definitions
EXEC SQL include "../reladminif/sqlglobals.h";
// libpg connection maintenance
// must have been initiated before use (see databasif.pgc)
extern PGconn *pgConn;
// all the DBMS independent code is factored out and
// will be included in the resulting .c file
#include "blobtile.hh"
#include "simplefilestorage.hh"
#include "raslib/error.hh"
#include "externs.h"
#include "raslib/rmdebug.hh"
#include "sqlerror.hh"
#include "tileid.hh"
#include "inlinetile.hh"
#include "objectbroker.hh"
#include "dbref.hh"
IFileStorage* initFileStorage() {
SimpleFileStorage *fileStorage;
char *path = getenv("RASDATA");
if (path == NULL)
path="/tmp";
fileStorage = new SimpleFileStorage(path);
return fileStorage;
}
// update blob in ras_tiles table, identified by variable myOId (from blobtile.cc), update map ref
void
BLOBTile::updateInDb() throw (r_Error)
{
RMDBGENTER(3, RMDebug::module_blobif, "BLOBTile", "updateInDb() " << myOId);
ENTER( "BLOBTile::updateInDb" );
if (fileStorage == NULL)
fileStorage = initFileStorage();
Oid tile;
long indbmyoId = 0;
long indbmyoid = 0;
short dataformat = 0;
char pgQuery[SQL_QUERY_BUFFER_SIZE];
PGresult *pgResult = NULL;
// (1) --- get tuple
dataformat = dataFormat;
indbmyoid = myOId.getCounter();
fileStorage->update(cells, size, indbmyoid);
// alternative solution for now:
(void) snprintf( pgQuery, (size_t) sizeof(pgQuery), "UPDATE RAS_TILES SET DataFormat = %d WHERE BlobId = %d", dataformat, indbmyoid );
TALK( pgQuery );
pgResult = PQexec( pgConn, pgQuery );
if (PQresultStatus(pgResult) != PGRES_COMMAND_OK)
{
LEAVE( "BLOBTile::updateInDb() libpq 'update dataformat' error: " << PQerrorMessage(pgConn) );
PQclear( pgResult );
generateException();
}
// (4) --- update map ref
DBObject::updateInDb();
LEAVE( "BLOBTile::updateInDb, myOId=" << myOId );
RMDBGEXIT(3, RMDebug::module_blobif, "BLOBTile", "updateInDb() " << myOId);
} // updateInDb()
// insert new blob into ras_tiles table, update map ref
// tuple is identified by blobtile.cc var myOId
// data is taken from buffer 'cells' containing 'size' bytes
void
BLOBTile::insertInDb() throw (r_Error)
{
RMDBGENTER(3, RMDebug::module_blobif, "BLOBTile", "insertInDb() " << myOId);
ENTER( "BLOBTile::insertInDb" );
if (fileStorage == NULL)
fileStorage = initFileStorage();
Oid blobOid;
long indbmyOId2;
short dataformat2;
Oid tile2;
dataformat2 = dataFormat;
// prelim:
char pgQuery[SQL_QUERY_BUFFER_SIZE];
PGresult *pgResult = NULL;
indbmyOId2 = myOId.getCounter();
TALK( "myOId.getCounter = " << indbmyOId2 );
fileStorage->insert(cells, size, indbmyOId2);
// (2) --- insert tuple into db
(void) snprintf( pgQuery, (size_t) sizeof(pgQuery), "INSERT INTO RAS_TILES ( BlobId, DataFormat, Tile) VALUES ( %d, %d, %d )", indbmyOId2, dataformat2, indbmyOId2 );
TALK( pgQuery );
pgResult = PQexec( pgConn, pgQuery );
if (PQresultStatus(pgResult) != PGRES_COMMAND_OK)
{
LEAVE( "BLOBTile::insertInDb() libpq 'insert' error: " << PQerrorMessage(pgConn) );
PQclear( pgResult );
generateException();
}
PQclear( pgResult );
// FIXME: other sources have here an updateInDb();
// (3) --- update map ref
DBObject::insertInDb();
LEAVE( "BLOBTile::insertInDb(), myOId=" << myOId );
RMDBGEXIT(3, RMDebug::module_blobif, "BLOBTile", "insertInDb() " << myOId);
} // insertInDb()
// delete one tuple from ras_tiles table, update map ref
// tuple is identified by blobtile.cc var myOId
void
BLOBTile::deleteFromDb() throw (r_Error)
{
RMDBGENTER(3, RMDebug::module_blobif, "BLOBTile", "deleteFromDb() " << myOId);
ENTER( "BLOBTile::deleteFromDb" );
if (fileStorage == NULL)
fileStorage = initFileStorage();
long blobId; // blob tuple primary key
Oid blobOid; // blob oid "ptr"
// get counter value (primary key) from oid
blobId = myOId.getCounter();
fileStorage->remove(blobId);
PGresult *pgResult = NULL;
char pgQuery[SQL_QUERY_BUFFER_SIZE];
(void) snprintf( pgQuery, (size_t) sizeof(pgQuery), "DELETE FROM RAS_TILES WHERE BlobId = ", blobOid );
TALK( pgQuery );
pgResult = PQexec( pgConn, pgQuery );
if (PQresultStatus(pgResult) != PGRES_COMMAND_OK)
{
LEAVE( "BLOBTile::deleteFromDb() libpq 'delete tuple' error: " << PQerrorMessage(pgConn) );
PQclear( pgResult );
generateException();
}
PQclear( pgResult );
// FIXME: other sources have here an updateInDb();
// update map ref
DBObject::deleteFromDb();
LEAVE( "BLOBTile::deleteFromDb, myOId=" << myOId );
RMDBGEXIT(3, RMDebug::module_blobif, "BLOBTile", "deleteFromDb() " << myOId);
}
// delete a range of tuple(s) from ras_tiles table, update map ref
// tuples are identified by target and a range
void
BLOBTile::kill(const OId& target, unsigned int range)
{
RMDBGENTER(0, RMDebug::module_blobif, "BLOBTile", "kill(" << target << ", " << range <<")");
ENTER( "BLOBTile::kill, target=" << target << ", range=" << range );
long indbmyOId5;
long indbmyOId6;
long blobId; // blob tuple primary key
Oid blobOid; // blob oid "ptr"
char pgQuery[SQL_QUERY_BUFFER_SIZE];
PGresult *pgResult = NULL; // query result
DBObject* targetobj = NULL;
IFileStorage *fileStorage = initFileStorage();
if (range == 0) // single tuple
{
// (1) --- delete form cache
targetobj = ObjectBroker::isInMemory(target);
if (targetobj)
{
targetobj->setPersistent(false);
}
// (2) --- free blob
indbmyOId5 = target.getCounter();
fileStorage->remove(indbmyOId5);
(void) snprintf( pgQuery, (size_t) sizeof(pgQuery), "DELETE FROM RAS_TILES WHERE BlobId = %d", indbmyOId5 );
TALK( pgQuery );
pgResult = PQexec( pgConn, pgQuery );
if (PQresultStatus(pgResult) != PGRES_COMMAND_OK)
{
LEAVE( "BLOBTile::kill() libpq 'delete single' error: " << PQerrorMessage(pgConn) );
PQclear( pgResult );
generateException();
}
else if (PQntuples(pgResult) == 0)
{
if (target.getType() == OId::INLINETILEOID)
{
InlineTileId t(target);
if (!t.is_null())
t->setPersistent(false);
}
//else: this tile has been deleted before
}
PQclear( pgResult );
}
else
{
// (1) --- iterate over cache and remove
DBObjectPMap& mapRef = ObjectBroker::getMap(target.getType());
DBObjectPMap::iterator it = mapRef.begin();
DBObjectPMap::iterator theEnd = mapRef.end();
OId end(target.getCounter() + range, target.getType());
while (it != theEnd)
{
if (target <= (const OId&)(*it).first && (*it).first <= (const OId&)end)
{
(*it).second->setPersistent(false);
}
}
// (2) --- iterate over db and remove
(void) snprintf( pgQuery, (size_t) sizeof(pgQuery), "DECLARE DeleteLoop CURSOR FOR SELECT Tile FROM RAS_TILES WHERE %d <= BlobId AND BlobId <= %d", indbmyOId5, indbmyOId6 );
TALK( pgQuery );
pgResult = PQexec( pgConn, pgQuery );
if (PQresultStatus(pgResult) != PGRES_TUPLES_OK)
{
LEAVE( "BLOBTile::kill() libpq 'select for deleteLoop' error: " << PQerrorMessage(pgConn) );
PQclear( pgResult );
generateException();
}
// loop over elements & delete each one
do
{
(void) snprintf( pgQuery, (size_t) sizeof(pgQuery), "FETCH NEXT FROM DeleteLoop" );
TALK( pgQuery );
pgResult = PQexec( pgConn, pgQuery );
if (PQresultStatus(pgResult) != PGRES_TUPLES_OK)
{
LEAVE( "BLOBTile::kill() libpq 'fetch next' error: " << PQerrorMessage(pgConn) );
PQclear( pgResult );
generateException();
}
else if (PQntuples(pgResult) != 1)
{
LEAVE( "BLOBTile::kill() libpq 'fetch' did not yield 1 result but " << PQntuples(pgResult) );
PQclear( pgResult );
generateException();
}
blobOid = atoi( PQgetvalue( pgResult, 0, 0 ) ); // extract value from result
// delete blob identified by blobOid
fileStorage->remove(blobOid);
// FIXME: what about errors? continue??
} while (PQresultStatus(pgResult) != PGRES_TUPLES_OK);
PQclear( pgResult );
// (3) --- delete tuples in db
indbmyOId5 = target.getCounter();
indbmyOId6 = end.getCounter();
(void) snprintf( pgQuery, (size_t) sizeof(pgQuery), "DELETE FROM RAS_TILES WHERE %d <= BlobId AND BlobId <= %d", indbmyOId5, indbmyOId6 );
TALK( pgQuery );
pgResult = PQexec( pgConn, pgQuery );
if (PQresultStatus(pgResult) != PGRES_COMMAND_OK)
{
LEAVE( "BLOBTile::kill() libpq 'delete range' error: " << PQerrorMessage(pgConn) );
PQclear( pgResult );
generateException();
}
PQclear( pgResult );
}
LEAVE( "BLOBTile::kill" );
RMDBGEXIT(0, RMDebug::module_blobif, "BLOBTile", "kill(" << target << " " << target.getType() << ")");
}
// read tuple from ras_tiles, identified by blobtile.cc var myOId
// allocates necessary mem into ptr 'cells' and fills it; must be freed elsewhere
// external var 'size' is set to the number of bytes read
void
BLOBTile::readFromDb() throw (r_Error)
{
RMDBGENTER(3, RMDebug::module_blobif, "BLOBTile", "readFromDb() " << myOId);
ENTER( "BLOBTile::readFromDb" );
if (fileStorage == NULL)
fileStorage = initFileStorage();
#ifdef RMANBENCHMARK
DBObject::readTimer.resume();
#endif
Oid blobOid;
long indbmyOId3;
short dataformat3;
short indicatorr3;
PGresult *pgResult = NULL; // PostgreSQL call return values
indbmyOId3 = myOId.getCounter();
// (1) --- access tuple
char pgQuery[SQL_QUERY_BUFFER_SIZE];
(void) snprintf( pgQuery, (size_t) sizeof(pgQuery), "SELECT Tile, DataFormat FROM RAS_TILES WHERE BlobId = %d", indbmyOId3 );
TALK( pgQuery );
pgResult = PQexec( pgConn, pgQuery );
if (PQresultStatus(pgResult) != PGRES_TUPLES_OK)
{
LEAVE( "BLOBTile::readFromDb() libpq 'select' error: " << PQerrorMessage(pgConn) );
PQclear( pgResult );
generateException();
}
else if (PQntuples(pgResult) != 1)
{
LEAVE( "BLOBTile::readFromDb() libpq 'select' did not yield 1 result but " << PQntuples(pgResult) );
PQclear( pgResult );
throw r_Error(r_Error::r_Error_ObjectUnknown);
}
blobOid = atoi( PQgetvalue( pgResult, 0, 0 ) ); // extract 1st value from result
dataformat3 = atoi( PQgetvalue( pgResult, 0, 1 ) ); // extract 2nd value from result
PQclear( pgResult );
// we have a tuple, extract data format
dataFormat = (r_Data_Format)dataformat3;
currentFormat = (r_Data_Format)dataformat3;
TALK( "got dataFormat " << dataFormat );
fileStorage->retrieve(indbmyOId3, &cells, &size);
RMDBGIF(20, RMDebug::module_blobif, "BLOBTileOutput", for (int a = 0; a < size; a++)\
RMInit::dbgOut << " " << hex << (int)(cells[a]); RMInit::dbgOut << dec << endl;)
DBObject::readFromDb();
#ifdef RMANBENCHMARK
DBObject::readTimer.pause();
#endif
LEAVE( "BLOBTile::readFromDb" );
RMDBGEXIT(3, RMDebug::module_blobif, "BLOBTile", "readFromDb() " << myOId);
}