1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
|
/*
* This file is part of rasdaman community.
*
* Rasdaman community is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rasdaman community is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
*
* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
rasdaman GmbH.
*
* For more information please see <http://www.rasdaman.org>
* or contact Peter Baumann via <baumann@rasdaman.com>.
/
/**
* INCLUDE: akgnet_nbcomm.hh
*
* MODULE: akg network
* CLASS: NbJob, NbServerJob, NbClientJob, NbCommunicator
*
* COMMENTS:
* Namespace akg
*
*/
#ifndef AKGNET_NBCOMM_HH
#define AKGNET_NBCOMM_HH
#include "akgnet_server.hh"
namespace akg
{
/** Base class for non-blocking communication jobs
*/
/**
* \ingroup Networks
*/
class NbJob
{
public:
/** Static function for setting the current time. Used
for marking the last action time, so timeout can be monitorized
*/
static void setCurrentTime() throw();
/** Static function for setting the timeout interval
We use the same timeout for all jobs because the
server doesn't do any distinction between jobs
*/
static void setTimeoutInterval(time_t x) throw();
/// Returns the timeout interval set for the jobs
static time_t getTimeoutInterval() throw();
public:
/// Status regarding accepting a new job
enum acceptStatus
{ acs_nopending = 0,
acs_Iambusy = 1,
acs_accepted = 2
};
/// Status during the lifetime of a job
enum workingStatus
{ wks_notdefined = 0,
wks_accepting = 1, // job is ready to accept a connection
wks_reading = 2, // job is reading data
wks_writing = 3, // job is writing data
wks_processing = 4 // job is processing the request
};
virtual ~NbJob() throw();
/// Returns the working status
workingStatus getStatus() throw();
/** Returns true if there is an operation in progress
this means reading, writing or processing
*/
bool isOperationPending() throw();
/// Returns true if the job is ready to accept a connection
bool isAccepting() throw();
/// Returns true if the job is reading data
bool isReading() throw();
/// Returns true if the job is writing data
bool isWriting() throw();
/// Returns true if the job is processing
bool isProcessing() throw();
/** Pure function to do initialization when attached to a Selector
Don't throw!
*/
virtual void initOnAttach(Selector *pselector) throw() =0;
/** Pure function to do initialization when accepting a connection
Returns:
- acs_nopending when there is connection pending
- acs_Iambusy when the job can't accept this connection
- acs_accepted when the connection was accepted
Assert:
the 'currentBufferPtr' is initialized. This would be a software error
Don't throw!
*/
virtual acceptStatus acceptConnection(ListenSocket& listenSocket) throw() =0;
/** Reads as much data as it can. After every read it calls the
'validateMessage()' function and returns whatever this function returns.
If there is a read error, other than EINTR or EAGAIN, the function
'executeOnReadError()' is called
Returns 'true' if the message is completelly red
Returns 'false' if there should be some more data
*/
bool readPartialMessage() throw();
/** Writes as much data as it can. After writing all data the function
'executeOnWriteReady()' is called.
If there is a write error, other than EINTR or EAGAIN, the function
'executeOnWriteError()' is called
Returns 'true' if the message is completelly written
Returns 'false' if there should be some more data to write
*/
bool writePartialMessage() throw();
/// Clears the connection - closes the socket and removes it from the Selector
void clearConnection() throw();
/// Returns the OS file descriptor for the socket
int getSocket() throw();
/// Returns the errno of the last socket operation
int getErrno() throw();
//######################################
/** Virtual function to clean up if timeout occured
This version returns 'false' if no timeout or no connection
If timeout it clears the connection and calls
'specificCleanUpOnTimeout()'
Don't throw!
*/
virtual bool cleanUpIfTimeout() throw();
/** Pure function to process the request
It has to set the appropriate status, so the server
knows how to continue with this job
Don't throw!
*/
virtual void processRequest() throw() =0;
//######################################
protected:
/// called after every read, returns 'true' if the message is all here
virtual bool validateMessage() throw() =0;
/// called when client is accepted, default does nothing
virtual void executeOnAccept() throw();
/// called when message is written
virtual void executeOnWriteReady() throw() =0;
/// called when timeout, it has to set the status apropriate and do other cleanup
virtual void specificCleanUpOnTimeout() throw() =0;
/// called when a read error occurs, usual a message and clean up
virtual void executeOnReadError() throw() =0;
/// called when a write error occurs, usual a message and clean up
virtual void executeOnWriteError() throw() =0;
//######################################
protected:
/// Protected constructor, taking a FileDescriptor, usually a Socket
NbJob(FileDescriptor&) throw() ;
/// Helper function for setting the job in read modus
bool setReading() throw();
/// Helper function for setting the job in write modus
bool setWriting() throw();
workingStatus status;
/** Reference to a FileDescriptor, usually a Socket. It has to be provided by the
derived class
*/
FileDescriptor &fdRef;
/** Pointer to a Selector, which has to be provided by the Server object
to which this job is attached
*/
Selector *selectorPtr;
/** Pointer to a CommBuffer, which has to be provided by the derived class
*/
CommBuffer *currentBufferPtr;
// for timeout
time_t lastActionTime;
/// Helper function which marks the current moment, so timeout counter is reset
void action() throw();
static time_t timeOutInterv;
static time_t currentTime;
};
/* Base class for generic non-blocking server jobs
*/
/**
* \ingroup Networks
*/
class NbServerJob : public NbJob
{
public:
/// Default constructor
NbServerJob() throw();
/** The version for servers, it just initializes the 'Selector*'
It doesn't have to be overloaded, it's OK for servers
*/
void initOnAttach(Selector *pselector) throw();
/** The version for servers
It doesn't have to be overloaded, it's OK for most servers
*/
acceptStatus acceptConnection(ListenSocket& listenSocket) throw();
/// Returns the SocketAddress of the client
SocketAddress getClientSocketAddress() throw();
/// Returns the HostAddress of the client
HostAddress getClientHostAddress() throw();
protected:
/// helper function, call it in "processRequest" to switch to writing
void readyToWriteAnswer() throw();
ServerSocket serverSocket;
};
/* Base class for generic non-blocking client jobs
*/
/**
* \ingroup Networks
*/
class NbClientJob : public NbJob
{
public:
/// Default constructor
NbClientJob() throw();
/// Returns 'true' if connection succeded
bool connectToServer(const char* serverHost, int serverPort) throw();
/** The version for clients, it initializes the 'Selector*'
and prepares for writing. It has to be called AFTER the
connection to the server succeded!
It doesn't have to be overloaded, it's OK for most clients
*/
void initOnAttach(Selector *pselector) throw();
/** The version for clients. It just returns 'acs_Iambusy' since
clients don't accept connections
It doesn't have to be overloaded, it's OK for most clients
*/
acceptStatus acceptConnection(ListenSocket& listenSocket) throw();
protected:
/// helper function, call it in 'executeOnWriteReady()' to switch to reading
void readyToReadAnswer() throw();
ClientSocket clientSocket;
};
/**
The heart of the non-blocking communication. It's derived from GenericServer but
it is called 'Communicator' since it not only for servers but also for clients
You can use this class for most communication purposes, special ones have to
reimplement 'executeBeforeSelect()' or 'executeOnTimeout()'
A better implementation should use 'vector'
Important: new is supposed to throw!
*/
/**
* \ingroup Networks
*/
class NbCommunicator : public GenericServer
{
public:
/// Default constructor
NbCommunicator() throw();
/// Constructor setting also the maximal number of simultan jobs
NbCommunicator(int newMaxJobs);
/// Destructor
~NbCommunicator() throw();
/// Sets the maximal number of simultan jobs
bool initJobs(int newMaxJobs);
/// returns the maximal number of simultan jobs
int getMaxJobs() throw();
/** Attaches a new job. Returns 'true' if succeded, 'false' if the
job is already attached or if the maximal number of jobs
is already attached
*/
bool attachJob(NbJob&) throw();
/** Deattach job. Returns 'true' if succeded, 'false' if the
job is not attached
*/
bool deattachJob(NbJob&) throw();
/// This runs the main loop for servers, this means it initializes the listen socket first
bool runServer() throw();
/// This runs the main loop for clients, this means without initializing the listen socket
bool runClient() throw();
protected:
/** Called before select, if it returns 'false' the loop exits.
This version just returns 'true'
*/
virtual bool executeBeforeSelect() throw();
/** Called after select, if it returns 'false' the loop exits.
This version just returns 'true'
*/
virtual bool executeAfterSelect() throw();
/** Called if select times out, if it returns 'false' the loop exits.
This version just returns 'true'
*/
virtual bool executeOnTimeout() throw();
private:
typedef NbJob *JobPtr;
JobPtr *jobPtr;
int maxJobs;
/** The main loop of the communication: it waits for clients, dispatches them to the
accepting jobs, then offers the jobs the possibility to read, process and write
It returns 'false' only if 'executeBeforeSelect()' or 'executeOnTimeout()'
return 'false'
Otherwise it returns 'true'
*/
bool mainLoop() throw();
/// Helper function for dispatching read requests
void dispatchReadRequest() throw();
/// Helper function for dispatching write requests
void dispatchWriteRequest() throw();
/// Helper function for dispatching connect requests
void connectNewClients() throw();
/// Helper function which looks for timeouted jobs
void lookForTimeout() throw();
/// Helper function which calls 'processRequest()' of all jobs that are processing
void processJobs() throw();
/** Helper function which returns 'true' if somebody called 'shouldExit()'
and there is no job which processes anything. But it closes the listen socket
so no more jobs are accepted and returns 'true' when all jobs finish
*/
bool mayExit() throw();
};
} //namespace
#endif
|