From 8f27e65bddd7d4b8515ce620fb485fdd78fcdf89 Mon Sep 17 00:00:00 2001 From: Constantin Jucovschi Date: Fri, 24 Apr 2009 07:20:22 -0400 Subject: Initial commit --- rasmgr/rasmgr_master_nb_hack.cc | 1007 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 1007 insertions(+) create mode 100644 rasmgr/rasmgr_master_nb_hack.cc (limited to 'rasmgr/rasmgr_master_nb_hack.cc') diff --git a/rasmgr/rasmgr_master_nb_hack.cc b/rasmgr/rasmgr_master_nb_hack.cc new file mode 100644 index 0000000..42e1cb4 --- /dev/null +++ b/rasmgr/rasmgr_master_nb_hack.cc @@ -0,0 +1,1007 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see . +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see +* or contact Peter Baumann via . +/ +/** + * SOURCE: rasmgr_master_nb.cc + * + * MODULE: rasmgr + * CLASS: MasterComm + * + * PURPOSE: + * Main loop of master rasmgr + * + * COMMENTS: + * - MasterComm::processRequest() is the central dispatcher for rasmgr requests, recognising and executing them. + * +*/ + +#include +#include +#include + +#include "rasmgr_master.hh" +#include "rasmgr_config.hh" +#include "rasmgr_rascontrol.hh" +#include "rasmgr_users.hh" +#include "rasmgr_host.hh" +#include "rasmgr_localsrv.hh" +#include "rasmgr_srv.hh" + +using namespace std; + +#include "debug.hh" + + +// from rasmgr_localsrv.cc; should go to a central location -- PB 2003-nov-25 +extern char *now(); + +// rasserver error codes (see errtxts) +// FIXME: should go into a central include file / class -- PB 2003-nov-20 +#define MSG_OK 200 +#define MSG_OK_STR "Ok" +#define MSG_UNKNOWNSERVERTYPE 1001 +#define MSG_UNKNOWNSERVERTYPE_STR "Unknown server type" +#define MSG_UNKNOWNACCESSTYPE 1002 +#define MSG_UNKNOWNACCESSTYPE_STR "Unknown access type" +#define MSG_DATABASENOTFOUND 807 +#define MSG_DATABASENOTFOUND_STR "Database not found" +#define MSG_WRITETRANSACTION 806 +#define MSG_WRITETRANSACTION_STR "Write transaction in progress" +#define MSG_NOSUITABLESERVER 805 +#define MSG_NOSUITABLESERVER_STR "No suitable servers started" +#define MSG_SYSTEMOVERLOADED 801 +#define MSG_SYSTEMOVERLOADED_STR "System overloaded" +// the following code means: I got an error code which I don't know (FIXME: reconsider nr!) +#define MSG_ILLEGAL 999 +#define MSG_ILLEGAL_STR "Internal error: Illegal response code." + +// here I start collecting rasmgr protocol tokens to eventually gather them all in one include file +#define RASMGRPROT_EOL "\r\n" +#define RASMGRPROT_DOUBLE_EOL "\r\n\r\n" + +// time [secs] until a client can be freed from the pending request list after a fake request +#define WAITTIME_AFTER_FAKE 2 +// time increment [secs] of iterated timeout (see updateTime()) +#define WAITTIME_REPEATED 3 + + +MasterComm::MasterComm() + { commit=false; + allowMultipleWriteTransactions = false; + } + +MasterComm::~MasterComm() + { + } + +void MasterComm::Run() + { + ENTER("MasterComm::Run: enter." ); + + initListenSocket(config.getListenPort()); // connect/bind the central listen socket + // using IOSelector level here! + + initJobs(MAXJOBSMASTER); // init jobs structure (nothing with sockets here) + + allowMultipleWriteTransactions = config.allowMultipleWriteTransactions(); + + selector.setTimeout( config.getPollFrequency() , 0 ); + + VLOG << "Entering server mode, prepared to receive requests." << endl << endl; + + while(mayExit()==false) + { + TALK("MasterComm::Run: new request cycle, status before processing is:" ); + printStatus(); + + if(commit) + doCommit(); + + int answerLen=0; + + TALK("MasterComm::Run: (c) Waiting..."); + + // wait for incoming requests, using select() + int r=selector.waitForRequest(); // 0 is timeout, <0 error, >0 success + // again, IOSelector level here! + + TALK("MasterComm::Run: (d) It's ringing..." << r); + + localServerManager.cleanChild(); // sa fie + + if(r<0) // nothing to read + { + TALK("MasterComm::Run: (f1) it's a signal (or a socket failure)..."); + continue; + } + if(r==0) // timeout, nothing to read + { + TALK("MasterComm::Run: (f2) nothing, look for timeouts..."); + lookForTimeout(); + continue; + } + if(r>0) // something is pending + { + + TALK("MasterComm::Run: (e) Got request, r=" << r << "..."); + + // iterate over all jobs to see what we can read / reconnect (?) / write + dispatchWriteRequest(); // first this, to increase the chance to free a client + connectNewClients(); // wait for requests, using accept() + dispatchReadRequest(); // now read in new requests + + for(int i=0;iresult(); // print stop time to log + } + } + } + LEAVE("MasterComm::Run: leave." ); + } // Run() + +// keep connection open after processing request? +// ...according to request type and comm success, shall we keep socket open? +// Note: this is a bad hack, but I don't want to change the "answer" ret code of processRequest() unless I fully understand it -- PB 2003-jun-10 +static bool keepConnection; + +void MasterComm::processJob(NbJob ¤tJob) + { + ENTER( "MasterComm::processJob: enter." ); + + if(currentJob.isOperationPending()==false ) + { + LEAVE( "MasterComm::processJob: leave. isOperationPending=false" ); + return; + } + + if(currentJob.wasError()) // low-level comm error + { + TALK( "MasterComm::processJob: closing connection." ); + currentJob.closeConnection(); + LEAVE( "MasterComm::processJob: leave." ); + return; + } + + if(currentJob.isMessageOK() == false) + { + LEAVE( "MasterComm::processJob: leave. isMessageOK=false" ); + return; + } + + if(fillInBuffer(currentJob.getMessage())==false) // fill msg into answer buffer header + body + { + TALK( "MasterComm::processJob: closing connection." ); + currentJob.closeConnection(); + LEAVE( "MasterComm::processJob: leave. fillInBuffer=false" ); + return; + } + + // now we have the message in inBuffer, with header and body set correctly + + int answer = processRequest( currentJob ); + + int outLen = strlen(outBuffer); + + if(outLen && answer != 2) // sending the answer + // FIXME: what is answer==2 ? never happens! -- PB 2003-jun-10 + { + TALK( "MasterComm::processJob: init sending answer for outBuffer, set socket to write mode." ); + currentJob.initSendAnswer(outBuffer); + } + + if(outLen == 0) // no answer to send + { + TALK( "MasterComm::processJob: no answer to send, closing connection." ); + currentJob.closeConnection(); + } + + if( answer == 1 ) // means "delayedOperation" + // FIXME: according to processRequest, delOp is 0 !!! -- PB 2003-jun-10 + // ...and 1 comes back for POST rasservernewstatus + { // the only known until now + TALK( "MasterComm::processJob: delayedOp, therefore changeServerStatus()." ); + rasManager.changeServerStatus(body); + } + /* Two words about this delayedOperation. If a remote server crashes, the remote rasmgr + sends a message and does not wait for answer. But if the master rasmgr restarts + immediately the crashed server, it could come to a deadlock. Maybe not, but we + wish to avoid any possibility, so we close first the connection and then attempt + to restart the server. + */ + + // EXPERIMENTAL: close sockets as soon and as always as possible + // if (! keepConnection) // singleton request or comm error + // { + // TALK( "MasterComm::processJob: singleton request, closing connection." ); + // currentJob.closeConnection(); + // } + + LEAVE( "MasterComm::processJob: leave." ); + } // processJob() + + +// printClientAddr(): aux fct to print client address to designated stream +const char *getClientAddr( int mySocket ) +{ + const char *result = NULL; + struct sockaddr_in s; + socklen_t sockaddrSize = (socklen_t) sizeof(s); + if ( getpeername( mySocket, (struct sockaddr*)&s, &sockaddrSize ) != 0) + result = strerror(errno); + else + result = inet_ntoa(s.sin_addr); + return result; +} + +// process request which has been prepared in inBuffer +// if 'verbose' is enabled in configuration then requests will be logged. +// returns +// 0 normally (?) +// 1 for POST rasservernewstatus +// NB: keepConnection is static above -- bad hack, see there +int MasterComm::processRequest( NbJob ¤tJob ) +{ + ENTER( "MasterComm::processRequest: enter. inBuffer=" << inBuffer ); + + // inBuffer: header + body Ok, output in outBuffer, which is not initialized here + outBuffer[0]=0; // mark outBuffer as empty + int answer = 0; + // delayedOperation = 0; + + bool fake = false; // getfreeserver request really wants to allocate a new server? + + // --- this is the central dispatcher for rasmgr requests, recognising and executing them. + + if(isMessage("POST rasmgrslave")) + { + VLOG << now() << " slave rasmgr request from " + << getClientAddr( currentJob.getSocket() ) + << ": '" << body << "'..." << flush; + + hostmanager.postSlaveMGR(body,outBuffer); // prepare outBuffer from body for send to slave + keepConnection = false; // master mgr passes thru, it's singleton commo, so don't keep conn + // FIXME: is this really correct?? -- PB 2003-jun-10 + VLOG << "ok" << endl; + } + else if(isMessage("POST rasservernewstatus")) + { + // extract server status from msg body + char serverName[50]; + int newstatus = 0; + long dummy = 0; + serverName[0] = '\0'; // initialize in case sscanf() fails + + int result = sscanf( body, "%s %d %ld", serverName, &newstatus, &dummy); + if (result == 3) // we simply ignore malformed requests, reason see below + { + const char *statusText = NULL; + switch (newstatus) + { + case SERVER_DOWN: + statusText = SERVER_DOWN_TXT; + break; + case SERVER_AVAILABLE: + statusText = SERVER_AVAILABLE_TXT; + break; + case SERVER_REGULARSIG: + statusText = SERVER_REGULARSIG_TXT; + break; + case SERVER_CRASHED: + statusText = SERVER_CRASHED_TXT; + break; + default: + statusText = "(unknown message flag)"; + break; + } + if (newstatus != SERVER_REGULARSIG) // don't blow up the log file with "still alive" signals + { + VLOG << now() << " status info from server " << serverName + << " @ " << getClientAddr( currentJob.getSocket() ) + << ": '" << statusText << "'...ok" << endl; + } + + keepConnection = false; // singleton msg slave -> master + answer = 1; + } + else // malformed request + { + VLOG << now() << " Error: malformed request (ignoring it) from " + << getClientAddr( currentJob.getSocket() ) + << ": '" << body << "'" << endl; + } + } + else if( (fake = isMessage("POST getfreeserver2")) || isMessage("POST getfreeserver")) + { + VLOG << now() << " client request from " + << getClientAddr( currentJob.getSocket() ) + << ": " << "'get server'..." << flush; + + int rc = getFreeServer(fake); // returns std rasdaman errors -- FIXME: error ignored! + keepConnection = (rc == MSG_OK) ? true : false; // 200 is "ok" + VLOG << "ok" << endl; + + + } + else if(isMessage("POST rascontrol")) + { + VLOG << now() << " rascontrol request from " + << getClientAddr( currentJob.getSocket() ) + << ": '" << body << "'..." << flush; + + if(authorization.acceptEntry(header)) + { + rascontrol.processRequest(body,outBuffer); + keepConnection = true; // rascontrol connection accepted, so keep it + VLOG << "ok" << endl; + } + else + { + answerAccessDenied(); + keepConnection = false; // this is a final answer, don't keep conn open afterwards + VLOG << "denied." << endl; + } + } + + LEAVE( "MasterComm::processRequest: leave. answer=" << answer << ", keepConnection=" << keepConnection ); + return answer; +} // processRequest() + +// fillInBuffer: fill parameter string passed into message header and body (both global) +// separator is a double newline +// FIXME: unstable and weird programming, improve! -- PB 2003-jun-10 +// input: +// s message input string +// returns: +// true filled buffer properly +// false NULL body string +// inBuffer (global buffer) set to s; header string part properly NULL terminated in inBuffer +// body (global ptr) set to beginning of message body in inBuffer +// header (global ptr) set to beginning of inBuffer +bool MasterComm::fillInBuffer(const char *s) +{ + strcpy(inBuffer,s); + header=inBuffer; // set header to begining of msg buffer + body=strstr(inBuffer, RASMGRPROT_DOUBLE_EOL ); // find double EOL, this is where body starts + if(body == NULL) // not found? this means a protocol syntax error + { + TALK( "MasterComm::fillInBuffer: Error in rasmgr protocol encountered (2xEOL missing). msg=" << inBuffer ); + return false; // only if client is stupid + } + + *body=0; // terminate header (!) string + body+= strlen( RASMGRPROT_DOUBLE_EOL ); // let body start after this double newline + + return true; +} + +// save config and auth file; deprecated +void MasterComm::doCommit() +{ + if(config.isTestModus()==false) + { + TALK( "MasterComm::doCommit: deprecated, should not be called any longer." ); +#if 0 // now done by saveCommand() directly + if(commitAuthOnly==false) + { VLOG << "Save configuration file..."; + if(config.saveConfigFile()) VLOG << "OK" << std::endl; + else VLOG << "Failed" << std::endl; + } + + VLOG << "Save authorization file..."; + if(authorization.saveAuthFile()) VLOG << "OK" << std::endl; + else VLOG << "Failed" << std::endl; +#endif + } + else + { std::cout<<"Save requested, but not permitted during test modus!"< [RPC|HTTP|RNP] [rw|ro] +// where the flags are NOT case sensitive +// returns: standard rasdasman error codes +// 200 (ok), 801, 805, 999, ... +int MasterComm::getFreeServer(bool fake) +{ + // creates too large log files, so omit in production: + // BenchmarkTimer *freeServerTimePtr = new BenchmarkTimer("Get free server"); + + char databaseName[100]; + char serverType[10]; + char serverName[100]; // name of rasserver found, if any + char accessType[5]; + char prevID[200]="(none)"; + + ENTER("MasterComm::getFreeServer: enter. fake=" << fake << ", body="<3) + clientID.init(prevID); + + TALK("GetFreeServer: db = " << databaseName << ", requested server type = " << serverType << ", access type = " << accessType << ", clientID="< " << cbs ); + cout << "Error: no server available, error code: " << cbs << endl; + answCode = cbs; + break; + } + if( fake == false) // server to be allocated? + { + // mark server found as unavailable to others + r.setNotAvailable(); + // set transaction mode requested + if(writeTransaction==true) + r.startWriteTransaction(db); // nothing real happens, no error can occur + else + r.startReadTransaction(db); // nothing real happens, no error can occur + TALK("MasterComm::getFreeServer: You have the server."); + // answCode is same as initialised if we come here + } + sprintf(answerString,"%s %ld %s ",r.getHostNetwName(),r.getPort(),authorization.getCapability(r.getName(),databaseName,!writeTransaction) ); + // remember server name + strncpy( serverName, r.getName(), sizeof(serverName) ); +; + TALK( "answerString=" << answerString ); + break; + } + } + } // for + + // any free server found? + if(countSuitableServers == 0) + { + cout << "Error: no suitable free server available." << endl; + answCode = MSG_NOSUITABLESERVER; + break; + } + + // no answer string provided -> no server available + // oops?? why not uniformly check against answCode? -- PB 2003-nov-20 + if(answerString[0]==0) + { + cout << "Error: cannot find any free server; answer code: " << answCode << " -> "; + answCode = MSG_SYSTEMOVERLOADED; + cout << answCode << endl; + break; + } + + } while(0); // see comment at start of "loop" + + answText = convertAnswerCode(answCode); + + if(answCode == MSG_OK) + sprintf(outBuffer,"HTTP/1.1 %d %s\r\nContent-type: text/plain\r\nContent-length: %d\r\n\r\n%s",answCode,answText,strlen(answerString)+1,answerString); + else + { + sprintf(outBuffer,"HTTP/1.1 %d %s\r\nContent-type: text/plain\r\nContent-length: %d\r\n\r\n%d %s",400,"Error",strlen(answText)+1,answCode,answText); + clientQueue.put(clientID, (const char*)databaseName, sType, answCode); + } + + // creates too large log files, so omit in production: + // freeServerTimePtr->result(); // print time elapsed + + LEAVE("MasterComm::getFreeServer: leave. answCode=" << answCode << ", server=" << serverName << ", outBuffer=" << outBuffer ); + return answCode; //strlen(outBuffer)+1; +} // getFreeServer() + +// convertAnswerCode: convert numeric answer code to error text for selected errors + OK +// input: +// code numeric error code, cf. errtxts +// returns: +// answer ptr to static error text +const char* MasterComm::convertAnswerCode(int code) +{ + const char *answer = MSG_ILLEGAL_STR; // return value, initialized to "illegal" + switch(code) + { + case MSG_OK: + answer = MSG_OK_STR; + break; + case MSG_UNKNOWNSERVERTYPE: + answer = MSG_UNKNOWNSERVERTYPE_STR; + break; + case MSG_UNKNOWNACCESSTYPE: + answer = MSG_UNKNOWNACCESSTYPE_STR; + break; + case MSG_DATABASENOTFOUND: + answer = MSG_DATABASENOTFOUND_STR; + break; + case MSG_WRITETRANSACTION: + answer = MSG_WRITETRANSACTION_STR; + break; + case MSG_NOSUITABLESERVER: + answer = MSG_NOSUITABLESERVER_STR; + break; + case MSG_SYSTEMOVERLOADED: + answer = MSG_SYSTEMOVERLOADED_STR; + break; + default: + // cout<<"Default value not allowed ="<activ = false; + ClientEntry newClient(clientID, dbName, serverType, errorCode); + newClient.activ = true; + newClient.updateTime(); + clients.push_back(newClient); + TALK("ClientQueue::put, known client db=" << dbName << ", serverType=" << serverType << ", but different request: id="<