summaryrefslogtreecommitdiffstats
path: root/network/akgnet_nbcomm.hh
blob: fad3443a9caaedc022547a888f1deb13776313e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community.  If not, see <http://www.gnu.org/licenses/>.
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see <http://www.rasdaman.org>
* or contact Peter Baumann via <baumann@rasdaman.com>.
/
/**
 * INCLUDE: akgnet_nbcomm.hh
 *
 * MODULE:  akg network
 * CLASS:   NbJob, NbServerJob, NbClientJob, NbCommunicator
 *
 * COMMENTS:
 * Namespace akg
 * 
*/

#ifndef AKGNET_NBCOMM_HH
#define AKGNET_NBCOMM_HH

#include "akgnet_server.hh"

namespace akg
  {
  
/** Base class for non-blocking communication jobs
*/  

/**
  * \ingroup Networks
  */ 
  
class NbJob
  {
    public:
      /** Static function for setting the current time. Used
          for marking the last action time, so timeout can be monitorized
      */
      static void   setCurrentTime() throw();
      
      /** Static function for setting the timeout interval
          We use the same timeout for all jobs because the
	  server doesn't do any distinction between jobs
      */
      static void   setTimeoutInterval(time_t x) throw();
      
      /// Returns the timeout interval set for the jobs
      static time_t getTimeoutInterval() throw();
    
    public:
      /// Status regarding accepting a new job
      enum acceptStatus
        { acs_nopending = 0,
	  acs_Iambusy   = 1,
	  acs_accepted  = 2
	 };
      /// Status during the lifetime of a job
      enum workingStatus
        { wks_notdefined = 0,
	  wks_accepting  = 1, // job is ready to accept a connection
	  wks_reading    = 2, // job is reading data
	  wks_writing    = 3, // job is writing data
	  wks_processing = 4  // job is processing the request
	 };
      
      virtual ~NbJob() throw();
      /// Returns the working status	 
      workingStatus getStatus() throw();
      
      /** Returns true if there is an operation in progress
          this means reading, writing or processing
      */
      bool isOperationPending() throw();
      
      /// Returns true if the job is ready to accept a connection
      bool isAccepting() throw();
      
      /// Returns true if the job is reading data
      bool isReading() throw();
      
      /// Returns true if the job is writing data
      bool isWriting() throw();
      
      /// Returns true if the job is processing
      bool isProcessing() throw();
      
      /** Pure function to do initialization when attached to a Selector
          Don't throw!
      */
      virtual void initOnAttach(Selector *pselector) throw() =0;
      
      /** Pure function to do initialization when accepting a connection
          Returns:
	     - acs_nopending when there is connection pending
	     - acs_Iambusy   when the job can't accept this connection
	     - acs_accepted  when the connection was accepted
	  Assert:
	    the 'currentBufferPtr' is initialized. This would be a software error
          Don't throw!
      */
      virtual acceptStatus acceptConnection(ListenSocket& listenSocket) throw() =0;
      
      /** Reads as much data as it can. After every read it calls the 
          'validateMessage()' function and returns whatever this function returns.
	  If there is a read error, other than EINTR or EAGAIN, the function 
	  'executeOnReadError()' is called
	  Returns 'true' if the message is completelly red
	  Returns 'false' if there should be some more data 
      */
      bool readPartialMessage() throw();
      
      /** Writes as much data as it can. After writing all data the function 
          'executeOnWriteReady()' is called.
	  If there is a write error, other than EINTR or EAGAIN, the function 
	  'executeOnWriteError()' is called
	  Returns 'true' if the message is completelly written
	  Returns 'false' if there should be some more data to write 
      */
      bool writePartialMessage() throw();
      
      /// Clears the connection - closes the socket and removes it from the Selector
      void clearConnection() throw();
      
      /// Returns the OS file descriptor for the socket
      int  getSocket() throw();
      
      /// Returns the errno of the last socket operation
      int  getErrno() throw();         
      //######################################
      /** Virtual function to clean up if timeout occured
          This version returns 'false' if no timeout or no connection
	  If timeout it clears the connection and calls
	  'specificCleanUpOnTimeout()'
	  Don't throw!
      */
      virtual bool cleanUpIfTimeout() throw();
      
      /** Pure function to process the request
          It has to set the appropriate status, so the server
	  knows how to continue with this job
	  Don't throw!
      */
      virtual void processRequest() throw() =0;
      //######################################
    protected:
      /// called after every read, returns 'true' if the message is all here
      virtual bool validateMessage() throw() =0;
      
      /// called when client is accepted, default does nothing
      virtual void executeOnAccept() throw();
      
      /// called when message is written
      virtual void executeOnWriteReady() throw() =0;
      
      /// called when timeout, it has to set the status apropriate and do other cleanup
      virtual void specificCleanUpOnTimeout() throw() =0;
      
      /// called when a read error occurs, usual a message and clean up
      virtual void executeOnReadError() throw() =0;
      
      /// called when a write error occurs, usual a message and clean up
      virtual void executeOnWriteError() throw() =0;
      //######################################
    protected:
      /// Protected constructor, taking a FileDescriptor, usually a Socket
      NbJob(FileDescriptor&) throw() ;
      
      /// Helper function for setting the job in read modus
      bool setReading() throw();
      
      /// Helper function for setting the job in write modus
      bool setWriting() throw();
      workingStatus status;
      
      /** Reference to a FileDescriptor, usually a Socket. It has to be provided by the
          derived class
      */
      FileDescriptor  &fdRef;
      
      /** Pointer to a Selector, which has to be provided by the Server object
          to which this job is attached
      */
      Selector        *selectorPtr;
      
      /** Pointer to a CommBuffer, which has to be provided by the derived class
      */
      CommBuffer      *currentBufferPtr;
      
      // for timeout
      time_t lastActionTime;
      
      /// Helper function which marks the current moment, so timeout counter is reset
      void   action() throw();
      
      static time_t timeOutInterv;
      static time_t currentTime;
   };   
   
/* Base class for generic non-blocking server jobs
 */

/**
  * \ingroup Networks
  */ 
  
class NbServerJob : public NbJob
  {
    public:
      /// Default constructor
      NbServerJob() throw();
      
      /** The version for servers, it just initializes the 'Selector*'
          It doesn't have to be overloaded, it's OK for servers
      */
      void initOnAttach(Selector *pselector) throw();
      
      /** The version for servers
          It doesn't have to be overloaded, it's OK for most servers
      */
      acceptStatus acceptConnection(ListenSocket& listenSocket) throw();
      
      /// Returns the SocketAddress of the client
      SocketAddress getClientSocketAddress() throw();
      
      /// Returns the HostAddress of the client
      HostAddress   getClientHostAddress() throw();
    protected:
    
      /// helper function, call it in "processRequest" to switch to writing
      void readyToWriteAnswer() throw();
      
      ServerSocket serverSocket;  
   };
   
/* Base class for generic non-blocking client jobs
 */

/**
  * \ingroup Networks
  */ 
  
class NbClientJob : public NbJob
  {
    public:
      /// Default constructor
      NbClientJob() throw();
      
      /// Returns 'true' if connection succeded
      bool connectToServer(const char* serverHost, int serverPort) throw(); 
      
      
      /** The version for clients, it initializes the 'Selector*'
          and prepares for writing. It has to be called AFTER the 
	  connection to the server succeded!
          It doesn't have to be overloaded, it's OK for most clients
      */
      void initOnAttach(Selector *pselector) throw();
      
      /** The version for clients. It just returns 'acs_Iambusy' since
          clients don't accept connections
          It doesn't have to be overloaded, it's OK for most clients
      */
      acceptStatus acceptConnection(ListenSocket& listenSocket) throw();
    protected:
      
      /// helper function, call it in 'executeOnWriteReady()' to switch to reading
      void readyToReadAnswer() throw();
      
      ClientSocket clientSocket;  
   };
   
/**
   The heart of the non-blocking communication. It's derived from GenericServer but
   it is called 'Communicator' since it not only for servers but also for clients
   You can use this class for most communication purposes, special ones have to
   reimplement 'executeBeforeSelect()' or 'executeOnTimeout()'
   A better implementation should use 'vector'
   Important: new is supposed to throw!
 */   

/**
  * \ingroup Networks
  */ 
  
class NbCommunicator : public GenericServer
  {
    public:
      /// Default constructor
      NbCommunicator() throw();
      
      /// Constructor setting also the maximal number of simultan jobs
      NbCommunicator(int newMaxJobs);
      
      /// Destructor
      ~NbCommunicator() throw();
      
      /// Sets the maximal number of simultan jobs
      bool initJobs(int newMaxJobs);
      
      /// returns the maximal number of simultan jobs
      int  getMaxJobs() throw();
      
      /** Attaches a new job. Returns 'true' if succeded, 'false' if the
          job is already attached or if the maximal number of jobs
	  is already attached
       */
      bool attachJob(NbJob&) throw();
      
      /** Deattach job. Returns 'true' if succeded, 'false' if the
          job is not attached
      */
      bool deattachJob(NbJob&) throw();
      
      /// This runs the main loop for servers, this means it initializes the listen socket first
      bool runServer() throw();
      
      /// This runs the main loop for clients, this means without initializing the listen socket
      bool runClient() throw();
    protected:
       /** Called before select, if it returns 'false' the loop exits. 
           This version just returns 'true'
       */
       virtual bool executeBeforeSelect() throw();
       
       /** Called after select, if it returns 'false' the loop exits. 
           This version just returns 'true'
       */
       virtual bool executeAfterSelect() throw();
       /** Called if select times out, if it returns 'false' the loop exits. 
           This version just returns 'true'
         */	   
       virtual bool executeOnTimeout() throw();
    private:
       typedef NbJob *JobPtr;
       
       JobPtr *jobPtr;
       int     maxJobs;
       
       /** The main loop of the communication: it waits for clients, dispatches them to the
           accepting jobs, then offers the jobs the possibility to read, process and write
	   It returns 'false' only if 'executeBeforeSelect()' or 'executeOnTimeout()'
	   return 'false'
	   Otherwise it returns 'true'
        */
       bool mainLoop() throw();
       
       /// Helper function for dispatching read requests
       void dispatchReadRequest() throw();
       
       /// Helper function for dispatching write requests
       void dispatchWriteRequest() throw();
       
       /// Helper function for dispatching connect requests
       void connectNewClients() throw();
       
       /// Helper function which looks for timeouted jobs
       void lookForTimeout() throw();
       
       /// Helper function which calls 'processRequest()' of all jobs that are processing
       void processJobs() throw();
             
       /** Helper function which returns 'true' if somebody called 'shouldExit()'
           and there is no job which processes anything. But it closes the listen socket
	   so no more jobs are accepted and returns 'true' when all jobs finish
       */
       bool mayExit() throw();

   };

} //namespace	                 
#endif