summaryrefslogtreecommitdiffstats
path: root/network
diff options
context:
space:
mode:
authorConstantin Jucovschi <cj@ubuntu.localdomain>2009-04-24 07:20:22 -0400
committerConstantin Jucovschi <cj@ubuntu.localdomain>2009-04-24 07:20:22 -0400
commit8f27e65bddd7d4b8515ce620fb485fdd78fcdf89 (patch)
treebd328a4dd4f92d32202241b5e3a7f36177792c5f /network
downloadrasdaman-upstream-8f27e65bddd7d4b8515ce620fb485fdd78fcdf89.tar.gz
rasdaman-upstream-8f27e65bddd7d4b8515ce620fb485fdd78fcdf89.tar.xz
rasdaman-upstream-8f27e65bddd7d4b8515ce620fb485fdd78fcdf89.zip
Initial commitv8.0
Diffstat (limited to 'network')
-rw-r--r--network/Makefile.am43
-rw-r--r--network/akgnet_commbuffer.cc190
-rw-r--r--network/akgnet_commbuffer.hh169
-rw-r--r--network/akgnet_common.hh100
-rw-r--r--network/akgnet_fdescr.cc113
-rw-r--r--network/akgnet_fdescr.hh107
-rw-r--r--network/akgnet_file.cc51
-rw-r--r--network/akgnet_file.hh73
-rw-r--r--network/akgnet_inetaddr.cc181
-rw-r--r--network/akgnet_inetaddr.hh148
-rw-r--r--network/akgnet_nbcomm.cc579
-rw-r--r--network/akgnet_nbcomm.hh370
-rw-r--r--network/akgnet_selector.cc97
-rw-r--r--network/akgnet_selector.hh95
-rw-r--r--network/akgnet_server.cc173
-rw-r--r--network/akgnet_server.hh157
-rw-r--r--network/akgnet_socket.cc174
-rw-r--r--network/akgnet_socket.hh147
-rw-r--r--network/akgnetwork.hh56
19 files changed, 3023 insertions, 0 deletions
diff --git a/network/Makefile.am b/network/Makefile.am
new file mode 100644
index 0000000..4f930ac
--- /dev/null
+++ b/network/Makefile.am
@@ -0,0 +1,43 @@
+#
+# This file is part of rasdaman community.
+#
+# Rasdaman community is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Rasdaman community is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+#
+# Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+# rasdaman GmbH.
+#
+# For more information please see <http://www.rasdaman.org>
+# or contact Peter Baumann via <baumann@rasdaman.com>.
+#
+# MAKEFILE FOR:
+# network
+#
+# COMMENTS:
+# - still waits to be really integrated completely (akg namespace, ...)
+#
+##################################################################
+
+noinst_LIBRARIES=libnetwork.a
+libnetwork_a_SOURCES= akgnet_commbuffer.cc akgnet_commbuffer.hh \
+ akgnet_file.cc akgnet_file.hh \
+ akgnet_nbcomm.cc akgnet_nbcomm.hh \
+ akgnet_server.cc akgnet_server.hh \
+ akgnet_fdescr.cc akgnet_fdescr.hh \
+ akgnet_inetaddr.cc akgnet_inetaddr.hh \
+ akgnet_selector.cc akgnet_selector.hh \
+ akgnet_socket.cc akgnet_socket.hh \
+ akgnetwork.hh akgnet_common.hh
+
+
+CLEANFILES=core
diff --git a/network/akgnet_commbuffer.cc b/network/akgnet_commbuffer.cc
new file mode 100644
index 0000000..3c5c561
--- /dev/null
+++ b/network/akgnet_commbuffer.cc
@@ -0,0 +1,190 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * SOURCE: akgnet_commbuffer.cc
+ *
+ * MODULE: akg network
+ * CLASS: CommBuffer
+ *
+ * COMMENTS:
+ *
+ */
+
+#include <akgnet_commbuffer.hh>
+#include <string>
+#include <assert.h>
+
+
+akg::CommBuffer::CommBuffer() throw()
+ {
+ data = NULL;
+ buffSize = 0;
+ fillSize = 0;
+ sendSize = 0;
+ allocated= false;
+ }
+
+akg::CommBuffer::CommBuffer(int size) throw()
+ {
+ assert(size > 0);
+ allocate(size);
+ }
+
+akg::CommBuffer::CommBuffer(void *externalBuffer,int totalSize, int dataSize) throw()
+ {
+ data = NULL;
+ takeOver(externalBuffer,totalSize,dataSize);
+ }
+
+akg::CommBuffer::~CommBuffer() throw()
+ {
+ freeBuffer();
+ }
+
+bool akg::CommBuffer::allocate(int size) throw()
+ {
+ assert(size > 0);
+
+ freeBuffer();
+ data = new char[size];
+ // new throws?
+ buffSize=size;
+ allocated=true;
+ return true;
+ }
+
+void akg::CommBuffer::freeBuffer() throw()
+ {
+ if(allocated == true && data != NULL) delete[] data;
+ data = NULL;
+ buffSize = 0;
+ fillSize = 0;
+ sendSize = 0;
+ allocated= false;
+ }
+
+void akg::CommBuffer::takeOver(void *externalBuffer,int totalSize, int dataSize) throw()
+ {
+ assert(externalBuffer != 0);
+ assert(totalSize > 0);
+ assert(dataSize >= 0);
+ assert(totalSize >= dataSize);
+
+ freeBuffer();
+ data = (char*)externalBuffer;
+ buffSize = totalSize;
+ fillSize = dataSize;
+ }
+
+bool akg::CommBuffer::resize(int newSize) throw()
+ {
+ assert(data != 0);
+
+ // we can't make the buffer smaller by truncating inside data!
+ if(newSize < fillSize) return false;
+
+ char *newData = new char[newSize];
+ memcpy(newData, data, fillSize);
+ if(allocated == true ) delete[] data;
+
+ data = newData;
+ buffSize = newSize;
+ allocated = true;
+ return true;
+ }
+
+void* akg::CommBuffer::getData() throw(){ return data;}
+int akg::CommBuffer::getDataSize() throw(){ return fillSize;}
+int akg::CommBuffer::getBufferSize() throw(){ return buffSize;}
+int akg::CommBuffer::getSendedSize() throw(){ return sendSize;}
+int akg::CommBuffer::getNotFilledSize() throw(){ return buffSize-fillSize;}
+int akg::CommBuffer::getNotSendedSize() throw(){ return fillSize-sendSize;}
+bool akg::CommBuffer::isAllocated() throw(){ return allocated;}
+
+int akg::CommBuffer::read(FileDescriptor &socket) throw()
+ {
+ int rasp = socket.read(data+fillSize,buffSize-fillSize);
+
+ if(rasp>=0) fillSize += rasp;
+
+ return rasp;
+ }
+
+int akg::CommBuffer::read(const void *externalBuffer,int size) throw()
+ {
+ assert(externalBuffer != 0);
+ assert(size >= 0);
+
+ int cpSize = size<(buffSize-fillSize) ? size:(buffSize-fillSize);
+
+ memcpy(data+fillSize,externalBuffer,cpSize);
+ fillSize += cpSize;
+
+ return cpSize;
+ }
+
+int akg::CommBuffer::reserve(int size) throw()
+ {
+ assert(size >= 0);
+
+ int cpSize = size<(buffSize-fillSize) ? size:(buffSize-fillSize);
+
+ fillSize += cpSize;
+
+ return cpSize;
+ }
+
+int akg::CommBuffer::write(FileDescriptor &socket) throw()
+ {
+ DBTALK("CommBuffer write fillSize="<<fillSize<<" sendSize="<<sendSize);
+ int rasp = socket.write(data+sendSize,fillSize-sendSize);
+
+ if(rasp>=0) sendSize+=rasp;
+
+ return rasp;
+ }
+
+int akg::CommBuffer::write(void *externalBuffer,int size) throw()
+ {
+ assert(externalBuffer != 0);
+ assert(size >= 0);
+
+ int cpSize = size<(fillSize-sendSize) ? size:(fillSize-sendSize);
+
+ memcpy(externalBuffer,data+sendSize,cpSize);
+ sendSize+=cpSize;
+
+ return cpSize;
+ }
+
+void akg::CommBuffer::clearToRead() throw()
+ {
+ DBTALK("CommBuffer clearToRead");
+ fillSize=0;sendSize=0;
+ }
+void akg::CommBuffer::clearToWrite() throw()
+ {
+ DBTALK("CommBuffer clearToWrite");
+ sendSize=0;
+ }
+
diff --git a/network/akgnet_commbuffer.hh b/network/akgnet_commbuffer.hh
new file mode 100644
index 0000000..0e28ee4
--- /dev/null
+++ b/network/akgnet_commbuffer.hh
@@ -0,0 +1,169 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnet_commbuffer.hh
+ *
+ * MODULE: akg network
+ * CLASS: CommBuffer
+ *
+ * COMMENTS:
+ * Namespace akg
+ *
+*/
+
+#ifndef AKGNET_BUFFER_HH
+#define AKGNET_BUFFER_HH
+
+#include "akgnet_fdescr.hh"
+
+namespace akg
+ {
+
+/** This class is a specialized buffer used for communication by the elements
+ of this library.
+ Important:
+ - if the internal buffer overflows, it is not reallocated, even there is a resize() function. This is not an error!
+ Please think twice before you change this
+ - 'new' is supposed to throw
+*/
+
+class CommBuffer
+ {
+ public:
+ /// Default constructor, no data is allocated
+ CommBuffer() throw();
+
+ /** Constructor allocating a 'size' bytes buffer
+ Assert: size > 0
+ */
+ CommBuffer(int size) throw();
+
+ /** Constructor taking over an existing buffer
+ totalSize - the total size of the buffer
+ dataSize - the size of the usefull data
+ */
+ CommBuffer(void*,int totalSize,int dataSize=0) throw();
+
+ /// Destructor. If the internal buffer is allocated, it will be deallocated
+ ~CommBuffer() throw();
+
+ /** Allocates a new buffer. The old one, if allocated, will be deallocated
+ Assert: size > 0
+ */
+ bool allocate(int size) throw();
+
+ /// Frees the internal buffer, if allocated.
+ void freeBuffer() throw();
+
+ /** Takes over an external buffer
+ totalSize - the total size of the buffer
+ dataSize - the size of the usefull data
+ Assert: externalBuffer != 0, totalSize > 0, dataSize>=0, totalSize>=dataSize
+ */
+ void takeOver(void *externalBuffer,int totalSize,int dataSize=0) throw();
+
+ /** Resizes the internal buffer, by allocating and copying the data.
+ Returns false if the new buffer is smaller than the actual data
+ Assert: there is a buffer, so 'data != 0'*/
+ bool resize(int newSize) throw();
+
+ /// Returns a pointer to the internal buffer. You are on your own!
+ void* getData() throw();
+
+ /// Returns the size of the data stored in the buffer
+ int getDataSize() throw();
+
+ /// Returns the total capacity of the buffer
+ int getBufferSize() throw();
+
+ /// Returns the size already written
+ int getSendedSize() throw();
+
+ /// Returns the size not filled yet
+ int getNotFilledSize() throw();
+
+ /// Returns the size not written yet
+ int getNotSendedSize() throw();
+
+ /// Returns true if the internal buffer is allocated
+ bool isAllocated() throw();
+
+ /** Reads as much as possible from the specified FileDescriptor
+ It stops if the buffer is full or there are are no more bytes to read
+ Returns the number of bytes red
+ */
+ int read(FileDescriptor&) throw();
+
+ /** Reads at most 'size' bytes from the specified memory address
+ It stops if the buffer is full or there are are no more bytes to read
+ Returns the number of bytes red
+ Assert: externalBuffer != 0, size >=0
+ */
+ int read(const void *externalBuffer,int size) throw();
+
+ /** Fake read, used to reserve space for future direct write
+ Returns the number of bytes reserverd, which can be less
+ than 'size' if there is not enough space
+ Assert: size >=0
+ */
+ int reserve(int size) throw();
+
+
+ /** Write as much as possible to the specified FileDescriptor
+ It stops if there are no more bytes to write or the FileDescriptor
+ can't accept more bytes
+ Returns the number of bytes written
+ */
+ int write(FileDescriptor&) throw();
+
+ /** Writes at most 'size' bytes to the specified memory address
+ It stops if the there are no more bytes to write
+ Returns the number of bytes written
+ Assert: externalBuffer != 0, size >=0
+ */
+ int write(void *externalBuffer,int size) throw();
+
+ /// Resets the buffer for reading. The data inside is discarded
+ void clearToRead() throw();
+
+ /** Resets the buffer for writing. The data inside is not touched,
+ just the writing counters are reset
+ */
+ void clearToWrite() throw();
+
+ private:
+ /// the internal buffer
+ char *data;
+ /// the size of the internal buffer
+ int buffSize;
+ /// the filled size
+ int fillSize;
+ /// the send size.
+ int sendSize;
+ /// is the buffer allocated?
+ bool allocated;
+ };
+
+} //namespace
+#endif
+
diff --git a/network/akgnet_common.hh b/network/akgnet_common.hh
new file mode 100644
index 0000000..aaf8ca6
--- /dev/null
+++ b/network/akgnet_common.hh
@@ -0,0 +1,100 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnet_common.hh
+ *
+ * MODULE: akg network
+ * CLASS:
+ *
+ * COMMENTS:
+ * Namespace akg
+ * Contains common definitions for the whole package
+ *
+*/
+
+#ifndef AKGNET_COMMON_HH
+#define AKGNET_COMMON_HH
+
+#if defined(DECALPHA) || defined(LINUX)
+ #ifndef _XOPEN_SOURCE_EXTENDED
+ #define _XOPEN_SOURCE_EXTENDED // for gethostid
+ #endif //_XOPEN_SOURCE_EXTENDED
+#endif //DECALPHA || LINUX
+
+
+#include<stdio.h>
+#include<stdlib.h>
+#include<errno.h>
+
+#if defined(DECALPHA)
+ #include<strings.h>
+ #include <arpa/inet.h>
+#endif
+#include<string.h>
+
+
+
+#include<unistd.h>
+#include<sys/types.h>
+#include<sys/socket.h>
+#include<netinet/in.h>
+#include<netdb.h>
+#include <fcntl.h>
+#include <sys/time.h>
+#include <time.h>
+
+
+#include <iostream>
+
+//##### System dependent data types #############
+
+#ifdef X86
+ #define akgSocklen_t socklen_t
+#endif
+
+#ifdef AIX
+ #define akgSocklen_t socklen_t
+#endif
+
+#ifdef SOLARIS
+ #define akgSocklen_t socklen_t
+ #define INADDR_NONE ((uint32_t) 0xffffffff)
+#endif
+
+#ifdef DECALPHA
+ #define akgSocklen_t size_t
+ typedef in_addr_t uint32_t;
+#endif
+
+#ifndef akgSocklen_t
+ #error "What Operating System is this?"
+#endif
+//##### Debugging stuff ########################
+
+#define DBTALK(a)
+//#define DBTALK(a) cout<<a<<endl
+
+using namespace std;
+//##############################################
+
+#endif
diff --git a/network/akgnet_fdescr.cc b/network/akgnet_fdescr.cc
new file mode 100644
index 0000000..bc5df7d
--- /dev/null
+++ b/network/akgnet_fdescr.cc
@@ -0,0 +1,113 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * SOURCE: akgnet_fdescr.cc
+ *
+ * MODULE: akg network
+ * CLASS: FileDescriptor
+ *
+ * COMMENTS:
+ *
+ */
+
+#include <akgnet_fdescr.hh>
+
+akg::FileDescriptor::FileDescriptor() throw()
+ {
+ fileDescriptor = -1;
+ savedErrno = 0;
+ }
+
+akg::FileDescriptor::~FileDescriptor() throw()
+ {
+ close();
+ }
+
+int akg::FileDescriptor::operator()() throw()
+ {
+ return fileDescriptor;
+ }
+
+bool akg::FileDescriptor::isOpen() throw()
+ {
+ return fileDescriptor == -1 ? false:true;
+ }
+
+void akg::FileDescriptor::close() throw()
+ {
+ if(isOpen())
+ { ::close(fileDescriptor);
+ saveErrno();
+ }
+ fileDescriptor = -1;
+ }
+
+int akg::FileDescriptor::write(const void *buffer, int count) throw()
+ {
+ savedErrno = 0;
+ DBTALK("FileDescriptor write: "<<buffer<<" count="<<count);
+ int nbytes = ::write(fileDescriptor,buffer,count);
+ if(nbytes < 0) saveErrno();
+ return nbytes;
+ }
+
+int akg::FileDescriptor::read (void *buffer, int count) throw()
+ {
+ savedErrno = 0;
+ DBTALK("FileDescriptor read: "<<buffer<<" count="<<count);
+ int nbytes = ::read(fileDescriptor,buffer,count);
+ if(nbytes < 0) saveErrno();
+ return nbytes;
+ }
+
+bool akg::FileDescriptor::setNonBlocking(bool nonBlocking) throw()
+ {
+ if(isOpen())
+ { int val = fcntl(fileDescriptor,F_GETFL,0);
+
+ if( nonBlocking) val |= O_NONBLOCK;
+ else val &=~O_NONBLOCK;
+
+ fcntl(fileDescriptor,F_SETFL,val);
+ return true;
+ }
+
+ return false;
+ }
+bool akg::FileDescriptor::isNonBlocking() throw()
+ {
+ if(isOpen())
+ { int val = fcntl(fileDescriptor,F_GETFL,0);
+ return (val & O_NONBLOCK) ? true:false;
+ }
+ return false;
+ }
+
+int akg::FileDescriptor::getErrno() throw()
+ { return savedErrno;
+ }
+void akg::FileDescriptor::saveErrno() throw()
+ { savedErrno=errno;
+ }
+
+
diff --git a/network/akgnet_fdescr.hh b/network/akgnet_fdescr.hh
new file mode 100644
index 0000000..7c41a7a
--- /dev/null
+++ b/network/akgnet_fdescr.hh
@@ -0,0 +1,107 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnet_fdescr.hh
+ *
+ * MODULE: akg network
+ * CLASS: FileDescriptor
+ *
+ * COMMENTS:
+ * Namespace akg
+ *
+*/
+
+#ifndef AKGNET_FDESCR_HH
+#define AKGNET_FDESCR_HH
+
+#include "akgnet_common.hh"
+
+namespace akg
+ {
+
+/**
+ This class the base class for a hierarchie, which are
+ envelopes for the usual OS file descriptors. They offer
+ only that much functionallity as needed for our library
+ The objects of this hierarchie can't be copied!
+*/
+
+class FileDescriptor
+ {
+ public:
+ /// Destructor, if open, closes the file descriptor
+ ~FileDescriptor() throw();
+
+ /// Returns the OS file descriptor
+ int operator()() throw ();
+
+ /** Returns true if the descriptor is open.
+ Be aware that closing the file descriptor by
+ using SO specific functions instead of the
+ methods of this class can lead to incorrect results
+ */
+ bool isOpen() throw();
+
+ /// Closes the descriptor
+ void close() throw();
+
+ /// Returns the error number of the last operation
+ int getErrno() throw();
+
+ /** Writes the specified number of bytes from the
+ specified buffer.
+ Returns the number of bytes actually written
+ */
+ int write(const void *buffer, int count) throw();
+
+ /** Reads the specified number of bytes into the
+ specified buffer.
+ Returns the number of bytes actually read
+ */
+ int read (void *buffer, int count) throw();
+
+ /** Sets the non-blocking mode on or off
+ Returns true o succes
+ */
+ bool setNonBlocking(bool nonBlocking) throw();
+
+ /// Returns true if the descriptors is in non-blocking mode
+ bool isNonBlocking() throw();
+ protected:
+ /// Protected constructor
+ FileDescriptor() throw();
+
+ /// Saves the errno
+ void saveErrno() throw();
+
+ int fileDescriptor;
+ int savedErrno;
+ private:
+ /// unimplemented, objects can't be copied
+ FileDescriptor(const FileDescriptor&);
+ /// unimplemented, objects can't be copied
+ FileDescriptor& operator=(const FileDescriptor&);
+ };
+
+} // namespace
+#endif
diff --git a/network/akgnet_file.cc b/network/akgnet_file.cc
new file mode 100644
index 0000000..aa20e30
--- /dev/null
+++ b/network/akgnet_file.cc
@@ -0,0 +1,51 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * SOURCE: akgnet_file.cc
+ *
+ * MODULE: akg network
+ * CLASS: File
+ *
+ * COMMENTS:
+ *
+ */
+
+#include <assert.h>
+#include<akgnet_file.hh>
+
+akg::File::File() throw()
+ {
+ }
+
+akg::File::File(int osFileDescriptor) throw()
+ {
+ assert(osFileDescriptor > 0);
+ fileDescriptor = osFileDescriptor;
+ }
+
+void akg::File::connectToDescriptor(int osFileDescriptor) throw()
+ {
+ assert(osFileDescriptor > 0);
+ fileDescriptor = osFileDescriptor;
+ }
+
diff --git a/network/akgnet_file.hh b/network/akgnet_file.hh
new file mode 100644
index 0000000..4d1ee23
--- /dev/null
+++ b/network/akgnet_file.hh
@@ -0,0 +1,73 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnet_file.hh
+ *
+ * MODULE: akg network
+ * CLASS: File
+ *
+ * COMMENTS:
+ * Namespace akg
+ *
+*/
+
+#ifndef AKGNET_FILE_HH
+#define AKGNET_FILE_HH
+
+#include "akgnet_fdescr.hh"
+
+namespace akg
+ {
+
+/** This class represents the files in the file descriptor hierarchie.
+ Since this is a network library, our concearn is only for the
+ descriptor of the files. The primary use of this class is access
+ to file descriptors opened in other ways than sockets, like stdin or stdout
+*/
+
+class File : public FileDescriptor
+ {
+ public:
+ /// Default constructor
+ File() throw();
+
+ /** Constructor taking an already opened file descriptor
+ Assert: osFileDescriptor > 0
+ */
+ File(int osFileDescriptor) throw();
+
+ /** Connect to an already opened file descriptor
+ Assert: osFileDescriptor > 0
+ */
+ void connectToDescriptor(int osFileDescriptor) throw();
+
+ private:
+ /// unimplemented, objects of this type can't be copied
+ File(const File&);
+
+ /// unimplemented, objects of this type can't be copied
+ File& operator=(const File&);
+ };
+
+} //namespace
+#endif
diff --git a/network/akgnet_inetaddr.cc b/network/akgnet_inetaddr.cc
new file mode 100644
index 0000000..e66dd47
--- /dev/null
+++ b/network/akgnet_inetaddr.cc
@@ -0,0 +1,181 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * SOURCE: akgnet_inetaddr.cc
+ *
+ * MODULE: akg network
+ * CLASS: HostAddress, SocketAddress
+ *
+ * COMMENTS:
+ *
+*/
+
+#include <akgnet_inetaddr.hh>
+#include <arpa/inet.h>
+#include <string.h>
+#include <assert.h>
+
+akg::HostAddress::HostAddress() throw()
+ {
+ initDefault();
+ }
+
+akg::HostAddress::HostAddress(uint32_t x)
+ {
+ initDefault();
+
+ address.s_addr = htonl(x);
+ struct hostent *host = gethostbyaddr((const char*)&address, sizeof(in_addr), AF_INET);
+ init(host);
+ }
+
+akg::HostAddress::HostAddress(const char *theHostName)
+ {
+ assert(theHostName != 0);
+
+ initDefault();
+
+ struct hostent *host = gethostbyname (theHostName);
+ init(host);
+ }
+
+akg::HostAddress::HostAddress(const akg::HostAddress &ha)
+ {
+ fullHostName = new char[strlen(ha.fullHostName) + 1];
+ strcpy(fullHostName, ha.fullHostName);
+
+ shortHostName = new char[strlen(ha.shortHostName) + 1];
+ strcpy(shortHostName, ha.shortHostName);
+
+ strAddress = new char[strlen(ha.strAddress) + 1];
+ strcpy(strAddress, ha.strAddress);
+ }
+akg::HostAddress::~HostAddress() throw()
+ {
+ if(fullHostName) delete[] fullHostName;
+ if(shortHostName) delete[] shortHostName;
+ if(strAddress) delete[] strAddress;
+ }
+
+bool akg::HostAddress::isValid() const throw()
+ {
+ return address.s_addr == addrNone ? false:true;
+ }
+
+void akg::HostAddress::initDefault() throw()
+ {
+ fullHostName = NULL;
+ shortHostName = NULL;
+ strAddress = NULL;
+ address.s_addr = addrNone;
+ }
+
+// New is supposed to throw
+bool akg::HostAddress::init(hostent *host)
+ {
+ if(host == NULL) return false;
+
+ in_addr *ptr = (in_addr*)host->h_addr_list[0];
+ if(host->h_name == NULL || ptr == NULL) return false;
+
+ fullHostName = new char[ strlen(host->h_name) +1];
+ strcpy(fullHostName,host->h_name);
+
+ char *dotPos = strchr(fullHostName,'.');
+ int copyLen = dotPos ? dotPos-fullHostName : strlen(fullHostName);
+
+ shortHostName = new char[copyLen+1];
+ strncpy(shortHostName,fullHostName,copyLen);
+ shortHostName[copyLen] = 0;
+
+ char *nta = inet_ntoa(*ptr);
+ strAddress = new char[strlen(nta) + 1];
+ strcpy(strAddress,nta);
+ address = *ptr;
+
+ return true;
+ }
+
+const char* akg::HostAddress::getFullHostName() const throw()
+ {
+ return fullHostName;
+ }
+
+const char* akg::HostAddress::getShortHostName() const throw()
+ {
+ return shortHostName;
+ }
+
+uint32_t akg::HostAddress::getAddress() const throw()
+ {
+ return ntohl(address.s_addr);
+ }
+
+const char* akg::HostAddress::getStringAddress() const throw()
+ {
+ return strAddress;
+ }
+
+//############################################################
+
+akg::SocketAddress::SocketAddress() throw()
+ {
+ clear();
+ }
+
+akg::SocketAddress::SocketAddress(sockaddr_in &x) throw()
+ {
+ init(x);
+ }
+
+void akg::SocketAddress::init(sockaddr_in &x) throw()
+ {
+ valid = true;
+ address = x;
+ }
+
+bool akg::SocketAddress::isValid() const throw()
+ {
+ return valid;
+ }
+void akg::SocketAddress::clear() throw()
+ {
+ valid = false;
+ address.sin_family = AF_INET;
+ }
+
+akg::HostAddress akg::SocketAddress::getHostAddress() const throw()
+ {
+ return valid ? HostAddress(getAddress()) : HostAddress();
+ }
+
+uint32_t akg::SocketAddress::getAddress() const throw()
+ {
+ return ntohl(address.sin_addr.s_addr);
+ }
+
+int akg::SocketAddress::getPort() const throw()
+ {
+ return ntohs(address.sin_port);
+ }
+
diff --git a/network/akgnet_inetaddr.hh b/network/akgnet_inetaddr.hh
new file mode 100644
index 0000000..ad9c5c3
--- /dev/null
+++ b/network/akgnet_inetaddr.hh
@@ -0,0 +1,148 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnet_inetaddr.hh
+ *
+ * MODULE: akg network
+ * CLASS: HostAddress, SocketAddress
+ *
+ * COMMENTS:
+ * Namespace akg
+ *
+*/
+#ifndef AKGNET_INETADDR_HH
+#define AKGNET_INETADDR_HH
+
+#include "akgnet_common.hh"
+
+
+namespace akg
+ {
+
+/** This class represents the internet address of a computer and envelops
+ the OS data structure 'in_addr'
+ Important: new is supposed to throw
+*/
+
+
+class HostAddress
+ {
+ public:
+ static const uint32_t addrLocalhost = INADDR_LOOPBACK;
+ static const uint32_t addrBroadcast = INADDR_BROADCAST;
+ static const uint32_t addrAny = INADDR_ANY;
+ static const uint32_t addrNone = INADDR_NONE;
+
+ /// Default constructor, creating an 'invalid' object
+ HostAddress() throw();
+
+ /// Copy constructor
+ HostAddress(const HostAddress&);
+
+ /// Constructor taking a 32-bit internet address
+ HostAddress(uint32_t);
+
+ /** Constructor taking a string representation of the address
+ It can be the the name or the internet address
+ Assert: theHostName != 0
+ */
+ HostAddress(const char* theHostName);
+
+ /// Destructor
+ ~HostAddress() throw();
+
+ /// Returns true if the object was initialized correctly
+ bool isValid() const throw();
+
+ /// Returns the full host name, meaning hostname.domainname
+ const char* getFullHostName() const throw();
+
+ /// Returns the short form of the host name
+ const char* getShortHostName() const throw();
+
+ /// Returns the IP-Address as a 32-bit value
+ uint32_t getAddress() const throw();
+
+ /// Returns the string representation of the IP-Address
+ const char* getStringAddress() const throw();
+
+ private:
+ /// Initializes the object with default values. Used by the constructors
+ void initDefault() throw();
+
+ /// Initializes the object from a OS 'hostent' object. Used by the constructors
+ bool init(hostent *);
+
+ char *fullHostName;
+ char *shortHostName;
+ char *strAddress;
+ in_addr address;
+
+ ///unimplemented
+ HostAddress operator=(const HostAddress&);
+
+ };
+
+
+/** This class represents the IP address of a OS socket and envelops
+ the OS data structure 'sockaddr_in'
+*/
+class SocketAddress
+ {
+ public:
+ /// Default constructor
+ SocketAddress() throw();
+
+ /// Constructor taking a 'sockaddr_in'
+ SocketAddress(sockaddr_in&) throw();
+
+ /// Initialization from a 'sockaddr_in'
+ void init(sockaddr_in&) throw();
+
+ /// Returns true if the object is initialized
+ bool isValid() const throw();
+
+ /// Returns the HostAddress of this socket
+ HostAddress getHostAddress() const throw();
+
+ /// Returns the IP Address
+ uint32_t getAddress() const throw();
+ int getPort() const throw();
+ private:
+ bool valid;
+ void clear() throw();
+
+ private:
+ sockaddr_in address;
+ /*
+ sa_family_t sin_family
+ struct inaddr
+ { uint32_t s_addr;
+ }
+ sin_addr
+ ushort sin_port
+ */
+ };
+
+} // namespace
+#endif
diff --git a/network/akgnet_nbcomm.cc b/network/akgnet_nbcomm.cc
new file mode 100644
index 0000000..92e4ad2
--- /dev/null
+++ b/network/akgnet_nbcomm.cc
@@ -0,0 +1,579 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * SOURCE: akgnet_nbcomm.cc
+ *
+ * MODULE: akg network
+ * CLASS: NbJob, NbServerJob, NbClientJob, NbCommunicator
+ *
+ * COMMENTS:
+ *
+*/
+
+#include <akgnet_nbcomm.hh>
+#include <assert.h>
+
+//### NBJob - static members #########################
+time_t akg::NbJob::timeOutInterv = 30;
+time_t akg::NbJob::currentTime = 0;
+
+void akg::NbJob::setCurrentTime() throw()
+ {
+ currentTime = time(NULL);
+ }
+
+void akg::NbJob::setTimeoutInterval(time_t x) throw()
+ {
+ timeOutInterv = x;
+ }
+
+time_t akg::NbJob::getTimeoutInterval() throw()
+ {
+ return timeOutInterv;
+ }
+
+//####################################################
+akg::NbJob::NbJob(FileDescriptor &fd) throw()
+:fdRef(fd)
+ {
+ status = wks_notdefined;
+ selectorPtr = NULL;
+ currentBufferPtr = NULL;
+ lastActionTime = 0;
+ }
+
+akg::NbJob::~NbJob() throw()
+ {
+ }
+
+akg::NbJob::workingStatus akg::NbJob::getStatus() throw()
+ {
+ return status;
+ }
+
+bool akg::NbJob::isOperationPending() throw()
+ {
+ return (status != wks_notdefined &&
+ status != wks_accepting) ? true:false;
+ }
+
+bool akg::NbJob::isAccepting() throw()
+ {
+ return status == wks_accepting ? true:false;
+ }
+bool akg::NbJob::isReading() throw()
+ {
+ return status == wks_reading ? true:false;
+ }
+bool akg::NbJob::isWriting() throw()
+ {
+ return status == wks_writing ? true:false;
+ }
+bool akg::NbJob::isProcessing() throw()
+ {
+ return status == wks_processing ? true:false;
+ }
+
+bool akg::NbJob::readPartialMessage() throw()
+ {
+ assert(currentBufferPtr != NULL);
+
+ action();
+
+ int nbytes = currentBufferPtr->read(fdRef);
+
+ if(nbytes>0)
+ {
+ DBTALK("..read socket("<<fdRef()<<") "<<nbytes);
+ return validateMessage();
+ }
+
+ else
+ { int saveerrno = fdRef.getErrno();
+ switch(saveerrno)
+ {
+ case EINTR: //DBTALK("EINTR, retry please");
+ break;
+
+ case EAGAIN: //DBTALK("EAGAIN, retry please");
+ break;
+
+ //case 0: DBTALK("Premature End-of-file");
+ // executeOnReadError() ???
+ // break;
+
+ default: DBTALK("Read error "<<saveerrno);
+ executeOnReadError();
+ break;
+ }
+ }
+ return false;
+ }
+
+bool akg::NbJob::writePartialMessage() throw()
+ {
+ assert(currentBufferPtr != NULL);
+
+ action();
+ int nbytes = currentBufferPtr->write(fdRef);
+
+ if(nbytes>0)
+ {
+ DBTALK("..write socket("<<fdRef()<<") "<<nbytes);
+
+ if(currentBufferPtr->getNotSendedSize()==0)
+ {
+ DBTALK("Write ready");
+ executeOnWriteReady();
+ return true;
+ }
+ }
+ else
+ {int saveerrno = fdRef.getErrno();
+ switch(saveerrno)
+ {
+ case EINTR: //DBTALK("EINTR, retry please");
+ break;
+
+ case EAGAIN: //DBTALK("EAGAIN, retry please");
+ break;
+
+ //case 0: DBTALK("Premature partner hang up"); //?? valabil la write
+ // break;
+
+ default: DBTALK("Write error "<<saveerrno);
+ executeOnWriteError();
+ break;
+ }
+ }
+ return false;
+ }
+
+bool akg::NbJob::cleanUpIfTimeout() throw()
+ {
+ if(fdRef.isOpen() == false ) return false;
+ if(lastActionTime + timeOutInterv > currentTime) return false;
+
+ DBTALK("Client socket "<<fdRef()<<" timeout");
+ clearConnection();
+
+ //********************
+ specificCleanUpOnTimeout();
+ //********************
+ return true;
+ }
+
+void akg::NbJob::clearConnection() throw()
+ {
+ if(fdRef.isOpen() && selectorPtr)
+ { selectorPtr->clearRead(fdRef());
+ selectorPtr->clearWrite(fdRef());
+ fdRef.close();
+ }
+ }
+
+void akg::NbJob::action() throw()
+ { lastActionTime = currentTime;
+ }
+
+int akg::NbJob::getSocket() throw()
+ { return fdRef();
+ }
+
+void akg::NbJob::executeOnAccept() throw()
+ {
+ }
+bool akg::NbJob::setReading() throw()
+ {
+ if(selectorPtr == NULL) return false;
+ selectorPtr->setRead(fdRef());
+ status = wks_reading;
+ return true;
+ }
+
+bool akg::NbJob::setWriting() throw()
+ {
+ if(selectorPtr == NULL) return false;
+ selectorPtr->setWrite(fdRef());
+ status = wks_writing;
+ return true;
+ }
+
+int akg::NbJob::getErrno() throw()
+ {
+ return fdRef.getErrno();
+ }
+
+//##################################################################
+akg::NbServerJob::NbServerJob() throw()
+:NbJob(serverSocket)
+ {
+ }
+
+void akg::NbServerJob::initOnAttach(Selector *pSelector) throw()
+ {
+ selectorPtr = pSelector;
+ }
+
+
+akg::NbJob::acceptStatus akg::NbServerJob::acceptConnection(ListenSocket& listenSocket) throw()
+ {
+ DBTALK("Am intrat in accepting");
+ assert(currentBufferPtr != NULL);
+
+ if(status != wks_accepting) return acs_Iambusy;
+ action();
+
+ if(serverSocket.acceptFrom(listenSocket) == false)
+ {
+ int saveerrno = serverSocket.getErrno();
+ if(saveerrno==EAGAIN) DBTALK("No pending connections");
+ else DBTALK("Accept error "<<saveerrno);
+ return acs_nopending;
+ }
+
+ serverSocket.setNonBlocking(true);
+
+ setReading();
+
+ executeOnAccept();
+ DBTALK("Accept: Socket="<<fdRef());
+ return acs_accepted;
+ }
+
+
+akg::SocketAddress akg::NbServerJob::getClientSocketAddress() throw()
+ {
+ return serverSocket.getPeerAddress();
+ }
+
+akg::HostAddress akg::NbServerJob::getClientHostAddress() throw()
+ {
+ return serverSocket.getPeerAddress().getHostAddress();
+ }
+
+void akg::NbServerJob::readyToWriteAnswer() throw()
+ {
+ currentBufferPtr->clearToWrite();
+
+ selectorPtr->clearRead(serverSocket());
+ selectorPtr->setWrite(serverSocket());
+ action();
+
+ status = wks_writing;
+ }
+//##################################################################
+
+akg::NbClientJob::NbClientJob() throw()
+:NbJob(clientSocket)
+ {
+ }
+
+bool akg::NbClientJob::connectToServer(const char* serverHost,int serverPort) throw()
+ {
+ if(clientSocket.open(serverHost,serverPort))
+ {
+ clientSocket.setNonBlocking(true);
+ selectorPtr->setWrite(clientSocket());
+ status = wks_writing;
+ action();
+ return true;
+ }
+ return false;
+ }
+
+void akg::NbClientJob::initOnAttach(Selector *pselector) throw()
+ {
+ selectorPtr = pselector;
+
+ if(status == wks_writing)
+ { selectorPtr->setWrite(clientSocket());
+ }
+ }
+
+akg::NbJob::acceptStatus
+akg::NbClientJob::acceptConnection(ListenSocket&) throw()
+ {
+ // we don't accept connections
+ return acs_Iambusy;
+ }
+
+void akg::NbClientJob::readyToReadAnswer() throw()
+ {
+ currentBufferPtr->clearToRead();
+
+ selectorPtr->clearWrite(clientSocket());
+ selectorPtr->setRead(clientSocket());
+ action();
+
+ status = wks_reading;
+ }
+
+//##################################################################
+akg::NbCommunicator::NbCommunicator() throw()
+ {
+ maxJobs = 0;
+ jobPtr = NULL;
+ }
+akg::NbCommunicator::NbCommunicator(int newMaxJobs)
+ {
+ jobPtr = NULL;
+ initJobs(newMaxJobs);
+ }
+
+bool akg::NbCommunicator::initJobs(int newMaxJobs)
+ {
+ if(jobPtr != NULL) return false;
+ maxJobs = newMaxJobs;
+ jobPtr = new JobPtr[maxJobs];
+
+ for(int i=0;i<maxJobs;i++)
+ { jobPtr[i]=0;
+ }
+ return true;
+ }
+
+akg::NbCommunicator::~NbCommunicator() throw()
+ {
+ if(jobPtr != NULL) delete[] jobPtr;
+ }
+
+bool akg::NbCommunicator::attachJob(NbJob &newJob) throw()
+ {
+ int freeSlot = -1;
+ for(int i=0;i<maxJobs;i++)
+ {
+ if(jobPtr[i]== &newJob) return false; // job e in lista
+ if(jobPtr[i]== NULL && freeSlot ==-1 ) freeSlot = i;
+ }
+ if(freeSlot ==-1 ) return false;
+
+ jobPtr[freeSlot]= &newJob;
+ newJob.initOnAttach(&selector);
+ return true;
+ }
+
+bool akg::NbCommunicator::deattachJob(NbJob &oldJob) throw()
+ {
+ for(int i=0;i<maxJobs;i++)
+ {
+ if(jobPtr[i]== &oldJob)
+ {
+ jobPtr[i] = NULL;
+ oldJob.clearConnection();
+ oldJob.initOnAttach(NULL);
+ return true;
+ }
+ }
+ return false;
+ }
+
+bool akg::NbCommunicator::mayExit() throw()
+ {
+ if(exitRequest == false) return false;
+
+ closeSocket(listenSocket); // we don't accept requests any more
+
+ for(int i=0;i<maxJobs;i++)
+ {
+ if(jobPtr[i] == NULL) continue;
+ if(jobPtr[i]->isOperationPending()) return false; // no, we have pending
+ }
+ return true; // ok, we may exit
+ }
+
+bool akg::NbCommunicator::runServer() throw()
+ {
+ if(listenPort == 0) return false;
+
+ if(initListenSocket(listenPort,true) == false) return false;
+
+ return mainLoop();
+ }
+
+bool akg::NbCommunicator::runClient() throw()
+ {
+ return mainLoop();
+ }
+
+bool akg::NbCommunicator::mainLoop() throw()
+ {
+ exitRequest=false;
+
+ while( mayExit() == false)
+ {
+ DBTALK("Waiting for calls");
+
+ if(executeBeforeSelect() == false) return false;
+
+ int rasp = selector();
+
+ akg::NbJob::setCurrentTime();
+
+ if(executeAfterSelect() == false) return false;
+
+ if(rasp > 0)
+ {
+ DBTALK("Ringing");
+ // first this, to increase the chance to free a client
+ dispatchWriteRequest();
+ connectNewClients();
+ dispatchReadRequest();
+ processJobs();
+ lookForTimeout(); // important!
+ }
+ if(rasp == 0)
+ {
+ DBTALK("Timeout");
+ lookForTimeout();
+ if(executeOnTimeout() == false) return false;
+ }
+ if(rasp < 0)
+ {
+ DBTALK("select error: "<<strerror(errno));
+ }
+ }
+ return true;
+ }
+
+void akg::NbCommunicator::processJobs() throw()
+ {
+ DBTALK("process Jobs - entering");
+
+ for(int i=0;i<maxJobs;i++)
+ {
+ if(jobPtr[i]==NULL) continue;
+
+ JobPtr& currentJob = jobPtr[i];
+
+ if(currentJob->isProcessing())
+ {
+ DBTALK("job "<<i<<" is processing");
+
+ currentJob->processRequest();
+ }
+ }
+ }
+
+void akg::NbCommunicator::lookForTimeout() throw()
+ {
+ DBTALK("Looking for timeout");
+
+ for(int i=0;i<maxJobs;i++)
+ {
+ if(jobPtr[i]==NULL) continue;
+
+ jobPtr[i]->cleanUpIfTimeout();
+ }
+ }
+
+void akg::NbCommunicator::dispatchWriteRequest() throw()
+ {
+ DBTALK("Dispatch writing");
+ int i;
+ for(i=0;i<maxJobs;i++)
+ {
+ if(jobPtr[i]==NULL) continue;
+
+ JobPtr& currentJob = jobPtr[i];
+
+ if(currentJob->isWriting())
+ {
+ DBTALK("job "<<i<<' '<<currentJob->getSocket()<<" is active");
+ if(selector.isWrite(currentJob->getSocket()))
+ {
+ DBTALK("...and may write ");
+ currentJob->writePartialMessage();
+ }
+ }
+ }
+ }
+
+void akg::NbCommunicator::dispatchReadRequest() throw()
+ {
+ DBTALK("Dispatch reading");
+ int i;
+ for(i=0;i<maxJobs;i++)
+ {
+ if(jobPtr[i]==NULL) continue;
+
+ JobPtr& currentJob = jobPtr[i];
+
+ if(currentJob->isReading())
+ {
+ DBTALK("job "<<i<<' '<<currentJob->getSocket()<<" is active");
+ if(selector.isRead(currentJob->getSocket()))
+ {
+ DBTALK("... and has message");
+ currentJob->readPartialMessage();
+ }
+ }
+ }
+ }
+
+void akg::NbCommunicator::connectNewClients() throw()
+ {
+ DBTALK("connect listenSocket="<<listenSocket());
+
+ if(selector.isRead(listenSocket()) == false) return;
+
+ DBTALK("Client is calling");
+
+ akg::NbJob::acceptStatus status;
+
+ for(int i=0;i<maxJobs;i++)
+ {
+ if(jobPtr[i] == NULL) continue;
+
+ JobPtr& currentJob = jobPtr[i];
+
+ if(currentJob->isAccepting())
+ {
+ // we try to connect as much pending connections as possible
+ status = currentJob->acceptConnection(listenSocket);
+
+ if(status == akg::NbJob::acs_nopending ) break;
+ // there is no pending request,
+ DBTALK("Connected client "<<i<<" on socket "<<currentJob->getSocket());
+ }
+ }
+ }
+
+bool akg::NbCommunicator::executeBeforeSelect() throw()
+ {
+ // false means server exit immediately
+ return true;
+ }
+
+bool akg::NbCommunicator::executeAfterSelect() throw()
+ {
+ // false means server exit immediately
+ return true;
+ }
+
+bool akg::NbCommunicator::executeOnTimeout() throw()
+ {
+ // false means server exit immediately
+ return true;
+ }
+
diff --git a/network/akgnet_nbcomm.hh b/network/akgnet_nbcomm.hh
new file mode 100644
index 0000000..441d8e4
--- /dev/null
+++ b/network/akgnet_nbcomm.hh
@@ -0,0 +1,370 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnet_nbcomm.hh
+ *
+ * MODULE: akg network
+ * CLASS: NbJob, NbServerJob, NbClientJob, NbCommunicator
+ *
+ * COMMENTS:
+ * Namespace akg
+ *
+*/
+
+#ifndef AKGNET_NBCOMM_HH
+#define AKGNET_NBCOMM_HH
+
+#include "akgnet_server.hh"
+
+namespace akg
+ {
+
+/** Base class for non-blocking communication jobs
+*/
+
+class NbJob
+ {
+ public:
+ /** Static function for setting the current time. Used
+ for marking the last action time, so timeout can be monitorized
+ */
+ static void setCurrentTime() throw();
+
+ /** Static function for setting the timeout interval
+ We use the same timeout for all jobs because the
+ server doesn't do any distinction between jobs
+ */
+ static void setTimeoutInterval(time_t x) throw();
+
+ /// Returns the timeout interval set for the jobs
+ static time_t getTimeoutInterval() throw();
+
+ public:
+ /// Status regarding accepting a new job
+ enum acceptStatus
+ { acs_nopending = 0,
+ acs_Iambusy = 1,
+ acs_accepted = 2
+ };
+ /// Status during the lifetime of a job
+ enum workingStatus
+ { wks_notdefined = 0,
+ wks_accepting = 1, // job is ready to accept a connection
+ wks_reading = 2, // job is reading data
+ wks_writing = 3, // job is writing data
+ wks_processing = 4 // job is processing the request
+ };
+
+ virtual ~NbJob() throw();
+ /// Returns the working status
+ workingStatus getStatus() throw();
+
+ /** Returns true if there is an operation in progress
+ this means reading, writing or processing
+ */
+ bool isOperationPending() throw();
+
+ /// Returns true if the job is ready to accept a connection
+ bool isAccepting() throw();
+
+ /// Returns true if the job is reading data
+ bool isReading() throw();
+
+ /// Returns true if the job is writing data
+ bool isWriting() throw();
+
+ /// Returns true if the job is processing
+ bool isProcessing() throw();
+
+ /** Pure function to do initialization when attached to a Selector
+ Don't throw!
+ */
+ virtual void initOnAttach(Selector *pselector) throw() =0;
+
+ /** Pure function to do initialization when accepting a connection
+ Returns:
+ - acs_nopending when there is connection pending
+ - acs_Iambusy when the job can't accept this connection
+ - acs_accepted when the connection was accepted
+ Assert:
+ the 'currentBufferPtr' is initialized. This would be a software error
+ Don't throw!
+ */
+ virtual acceptStatus acceptConnection(ListenSocket& listenSocket) throw() =0;
+
+ /** Reads as much data as it can. After every read it calls the
+ 'validateMessage()' function and returns whatever this function returns.
+ If there is a read error, other than EINTR or EAGAIN, the function
+ 'executeOnReadError()' is called
+ Returns 'true' if the message is completelly red
+ Returns 'false' if there should be some more data
+ */
+ bool readPartialMessage() throw();
+
+ /** Writes as much data as it can. After writing all data the function
+ 'executeOnWriteReady()' is called.
+ If there is a write error, other than EINTR or EAGAIN, the function
+ 'executeOnWriteError()' is called
+ Returns 'true' if the message is completelly written
+ Returns 'false' if there should be some more data to write
+ */
+ bool writePartialMessage() throw();
+
+ /// Clears the connection - closes the socket and removes it from the Selector
+ void clearConnection() throw();
+
+ /// Returns the OS file descriptor for the socket
+ int getSocket() throw();
+
+ /// Returns the errno of the last socket operation
+ int getErrno() throw();
+ //######################################
+ /** Virtual function to clean up if timeout occured
+ This version returns 'false' if no timeout or no connection
+ If timeout it clears the connection and calls
+ 'specificCleanUpOnTimeout()'
+ Don't throw!
+ */
+ virtual bool cleanUpIfTimeout() throw();
+
+ /** Pure function to process the request
+ It has to set the appropriate status, so the server
+ knows how to continue with this job
+ Don't throw!
+ */
+ virtual void processRequest() throw() =0;
+ //######################################
+ protected:
+ /// called after every read, returns 'true' if the message is all here
+ virtual bool validateMessage() throw() =0;
+
+ /// called when client is accepted, default does nothing
+ virtual void executeOnAccept() throw();
+
+ /// called when message is written
+ virtual void executeOnWriteReady() throw() =0;
+
+ /// called when timeout, it has to set the status apropriate and do other cleanup
+ virtual void specificCleanUpOnTimeout() throw() =0;
+
+ /// called when a read error occurs, usual a message and clean up
+ virtual void executeOnReadError() throw() =0;
+
+ /// called when a write error occurs, usual a message and clean up
+ virtual void executeOnWriteError() throw() =0;
+ //######################################
+ protected:
+ /// Protected constructor, taking a FileDescriptor, usually a Socket
+ NbJob(FileDescriptor&) throw() ;
+
+ /// Helper function for setting the job in read modus
+ bool setReading() throw();
+
+ /// Helper function for setting the job in write modus
+ bool setWriting() throw();
+ workingStatus status;
+
+ /** Reference to a FileDescriptor, usually a Socket. It has to be provided by the
+ derived class
+ */
+ FileDescriptor &fdRef;
+
+ /** Pointer to a Selector, which has to be provided by the Server object
+ to which this job is attached
+ */
+ Selector *selectorPtr;
+
+ /** Pointer to a CommBuffer, which has to be provided by the derived class
+ */
+ CommBuffer *currentBufferPtr;
+
+ // for timeout
+ time_t lastActionTime;
+
+ /// Helper function which marks the current moment, so timeout counter is reset
+ void action() throw();
+
+ static time_t timeOutInterv;
+ static time_t currentTime;
+ };
+
+/* Base class for generic non-blocking server jobs
+ */
+class NbServerJob : public NbJob
+ {
+ public:
+ /// Default constructor
+ NbServerJob() throw();
+
+ /** The version for servers, it just initializes the 'Selector*'
+ It doesn't have to be overloaded, it's OK for servers
+ */
+ void initOnAttach(Selector *pselector) throw();
+
+ /** The version for servers
+ It doesn't have to be overloaded, it's OK for most servers
+ */
+ acceptStatus acceptConnection(ListenSocket& listenSocket) throw();
+
+ /// Returns the SocketAddress of the client
+ SocketAddress getClientSocketAddress() throw();
+
+ /// Returns the HostAddress of the client
+ HostAddress getClientHostAddress() throw();
+ protected:
+
+ /// helper function, call it in "processRequest" to switch to writing
+ void readyToWriteAnswer() throw();
+
+ ServerSocket serverSocket;
+ };
+
+/* Base class for generic non-blocking client jobs
+ */
+class NbClientJob : public NbJob
+ {
+ public:
+ /// Default constructor
+ NbClientJob() throw();
+
+ /// Returns 'true' if connection succeded
+ bool connectToServer(const char* serverHost, int serverPort) throw();
+
+
+ /** The version for clients, it initializes the 'Selector*'
+ and prepares for writing. It has to be called AFTER the
+ connection to the server succeded!
+ It doesn't have to be overloaded, it's OK for most clients
+ */
+ void initOnAttach(Selector *pselector) throw();
+
+ /** The version for clients. It just returns 'acs_Iambusy' since
+ clients don't accept connections
+ It doesn't have to be overloaded, it's OK for most clients
+ */
+ acceptStatus acceptConnection(ListenSocket& listenSocket) throw();
+ protected:
+
+ /// helper function, call it in 'executeOnWriteReady()' to switch to reading
+ void readyToReadAnswer() throw();
+
+ ClientSocket clientSocket;
+ };
+
+/**
+ The heart of the non-blocking communication. It's derived from GenericServer but
+ it is called 'Communicator' since it not only for servers but also for clients
+ You can use this class for most communication purposes, special ones have to
+ reimplement 'executeBeforeSelect()' or 'executeOnTimeout()'
+ A better implementation should use 'vector'
+ Important: new is supposed to throw!
+ */
+
+class NbCommunicator : public GenericServer
+ {
+ public:
+ /// Default constructor
+ NbCommunicator() throw();
+
+ /// Constructor setting also the maximal number of simultan jobs
+ NbCommunicator(int newMaxJobs);
+
+ /// Destructor
+ ~NbCommunicator() throw();
+
+ /// Sets the maximal number of simultan jobs
+ bool initJobs(int newMaxJobs);
+
+ /// returns the maximal number of simultan jobs
+ int getMaxJobs() throw();
+
+ /** Attaches a new job. Returns 'true' if succeded, 'false' if the
+ job is already attached or if the maximal number of jobs
+ is already attached
+ */
+ bool attachJob(NbJob&) throw();
+
+ /** Deattach job. Returns 'true' if succeded, 'false' if the
+ job is not attached
+ */
+ bool deattachJob(NbJob&) throw();
+
+ /// This runs the main loop for servers, this means it initializes the listen socket first
+ bool runServer() throw();
+
+ /// This runs the main loop for clients, this means without initializing the listen socket
+ bool runClient() throw();
+ protected:
+ /** Called before select, if it returns 'false' the loop exits.
+ This version just returns 'true'
+ */
+ virtual bool executeBeforeSelect() throw();
+
+ /** Called after select, if it returns 'false' the loop exits.
+ This version just returns 'true'
+ */
+ virtual bool executeAfterSelect() throw();
+ /** Called if select times out, if it returns 'false' the loop exits.
+ This version just returns 'true'
+ */
+ virtual bool executeOnTimeout() throw();
+ private:
+ typedef NbJob *JobPtr;
+
+ JobPtr *jobPtr;
+ int maxJobs;
+
+ /** The main loop of the communication: it waits for clients, dispatches them to the
+ accepting jobs, then offers the jobs the possibility to read, process and write
+ It returns 'false' only if 'executeBeforeSelect()' or 'executeOnTimeout()'
+ return 'false'
+ Otherwise it returns 'true'
+ */
+ bool mainLoop() throw();
+
+ /// Helper function for dispatching read requests
+ void dispatchReadRequest() throw();
+
+ /// Helper function for dispatching write requests
+ void dispatchWriteRequest() throw();
+
+ /// Helper function for dispatching connect requests
+ void connectNewClients() throw();
+
+ /// Helper function which looks for timeouted jobs
+ void lookForTimeout() throw();
+
+ /// Helper function which calls 'processRequest()' of all jobs that are processing
+ void processJobs() throw();
+
+ /** Helper function which returns 'true' if somebody called 'shouldExit()'
+ and there is no job which processes anything. But it closes the listen socket
+ so no more jobs are accepted and returns 'true' when all jobs finish
+ */
+ bool mayExit() throw();
+
+ };
+
+} //namespace
+#endif
+
diff --git a/network/akgnet_selector.cc b/network/akgnet_selector.cc
new file mode 100644
index 0000000..7ad2f64
--- /dev/null
+++ b/network/akgnet_selector.cc
@@ -0,0 +1,97 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+*/
+
+#include "akgnet_selector.hh"
+
+
+akg::Selector::Selector() throw()
+ {
+ FD_ZERO(& watchReadFdSet);
+ FD_ZERO(& watchWriteFdSet);
+ FD_ZERO(& watchExceptFdSet);
+ tvptr = NULL;
+ }
+void akg::Selector::setTimeout(int sec,int milisec) throw()
+ { tvinit.tv_sec=sec;
+ tvinit.tv_usec=milisec;
+ tvptr=&tv; // yes, yes, &tv
+ }
+void akg::Selector::disableTimeout() throw()
+ { tvptr=NULL;
+ }
+void akg::Selector::setRead(int fdescr) throw()
+ {
+ if(fdescr < 0) return;
+ FD_SET(fdescr,&watchReadFdSet);
+ }
+void akg::Selector::clearRead(int fdescr) throw()
+ {
+ if(fdescr < 0) return;
+ FD_CLR(fdescr,&watchReadFdSet);
+ }
+void akg::Selector::setWrite(int fdescr) throw()
+ {
+ if(fdescr < 0) return;
+ FD_SET(fdescr,&watchWriteFdSet);
+ }
+void akg::Selector::clearWrite(int fdescr) throw()
+ {
+ if(fdescr < 0) return;
+ FD_CLR(fdescr,&watchWriteFdSet);
+ }
+
+int akg::Selector::operator()() throw()
+ {
+ resultReadFdSet =watchReadFdSet;
+ resultWriteFdSet=watchWriteFdSet;
+ // error unused
+ // tv has to be reloaded every time; if tvptr is NULL it doesn't matter
+ tv.tv_sec = tvinit.tv_sec;
+ tv.tv_usec = tvinit.tv_usec;
+
+ return select(FD_SETSIZE,&resultReadFdSet,&resultWriteFdSet,NULL,tvptr);
+ }
+
+bool akg::Selector::isRead(int fdescr) throw()
+ {
+ if(fdescr < 0) return false;
+ return FD_ISSET(fdescr,&resultReadFdSet);
+ }
+bool akg::Selector::isWrite(int fdescr) throw()
+ {
+ if(fdescr < 0) return false;
+ return FD_ISSET(fdescr,&resultWriteFdSet);
+ }
+
+void akg::Selector::closeForcedAllFileDescriptors() throw()
+ {
+ for(int i=0;i<FD_SETSIZE;i++)
+ {
+ if(FD_ISSET(i,&watchReadFdSet) || FD_ISSET(i,&watchWriteFdSet))
+ {
+ close(i);
+ }
+ }
+ }
+
+
diff --git a/network/akgnet_selector.hh b/network/akgnet_selector.hh
new file mode 100644
index 0000000..fde8462
--- /dev/null
+++ b/network/akgnet_selector.hh
@@ -0,0 +1,95 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnet_selector.hh
+ *
+ * MODULE: akg network
+ * CLASS: Selector
+ *
+ * COMMENTS:
+ * Namespace akg
+ *
+*/
+
+#ifndef AKGNET_SELECTOR
+#define AKGNET_SELECTOR
+
+#include "akgnet_common.hh"
+
+namespace akg
+ {
+
+/** This class envelops the 'select' function of the C-library
+*/
+
+class Selector
+ {
+ public:
+ /// Default constructor
+ Selector() throw();
+
+ /// Sets the timeout interval
+ void setTimeout(int sec,int milisec) throw();
+
+ /// Disables the timeout
+ void disableTimeout() throw();
+
+ /// Registers the file descriptor for reading
+ void setRead(int fdescr) throw();
+ /// Unregisters the file descriptor from reading
+ void clearRead(int fdescr) throw();
+
+ /// Registers the file descriptor for writing
+ void setWrite(int fdescr) throw();
+ /// Unregisters the file descriptor from writing
+ void clearWrite(int fdescr) throw();
+
+ /// The real 'select' operation. The return value is the one of 'select'
+ int operator()() throw();
+
+ /// Returns true if the file descriptor is registered for read
+ bool isRead(int fdescr) throw();
+ /// Returns true if the file descriptor is registered for write
+ bool isWrite(int fdescr) throw();
+
+ /** Closes all file descriptors. Usefull when forking so
+ child processes don't inherit this file descriptors
+ */
+ void closeForcedAllFileDescriptors() throw();
+ private:
+ fd_set watchReadFdSet;
+ fd_set watchWriteFdSet;
+ fd_set watchExceptFdSet; // unused but ...
+
+ fd_set resultReadFdSet;
+ fd_set resultWriteFdSet;
+ fd_set resultExceptFdSet; // unused but ...
+
+ struct timeval tvinit;
+ struct timeval tv;
+ timeval *tvptr;
+
+ };
+
+} // namespace
+#endif
diff --git a/network/akgnet_server.cc b/network/akgnet_server.cc
new file mode 100644
index 0000000..8fcc5e0
--- /dev/null
+++ b/network/akgnet_server.cc
@@ -0,0 +1,173 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * SOURCE: akgnet_server.cc
+ *
+ * MODULE: akg network
+ * CLASS: GenericServer, BlockingServer
+ *
+ * COMMENTS:
+ *
+ */
+
+#include "debug.hh"
+
+#include <akgnet_server.hh>
+
+//#include<iostream>
+
+
+akg::GenericServer::GenericServer() throw()
+ {
+ listenPort = 0;
+ exitRequest = false;
+ }
+akg::GenericServer::~GenericServer() throw()
+ {
+ }
+
+void akg::GenericServer::setListenPort(int x) throw()
+ {
+ listenPort = x;
+ }
+int akg::GenericServer::getListenPort() throw()
+ {
+ return listenPort;
+ }
+
+void akg::GenericServer::setTimeout(int sec,int milisec) throw()
+ { selector.setTimeout(sec,milisec);
+ }
+void akg::GenericServer::disableTimeout() throw()
+ { selector.disableTimeout();
+ }
+
+void akg::GenericServer::shouldExit() throw()
+ {
+ exitRequest = true;
+ }
+
+
+bool akg::GenericServer::initListenSocket(int port, bool nonblocking) throw()
+ {
+ if(listenSocket.open(port) == false) return false;
+
+ if(nonblocking) listenSocket.setNonBlocking(true);
+
+ selector.setRead(listenSocket());
+
+ DBTALK("Listen socket="<<listenSocket());
+ return true;
+ }
+
+bool akg::GenericServer::connectNewClient(ServerSocket &s) throw()
+ {
+ if(s.acceptFrom(listenSocket)==false) return false;
+
+ selector.setRead(s());
+
+ return true;
+ }
+void akg::GenericServer::closeSocket(Socket &s) throw()
+ {
+ if(s.isOpen())
+ { selector.clearRead(s());
+ selector.clearWrite(s());
+ s.close();
+ }
+ }
+
+
+//###########################
+
+akg::BlockingServer::BlockingServer() throw()
+ {
+ }
+akg::BlockingServer::~BlockingServer() throw()
+ {
+ }
+
+bool akg::BlockingServer::runServer() throw()
+{
+ ENTER( "akg::BlockingServer::runServer" );
+
+ if(listenPort == 0)
+ {
+ LEAVE( "akg::BlockingServer::runServer, listenPort=0, result=false" );
+ return false;
+ }
+
+ if(initListenSocket(listenPort,false)==false)
+ {
+ LEAVE( "akg::BlockingServer::runServer, Error: init socket failed for port " << listenPort << ", result=false" );
+ return false;
+ }
+
+ while(exitRequest == false)
+ {
+ int rasp = selector();
+ TALK( "rasp=" << rasp );
+ if(rasp>0)
+ {
+ if(serverSocket.isOpen())
+ {
+ TALK( "socket is open." );
+ if(selector.isRead(serverSocket()))
+ {
+ TALK( "socket is readable, executing request." );
+ executeRequest(serverSocket);
+ TALK( "closing socket." );
+ closeSocket(serverSocket);
+ TALK( "socket closed." );
+ }
+ }
+ else if(selector.isRead(listenSocket()))
+ {
+ TALK( "socket not open, but readable (???). connecting new client." );
+ connectNewClient(serverSocket);
+ TALK( "after client connect." );
+ // we don't care why it could fail
+ }
+ else
+ {
+ TALK( "no read socket - should never have reached this point." );
+ }
+ }
+
+ if(rasp == 0)
+ {
+ TALK( "exec timeout" );
+ executeTimeout();
+ }
+
+ if(rasp<0)
+ {
+ TALK( "Internal connect error: bad selector." );
+ }
+ }
+
+ LEAVE( "akg::BlockingServer::runServer, result=true" );
+ return true;
+}
+
+
diff --git a/network/akgnet_server.hh b/network/akgnet_server.hh
new file mode 100644
index 0000000..40d9129
--- /dev/null
+++ b/network/akgnet_server.hh
@@ -0,0 +1,157 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnet_server.hh
+ *
+ * MODULE: akg network
+ * CLASS: GenericServer, BlockingServer
+ *
+ * COMMENTS:
+ * Namespace akg
+ *
+*/
+
+#ifndef AKGNET_SERVER_HH
+#define AKGNET_SERVER_HH
+
+//#### Includes #################################
+
+#include "akgnet_socket.hh"
+#include "akgnet_selector.hh"
+#include "akgnet_commbuffer.hh"
+
+//###############################################
+
+
+namespace akg
+ {
+/** Abstract base class for servers. Offers basic functionality
+ for opening the listen socket and accepting a new connection
+ and other helper functions for more evoluate servers
+*/
+class GenericServer
+ {
+ public:
+ /// Default constructor
+ GenericServer() throw();
+
+ /// Destructor
+ virtual ~GenericServer() throw();
+
+ //*************************
+ /** Pure function to run the server. Has to initialize
+ the listen socket, than makes a loop by listening,
+ accepting and dispatching the connection
+ for processing. It should'n throw, it has to handle
+ correcty every exception
+ */
+ virtual bool runServer() throw() =0;
+ //*************************
+
+ /// Instructs the server to leave the loop (runServer())
+ void shouldExit() throw();
+
+ /// Sets the listen port
+ void setListenPort(int) throw();
+
+ /// Returns the listen port
+ int getListenPort() throw();
+
+ /** Sets the timeout, how much time the selector should
+ wait for incomming requests
+ */
+ void setTimeout(int sec,int milisec) throw();
+
+ /// Disables timeout, means wait unlimited
+ void disableTimeout() throw();
+
+ protected:
+ /// Init the listen socket
+ bool initListenSocket(int port, bool nonblocking) throw();
+
+ /** Connects a new client by accepting the connection
+ and setting the ServerSocket in read modus
+ */
+ bool connectNewClient(ServerSocket&) throw();
+
+ /** Closes the given Socket and removes it
+ from the Selector
+ */
+ void closeSocket(Socket&) throw();
+
+ ListenSocket listenSocket;
+ int listenPort;
+
+ Selector selector;
+
+ bool exitRequest;
+
+ private:
+ /// unimplemented, objects of this type can't be copied
+ GenericServer(const GenericServer&);
+ /// unimplemented, objects of this type can't be copied
+ GenericServer& operator=(const GenericServer&);
+ };
+
+
+/** Base class for a simple blocking server, capable
+ of dealing with a single client. Don't use except for
+ very simple cases.
+ This version doesn't care much about errors
+*/
+class BlockingServer : public GenericServer
+ {
+ public:
+ /// Default constructor
+ BlockingServer() throw();
+ /// Destructor
+ ~BlockingServer() throw();
+
+ /** runs the server. Accepts only one connection
+ and blocks until the request is done
+ */
+ bool runServer() throw();
+ protected:
+ //************************************************
+ /** Pure function to process the request. It has to read,
+ process and write the answer, because afterwards
+ the socket is closed. Don't throw!
+ */
+ virtual void executeRequest(ServerSocket&) throw() =0;
+
+ /** Pure function to execute on timeout. Don't throw!
+ */
+ virtual void executeTimeout() throw() =0;
+ //************************************************
+ private:
+ ServerSocket serverSocket;
+
+ /// unimplemented, objects of this type can't be copied
+ BlockingServer(const BlockingServer&);
+ /// unimplemented, objects of this type can't be copied
+ BlockingServer& operator=(const BlockingServer&);
+ };
+
+
+} //namespace
+#endif
diff --git a/network/akgnet_socket.cc b/network/akgnet_socket.cc
new file mode 100644
index 0000000..91d7d6d
--- /dev/null
+++ b/network/akgnet_socket.cc
@@ -0,0 +1,174 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * SOURCE: akgnet_socket.cc
+ *
+ * MODULE: akg network
+ * CLASS: Socket, ListenSocket, ServerSocket, ClientSocket
+ *
+ * COMMENTS:
+ *
+ */
+
+#include <assert.h>
+#include <akgnet_socket.hh>
+
+akg::Socket::Socket() throw()
+ {
+ }
+
+bool akg::Socket::createTcpSocket() throw()
+ {
+ struct protoent *getprotoptr = getprotobyname("tcp");
+ fileDescriptor= socket(PF_INET,SOCK_STREAM,getprotoptr->p_proto);
+ if(fileDescriptor<0)
+ { saveErrno();return false;}
+ return true;
+ }
+
+akg::SocketAddress akg::Socket::getAddress() throw()
+ {
+ akgSocklen_t size = sizeof(sockaddr_in);
+ sockaddr_in buffer;
+
+ getsockname(fileDescriptor, (sockaddr*)&buffer,&size);
+
+ return SocketAddress(buffer);
+ }
+
+akg::SocketAddress akg::Socket::getPeerAddress() throw()
+ {
+ akgSocklen_t size = sizeof(sockaddr_in);
+ sockaddr_in buffer;
+
+ int rasp=getpeername(fileDescriptor, (sockaddr*)&buffer,&size);
+ saveErrno();
+ return rasp != -1 ? SocketAddress(buffer) : SocketAddress();
+ }
+
+//###########################################################
+
+akg::ListenSocket::ListenSocket() throw()
+ {
+ queuesize = SOMAXCONN;
+ }
+
+akg::ListenSocket::~ListenSocket() throw()
+ {
+ }
+
+bool akg::ListenSocket::open(int port) throw()
+ {
+ close();
+ if(createTcpSocket() == false) return false;
+
+ struct sockaddr_in internetAddress;
+ internetAddress.sin_family = AF_INET;
+ internetAddress.sin_port = htons(port);
+ internetAddress.sin_addr.s_addr= htonl(INADDR_ANY);
+
+#ifdef SO_REUSEADDR
+ int val = 1;
+ int len = sizeof( val );
+ if(setsockopt( fileDescriptor, SOL_SOCKET, SO_REUSEADDR, (char*)&val, len ))
+ {
+ DBTALK("Can't set address reusable: "<<strerror(errno));
+ }
+#endif
+
+ if(bind(fileDescriptor,(sockaddr*)&internetAddress,sizeof(sockaddr_in)) <0)
+ { saveErrno();return false;}
+
+ if(listen(fileDescriptor,queuesize) < 0)
+ { saveErrno();return false;}
+
+ return true;
+ }
+
+void akg::ListenSocket::setQueueSize(int newSize) throw()
+ {
+ assert(newSize > 0);
+ queuesize = newSize < SOMAXCONN ? newSize : SOMAXCONN;
+ }
+
+int akg::ListenSocket::getQueueSize() throw()
+ { return queuesize;
+ }
+
+
+//###########################################################
+akg::ServerSocket::ServerSocket() throw()
+ {
+ }
+
+akg::ServerSocket::~ServerSocket() throw()
+ {
+ }
+
+bool akg::ServerSocket::acceptFrom(ListenSocket& listenSocket) throw()
+ {
+ close();
+ struct sockaddr_in internetAddress;
+ akgSocklen_t size=sizeof(sockaddr_in);
+
+ savedErrno = 0;
+ fileDescriptor=accept(listenSocket(),(struct sockaddr*)&internetAddress,&size);
+ if(fileDescriptor < 0) { saveErrno();return false;}
+
+ return true;
+ }
+
+//###########################################################
+
+akg::ClientSocket::ClientSocket() throw()
+ {
+ }
+akg::ClientSocket::~ClientSocket() throw()
+ {
+ }
+
+bool akg::ClientSocket::open(const char *serverHost,int serverPort) throw()
+ {
+
+ close();
+ savedErrno=0;
+
+ if(createTcpSocket() == false) return false;
+
+ struct hostent *hostinfo=gethostbyname(serverHost);
+ if(hostinfo==NULL)
+ { saveErrno();return false;
+ }
+
+ sockaddr_in internetAddress;
+
+ internetAddress.sin_family = AF_INET;
+ internetAddress.sin_port = htons(serverPort);
+ internetAddress.sin_addr = *(struct in_addr*)hostinfo->h_addr;
+
+ if(connect(fileDescriptor,(struct sockaddr*)&internetAddress,sizeof(sockaddr_in)) < 0)
+ { saveErrno();return false;
+ }
+ return true;
+ }
+
diff --git a/network/akgnet_socket.hh b/network/akgnet_socket.hh
new file mode 100644
index 0000000..c17039d
--- /dev/null
+++ b/network/akgnet_socket.hh
@@ -0,0 +1,147 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnet_socket.hh
+ *
+ * MODULE: akg network
+ * CLASS: Socket, ListenSocket, ServerSocket, ClientSocket
+ *
+ * COMMENTS:
+ * Namespace akg
+ *
+ */
+
+#ifndef AKGNET_SOCKET_HH
+#define AKGNET_SOCKET_HH
+
+#include "akgnet_fdescr.hh"
+#include "akgnet_inetaddr.hh"
+
+namespace akg
+ {
+
+/** This class represents the sockets in the file descriptor hierarchie.
+ Socket is the base class of the socket hierarchie, which contains also
+ ListenSocket, ServerSocket and ClientSocket
+*/
+
+class Socket : public FileDescriptor
+ {
+ public:
+ /// Default constructor
+ Socket() throw();
+
+ /// Returns the SocketAddress of this socket
+ SocketAddress getAddress() throw();
+
+ /** Returns the SocketAddress of the peer. If the Socket is not connected
+ returns the SocketAddress of this socket
+ */
+ SocketAddress getPeerAddress() throw();
+ protected:
+ /// helper function to initialize this Socket as a TCP/IP socket
+ bool createTcpSocket() throw();
+
+ private:
+ /// unimplemented, objects of this type can't be copied
+ Socket(const Socket&);
+ /// unimplemented, objects of this type can't be copied
+ Socket& operator=(const Socket&);
+ };
+
+/** ListenSocket - socket for servers, to listen for clients
+*/
+class ListenSocket : public Socket
+ {
+ public:
+ /// Default constructor
+ ListenSocket() throw();
+
+ /// Destructor, closes, indirectly, the socket
+ ~ListenSocket() throw();
+
+ /// Opens the listen socket. Returns true if succes
+ bool open(int port) throw();
+
+ /** Sets the OS queue size for this socket. Maximal size is SOMAXCONN
+ Assert: newSize > 0
+ */
+ void setQueueSize(int newSize) throw();
+
+ /// Returns the OS queue size for this socket
+ int getQueueSize() throw();
+
+ private:
+ int queuesize;
+
+ /// unimplemented, objects of this type can't be copied
+ ListenSocket(const ListenSocket&);
+ /// unimplemented, objects of this type can't be copied
+ ListenSocket& operator=(const ListenSocket&);
+ };
+
+/** ServerSocket - socket for servers, to communicate with clients
+ It opens by accepting a pending connection from a ListenSocket
+*/
+
+class ServerSocket : public Socket
+ {
+ public:
+ /// Default constructor
+ ServerSocket() throw();
+
+ /// Destructor
+ ~ServerSocket() throw();
+
+ /// Accepts a pending connection from a ListenSocket. Returns true on succes
+ bool acceptFrom(ListenSocket&) throw();
+ private:
+ /// unimplemented, objects of this type can't be copied
+ ServerSocket(const ServerSocket&);
+ /// unimplemented, objects of this type can't be copied
+ ServerSocket& operator=(const ServerSocket&);
+ };
+
+/** ClientSocket - socket for clients, to communicate with servers
+*/
+class ClientSocket : public Socket
+ {
+ public:
+ /// Default constructor
+ ClientSocket() throw();
+
+ /// Destructor
+ ~ClientSocket() throw();
+
+ /// Opens the connection with the given server. Returns true on succes
+ bool open(const char *serverHost,int serverPort) throw();
+ private:
+ /// unimplemented, objects of this type can't be copied
+ ClientSocket(const ClientSocket&);
+ /// unimplemented, objects of this type can't be copied
+ ClientSocket& operator=(const ClientSocket&);
+ };
+
+
+} // namespace
+#endif
diff --git a/network/akgnetwork.hh b/network/akgnetwork.hh
new file mode 100644
index 0000000..7a34444
--- /dev/null
+++ b/network/akgnetwork.hh
@@ -0,0 +1,56 @@
+/*
+* This file is part of rasdaman community.
+*
+* Rasdaman community is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Rasdaman community is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with rasdaman community. If not, see <http://www.gnu.org/licenses/>.
+*
+* Copyright 2003, 2004, 2005, 2006, 2007, 2008, 2009 Peter Baumann /
+rasdaman GmbH.
+*
+* For more information please see <http://www.rasdaman.org>
+* or contact Peter Baumann via <baumann@rasdaman.com>.
+/
+/**
+ * INCLUDE: akgnetwork.hh
+ *
+ * MODULE: akg network
+ * CLASS:
+ *
+ * COMMENTS:
+ * Namespace akg
+ * Include this file if you need services from the akgnetwork package
+ *
+ */
+
+#ifndef AKGNETWORK_HH
+#define AKGNETWORK_HH
+
+#include "akgnet_common.hh"
+
+#include "akgnet_fdescr.hh"
+
+#include "akgnet_file.hh"
+
+#include "akgnet_selector.hh"
+
+#include "akgnet_commbuffer.hh"
+
+#include "akgnet_socket.hh"
+
+#include "akgnet_server.hh"
+
+#include "akgnet_nbcomm.hh"
+
+#include "akgnet_inetaddr.hh"
+
+#endif