diff options
Diffstat (limited to 'network/akgnet_nbcomm.hh')
-rw-r--r-- | network/akgnet_nbcomm.hh | 370 |
1 files changed, 370 insertions, 0 deletions
diff --git a/network/akgnet_nbcomm.hh b/network/akgnet_nbcomm.hh new file mode 100644 index 0000000..441d8e4 --- /dev/null +++ b/network/akgnet_nbcomm.hh @@ -0,0 +1,370 @@ +/* +* This file is part of rasdaman community. +* +* Rasdaman community is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Rasdaman community is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>. +* +* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann / +rasdaman GmbH. +* +* For more information please see <http://www.rasdaman.org> +* or contact Peter Baumann via <baumann@rasdaman.com>. +/ +/** + * INCLUDE: akgnet_nbcomm.hh + * + * MODULE: akg network + * CLASS: NbJob, NbServerJob, NbClientJob, NbCommunicator + * + * COMMENTS: + * Namespace akg + * +*/ + +#ifndef AKGNET_NBCOMM_HH +#define AKGNET_NBCOMM_HH + +#include "akgnet_server.hh" + +namespace akg + { + +/** Base class for non-blocking communication jobs +*/ + +class NbJob + { + public: + /** Static function for setting the current time. Used + for marking the last action time, so timeout can be monitorized + */ + static void setCurrentTime() throw(); + + /** Static function for setting the timeout interval + We use the same timeout for all jobs because the + server doesn't do any distinction between jobs + */ + static void setTimeoutInterval(time_t x) throw(); + + /// Returns the timeout interval set for the jobs + static time_t getTimeoutInterval() throw(); + + public: + /// Status regarding accepting a new job + enum acceptStatus + { acs_nopending = 0, + acs_Iambusy = 1, + acs_accepted = 2 + }; + /// Status during the lifetime of a job + enum workingStatus + { wks_notdefined = 0, + wks_accepting = 1, // job is ready to accept a connection + wks_reading = 2, // job is reading data + wks_writing = 3, // job is writing data + wks_processing = 4 // job is processing the request + }; + + virtual ~NbJob() throw(); + /// Returns the working status + workingStatus getStatus() throw(); + + /** Returns true if there is an operation in progress + this means reading, writing or processing + */ + bool isOperationPending() throw(); + + /// Returns true if the job is ready to accept a connection + bool isAccepting() throw(); + + /// Returns true if the job is reading data + bool isReading() throw(); + + /// Returns true if the job is writing data + bool isWriting() throw(); + + /// Returns true if the job is processing + bool isProcessing() throw(); + + /** Pure function to do initialization when attached to a Selector + Don't throw! + */ + virtual void initOnAttach(Selector *pselector) throw() =0; + + /** Pure function to do initialization when accepting a connection + Returns: + - acs_nopending when there is connection pending + - acs_Iambusy when the job can't accept this connection + - acs_accepted when the connection was accepted + Assert: + the 'currentBufferPtr' is initialized. This would be a software error + Don't throw! + */ + virtual acceptStatus acceptConnection(ListenSocket& listenSocket) throw() =0; + + /** Reads as much data as it can. After every read it calls the + 'validateMessage()' function and returns whatever this function returns. + If there is a read error, other than EINTR or EAGAIN, the function + 'executeOnReadError()' is called + Returns 'true' if the message is completelly red + Returns 'false' if there should be some more data + */ + bool readPartialMessage() throw(); + + /** Writes as much data as it can. After writing all data the function + 'executeOnWriteReady()' is called. + If there is a write error, other than EINTR or EAGAIN, the function + 'executeOnWriteError()' is called + Returns 'true' if the message is completelly written + Returns 'false' if there should be some more data to write + */ + bool writePartialMessage() throw(); + + /// Clears the connection - closes the socket and removes it from the Selector + void clearConnection() throw(); + + /// Returns the OS file descriptor for the socket + int getSocket() throw(); + + /// Returns the errno of the last socket operation + int getErrno() throw(); + //###################################### + /** Virtual function to clean up if timeout occured + This version returns 'false' if no timeout or no connection + If timeout it clears the connection and calls + 'specificCleanUpOnTimeout()' + Don't throw! + */ + virtual bool cleanUpIfTimeout() throw(); + + /** Pure function to process the request + It has to set the appropriate status, so the server + knows how to continue with this job + Don't throw! + */ + virtual void processRequest() throw() =0; + //###################################### + protected: + /// called after every read, returns 'true' if the message is all here + virtual bool validateMessage() throw() =0; + + /// called when client is accepted, default does nothing + virtual void executeOnAccept() throw(); + + /// called when message is written + virtual void executeOnWriteReady() throw() =0; + + /// called when timeout, it has to set the status apropriate and do other cleanup + virtual void specificCleanUpOnTimeout() throw() =0; + + /// called when a read error occurs, usual a message and clean up + virtual void executeOnReadError() throw() =0; + + /// called when a write error occurs, usual a message and clean up + virtual void executeOnWriteError() throw() =0; + //###################################### + protected: + /// Protected constructor, taking a FileDescriptor, usually a Socket + NbJob(FileDescriptor&) throw() ; + + /// Helper function for setting the job in read modus + bool setReading() throw(); + + /// Helper function for setting the job in write modus + bool setWriting() throw(); + workingStatus status; + + /** Reference to a FileDescriptor, usually a Socket. It has to be provided by the + derived class + */ + FileDescriptor &fdRef; + + /** Pointer to a Selector, which has to be provided by the Server object + to which this job is attached + */ + Selector *selectorPtr; + + /** Pointer to a CommBuffer, which has to be provided by the derived class + */ + CommBuffer *currentBufferPtr; + + // for timeout + time_t lastActionTime; + + /// Helper function which marks the current moment, so timeout counter is reset + void action() throw(); + + static time_t timeOutInterv; + static time_t currentTime; + }; + +/* Base class for generic non-blocking server jobs + */ +class NbServerJob : public NbJob + { + public: + /// Default constructor + NbServerJob() throw(); + + /** The version for servers, it just initializes the 'Selector*' + It doesn't have to be overloaded, it's OK for servers + */ + void initOnAttach(Selector *pselector) throw(); + + /** The version for servers + It doesn't have to be overloaded, it's OK for most servers + */ + acceptStatus acceptConnection(ListenSocket& listenSocket) throw(); + + /// Returns the SocketAddress of the client + SocketAddress getClientSocketAddress() throw(); + + /// Returns the HostAddress of the client + HostAddress getClientHostAddress() throw(); + protected: + + /// helper function, call it in "processRequest" to switch to writing + void readyToWriteAnswer() throw(); + + ServerSocket serverSocket; + }; + +/* Base class for generic non-blocking client jobs + */ +class NbClientJob : public NbJob + { + public: + /// Default constructor + NbClientJob() throw(); + + /// Returns 'true' if connection succeded + bool connectToServer(const char* serverHost, int serverPort) throw(); + + + /** The version for clients, it initializes the 'Selector*' + and prepares for writing. It has to be called AFTER the + connection to the server succeded! + It doesn't have to be overloaded, it's OK for most clients + */ + void initOnAttach(Selector *pselector) throw(); + + /** The version for clients. It just returns 'acs_Iambusy' since + clients don't accept connections + It doesn't have to be overloaded, it's OK for most clients + */ + acceptStatus acceptConnection(ListenSocket& listenSocket) throw(); + protected: + + /// helper function, call it in 'executeOnWriteReady()' to switch to reading + void readyToReadAnswer() throw(); + + ClientSocket clientSocket; + }; + +/** + The heart of the non-blocking communication. It's derived from GenericServer but + it is called 'Communicator' since it not only for servers but also for clients + You can use this class for most communication purposes, special ones have to + reimplement 'executeBeforeSelect()' or 'executeOnTimeout()' + A better implementation should use 'vector' + Important: new is supposed to throw! + */ + +class NbCommunicator : public GenericServer + { + public: + /// Default constructor + NbCommunicator() throw(); + + /// Constructor setting also the maximal number of simultan jobs + NbCommunicator(int newMaxJobs); + + /// Destructor + ~NbCommunicator() throw(); + + /// Sets the maximal number of simultan jobs + bool initJobs(int newMaxJobs); + + /// returns the maximal number of simultan jobs + int getMaxJobs() throw(); + + /** Attaches a new job. Returns 'true' if succeded, 'false' if the + job is already attached or if the maximal number of jobs + is already attached + */ + bool attachJob(NbJob&) throw(); + + /** Deattach job. Returns 'true' if succeded, 'false' if the + job is not attached + */ + bool deattachJob(NbJob&) throw(); + + /// This runs the main loop for servers, this means it initializes the listen socket first + bool runServer() throw(); + + /// This runs the main loop for clients, this means without initializing the listen socket + bool runClient() throw(); + protected: + /** Called before select, if it returns 'false' the loop exits. + This version just returns 'true' + */ + virtual bool executeBeforeSelect() throw(); + + /** Called after select, if it returns 'false' the loop exits. + This version just returns 'true' + */ + virtual bool executeAfterSelect() throw(); + /** Called if select times out, if it returns 'false' the loop exits. + This version just returns 'true' + */ + virtual bool executeOnTimeout() throw(); + private: + typedef NbJob *JobPtr; + + JobPtr *jobPtr; + int maxJobs; + + /** The main loop of the communication: it waits for clients, dispatches them to the + accepting jobs, then offers the jobs the possibility to read, process and write + It returns 'false' only if 'executeBeforeSelect()' or 'executeOnTimeout()' + return 'false' + Otherwise it returns 'true' + */ + bool mainLoop() throw(); + + /// Helper function for dispatching read requests + void dispatchReadRequest() throw(); + + /// Helper function for dispatching write requests + void dispatchWriteRequest() throw(); + + /// Helper function for dispatching connect requests + void connectNewClients() throw(); + + /// Helper function which looks for timeouted jobs + void lookForTimeout() throw(); + + /// Helper function which calls 'processRequest()' of all jobs that are processing + void processJobs() throw(); + + /** Helper function which returns 'true' if somebody called 'shouldExit()' + and there is no job which processes anything. But it closes the listen socket + so no more jobs are accepted and returns 'true' when all jobs finish + */ + bool mayExit() throw(); + + }; + +} //namespace +#endif + |