/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community. If not, see .
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see
* or contact Peter Baumann via .
/
/**
* SOURCE: rasmgr_master_nb.cc
*
* MODULE: rasmgr
* CLASS: MasterComm
*
* PURPOSE:
* Main loop of master rasmgr
*
* COMMENTS:
* - MasterComm::processRequest() is the central dispatcher for rasmgr requests, recognising and executing them.
*
*/
#include
#include
#include
#include "rasmgr_master.hh"
#include "rasmgr_config.hh"
#include "rasmgr_rascontrol.hh"
#include "rasmgr_users.hh"
#include "rasmgr_host.hh"
#include "rasmgr_localsrv.hh"
#include "rasmgr_srv.hh"
using namespace std;
#include "raslib/rmdebug.hh"
#include "debug.hh"
// from rasmgr_localsrv.cc; should go to a central location -- PB 2003-nov-25
extern char *now();
// rasserver error codes (see errtxts)
// FIXME: should go into a central include file / class -- PB 2003-nov-20
#define MSG_OK 200
#define MSG_OK_STR "Ok"
#define MSG_UNKNOWNSERVERTYPE 1001
#define MSG_UNKNOWNSERVERTYPE_STR "Unknown server type"
#define MSG_UNKNOWNACCESSTYPE 1002
#define MSG_UNKNOWNACCESSTYPE_STR "Unknown access type"
#define MSG_DATABASENOTFOUND 807
#define MSG_DATABASENOTFOUND_STR "Database not found"
#define MSG_WRITETRANSACTION 806
#define MSG_WRITETRANSACTION_STR "Write transaction in progress"
#define MSG_NOSUITABLESERVER 805
#define MSG_NOSUITABLESERVER_STR "No suitable servers started"
#define MSG_SYSTEMOVERLOADED 801
#define MSG_SYSTEMOVERLOADED_STR "System overloaded"
// the following code means: I got an error code which I don't know (FIXME: reconsider nr!)
#define MSG_ILLEGAL 999
#define MSG_ILLEGAL_STR "Internal error: Illegal response code."
// here I start collecting rasmgr protocol tokens to eventually gather them all in one include file
#define RASMGRPROT_EOL "\r\n"
#define RASMGRPROT_DOUBLE_EOL "\r\n\r\n"
// time [secs] until a client can be freed from the pending request list after a fake request
#define WAITTIME_AFTER_FAKE 2
// time increment [secs] of iterated timeout (see updateTime())
#define WAITTIME_REPEATED 3
MasterComm::MasterComm()
{ commit=false;
allowMultipleWriteTransactions = false;
}
MasterComm::~MasterComm()
{
}
void MasterComm::Run()
{
ENTER("MasterComm::Run: enter." );
RMTIMER( "MasterComm", "Run" ); // benchmark this routine, if enabled
initListenSocket(config.getListenPort()); // connect/bind the central listen socket
// using IOSelector level here!
initJobs(MAXJOBSMASTER); // init jobs structure (nothing with sockets here)
allowMultipleWriteTransactions = config.allowMultipleWriteTransactions();
selector.setTimeout( config.getPollFrequency() , 0 );
VLOG << "Entering server mode, prepared to receive requests." << endl << endl;
while(mayExit()==false)
{
TALK("MasterComm::Run: new request cycle, status before processing is:" );
printStatus();
if(commit)
doCommit();
int answerLen=0;
TALK("MasterComm::Run: (c) Waiting...");
// wait for incoming requests, using select()
int r=selector.waitForRequest(); // 0 is timeout, <0 error, >0 success
// again, IOSelector level here!
TALK("MasterComm::Run: (d) It's ringing..." << r);
localServerManager.cleanChild(); // sa fie
if(r<0) // nothing to read
{
TALK("MasterComm::Run: (f1) it's a signal (or a socket failure)...");
continue;
}
if(r==0) // timeout, nothing to read
{
TALK("MasterComm::Run: (f2) nothing, look for timeouts...");
lookForTimeout();
continue;
}
if(r>0) // something is pending
{
TALK("MasterComm::Run: (e) Got request, r=" << r << "...");
// iterate over all jobs to see what we can read / reconnect (?) / write
dispatchWriteRequest(); // first this, to increase the chance to free a client
connectNewClients(); // wait for requests, using accept()
dispatchReadRequest(); // now read in new requests
for(int i=0;iresult(); // print stop time to log
}
}
}
LEAVE("MasterComm::Run: leave." );
} // Run()
// keep connection open after processing request?
// ...according to request type and comm success, shall we keep socket open?
// Note: this is a bad hack, but I don't want to change the "answer" ret code of processRequest() unless I fully understand it -- PB 2003-jun-10
static bool keepConnection;
void MasterComm::processJob(NbJob ¤tJob)
{
ENTER( "MasterComm::processJob: enter." );
if(currentJob.isOperationPending()==false )
{
LEAVE( "MasterComm::processJob: leave. isOperationPending=false" );
return;
}
if(currentJob.wasError()) // low-level comm error
{
TALK( "MasterComm::processJob: closing connection." );
currentJob.closeConnection();
LEAVE( "MasterComm::processJob: leave." );
return;
}
if(currentJob.isMessageOK() == false)
{
LEAVE( "MasterComm::processJob: leave. isMessageOK=false" );
return;
}
if(fillInBuffer(currentJob.getMessage())==false) // fill msg into answer buffer header + body
{
TALK( "MasterComm::processJob: closing connection." );
currentJob.closeConnection();
LEAVE( "MasterComm::processJob: leave. fillInBuffer=false" );
return;
}
// now we have the message in inBuffer, with header and body set correctly
int answer = processRequest( currentJob );
int outLen = strlen(outBuffer);
if(outLen && answer != 2) // sending the answer
// FIXME: what is answer==2 ? never happens! -- PB 2003-jun-10
{
TALK( "MasterComm::processJob: init sending answer for outBuffer, set socket to write mode." );
currentJob.initSendAnswer(outBuffer);
}
if(outLen == 0) // no answer to send
{
TALK( "MasterComm::processJob: no answer to send, closing connection." );
currentJob.closeConnection();
}
if( answer == 1 ) // means "delayedOperation"
// FIXME: according to processRequest, delOp is 0 !!! -- PB 2003-jun-10
// ...and 1 comes back for POST rasservernewstatus
{ // the only known until now
TALK( "MasterComm::processJob: delayedOp, therefore changeServerStatus()." );
rasManager.changeServerStatus(body);
}
/* Two words about this delayedOperation. If a remote server crashes, the remote rasmgr
sends a message and does not wait for answer. But if the master rasmgr restarts
immediately the crashed server, it could come to a deadlock. Maybe not, but we
wish to avoid any possibility, so we close first the connection and then attempt
to restart the server.
*/
// EXPERIMENTAL: close sockets as soon and as always as possible
// if (! keepConnection) // singleton request or comm error
// {
// TALK( "MasterComm::processJob: singleton request, closing connection." );
// currentJob.closeConnection();
// }
LEAVE( "MasterComm::processJob: leave." );
} // processJob()
// printClientAddr(): aux fct to print client address to designated stream
const char *getClientAddr( int mySocket )
{
const char *result = NULL;
struct sockaddr_in s;
socklen_t sockaddrSize = (socklen_t) sizeof(s);
if ( getpeername( mySocket, (struct sockaddr*)&s, &sockaddrSize ) != 0)
result = strerror(errno);
else
result = inet_ntoa(s.sin_addr);
return result;
}
// process request which has been prepared in inBuffer
// if 'verbose' is enabled in configuration then requests will be logged.
// returns
// 0 normally (?)
// 1 for POST rasservernewstatus
// NB: keepConnection is static above -- bad hack, see there
int MasterComm::processRequest( NbJob ¤tJob )
{
ENTER( "MasterComm::processRequest: enter. inBuffer=" << inBuffer );
// inBuffer: header + body Ok, output in outBuffer, which is not initialized here
outBuffer[0]=0; // mark outBuffer as empty
int answer = 0;
// delayedOperation = 0;
bool fake = false; // getfreeserver request really wants to allocate a new server?
// --- this is the central dispatcher for rasmgr requests, recognising and executing them.
if(isMessage("POST rasmgrslave"))
{
VLOG << now() << " slave rasmgr request from "
<< getClientAddr( currentJob.getSocket() )
<< ": '" << body << "'..." << flush;
hostmanager.postSlaveMGR(body,outBuffer); // prepare outBuffer from body for send to slave
keepConnection = false; // master mgr passes thru, it's singleton commo, so don't keep conn
// FIXME: is this really correct?? -- PB 2003-jun-10
VLOG << "ok" << endl;
}
else if(isMessage("POST rasservernewstatus"))
{
// extract server status from msg body
char serverName[50];
int newstatus = 0;
long dummy = 0;
serverName[0] = '\0'; // initialize in case sscanf() fails
int result = sscanf( body, "%s %d %ld", serverName, &newstatus, &dummy);
if (result == 3) // we simply ignore malformed requests, reason see below
{
const char *statusText = NULL;
switch (newstatus)
{
case SERVER_DOWN:
statusText = SERVER_DOWN_TXT;
break;
case SERVER_AVAILABLE:
statusText = SERVER_AVAILABLE_TXT;
break;
case SERVER_REGULARSIG:
statusText = SERVER_REGULARSIG_TXT;
break;
case SERVER_CRASHED:
statusText = SERVER_CRASHED_TXT;
break;
default:
statusText = "(unknown message flag)";
break;
}
if (newstatus != SERVER_REGULARSIG) // don't blow up the log file with "still alive" signals
{
VLOG << now() << " status info from server " << serverName
<< " @ " << getClientAddr( currentJob.getSocket() )
<< ": '" << statusText << "'...ok" << endl;
}
keepConnection = false; // singleton msg slave -> master
answer = 1;
}
else // malformed request
{
VLOG << now() << " Error: malformed request (ignoring it) from "
<< getClientAddr( currentJob.getSocket() )
<< ": '" << body << "'" << endl;
}
}
else if( (fake = isMessage("POST getfreeserver2")) || isMessage("POST getfreeserver"))
{
VLOG << now() << " client request from "
<< getClientAddr( currentJob.getSocket() )
<< ": " << "'get server'..." << flush;
int rc = getFreeServer(fake); // returns std rasdaman errors -- FIXME: error ignored!
keepConnection = (rc == MSG_OK) ? true : false; // 200 is "ok"
VLOG << "ok" << endl;
}
else if(isMessage("POST rascontrol"))
{
VLOG << now() << " rascontrol request from "
<< getClientAddr( currentJob.getSocket() )
<< ": '" << body << "'..." << flush;
if(authorization.acceptEntry(header))
{
rascontrol.processRequest(body,outBuffer);
keepConnection = true; // rascontrol connection accepted, so keep it
VLOG << "ok" << endl;
}
else
{
answerAccessDenied();
keepConnection = false; // this is a final answer, don't keep conn open afterwards
VLOG << "denied." << endl;
}
}
LEAVE( "MasterComm::processRequest: leave. answer=" << answer << ", keepConnection=" << keepConnection );
return answer;
} // processRequest()
// fillInBuffer: fill parameter string passed into message header and body (both global)
// separator is a double newline
// FIXME: unstable and weird programming, improve! -- PB 2003-jun-10
// input:
// s message input string
// returns:
// true filled buffer properly
// false NULL body string
// inBuffer (global buffer) set to s; header string part properly NULL terminated in inBuffer
// body (global ptr) set to beginning of message body in inBuffer
// header (global ptr) set to beginning of inBuffer
bool MasterComm::fillInBuffer(const char *s)
{
strcpy(inBuffer,s);
header=inBuffer; // set header to begining of msg buffer
body=strstr(inBuffer, RASMGRPROT_DOUBLE_EOL ); // find double EOL, this is where body starts
if(body == NULL) // not found? this means a protocol syntax error
{
TALK( "MasterComm::fillInBuffer: Error in rasmgr protocol encountered (2xEOL missing). msg=" << inBuffer );
return false; // only if client is stupid
}
*body=0; // terminate header (!) string
body+= strlen( RASMGRPROT_DOUBLE_EOL ); // let body start after this double newline
return true;
}
// save config and auth file; deprecated
void MasterComm::doCommit()
{
if(config.isTestModus()==false)
{
TALK( "MasterComm::doCommit: deprecated, should not be called any longer." );
#if 0 // now done by saveCommand() directly
if(commitAuthOnly==false)
{ VLOG << "Save configuration file...";
if(config.saveConfigFile()) VLOG << "OK" << std::endl;
else VLOG << "Failed" << std::endl;
}
VLOG << "Save authorization file...";
if(authorization.saveAuthFile()) VLOG << "OK" << std::endl;
else VLOG << "Failed" << std::endl;
#endif
}
else
{ std::cout<<"Save requested, but not permitted during test modus!"< [RPC|HTTP|RNP] [rw|ro]
// where the flags are NOT case sensitive
// returns: standard rasdasman error codes
// 200 (ok), 801, 805, 999, ...
int MasterComm::getFreeServer(bool fake)
{
// creates too large log files, so omit in production:
// BenchmarkTimer *freeServerTimePtr = new BenchmarkTimer("Get free server");
char databaseName[100];
char serverType[10];
char serverName[100]; // name of rasserver found, if any
char accessType[5];
char prevID[200]="(none)";
ENTER("MasterComm::getFreeServer: enter. fake=" << fake << ", body="<3)
clientID.init(prevID);
TALK("GetFreeServer: db = " << databaseName << ", requested server type = " << serverType << ", access type = " << accessType << ", clientID="< " << cbs );
cout << "Error: no server available, error code: " << cbs << endl;
answCode = cbs;
break;
}
if( fake == false) // server to be allocated?
{
// mark server found as unavailable to others
r.setNotAvailable();
// set transaction mode requested
if(writeTransaction==true)
r.startWriteTransaction(db); // nothing real happens, no error can occur
else
r.startReadTransaction(db); // nothing real happens, no error can occur
TALK("MasterComm::getFreeServer: You have the server.");
// answCode is same as initialised if we come here
}
sprintf(answerString,"%s %ld %s ",r.getHostNetwName(),r.getPort(),authorization.getCapability(r.getName(),databaseName,!writeTransaction) );
// remember server name
strncpy( serverName, r.getName(), sizeof(serverName) );
;
TALK( "answerString=" << answerString );
break;
}
}
} // for
// any free server found?
if(countSuitableServers == 0)
{
cout << "Error: no suitable free server available." << endl;
answCode = MSG_NOSUITABLESERVER;
break;
}
// no answer string provided -> no server available
// oops?? why not uniformly check against answCode? -- PB 2003-nov-20
if(answerString[0]==0)
{
cout << "Error: cannot find any free server; answer code: " << answCode << " -> ";
answCode = MSG_SYSTEMOVERLOADED;
cout << answCode << endl;
break;
}
} while(0); // see comment at start of "loop"
answText = convertAnswerCode(answCode);
if(answCode == MSG_OK)
sprintf(outBuffer,"HTTP/1.1 %d %s\r\nContent-type: text/plain\r\nContent-length: %d\r\n\r\n%s",answCode,answText,strlen(answerString)+1,answerString);
else
{
sprintf(outBuffer,"HTTP/1.1 %d %s\r\nContent-type: text/plain\r\nContent-length: %d\r\n\r\n%d %s",400,"Error",strlen(answText)+1,answCode,answText);
clientQueue.put(clientID, (const char*)databaseName, sType, answCode);
}
// creates too large log files, so omit in production:
// freeServerTimePtr->result(); // print time elapsed
LEAVE("MasterComm::getFreeServer: leave. answCode=" << answCode << ", server=" << serverName << ", outBuffer=" << outBuffer );
return answCode; //strlen(outBuffer)+1;
} // getFreeServer()
// convertAnswerCode: convert numeric answer code to error text for selected errors + OK
// input:
// code numeric error code, cf. errtxts
// returns:
// answer ptr to static error text
const char* MasterComm::convertAnswerCode(int code)
{
const char *answer = MSG_ILLEGAL_STR; // return value, initialized to "illegal"
switch(code)
{
case MSG_OK:
answer = MSG_OK_STR;
break;
case MSG_UNKNOWNSERVERTYPE:
answer = MSG_UNKNOWNSERVERTYPE_STR;
break;
case MSG_UNKNOWNACCESSTYPE:
answer = MSG_UNKNOWNACCESSTYPE_STR;
break;
case MSG_DATABASENOTFOUND:
answer = MSG_DATABASENOTFOUND_STR;
break;
case MSG_WRITETRANSACTION:
answer = MSG_WRITETRANSACTION_STR;
break;
case MSG_NOSUITABLESERVER:
answer = MSG_NOSUITABLESERVER_STR;
break;
case MSG_SYSTEMOVERLOADED:
answer = MSG_SYSTEMOVERLOADED_STR;
break;
default:
// cout<<"Default value not allowed ="<activ = false;
ClientEntry newClient(clientID, dbName, serverType, errorCode);
newClient.activ = true;
newClient.updateTime();
clients.push_back(newClient);
TALK("ClientQueue::put, known client db=" << dbName << ", serverType=" << serverType << ", but different request: id="<