/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community. If not, see .
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see
* or contact Peter Baumann via .
*/
#include "indexmgr/srptindexlogic.hh"
#include "raslib/rmdebug.hh"
#include "tilemgr/tile.hh"
#include "raslib/point.hh"
#include "indexmgr/sdirindexlogic.hh"
#include
#include "indexmgr/keyobject.hh"
#include "relindexif/hierindex.hh"
#include "reladminif/dbref.hh"
#include "relindexif/indexid.hh"
const float ff = 0.5;
//removes all entries from a index and inserts them into a vector
void
clear(KeyObjectVector& keyvec, HierIndexDS* node)
{
RMDBGENTER(7, RMDebug::module_indexmgr, "SRPTIndexLogic", "clear(keyvec, " << OId(node->getIdentifier()) << ")");
unsigned int i = 0;
unsigned int nodeSize = node->getSize();
keyvec.reserve(nodeSize);
while (!keyvec.empty())
{
keyvec.erase(keyvec.begin());
}
for(i = 0; i < nodeSize; i++)
{
keyvec.push_back(node->getObject(0));
node->removeObject((unsigned int)0);
}
RMDBGEXIT(7, RMDebug::module_indexmgr, "SRPTIndexLogic", "clear(keyvec, " << OId(node->getIdentifier()) << ")");
}
/*this may be usefull when getting rid of the extendFaces method
unsigned int
findNearestNode(IndexDS* whereToLook, const r_Minterval& theEntryDomain)
{
r_Minterval sum(theEntryDomain.dimension());
//this code is not used -> no pror
r_Area smallestArea = (r_Area)0xFFFFFFFF;
unsigned int smallestAreaAt = 0;
unsigned long currentArea = 0;
unsigned int i = 0;
unsigned int numElems = whereToLook->getSize();
for (i = 0; i < numElems; i++)
{
currentArea = sum.closure_of(whereToLook->getObjectDomain(i), theEntryDomain).cell_count();
if (currentArea <= smallestArea)
{
smallestArea = currentArea;
smallestAreaAt = i;
}
}
return smallestAreaAt;
}
*/
bool
SRPTIndexLogic::insertObject2(IndexDS* ixDS, const KeyObject& newKeyObject, const StorageLayout& sl)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "insertObject2(" << OId(ixDS->getIdentifier()) << ", " << newKeyObject << ")")
IndexPVector leafNodes2Split;
r_Minterval newKeyObjectDom;
r_Minterval cd;
bool extend = false;
bool* facesToExtendLo = NULL;
bool* facesToExtendHi = NULL;
r_Dimension i = 0;
r_Dimension dim = 0;
unsigned int overflowed = 0;
if (ixDS->getSize() == 0)
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "Index is empty. only set domain");
ixDS->setAssignedDomain(newKeyObject.getDomain());
}
else {
// initialize facesToExtend
newKeyObjectDom = newKeyObject.getDomain();
cd = ixDS->getCoveredDomain();
dim = cd.dimension();
extend = false;
facesToExtendLo = new bool[dim];
facesToExtendHi = new bool[dim];
for(i = 0; i < dim; i++)
{
if (newKeyObjectDom[i].low() < cd[i].low())
{
facesToExtendLo[i] = true;
extend = true;
}
else
facesToExtendLo[i] = false;
if (newKeyObjectDom[i].high() > cd[i].high())
{
facesToExtendHi[i] = true;
extend = true;
}
else
facesToExtendHi[i] = false;
}
// Implementation note:
// the extension of faces could be integrated with the insertObject()
// function and result in a more efficient but more complex implementation
// (a nonrecursive extendFaces() woul be needed and insertObject()
// changed to have additional parameters oldCurrDom, facesToExtendLo,
// facesToExtendHi).
// The simple solution was chosen, where the whole tree is first updated
// for the new borders and then the insertion is made.
// Another solution would be the implementation of the external borders as
// infinite. Each node would have to have both a current domain and a domain.
if (extend)
{
SRPTIndexLogic::extendFaces((HierIndexDS*)ixDS, newKeyObjectDom, cd, facesToExtendLo, facesToExtendHi);
}
else {
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "no need to extend faces");
}
delete[] facesToExtendLo;
facesToExtendLo = NULL;
delete[] facesToExtendHi;
facesToExtendHi = NULL;
}
// call recursive insertObject()
overflowed = SRPTIndexLogic::insertObject(newKeyObject, (HierIndexDS*)ixDS, leafNodes2Split, sl);
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "number of leaf overflows " << leafNodes2Split.size())
if (!leafNodes2Split.empty())
SRPTIndexLogic::splitNodes((HierIndexDS*)ixDS, leafNodes2Split, sl);
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "insertObject2(" << OId(ixDS->getIdentifier()) << ", " << newKeyObject << ")")
//there should be a check here : )
return true;
}
void
SRPTIndexLogic::intersect2(const IndexDS* ixDS, const r_Minterval& searchInter, KeyObjectVector& intersectedObjs, const StorageLayout& sl)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "intersect2(" << OId(ixDS->getIdentifier()) << ", " << searchInter << ")")
r_Minterval dom = ixDS->getCoveredDomain();
r_Area area = 0;
// avoid exceptions from r_Minterval
if (searchInter.intersects_with(dom))
{
//this is neccessary because intersectNoDuplicats would think there were other index nodes that cover this area.
r_Minterval searchDom = searchInter.create_intersection(dom);
//needed this parent domain, or else indexes with one level only don't work
area = searchDom.cell_count();
SRPTIndexLogic::intersect(searchDom, dom, intersectedObjs, (const HierIndexDS*)ixDS, area);
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "intersect2(" << OId(ixDS->getIdentifier()) << "*, " << searchInter << ") found #" << intersectedObjs.size() << " matches");
}
else {
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "intersect2(" << OId(ixDS->getIdentifier()) << "* dom " << dom << ", " << searchInter << ") don't intersect");
}
}
void
SRPTIndexLogic::containPointQuery2(const IndexDS* ixDS, const r_Point& searchPoint, KeyObject& result, const StorageLayout& sl)
{
SRPTIndexLogic::containPointQuery(searchPoint, (const HierIndexDS*)ixDS, result, sl);
}
void
SRPTIndexLogic::getObjects(const IndexDS* ixDS, KeyObjectVector& objs, const StorageLayout& sl)
{
// can be optimized !!!
intersect2((const HierIndexDS*)ixDS, ixDS->getCoveredDomain(), objs, sl);
}
int
SRPTIndexLogic::insertObject( const KeyObject& newKeyObject,
HierIndexDS* ix,
IndexPVector& leafNodes2Split,
const StorageLayout& sl)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "insertObject(" << newKeyObject << ", " << OId(ix->getIdentifier()) << ", leafNodes2Split.size " << leafNodes2Split.size() << ")")
int overflowed = 0;
if (ix->isLeaf())
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "ix is Leaf")
//this is new
r_Minterval oldDom = ix->getCoveredDomain();
SDirIndexLogic::insertObject(ix, newKeyObject, sl);
ix->setAssignedDomain(oldDom);
// no node overflow, simply insert newEntry
if (ix->isOverFull())
{// node overflow: add node to list of nodes to split
overflowed = 1;
leafNodes2Split.push_back(ix);
}
else {
if (!ix->isRoot())
{
ix->destroy();
}
}
}
else {
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "ix is Node")
KeyObjectVector intersectedNodes;
KeyObjectVector::iterator nodeIt;
SDirIndexLogic::intersect(ix, newKeyObject.getDomain(), intersectedNodes, sl);
while (!intersectedNodes.empty())
{
overflowed = overflowed + insertObject(newKeyObject, DBHierIndexId((*(intersectedNodes.begin())).getObject()).ptr(), leafNodes2Split, sl);
intersectedNodes.erase(intersectedNodes.begin());
}
}
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "insertObject(" << newKeyObject << ", ix, leafNodes2Split.size " << leafNodes2Split.size() << ") " << overflowed)
return overflowed;
}
void
SRPTIndexLogic::extendFaces( HierIndexDS* ix,
const r_Minterval& newKeyObjectDom,
const r_Minterval& oldCurrDom,
const bool* facesToExtendLo,
const bool* facesToExtendHi)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "extendFaces(" << OId(ix->getIdentifier()) << ", " << newKeyObjectDom << ", " << oldCurrDom << ", facesToExtendLo, facesToExtendHi)");
bool extendEntries = false;
unsigned int numberElems = 0;
r_Dimension dim = 0;
r_Dimension i = 0;
r_Dimension d = 0;
r_Minterval entryDom;
bool follow = false;
r_Minterval ixDom = oldCurrDom;//ix->getCoveredDomain();
if (ix->isLeaf())
{
if (!(ix->isRoot())) // nothing to do!!!
{//this entry's domain was set in the previous call of extendFaces
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", OId(ix->getIdentifier()) << " is Leaf and not Root - already updated");
ix->destroy();
ix = 0;
}
else {// ix is both leaf and root, one node only!! must update domain
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", OId(ix->getIdentifier()) << " is Leaf and Root - update domain");
ixDom.closure_with(newKeyObjectDom);
ix->setAssignedDomain(ixDom);
}
}
else {
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", OId(ix->getIdentifier()) << " is Node");
dim = newKeyObjectDom.dimension();
for (i = 0; i < dim; i++)
{
if (facesToExtendLo[i] && (ixDom[i].low() == oldCurrDom[i].low()))
{
ixDom[i].set_low(newKeyObjectDom[i].low());
extendEntries = true;
}
if (facesToExtendHi[i] && (ixDom[i].high() == oldCurrDom[i].high()))
{
ixDom[i].set_high(newKeyObjectDom[i].high());
extendEntries = true;
}
}
if (extendEntries)
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "must extend entries");
numberElems = ix->getSize();
for (i = 0; i < numberElems; i++)
{
follow = false;
entryDom = ix->getObjectDomain(i);
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "Entry #" << i << " has domain " << entryDom);
for(d = 0; d < dim; d++)
{
if (facesToExtendLo[d] && (entryDom[d].low() == oldCurrDom[d].low()))
{
entryDom[d].set_low(newKeyObjectDom[d].low());
follow = true;
}
if (facesToExtendHi[d] && (entryDom[d].high() == oldCurrDom[d].high()))
{
entryDom[d].set_high(newKeyObjectDom[d].high());
follow = true;
}
}
if (follow)
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "Entry #" << i << " must be extended to " << entryDom);
HierIndexDS* child = convert(ix->getObject(i));
extendFaces(child, newKeyObjectDom, oldCurrDom, facesToExtendLo, facesToExtendHi);
ix->setObjectDomain(entryDom, i);
}
}
}
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "new Domain of " << OId(ix->getIdentifier()) << " is " << ix->getCoveredDomain());
if (!ix->isRoot())
{
ix->destroy();
ix =0;
}
}
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "extendFaces(ixMayBeNull, " << newKeyObjectDom << ", " << oldCurrDom << ", facesToExtendLo, facesToExtendHi)");
}
void
SRPTIndexLogic::splitNodes(HierIndexDS* ixDS, IndexPVector& leafNodes2Split, const StorageLayout& sl)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "splitNodes(" << OId(ixDS->getIdentifier()) << ", leafNodes2Split)")
HierIndexDS* parentIxDS = NULL;
HierIndexDS* leafNodeIxDS = NULL;
HierIndexDS* n1 = NULL;
HierIndexDS* n2 = NULL;
HierIndexDS* nln1 = NULL; // non leaf nodes
HierIndexDS* nln2 = NULL; // non leaf nodes
HierIndexDS* tempPar = NULL;
r_Dimension axis = 0;
r_Range value = 0;
int parentOverflowed = 0;
KeyObjectVector keyvec;
KeyObject nodekey1;
KeyObject nodekey2;
r_Minterval domain;
bool wasroot = false;
bool found = false;
r_Dimension dim = ixDS->getDimension();
unsigned int numElem = 0;
unsigned int cur = 0;
while (!leafNodes2Split.empty())
{
leafNodeIxDS = (HierIndexDS*)leafNodes2Split[0];
leafNodes2Split.erase(leafNodes2Split.begin());
wasroot = leafNodeIxDS->isRoot();
if (wasroot)
domain = leafNodeIxDS->getCoveredDomain();
else {
tempPar = leafNodeIxDS->getParent();
KeyObject tkey;
numElem = tempPar->getSize();
for (cur = 0; cur < numElem; cur++)
{
tkey = tempPar->getObject(cur);
if (((OId::OIdPrimitive)tkey.getObject().getOId()) == leafNodeIxDS->getIdentifier())
{
domain = tkey.getDomain();
found = true;
break;
}
}
if (!found)
{
RMInit::logOut << "SRPTIndexLogic::splitNodes() the leaf node to split was not found in its parent" << endl;
throw r_Error(INDEXNOTFOUNDINPARENT);
}
tempPar->destroy();
tempPar = NULL;
}
calculatePartition(axis, value, leafNodeIxDS);
clear(keyvec, leafNodeIxDS);
n1 = (HierIndexDS*)leafNodeIxDS->getNewInstance();
if (wasroot)
{
parentIxDS = leafNodeIxDS;
n2 = (HierIndexDS*)leafNodeIxDS->getNewInstance();
leafNodeIxDS->setIsNode(true);
leafNodeIxDS = NULL;
}
else {
parentIxDS = leafNodeIxDS->getParent();
n2 = leafNodeIxDS;
leafNodeIxDS = NULL;
}
splitLeaf(n1, n2, keyvec, axis, value, domain, sl);
nodekey1 = convert(n1);
nodekey2 = convert(n2);
if (!wasroot)
parentIxDS->removeObject(nodekey2);
SDirIndexLogic::insertObject(parentIxDS, nodekey1, sl);
SDirIndexLogic::insertObject(parentIxDS, nodekey2, sl);
parentOverflowed = parentIxDS->isOverFull();
n1->destroy();
n1 = NULL;
n2->destroy();
n2 = NULL;
while (parentOverflowed) // split up
{
wasroot = parentIxDS->isRoot();
domain = parentIxDS->getAssignedDomain();
calculatePartition(axis, value, parentIxDS);
clear(keyvec, parentIxDS);
nln1 = (HierIndexDS*)parentIxDS->getNewInstance();
if (wasroot)
{
nln2 = (HierIndexDS*)parentIxDS->getNewInstance();
}
else {
nln2 = parentIxDS;
parentIxDS = parentIxDS->getParent();
}
splitNonLeaf(nln1, nln2, keyvec, leafNodes2Split, axis, value, domain, sl);
nodekey1 = convert(nln1);
nodekey2 = convert(nln2);
if (!wasroot)
parentIxDS->removeObject(nodekey2);
SDirIndexLogic::insertObject(parentIxDS, nodekey1, sl);
SDirIndexLogic::insertObject(parentIxDS, nodekey2, sl);
parentOverflowed = parentIxDS->isOverFull();
nln1->destroy();
nln1 = NULL;
nln2->destroy();
nln2 = NULL;
}
if (parentIxDS && (parentIxDS != ixDS))
{
parentIxDS->destroy();
parentIxDS = NULL;
}
}
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "splitNodes(" << OId(ixDS->getIdentifier()) << ", leafNodes2Split)")
}
void
SRPTIndexLogic::splitLeaf( HierIndexDS* pd1,
HierIndexDS* pd2,
KeyObjectVector& keyvec,
r_Dimension axis,
r_Range value,
r_Minterval& domain,
const StorageLayout& sl)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "splitLeaf(" << OId(pd1->getIdentifier()) << ", " << OId(pd2->getIdentifier()) << ", keyvec, " << axis << ", " << value << ", " << domain << ")")
unsigned int i = 0;
unsigned int leafSize = keyvec.size();
r_Minterval cd(domain);
r_Minterval nd1 = cd;
r_Minterval nd2 = cd;
KeyObject obj;
nd1[axis].set_high(value - 1);
nd2[axis].set_low(value);
RMDBGMIDDLE(6, RMDebug::module_indexmgr, "SRPTIndexLogic", "old leaf domain " << cd << " partition 1 " << nd1 << " partition 2 " << nd2);
//populate two nodes
for(i = 0; i < leafSize; i++)
{
cd = keyvec[i].getDomain();
RMDBGMIDDLE(5, RMDebug::module_indexmgr, "SRPTIndexLogic", "ObjectDomain of Object #" << i << " is " << cd);
obj = keyvec[i];
if (nd1.intersects_with(cd))
{
SDirIndexLogic::insertObject(pd1, obj, sl);
}
if (nd2.intersects_with(cd))
{
SDirIndexLogic::insertObject(pd2, obj, sl);
}
//sanity check
RMDBGIF(0, RMDebug::module_indexmgr, "SRPTIndexLogic", \
if (!nd1.intersects_with(cd) && !nd2.intersects_with(cd)) \
{ \
RMInit::logOut << "SRPTIndexLogic::splitLeaf() the entry does not intersect with any node: node 1 " << nd1 << " node 2 " << nd2 << " entry " << cd << endl; \
throw r_Error(TILE_NOT_INSERTED_INTO_INDEX); \
} )
}
pd1->setAssignedDomain(nd1);
pd2->setAssignedDomain(nd2);
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "splitLeaf(" << OId(pd1->getIdentifier()) << ", " << OId(pd2->getIdentifier()) << ", keyvec, " << axis << ", " << value << ", " << domain << ")")
}
void
SRPTIndexLogic::splitNonLeaf( HierIndexDS* pd1,
HierIndexDS* pd2,
KeyObjectVector& keyvec,
IndexPVector& leafNodes2Split,
r_Dimension axis,
r_Range value,
const r_Minterval& domain,
const StorageLayout& sl)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "splitNonLeaf(" << OId(pd1->getIdentifier()) << ", " << OId(pd2->getIdentifier()) << ", keyvec, leafNodes2Split, " << axis << ", " << value << ", " << domain << ")");
r_Dimension dim = domain.dimension();
r_Minterval cd(domain);
r_Minterval nd1(cd);
r_Minterval nd2(cd);
r_Minterval leafDomain(dim);
r_Minterval nodeDomain(dim);
KeyObjectVector listMinKO1;
KeyObjectVector listMinKO2;
KeyObjectVector keyvec2;
IndexPVector newLeafsToSplit;
HierIndexDS* entry = NULL;
HierIndexDS* n11 = NULL;
HierIndexDS* parentIxDS = NULL;
KeyObject tempKey;
KeyObject k11;
KeyObject k22;
unsigned int i = 0;
unsigned int a = 0;
unsigned int nodeSize = keyvec.size();
unsigned int leafSize = 0;
nd1[axis].set_high(value - 1);
nd2[axis].set_low(value);
//repopulate node and pd1
for(i = 0; i < nodeSize; i++)
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "repopulating node (entry " << i << " of " << nodeSize << ")");
tempKey = keyvec[i];
entry = convert(tempKey);
// entry's domain
cd = keyvec[i].getDomain();
if (nd1.covers(cd))
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "entry #" << i << " " << cd << " covers node 1 " << nd1);
// updates parent automatically
SDirIndexLogic::insertObject(pd1, tempKey, sl);
entry->destroy();
entry = 0;
}
else {
if (nd2.covers(cd))
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "entry #" << i << " " << cd << " covers node 2 " << nd2);
// updates parent automatically
SDirIndexLogic::insertObject(pd2, tempKey, sl);
entry->destroy();
entry = NULL;
}
else {// intersects both -> split down
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "entry #" << i << " " << cd << " intersects both " << nd1 << " " << nd2);
n11 = NULL;
//k11 is not a pointer! it is an object
// k11 = 0;
if (entry->isLeaf())
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "entry is leaf ")
n11 = (HierIndexDS*)entry->getNewInstance();
n11->setIsNode(false);
leafDomain = cd;
clear(keyvec2, entry);
splitLeaf(n11, entry, keyvec2, axis, value, leafDomain, sl);
//if this was one of the leaf nodes to split, remove it from the list
leafSize = leafNodes2Split.size();
newLeafsToSplit = vector< IndexDS* >();
for (a = 0; a < leafSize; a++)
{
if (leafNodes2Split[a]->isSameAs(entry))
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "will not add entry " << a << " to leafNodes2Split.size " << leafSize)
}
else {
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "will add entry " << a << " to leafNodes2Split.size " << leafSize)
newLeafsToSplit.push_back(leafNodes2Split[a]);
}
}
leafNodes2Split = newLeafsToSplit;
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "new LeafsToSplit size:" << leafNodes2Split.size())
}
else {// nonleaf node to be split
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "entry is nonleaf ")
n11 = (HierIndexDS*)entry->getNewInstance();
n11->setIsNode(true);
nodeDomain = cd;
parentIxDS = entry->getParent();
clear(keyvec2, entry);
splitNonLeaf(n11, entry, keyvec2, leafNodes2Split, axis, value, nodeDomain, sl);
k22 = convert(entry);
parentIxDS->removeObject(k22);
parentIxDS->destroy();
parentIxDS = NULL;
}
//n11 and entry are allocated
//n11 and entry are not inserted in a parent!
k11 = convert(n11);
if (n11->isUnderFull() || n11->isLeaf())
{//leaf or node with more than minfill entries
SDirIndexLogic::insertObject(pd1, k11, sl);
if (n11->isLeaf() && n11->isOverFull())
{//very improbable that this happens
//leaf with more than maxfill entries
leafNodes2Split.push_back(n11);
}
else {//node or leaf with ok entries
n11->destroy();
n11 = NULL;
}
}
else {//node with less than minfill entries
listMinKO1.push_back(k11);
//k11 is deleted in redistribute
n11->destroy();
n11 = NULL;
}
k22 = convert(entry);
if (entry->isUnderFull() || entry->isLeaf())
{
SDirIndexLogic::insertObject(pd2, k22, sl);
if (entry->isLeaf() && entry->isOverFull())
{// very improbable that this happens
leafNodes2Split.push_back(entry);
}
else {
entry->destroy();
entry = 0;
}
}
else {
listMinKO2.push_back(k22);
entry->destroy();
entry = 0;
//k22 is deleted in redistribute
//where is entry deleted
}
}
}
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "ended repopulating node (entry " << i << " of " << nodeSize << ")");
}
redistributeEntries(pd1, listMinKO1, sl);
redistributeEntries(pd2, listMinKO2, sl);
pd1->setAssignedDomain(nd1);
pd2->setAssignedDomain(nd2);
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "splitNonLeaf(" << OId(pd1->getIdentifier()) << ", " << OId(pd2->getIdentifier()) << ", keyvec, leafNodes2Split, " << axis << ", " << value << ", " << domain << ")");
}
void
SRPTIndexLogic::redistributeEntries(IndexDS* node, KeyObjectVector& listMinKO, const StorageLayout& sl)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "redistributeEntries(" << OId(node->getIdentifier()) << ", redistList.size " << listMinKO.size() << ")")
// not implemented. It could redistribute objects in case of too low fill factor
unsigned int size = listMinKO.size();
for(int i = 0; i < size; i++)
{
SDirIndexLogic::insertObject(node, listMinKO[i], sl);
}
listMinKO.clear();
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "redistributeEntries(" << OId(node->getIdentifier()) << ", redistList.size " << listMinKO.size() << ")")
}
bool
SRPTIndexLogic::removeObject(IndexDS* ixDS, const KeyObject& objToRemove, const StorageLayout& sl)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "removeObject(" << OId(ixDS->getIdentifier()) << ", " << objToRemove << ")")
bool found = false;
if (ixDS->getAssignedDomain().intersects_with(objToRemove.getDomain()))
{
if (((HierIndexDS*)ixDS)->isLeaf())
if (ixDS->removeObject(objToRemove))
{
found = true;
}
else {
RMDBGMIDDLE(0, RMDebug::module_indexmgr, "SRPTIndexLogic", "removeObject(" << ixDS->getAssignedDomain() << ", " << objToRemove.getDomain() << ") object was not found")
}
else {
KeyObjectVector candidates;
SDirIndexLogic::intersectUnOpt(ixDS, objToRemove.getDomain(), candidates);
for (KeyObjectVector::iterator it = candidates.begin(); it != candidates.end(); it++)
{
if (SRPTIndexLogic::removeObject((HierIndexDS*)DBHierIndexId((*it).getObject()), objToRemove, sl))
found = true;
else {
RMDBGMIDDLE(0, RMDebug::module_indexmgr, "SRPTIndexLogic", "removeObject(" << ixDS->getAssignedDomain() << ", " << objToRemove.getDomain() << ") did not remove an entry in a node")
}
}
}
}
else {
RMDBGMIDDLE(0, RMDebug::module_indexmgr, "SRPTIndexLogic", "removeObject(" << ixDS->getAssignedDomain() << ", " << objToRemove.getDomain() << ") did not intersect")
}
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "removeObject(" << OId(ixDS->getIdentifier()) << ", const KeyObject*)")
return found;
}
void
SRPTIndexLogic::calculatePartition(r_Dimension& axis, r_Range& value, const HierIndexDS* node)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "calculatePartition(" << axis << ", " << value << ", " << OId(node->getIdentifier()) << ")");
float bestDist1 = 1;
float bestDist2 = 1;
double bestDistBal = 1;
r_Dimension dim = node->getDimension();
r_Dimension first = 0;//rand()%dim;
r_Dimension a = first;
unsigned int elemCount = node->getSize();
r_Range v = -1;
float dist1 = 0;
float dist2 = 0;
double distBal = 0;
while (true)
{
for (int i = 0; i < elemCount; i++)
{
v = node->getObjectDomain(i)[a].low();
calculateDistribution(a, v, dist1, dist2, node);
// balanced property of this split distribution: how close it
// is to 50%, 50% distribution. Worst case: 1.
distBal = fabs(dist1 - 0.5) + fabs(dist2 - 0.5);
if (distBal < bestDistBal)// less overlapping in number of entries;add other conditions !!!
{
bestDist1 = dist1;
bestDist2 = dist2;
bestDistBal = fabs(bestDist1 - 0.5) + fabs(bestDist2 - 0.5);
axis = a;
value = v;
}
}
a = (a+1) % dim;
if (a == first)
break;
}
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "calculatePartition(" << axis << ", " << value <<", " << OId(node->getIdentifier()) << ")")
}
void
SRPTIndexLogic::calculateDistribution( r_Dimension axis,
r_Range value,
float& dist1,
float& dist2,
const HierIndexDS* node)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "calculateDistribution(" << axis <<", " << value << ", Dist1, Dist2, " << OId(node->getIdentifier()) << ")")
dist1 = 0;
dist2 = 0;
unsigned int n = node->getSize();
r_Minterval dom;
for (unsigned int i = 0; i < n ; i++)
{
dom = node->getObjectDomain(i);
if (dom[axis].high() < value)
{// entry will fall in first part only
dist1 +=1;
RMDBGMIDDLE(7, RMDebug::module_indexmgr, "SRPTIndexLogic", "Entry goes into the first");
}
else {// entry will fall in first part only
if (dom[axis].low() >= value)
{
dist2 +=1;
RMDBGMIDDLE(7, RMDebug::module_indexmgr, "SRPTIndexLogic", "Entry goes into the second");
}
else {// entry will fall in both parts
dist1 +=1;
dist2 +=1;
RMDBGMIDDLE(7, RMDebug::module_indexmgr, "SRPTIndexLogic", "Entry goes into the first and second");
}
}
}
dist1 = dist1/n;
dist2 = dist2/n;
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "calculateDistribution(" << axis <<", " << value << ", " << dist1 << ", " << dist2 << ", " << OId(node->getIdentifier()) << ")")
}
void
SRPTIndexLogic::intersect( const r_Minterval& searchInter,
const r_Minterval& parentEntryDomain,
KeyObjectVector& intersectedObjs,
const HierIndexDS* ix,
r_Area& area)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "intersect(SearchDom:" << searchInter << ", ParentDom:" << parentEntryDomain << ", intersectedObjects.size " << intersectedObjs.size() << ", " << OId(ix->getIdentifier()) << ", Area " << area << ") ")
r_Minterval intersectArea;
r_Minterval dom = ix->getCoveredDomain();
KeyObjectVector intersectedNodes;
HierIndexDS* tempIx = NULL;
r_Area nodeArea = 0;
r_Area oldArea = area;
unsigned int i = 0;
unsigned int nodeSize = ix->getSize();
if (ix->isLeaf())
{
//are there cells which belong into the result?
if (searchInter.intersects_with(dom))
{
intersectArea = searchInter.create_intersection(dom);
//may this leaf put cells into the result?
if (intersectArea.intersects_with(parentEntryDomain))
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "searchDom " << searchInter << " indexDom " << dom << " intersection " << intersectArea << " area " << area);
binaryRegionSearch(ix, searchInter, area, intersectedObjs, 0, nodeSize - 1, dom);//parentEntryDomain);
}
}
}
else {// node is not a Leaf
if (searchInter.intersects_with(dom))
{
nodeArea = area;
binaryRegionSearch(ix, searchInter, nodeArea, intersectedNodes, 0, nodeSize - 1, dom);
for (i = 0; i < intersectedNodes.size(); i++)
{
if (area == 0)
{
RMDBGMIDDLE(0, RMDebug::module_indexmgr, "SRPTIndexLogic", "intersect AREA IS ALREADY FOUND")
}
if (nodeArea > oldArea)
{
RMInit::logOut << "SRPTIndexLogic::intersect() the index found more cells than allowed" << endl;
throw r_Error(INDEXEXHAUSTEDAREA);
}
r_Minterval objDom(intersectedNodes[i].getDomain());
tempIx = convert(intersectedNodes[i]);
intersect(searchInter, objDom, intersectedObjs, tempIx, area);
tempIx->destroy();
}
}
}
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "intersect(SearchDom:" << searchInter << ", ParentDom:" << parentEntryDomain << ", intersectedObjects.size " << intersectedObjs.size() << ", " << OId(ix->getIdentifier()) << ", Area " << area << ")")
}
bool
SRPTIndexLogic::intersectNoDuplicates( const r_Minterval& searchInter,
const r_Minterval& entryDomain,
const r_Minterval& parentEntryDomain)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "intersectNoDuplicates(SearchDom:" << searchInter << ", EntryDom:" << entryDomain << ", ParentDom:" << parentEntryDomain << ")");
bool retval = true;
int testcase = 0;
r_Dimension i = 0;
r_Dimension dim = entryDomain.dimension();
// This condition allows an early detection of duplicates in the
// index structure for intersection operations.
// An entry of a leaf node is only added to the list of entries
// intersected by that node having it internally with respect to the
// parent (since no other node may have it then) or if it crosses
// the upper bounds of the parent node (it will be also crossed by
// another parent at the upper bounds and it is added to the list then).
// don't add the entry if it doesn't intersect the search area
// or if it intersects a lower bound of the parent's
// domain and that lower bound of the parent is higher than
// that of the search region
r_Range searchLo = 0;
r_Range entryLo = 0;
for (i = 0 ; i < dim; i++)
{
searchLo = searchInter[i].low();
entryLo = entryDomain[i].low();
// entry doesn't intersect search region
if (entryLo > searchInter[i].high())
{
retval = false;
break;
}
// entry doesn't intersect search region
if (entryDomain[i].high() < searchLo)
{
retval = false;
break;
}
// entry is also in another node where it intersects the higher bounds,
// it should be included then, not here
if (entryLo < parentEntryDomain[i].low() && searchLo < parentEntryDomain[i].low())
{
retval = false;
break;
}
}
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "intersectNoDuplicates(SearchDom:" << searchInter << ", EntryDom:" << entryDomain << ", ParentDom:" << parentEntryDomain << ")" << retval << " testcase " << testcase << " at dim " << i);
return retval;
}
int
SRPTIndexLogic::regionSearch( const HierIndexDS* ixNode,
const r_Minterval& mint,
r_Area& area,
KeyObjectVector& intersectedObjects,
const r_Minterval& parentEntryDomain)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "regionSearch(" << OId(ixNode->getIdentifier()) << ", SearchDom " << mint << ", Area " << area << ", intersectedObjects.size " << intersectedObjects.size() << ", ParentDom " << parentEntryDomain << ")");
r_Minterval intersectedRegion;
unsigned int endAt = ixNode->getSize();
int retval = endAt;
KeyObject newObj;
r_Minterval objDomain;
unsigned int i = 0;
r_Area oldArea = area;
/*there must be something like or the map version
DomainMap t;
DomainMap::iterator it;
for (i = 0; i < intersectedObjects->size(); i++)
{
DomainPair p((*intersectedObjects)[i]->getObject().getOId(), (*intersectedObjects)[i]->getDomain());
t.insert(p);
}
*/
for (i = 0; i < endAt ; i++)
{
objDomain = ixNode->getObjectDomain(i);
// object intersects region
// problem with calculation of complete area when the entry is not added.
/*there must be something like or the map version
if (objDomain.intersects_with(mint))
{
objDomain.intersection_with(mint);
area = area - objDomain.cell_count();
if (area <= 0)
{
retval = i;
break;
}
//insert intersectNoDuplicates here
}
*/
if (intersectNoDuplicates(mint, objDomain, parentEntryDomain))
{
/*there must be something like or the map version
if ((it = t.find(ixNode->getObject(i)->getObject().getOId())) == t.end())
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "adding " << ixNode->getObject(i)->getObject().getOId() << " intersected region " << intersectedRegion << " area " << area);
}
else {
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "not adding " << ixNode->getObject(i)->getObject().getOId() << " area " << area);
RMInit::logOut << "should never happen" << endl;
RMInit::logOut << "want to add " << ixNode->getObject(i)->getObject().getOId() << " at " << objDomain << endl;
for (it = t.begin(); it != t.end(); it++)
RMInit::logOut << OId((*it).first) << " at " << (*it).second << endl;
throw r_Error(TESTERROR);
}
*/
objDomain.intersection_with(mint);
area = area - objDomain.cell_count();
newObj = ixNode->getObject(i);
intersectedObjects.push_back(newObj);
if (oldArea < area)
{
retval = i;
RMInit::logOut << "SRPTIndexLogic::regionSearch() the area was completely exhausted" << endl;
throw r_Error(INDEXEXHAUSTEDAREA);
break;
}
if (area == 0)
{
retval = i;
break;
}
}
else {
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "not adding " << ixNode->getObject(i).getObject().getOId() << " dom " << objDomain << " does not intersect")
}
}
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "regionSearch(" << OId(ixNode->getIdentifier()) << ", SearchDom " << mint << ", Area " << area << ", intersectedObjects.size " << intersectedObjects.size() << ", ParentDom " << parentEntryDomain << ")" << retval);
return retval;
}
int
SRPTIndexLogic::binaryRegionSearch( const HierIndexDS* ixNode,
const r_Minterval& mint,
r_Area& area,
KeyObjectVector& intersectedObjects,
int first,
int last,
const r_Minterval& parentEntryDomain)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "binaryRegionSearch(" << OId(ixNode->getIdentifier()) << ", SearchDom " << mint << ", Area " << area << ", intersectedObjs.size " << intersectedObjects.size() << ", From " << first << ", To " << last << ", ParentDom " << parentEntryDomain << ")");
// code copied from DirIx::binaryRegionSearch (11.11.98)
// and further adapted
// assumes order according to the lowest corner of the objects
int retval = 0;
int middle = 0;
int inc = 0;
int ix = 0;
int compResult = 0;
r_Minterval t;
r_Minterval objDomain;
r_Minterval intersectedRegion;
KeyObject newObj;
r_Area oldArea = area;
if (area == 0)
retval = -1;
else
if (first > last)
retval = -1;
else {
middle = (last + first)/2;
t = ixNode->getObjectDomain(middle);
if(mint.get_high().compare_with(t.get_origin()) < 0)
{ // R.hi < tile.lo no tiles after this one
retval = binaryRegionSearch(ixNode, mint, area, intersectedObjects, first, middle - 1, parentEntryDomain);
}
else {
if(t.get_high().compare_with(mint.get_origin()) < 0)
{
retval = binaryRegionSearch(ixNode, mint, area, intersectedObjects, middle + 1, last, parentEntryDomain);
if (area > 0)
{
retval = binaryRegionSearch(ixNode, mint, area, intersectedObjects, first, middle - 1, parentEntryDomain);
}
}
else {
inc = 1;
for (ix = middle; ; ix += inc)
{
objDomain = ixNode->getObjectDomain(ix);
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "cycle " << ix << " " << objDomain);
compResult = mint.get_high().compare_with(objDomain.get_origin());
// object intersects region
if (intersectNoDuplicates(mint, objDomain, parentEntryDomain))
{
intersectedRegion = objDomain;
intersectedRegion.intersection_with(mint).intersection_with(parentEntryDomain);
oldArea = area;
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "interesected region " << intersectedRegion << " intersection area " << intersectedRegion.cell_count() << " area before " << area)
area = area - intersectedRegion.cell_count();
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "area after " << area)
if (area > oldArea)
{
RMInit::logOut << "SRPTIndexLogic::binaryRegionSearch() index found more cells than allowed" << endl;
throw r_Error(INDEXEXHAUSTEDAREA);
}
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "intersectedRegion " << intersectedRegion << " area " << area)
newObj = ixNode->getObject(ix);
intersectedObjects.push_back(newObj);
if (area == 0)
{
retval = ix;
break;
}
}
if (inc != -1 && (ix == last || compResult < 0))
{
ix = middle;
inc = -1;
}
if (ix == first && inc == -1)
{
retval = ix;
break;
}
}
}
}
}
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "binaryRegionSearch(" << OId(ixNode->getIdentifier()) << ", SearchDom " << mint << ", Area " << area << ", intersectedObjs.size " << intersectedObjects.size() << ", From " << first << ", To " << last << ", ParentDom " << parentEntryDomain << ")" << retval);
return retval;
}
void
SRPTIndexLogic::containPointQuery( const r_Point& searchPoint,
const HierIndexDS* ix,
KeyObject& result,
const StorageLayout& sl)
{
RMDBGENTER(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "containPointQuery(" << searchPoint << ", Node " << ix << ", result)");
HierIndexDS* intersectedNode = NULL;
if (!ix)
{
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "containPointQuery(" << searchPoint << ", Node, result) node is NULL");
}
else {
if (ix->isLeaf())
{
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "index " << OId(ix->getIdentifier()) << " is leaf");
SDirIndexLogic::containPointQuery(ix, searchPoint, result, sl);
}
else {
RMDBGMIDDLE(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "index " << OId(ix->getIdentifier()) << " is node");
KeyObject lowerNode;
SDirIndexLogic::containPointQuery(ix, searchPoint, lowerNode, sl);
containPointQuery(searchPoint, convert(lowerNode), result, sl);
}
if (result.isInitialised())
{
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "containPointQuery(" << searchPoint << ", " << OId(ix->getIdentifier()) << ")" << result);
}
else {
RMDBGEXIT(4, RMDebug::module_indexmgr, "SRPTIndexLogic", "containPointQuery(" << searchPoint << ", " << OId(ix->getIdentifier()) << ") nothing found");
}
}
}
HierIndexDS*
SRPTIndexLogic::convert(const KeyObject& toConvert)
{
HierIndexDS* retval = NULL;
if (toConvert.isInitialised())
retval = (HierIndexDS*)DBHierIndexId(toConvert.getObject());
return retval;
}
KeyObject
SRPTIndexLogic::convert(HierIndexDS* toConvert)
{
if (toConvert)
return KeyObject(DBObjectId(toConvert->getIdentifier()), toConvert->getAssignedDomain());
KeyObject retval;
return retval;
}