summaryrefslogtreecommitdiffstats
path: root/rnprotocol/rnpcommunication.hh
blob: 56aca81c648962d14c4d8da1498b01ef37c86b1d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
#ifndef RNPCOMMUNICATION_HH
#define RNPCOMMUNICATION_HH
/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community.  If not, see <http://www.gnu.org/licenses/>.
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see <http://www.rasdaman.org>
* or contact Peter Baumann via <baumann@rasdaman.com>.
*/
/****************************************************************************
 *
 *
 * COMMENTS:
 * 
 * 
 ****************************************************************************/

#include "network/akgnetwork.hh"
#include "rnprotocol/rnpembedded.hh"
#include <vector>

#ifdef AFTERV52
  #include <akglogging.hh>
  #include <akg_exception.hh>
#else
  #include <exception>
#endif

namespace rnp
  {
using namespace akg;
using namespace std;

class RnpBaseClientComm;

/** This class represents the RNP client job. It taker a CommBuffer, sends its data
    and receives the answer. Is directy owned and used by 'RnpBaseClientComm',
    so you don't have to worry about it
    Be aware that the transmitter buffer is freed after transmission! */
class RnpClientJob : public NbClientJob
  {
    public:
      /// Default constructor
      RnpClientJob() throw();
      
      /** Initialization: takes the tarnsmitter buffer containing data to be send
          and a pointer to a Communicator object, which will coordinate the transmission
	  Assert: transmitterBuffer!=0, newClientComm !=0  */
      void init(CommBuffer *transmitterBuffer, RnpBaseClientComm *newClientComm) throw();
            
      /// Call-back function for the Communicator.
      void processRequest() throw();
      
      /** Returns a pointer to the buffer containing the answer. The buffer
          holds only the RNP message, without carrier header  */
      CommBuffer* getAnswerBuffer() throw();
      
      /// Returns 'true' if the answer was correctly received
      bool        isAnswerOk() throw();
      
      /// Returns true if the format of the received message is not valid RNP and was discarded
      bool        isInvalidFormat() throw();
      
      /** Clears the answer buffer. Important if huge amount of data where received.
          The buffer is cleared by the next transmission, also. */
      void        clearAnswerBuffer() throw();
    protected:
      /// (See the explanations from NbJob)
      bool validateMessage() throw();
      void executeOnWriteReady() throw();
      void specificCleanUpOnTimeout() throw();
      void executeOnReadError() throw();
      void executeOnWriteError() throw();
    
      /// Resets the object: clears the connection and marks the job as ready
      void resetState() throw();
    private:
      RnpBaseClientComm *clientCommPtr;
      
      RnpReceiver rnpReceiver;
      bool        answerOk;
      bool        invalidFormat;       
   };

/**
  RnpBaseClientComm is the base class for the client communication. It offers
  the necessary elements for creating the request, send it, receive the answer 
  and decode it. Every specific client comm will inherit from this and will 
  implement the various functions using the functions provided by this class.
  
  It has a private NbCommunicator object, but if you need a shared one
  be my guest. The RnpClientJob is its own also and this stays like that!
*/
class RnpBaseClientComm
  {
    public:
      /// Constructor taking the server type and the carrier protocol
      RnpBaseClientComm(RnpQuark serverType, RnpTransport::CarrierProtocol = RnpTransport::crp_Rnp) throw();
      
      /** Constructor taking also the connection info for the server
          Assert: serverHost != 0, serverPort > 0  */
      RnpBaseClientComm(const char* serverHost, int serverPort, RnpQuark serverType, RnpTransport::CarrierProtocol = RnpTransport::crp_Rnp) throw();
      
      /// Destructor
      virtual ~RnpBaseClientComm() throw();
      
      /** Set the connection parameter
          Assert: serverHost != 0, serverPort > 0  */
      void setConnectionParameters(const char* serverHost, int serverPort) throw();
      
      /// Set the carrier protocol
      void setCarrierProtocol(RnpTransport::CarrierProtocol) throw();
      /// Returns the used carrier protocol
      RnpTransport::CarrierProtocol getCarrierProtocol() throw();
      
      // callback from RnpClientJob
      void jobIsReady() throw();
      
      // Set the maximal retry count (retries to connect to the server) 
      void setMaxRetry(unsigned int newMaxRetry);
      
      /// Returns the maximal retry count
      unsigned int  getMaxRetry();

    protected:
      // stuff for helping creating the function calls
      RnpQuark serverType;
      RnpTransport::CarrierProtocol carrierProtocol;
      
      /// Start building the request, might throw whatever new throws
      void startRequest(RnpQuark command, int transmitterBufferSize = RNP_DEFAULTBUFFERSIZE);
      
      /** Does the dirty work: sends the request and brings the answer
          Later it will throw various exceptions, but for now it only 
	  returns 'true' if everything is OK 
	  Assert: serverHost != 0, serverPort > 0 */
      bool sendRequestGetAnswer();
      
      /** detects an exception as answer and throws it. this version only Akg and STL
          returns true if there is an exception, but can't reassemble it
	  returns false if there is a correct answer, no exception
	  doesn't return, but throws, if there is an exception and it can reassemble it*/
      virtual bool checkForExceptions();
      
#ifdef AFTERV52      
      // reassembles and throws an AkgSerializableException. Returns if it isn't an Akg...
      void reassembleAkgSerializable() throw(AkgSerializableException);
      
      // reassembles and throws a STL-exception. Returns only if it isn't a stl-exception
      void reassembleStlException() throw(RnpStlException);
#endif      
      /// Clear the answer when you don't need it any more, memory is released
      void clearAnswer() throw();
      
      /** Default communication init, build another init() if you don't like this
          This sets 1 job, 60sec as timeout, attaches the internal job.
	  Be aware that this timeout is not the timeout of the client job, 
	  but the one of the communicator  */
      void initDefaultCommunication() throw();
	     
      // encoding and decoding
      RnpProtocolDecoder decoder;
      RnpTransmitter     encoder;
      CommBuffer         transmitterBuffer; // to go, use internal of encoder
      
      // stuff for non blocking communication 
      RnpClientJob    clientJob;             // the client job
      NbCommunicator  *communicatorPtr;      // the communicator to be used
      NbCommunicator  internalCommunicator;  // an internal communicator, if you dont like that you put another one
     
      // connection parameters
      const char*      serverHost;
      unsigned int     serverPort;
      unsigned int     maxRetry;
      
      /// Helper function for ptinting the current parameter
      void printCurrentParameter() throw();
     };


//############ Server side ###################################

class RnpBaseServerComm;

/** This class represents the RNP server job. It receives the request, sends it to 'RnpBaseServerComm'
    for processing and gets from there the answer which it transmittes to the client
*/
class RnpServerJob : public NbServerJob
  {
    public:
      /// Default constructor
      RnpServerJob() throw();
      
      /** Initialization: it connects to the given 'RnpBaseServerComm'
          Assert: theServerComm != 0 */
      void init(RnpBaseServerComm*) throw();
      
      /// Calls the 'RnpBaseServerComm->processRequest()' and than initiates the transmission
      void processRequest() throw();
      
    protected:
      /// (See explanations from NbJob)
      bool validateMessage() throw();
      void executeOnAccept() throw();
      void executeOnWriteReady() throw();
      void specificCleanUpOnTimeout() throw();
      void executeOnReadError() throw();
      void executeOnWriteError() throw();
 
      void resetJob() throw();
      
      RnpBaseServerComm *serverCommPtr;
      
      RnpReceiver rnpReceiver;
      
      CommBuffer transmiterBuffer; 
   };

/**
  RnpBaseServerComm is the base class for the server communication. It offers
  the necessary elements for decoding the request, and for creating and transmitting 
  the answer. Every specific server comm will inherit from this and will 
  implement the various functions, most important the 'processRequest()',
  using the elements provided by this class.
  
  It has a pool of 'RnpServerJob's which deal with the communication. Whichever has 
  a valid request calls 'processRequest()'. The communicator object is external
*/

class RnpBaseServerComm
  {
    public:
      /// Default constructor -  1 server job
      RnpBaseServerComm() throw();
           
      /// Destructor
      virtual ~RnpBaseServerComm() throw();
      
      /** Sets the number of server jobs, only if there is no connection to a communicator
          Otherwise it changes nothing and returns 'false' */
      bool setServerJobs(int nrOfServerJobs) throw();
      
      /// Returns the number of server jobs
      int  countServerJobs() throw();
      
      /// Connect to the communicator. It also creates the jobs. Throws whatever new throws. Assert: no other connection!
      void connectToCommunicator(NbCommunicator&);
      
      /** Disconnect the jobs from the communicator and destroys them. 
          Returns 'false' if there wasn't any connection to a communicator */
      bool disconnectFromCommunicator() throw();
      
      /// Set the transmitter buffer size
      void setTransmitterBufferSize(int) throw();
      
      /// Returns the transmitter buffer size
      int  getTransmitterBufferSize() throw();
      
      /** The heart of the class. It takes the request, decodes it, sends every fragment
          to the 'decodeFragment()', which has to dispatch the commands to the specific
	  functions. These functions have to use 'decoder' and 'encoder' to do their job and 
	  might throw whatever is appropriate. 'processRequest()' catches 'AkgException', 
	  'exception' and (...) and converts them for transmission. 
	  If you don't like this version, make another one */
      virtual void processRequest(CommBuffer *receiverBuffer, CommBuffer *transmiterBuffer, RnpTransport::CarrierProtocol, RnpServerJob *callingJob) throw();
          
      /** Instructs the communicator that it should exit. Usefull to implement
          'down server' commands  */
      void communicatorShouldExit() throw();
      
    protected:
      /** Called by 'processRequest' to dispatch to the specific functions
          Might throw whatever appropriate */
      virtual void decodeFragment() = 0;
      
      /// Returns next parameter as string(can be NULL), verifying the parameter type.
      const char* getNextAsString(RnpQuark parameterType) const;
	    
      /// Returns next parameter as integer, verifying the parameter type.
      int	  getNextAsInteger(RnpQuark parameterType) const;
      
      /// Returns next parameter as float, verifying the parameter type.
      float	  getNextAsFloat(RnpQuark parameterType) const;
	    
      /// Returns next parameter as double, verifying the parameter type.
      double	  getNextAsDouble(RnpQuark parameterType) const;
	    
      /// Returns next parameter as const void* (can be NULL), verifying the parameter type.
      const void* getNextAsOpaque(RnpQuark parameterType) const;
	    
      /// Returns the length of the data of the current parameter
      int	  getCurrentParameterLength() const throw();
#ifdef AFTERV52      
      /// Helper function to serialize an 'AkgException'
      void answerAkgSerializable(AkgSerializableException&) throw();
#endif      
      /// Helper function to serialize an 'exception' (based on it's 'what()'-member
      void answerSTLException(exception&) throw();
      
      /// Helper function to serialize an unknown exception
      void answerUnknownError() throw();
      
      /// Helper function to discard a fragment
      void discardFragment() throw();
      
      /// Start building an OK-answer
      void startOkAnswer() throw();
      
      /// Just for completeness, it's only an 'encoder.endFragment()'
      void endOkAnswer() throw();
      
      RnpProtocolDecoder decoder;
      RnpTransmitter     encoder;
      
    private:
      /** Creates a server jobs. Default is a  RnpServerJob, but you might want
          some other kind of job */
      virtual RnpServerJob* createJob();

      vector<RnpServerJob*> serverJob;
       
      int                nrServerJobs;
      
      NbCommunicator  *communicator;
      
      int  transmitterBufferSize;
   };   

} // namespace
#endif