path: root/rnprotocol/rnpcommunication.hh
diff options
Diffstat (limited to 'rnprotocol/rnpcommunication.hh')
1 files changed, 345 insertions, 0 deletions
diff --git a/rnprotocol/rnpcommunication.hh b/rnprotocol/rnpcommunication.hh
new file mode 100644
index 0000000..56aca81
--- /dev/null
+++ b/rnprotocol/rnpcommunication.hh
@@ -0,0 +1,345 @@
+* This file is part of rasdaman community.
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* GNU General Public License for more details.
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <>.
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+* For more information please see <>
+* or contact Peter Baumann via <>.
+ *
+ *
+ *
+ *
+ ****************************************************************************/
+#include "network/akgnetwork.hh"
+#include "rnprotocol/rnpembedded.hh"
+#include <vector>
+#ifdef AFTERV52
+ #include <akglogging.hh>
+ #include <akg_exception.hh>
+ #include <exception>
+namespace rnp
+ {
+using namespace akg;
+using namespace std;
+class RnpBaseClientComm;
+/** This class represents the RNP client job. It taker a CommBuffer, sends its data
+ and receives the answer. Is directy owned and used by 'RnpBaseClientComm',
+ so you don't have to worry about it
+ Be aware that the transmitter buffer is freed after transmission! */
+class RnpClientJob : public NbClientJob
+ {
+ public:
+ /// Default constructor
+ RnpClientJob() throw();
+ /** Initialization: takes the tarnsmitter buffer containing data to be send
+ and a pointer to a Communicator object, which will coordinate the transmission
+ Assert: transmitterBuffer!=0, newClientComm !=0 */
+ void init(CommBuffer *transmitterBuffer, RnpBaseClientComm *newClientComm) throw();
+ /// Call-back function for the Communicator.
+ void processRequest() throw();
+ /** Returns a pointer to the buffer containing the answer. The buffer
+ holds only the RNP message, without carrier header */
+ CommBuffer* getAnswerBuffer() throw();
+ /// Returns 'true' if the answer was correctly received
+ bool isAnswerOk() throw();
+ /// Returns true if the format of the received message is not valid RNP and was discarded
+ bool isInvalidFormat() throw();
+ /** Clears the answer buffer. Important if huge amount of data where received.
+ The buffer is cleared by the next transmission, also. */
+ void clearAnswerBuffer() throw();
+ protected:
+ /// (See the explanations from NbJob)
+ bool validateMessage() throw();
+ void executeOnWriteReady() throw();
+ void specificCleanUpOnTimeout() throw();
+ void executeOnReadError() throw();
+ void executeOnWriteError() throw();
+ /// Resets the object: clears the connection and marks the job as ready
+ void resetState() throw();
+ private:
+ RnpBaseClientComm *clientCommPtr;
+ RnpReceiver rnpReceiver;
+ bool answerOk;
+ bool invalidFormat;
+ };
+ RnpBaseClientComm is the base class for the client communication. It offers
+ the necessary elements for creating the request, send it, receive the answer
+ and decode it. Every specific client comm will inherit from this and will
+ implement the various functions using the functions provided by this class.
+ It has a private NbCommunicator object, but if you need a shared one
+ be my guest. The RnpClientJob is its own also and this stays like that!
+class RnpBaseClientComm
+ {
+ public:
+ /// Constructor taking the server type and the carrier protocol
+ RnpBaseClientComm(RnpQuark serverType, RnpTransport::CarrierProtocol = RnpTransport::crp_Rnp) throw();
+ /** Constructor taking also the connection info for the server
+ Assert: serverHost != 0, serverPort > 0 */
+ RnpBaseClientComm(const char* serverHost, int serverPort, RnpQuark serverType, RnpTransport::CarrierProtocol = RnpTransport::crp_Rnp) throw();
+ /// Destructor
+ virtual ~RnpBaseClientComm() throw();
+ /** Set the connection parameter
+ Assert: serverHost != 0, serverPort > 0 */
+ void setConnectionParameters(const char* serverHost, int serverPort) throw();
+ /// Set the carrier protocol
+ void setCarrierProtocol(RnpTransport::CarrierProtocol) throw();
+ /// Returns the used carrier protocol
+ RnpTransport::CarrierProtocol getCarrierProtocol() throw();
+ // callback from RnpClientJob
+ void jobIsReady() throw();
+ // Set the maximal retry count (retries to connect to the server)
+ void setMaxRetry(unsigned int newMaxRetry);
+ /// Returns the maximal retry count
+ unsigned int getMaxRetry();
+ protected:
+ // stuff for helping creating the function calls
+ RnpQuark serverType;
+ RnpTransport::CarrierProtocol carrierProtocol;
+ /// Start building the request, might throw whatever new throws
+ void startRequest(RnpQuark command, int transmitterBufferSize = RNP_DEFAULTBUFFERSIZE);
+ /** Does the dirty work: sends the request and brings the answer
+ Later it will throw various exceptions, but for now it only
+ returns 'true' if everything is OK
+ Assert: serverHost != 0, serverPort > 0 */
+ bool sendRequestGetAnswer();
+ /** detects an exception as answer and throws it. this version only Akg and STL
+ returns true if there is an exception, but can't reassemble it
+ returns false if there is a correct answer, no exception
+ doesn't return, but throws, if there is an exception and it can reassemble it*/
+ virtual bool checkForExceptions();
+#ifdef AFTERV52
+ // reassembles and throws an AkgSerializableException. Returns if it isn't an Akg...
+ void reassembleAkgSerializable() throw(AkgSerializableException);
+ // reassembles and throws a STL-exception. Returns only if it isn't a stl-exception
+ void reassembleStlException() throw(RnpStlException);
+ /// Clear the answer when you don't need it any more, memory is released
+ void clearAnswer() throw();
+ /** Default communication init, build another init() if you don't like this
+ This sets 1 job, 60sec as timeout, attaches the internal job.
+ Be aware that this timeout is not the timeout of the client job,
+ but the one of the communicator */
+ void initDefaultCommunication() throw();
+ // encoding and decoding
+ RnpProtocolDecoder decoder;
+ RnpTransmitter encoder;
+ CommBuffer transmitterBuffer; // to go, use internal of encoder
+ // stuff for non blocking communication
+ RnpClientJob clientJob; // the client job
+ NbCommunicator *communicatorPtr; // the communicator to be used
+ NbCommunicator internalCommunicator; // an internal communicator, if you dont like that you put another one
+ // connection parameters
+ const char* serverHost;
+ unsigned int serverPort;
+ unsigned int maxRetry;
+ /// Helper function for ptinting the current parameter
+ void printCurrentParameter() throw();
+ };
+//############ Server side ###################################
+class RnpBaseServerComm;
+/** This class represents the RNP server job. It receives the request, sends it to 'RnpBaseServerComm'
+ for processing and gets from there the answer which it transmittes to the client
+class RnpServerJob : public NbServerJob
+ {
+ public:
+ /// Default constructor
+ RnpServerJob() throw();
+ /** Initialization: it connects to the given 'RnpBaseServerComm'
+ Assert: theServerComm != 0 */
+ void init(RnpBaseServerComm*) throw();
+ /// Calls the 'RnpBaseServerComm->processRequest()' and than initiates the transmission
+ void processRequest() throw();
+ protected:
+ /// (See explanations from NbJob)
+ bool validateMessage() throw();
+ void executeOnAccept() throw();
+ void executeOnWriteReady() throw();
+ void specificCleanUpOnTimeout() throw();
+ void executeOnReadError() throw();
+ void executeOnWriteError() throw();
+ void resetJob() throw();
+ RnpBaseServerComm *serverCommPtr;
+ RnpReceiver rnpReceiver;
+ CommBuffer transmiterBuffer;
+ };
+ RnpBaseServerComm is the base class for the server communication. It offers
+ the necessary elements for decoding the request, and for creating and transmitting
+ the answer. Every specific server comm will inherit from this and will
+ implement the various functions, most important the 'processRequest()',
+ using the elements provided by this class.
+ It has a pool of 'RnpServerJob's which deal with the communication. Whichever has
+ a valid request calls 'processRequest()'. The communicator object is external
+class RnpBaseServerComm
+ {
+ public:
+ /// Default constructor - 1 server job
+ RnpBaseServerComm() throw();
+ /// Destructor
+ virtual ~RnpBaseServerComm() throw();
+ /** Sets the number of server jobs, only if there is no connection to a communicator
+ Otherwise it changes nothing and returns 'false' */
+ bool setServerJobs(int nrOfServerJobs) throw();
+ /// Returns the number of server jobs
+ int countServerJobs() throw();
+ /// Connect to the communicator. It also creates the jobs. Throws whatever new throws. Assert: no other connection!
+ void connectToCommunicator(NbCommunicator&);
+ /** Disconnect the jobs from the communicator and destroys them.
+ Returns 'false' if there wasn't any connection to a communicator */
+ bool disconnectFromCommunicator() throw();
+ /// Set the transmitter buffer size
+ void setTransmitterBufferSize(int) throw();
+ /// Returns the transmitter buffer size
+ int getTransmitterBufferSize() throw();
+ /** The heart of the class. It takes the request, decodes it, sends every fragment
+ to the 'decodeFragment()', which has to dispatch the commands to the specific
+ functions. These functions have to use 'decoder' and 'encoder' to do their job and
+ might throw whatever is appropriate. 'processRequest()' catches 'AkgException',
+ 'exception' and (...) and converts them for transmission.
+ If you don't like this version, make another one */
+ virtual void processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol, RnpServerJob *callingJob) throw();
+ /** Instructs the communicator that it should exit. Usefull to implement
+ 'down server' commands */
+ void communicatorShouldExit() throw();
+ protected:
+ /** Called by 'processRequest' to dispatch to the specific functions
+ Might throw whatever appropriate */
+ virtual void decodeFragment() = 0;
+ /// Returns next parameter as string(can be NULL), verifying the parameter type.
+ const char* getNextAsString(RnpQuark parameterType) const;
+ /// Returns next parameter as integer, verifying the parameter type.
+ int getNextAsInteger(RnpQuark parameterType) const;
+ /// Returns next parameter as float, verifying the parameter type.
+ float getNextAsFloat(RnpQuark parameterType) const;
+ /// Returns next parameter as double, verifying the parameter type.
+ double getNextAsDouble(RnpQuark parameterType) const;
+ /// Returns next parameter as const void* (can be NULL), verifying the parameter type.
+ const void* getNextAsOpaque(RnpQuark parameterType) const;
+ /// Returns the length of the data of the current parameter
+ int getCurrentParameterLength() const throw();
+#ifdef AFTERV52
+ /// Helper function to serialize an 'AkgException'
+ void answerAkgSerializable(AkgSerializableException&) throw();
+ /// Helper function to serialize an 'exception' (based on it's 'what()'-member
+ void answerSTLException(exception&) throw();
+ /// Helper function to serialize an unknown exception
+ void answerUnknownError() throw();
+ /// Helper function to discard a fragment
+ void discardFragment() throw();
+ /// Start building an OK-answer
+ void startOkAnswer() throw();
+ /// Just for completeness, it's only an 'encoder.endFragment()'
+ void endOkAnswer() throw();
+ RnpProtocolDecoder decoder;
+ RnpTransmitter encoder;
+ private:
+ /** Creates a server jobs. Default is a RnpServerJob, but you might want
+ some other kind of job */
+ virtual RnpServerJob* createJob();
+ vector<RnpServerJob*> serverJob;
+ int nrServerJobs;
+ NbCommunicator *communicator;
+ int transmitterBufferSize;
+ };
+} // namespace