diff options
Diffstat (limited to 'transport')
-rw-r--r-- | transport/Makefile.am | 3 | ||||
-rw-r--r-- | transport/ib-verbs/Makefile.am | 1 | ||||
-rw-r--r-- | transport/ib-verbs/src/Makefile.am | 15 | ||||
-rw-r--r-- | transport/ib-verbs/src/ib-verbs-mem-types.h | 39 | ||||
-rw-r--r-- | transport/ib-verbs/src/ib-verbs.c | 2613 | ||||
-rw-r--r-- | transport/ib-verbs/src/ib-verbs.h | 220 | ||||
-rw-r--r-- | transport/ib-verbs/src/name.c | 712 | ||||
-rw-r--r-- | transport/ib-verbs/src/name.h | 47 | ||||
-rw-r--r-- | transport/socket/Makefile.am | 1 | ||||
-rw-r--r-- | transport/socket/src/Makefile.am | 14 | ||||
-rw-r--r-- | transport/socket/src/name.c | 737 | ||||
-rw-r--r-- | transport/socket/src/name.h | 44 | ||||
-rw-r--r-- | transport/socket/src/socket-mem-types.h | 36 | ||||
-rw-r--r-- | transport/socket/src/socket.c | 1552 | ||||
-rw-r--r-- | transport/socket/src/socket.h | 125 |
15 files changed, 0 insertions, 6159 deletions
diff --git a/transport/Makefile.am b/transport/Makefile.am deleted file mode 100644 index e2f97437c1..0000000000 --- a/transport/Makefile.am +++ /dev/null @@ -1,3 +0,0 @@ -SUBDIRS = socket $(IBVERBS_SUBDIR) - -CLEANFILES = diff --git a/transport/ib-verbs/Makefile.am b/transport/ib-verbs/Makefile.am deleted file mode 100644 index f963effea2..0000000000 --- a/transport/ib-verbs/Makefile.am +++ /dev/null @@ -1 +0,0 @@ -SUBDIRS = src
\ No newline at end of file diff --git a/transport/ib-verbs/src/Makefile.am b/transport/ib-verbs/src/Makefile.am deleted file mode 100644 index 1baf080f21..0000000000 --- a/transport/ib-verbs/src/Makefile.am +++ /dev/null @@ -1,15 +0,0 @@ -noinst_HEADERS = ib-verbs.h name.h - -transport_LTLIBRARIES = ib-verbs.la -transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/transport - -ib_verbs_la_LDFLAGS = -module -avoidversion - -ib_verbs_la_SOURCES = ib-verbs.c name.c -ib_verbs_la_LIBADD = -libverbs $(top_builddir)/libglusterfs/src/libglusterfs.la - -AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ - -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/transport/ib-verbs \ - -shared -nostartfiles $(GF_CFLAGS) - -CLEANFILES = *~ diff --git a/transport/ib-verbs/src/ib-verbs-mem-types.h b/transport/ib-verbs/src/ib-verbs-mem-types.h deleted file mode 100644 index bac559646f..0000000000 --- a/transport/ib-verbs/src/ib-verbs-mem-types.h +++ /dev/null @@ -1,39 +0,0 @@ - -/* - Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - - -#ifndef __IB_VERBS_MEM_TYPES_H__ -#define __IB_VERBS_MEM_TYPES_H__ - -#include "mem-types.h" - -enum gf_ib_verbs_mem_types_ { - gf_ibv_mt_ib_verbs_private_t = gf_common_mt_end + 1, - gf_ibv_mt_ib_verbs_ioq_t, - gf_ibv_mt_transport_t, - gf_ibv_mt_ib_verbs_local_t, - gf_ibv_mt_ib_verbs_post_t, - gf_ibv_mt_char, - gf_ibv_mt_qpent, - gf_ibv_mt_ib_verbs_device_t, - gf_ibv_mt_end -}; -#endif - diff --git a/transport/ib-verbs/src/ib-verbs.c b/transport/ib-verbs/src/ib-verbs.c deleted file mode 100644 index a252a13d88..0000000000 --- a/transport/ib-verbs/src/ib-verbs.c +++ /dev/null @@ -1,2613 +0,0 @@ -/* - Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include "dict.h" -#include "glusterfs.h" -#include "transport.h" -#include "protocol.h" -#include "logging.h" -#include "xlator.h" -#include "name.h" -#include "ib-verbs.h" -#include <signal.h> - -int32_t -gf_resolve_ip6 (const char *hostname, - uint16_t port, - int family, - void **dnscache, - struct addrinfo **addr_info); - -static uint16_t -ib_verbs_get_local_lid (struct ibv_context *context, - int32_t port) -{ - struct ibv_port_attr attr; - - if (ibv_query_port (context, port, &attr)) - return 0; - - return attr.lid; -} - -static const char * -get_port_state_str(enum ibv_port_state pstate) -{ - switch (pstate) { - case IBV_PORT_DOWN: return "PORT_DOWN"; - case IBV_PORT_INIT: return "PORT_INIT"; - case IBV_PORT_ARMED: return "PORT_ARMED"; - case IBV_PORT_ACTIVE: return "PORT_ACTIVE"; - case IBV_PORT_ACTIVE_DEFER: return "PORT_ACTIVE_DEFER"; - default: return "invalid state"; - } -} - -static int32_t -ib_check_active_port (struct ibv_context *ctx, uint8_t port) -{ - struct ibv_port_attr port_attr; - - int32_t ret = 0; - const char *state_str = NULL; - - if (!ctx) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "Error in supplied context"); - return -1; - } - - ret = ibv_query_port (ctx, port, &port_attr); - - if (ret) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "Failed to query port %u properties", port); - return -1; - } - - state_str = get_port_state_str (port_attr.state); - gf_log ("transport/ib-verbs", GF_LOG_TRACE, - "Infiniband PORT: (%u) STATE: (%s)", - port, state_str); - - if (port_attr.state == IBV_PORT_ACTIVE) - return 0; - - return -1; -} - -static int32_t -ib_get_active_port (struct ibv_context *ib_ctx) -{ - struct ibv_device_attr ib_device_attr; - - int32_t ret = -1; - uint8_t ib_port = 0; - - if (!ib_ctx) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "Error in supplied context"); - return -1; - } - if (ibv_query_device (ib_ctx, &ib_device_attr)) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "Failed to query device properties"); - return -1; - } - - for (ib_port = 1; ib_port <= ib_device_attr.phys_port_cnt; ++ib_port) { - ret = ib_check_active_port (ib_ctx, ib_port); - if (ret == 0) - return ib_port; - - gf_log ("transport/ib-verbs", GF_LOG_TRACE, - "Port:(%u) not active", ib_port); - continue; - } - return ret; -} - - - -static void -ib_verbs_put_post (ib_verbs_queue_t *queue, - ib_verbs_post_t *post) -{ - pthread_mutex_lock (&queue->lock); - if (post->prev) { - queue->active_count--; - post->prev->next = post->next; - } - if (post->next) - post->next->prev = post->prev; - post->prev = &queue->passive_posts; - post->next = post->prev->next; - post->prev->next = post; - post->next->prev = post; - queue->passive_count++; - pthread_mutex_unlock (&queue->lock); -} - - -static ib_verbs_post_t * -ib_verbs_new_post (ib_verbs_device_t *device, int32_t len) -{ - ib_verbs_post_t *post; - - post = (ib_verbs_post_t *) GF_CALLOC (1, sizeof (*post), - gf_ibv_mt_ib_verbs_post_t); - if (!post) - return NULL; - - post->buf_size = len; - - post->buf = valloc (len); - if (!post->buf) { - GF_FREE (post); - return NULL; - } - - post->mr = ibv_reg_mr (device->pd, - post->buf, - post->buf_size, - IBV_ACCESS_LOCAL_WRITE); - if (!post->mr) { - free (post->buf); - GF_FREE (post); - return NULL; - } - - return post; -} - - -static ib_verbs_post_t * -ib_verbs_get_post (ib_verbs_queue_t *queue) -{ - ib_verbs_post_t *post; - - pthread_mutex_lock (&queue->lock); - { - post = queue->passive_posts.next; - if (post == &queue->passive_posts) - post = NULL; - - if (post) { - if (post->prev) - post->prev->next = post->next; - if (post->next) - post->next->prev = post->prev; - post->prev = &queue->active_posts; - post->next = post->prev->next; - post->prev->next = post; - post->next->prev = post; - post->reused++; - queue->active_count++; - } - } - pthread_mutex_unlock (&queue->lock); - - return post; -} - -void -ib_verbs_destroy_post (ib_verbs_post_t *post) -{ - ibv_dereg_mr (post->mr); - free (post->buf); - GF_FREE (post); -} - - -static int32_t -__ib_verbs_quota_get (ib_verbs_peer_t *peer) -{ - int32_t ret = -1; - ib_verbs_private_t *priv = peer->trans->private; - - if (priv->connected && peer->quota > 0) { - ret = peer->quota--; - } - - return ret; -} - -/* - static int32_t - ib_verbs_quota_get (ib_verbs_peer_t *peer) - { - int32_t ret = -1; - ib_verbs_private_t *priv = peer->trans->private; - - pthread_mutex_lock (&priv->write_mutex); - { - ret = __ib_verbs_quota_get (peer); - } - pthread_mutex_unlock (&priv->write_mutex); - - return ret; - } -*/ - -static void -__ib_verbs_ioq_entry_free (ib_verbs_ioq_t *entry) -{ - list_del_init (&entry->list); - if (entry->iobref) - iobref_unref (entry->iobref); - - /* TODO: use mem-pool */ - GF_FREE (entry->buf); - - /* TODO: use mem-pool */ - GF_FREE (entry); -} - - -static void -__ib_verbs_ioq_flush (ib_verbs_peer_t *peer) -{ - ib_verbs_ioq_t *entry = NULL, *dummy = NULL; - - list_for_each_entry_safe (entry, dummy, &peer->ioq, list) { - __ib_verbs_ioq_entry_free (entry); - } -} - - -static int32_t -__ib_verbs_disconnect (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - int32_t ret = 0; - - if (priv->connected || priv->tcp_connected) { - fcntl (priv->sock, F_SETFL, O_NONBLOCK); - if (shutdown (priv->sock, SHUT_RDWR) != 0) { - gf_log ("transport/ib-verbs", - GF_LOG_DEBUG, - "shutdown () - error: %s", - strerror (errno)); - ret = -errno; - priv->tcp_connected = 0; - } - } - - return ret; -} - - -static int32_t -ib_verbs_post_send (struct ibv_qp *qp, - ib_verbs_post_t *post, - int32_t len) -{ - struct ibv_sge list = { - .addr = (unsigned long) post->buf, - .length = len, - .lkey = post->mr->lkey - }; - - struct ibv_send_wr wr = { - .wr_id = (unsigned long) post, - .sg_list = &list, - .num_sge = 1, - .opcode = IBV_WR_SEND, - .send_flags = IBV_SEND_SIGNALED, - }, *bad_wr; - - if (!qp) - return -1; - - return ibv_post_send (qp, &wr, &bad_wr); -} - - -static int32_t -__ib_verbs_ioq_churn_entry (ib_verbs_peer_t *peer, ib_verbs_ioq_t *entry) -{ - int32_t ret = 0, quota = 0; - ib_verbs_private_t *priv = peer->trans->private; - ib_verbs_device_t *device = priv->device; - ib_verbs_options_t *options = &priv->options; - ib_verbs_post_t *post = NULL; - int32_t len = 0; - - quota = __ib_verbs_quota_get (peer); - if (quota > 0) { - post = ib_verbs_get_post (&device->sendq); - if (!post) - post = ib_verbs_new_post (device, - (options->send_size + 2048)); - - len = iov_length ((const struct iovec *)&entry->vector, - entry->count); - if (len >= (options->send_size + 2048)) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "increase value of option 'transport.ib-verbs." - "work-request-send-size' (given=> %"PRId64") " - "to send bigger (%d) messages", - (options->send_size + 2048), len); - return -1; - } - - iov_unload (post->buf, - (const struct iovec *)&entry->vector, - entry->count); - - ret = ib_verbs_post_send (peer->qp, post, len); - if (!ret) { - __ib_verbs_ioq_entry_free (entry); - ret = len; - } else { - gf_log ("transport/ib-verbs", GF_LOG_DEBUG, - "ibv_post_send failed with ret = %d", ret); - ib_verbs_put_post (&device->sendq, post); - __ib_verbs_disconnect (peer->trans); - ret = -1; - } - } - - return ret; -} - - -static int32_t -__ib_verbs_ioq_churn (ib_verbs_peer_t *peer) -{ - ib_verbs_ioq_t *entry = NULL; - int32_t ret = 0; - - while (!list_empty (&peer->ioq)) - { - /* pick next entry */ - entry = peer->ioq_next; - - ret = __ib_verbs_ioq_churn_entry (peer, entry); - - if (ret <= 0) - break; - } - - /* - list_for_each_entry_safe (entry, dummy, &peer->ioq, list) { - ret = __ib_verbs_ioq_churn_entry (peer, entry); - if (ret <= 0) { - break; - } - } - */ - - return ret; -} - -static int32_t -__ib_verbs_quota_put (ib_verbs_peer_t *peer) -{ - int32_t ret; - - peer->quota++; - ret = peer->quota; - - if (!list_empty (&peer->ioq)) { - ret = __ib_verbs_ioq_churn (peer); - } - - return ret; -} - - -static int32_t -ib_verbs_quota_put (ib_verbs_peer_t *peer) -{ - int32_t ret; - ib_verbs_private_t *priv = peer->trans->private; - - pthread_mutex_lock (&priv->write_mutex); - { - ret = __ib_verbs_quota_put (peer); - } - pthread_mutex_unlock (&priv->write_mutex); - - return ret; -} - - -static int32_t -ib_verbs_post_recv (struct ibv_srq *srq, - ib_verbs_post_t *post) -{ - struct ibv_sge list = { - .addr = (unsigned long) post->buf, - .length = post->buf_size, - .lkey = post->mr->lkey - }; - - struct ibv_recv_wr wr = { - .wr_id = (unsigned long) post, - .sg_list = &list, - .num_sge = 1, - }, *bad_wr; - - return ibv_post_srq_recv (srq, &wr, &bad_wr); -} - - -static int32_t -ib_verbs_writev (transport_t *this, - ib_verbs_ioq_t *entry) -{ - int32_t ret = 0, need_append = 1; - ib_verbs_private_t *priv = this->private; - ib_verbs_peer_t *peer = NULL; - - pthread_mutex_lock (&priv->write_mutex); - { - if (!priv->connected) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "ib-verbs is not connected to post a " - "send request"); - ret = -1; - goto unlock; - } - - peer = &priv->peer; - if (list_empty (&peer->ioq)) { - ret = __ib_verbs_ioq_churn_entry (peer, entry); - if (ret != 0) { - need_append = 0; - } - } - - if (need_append) { - list_add_tail (&entry->list, &peer->ioq); - } - } -unlock: - pthread_mutex_unlock (&priv->write_mutex); - return ret; -} - - -static ib_verbs_ioq_t * -ib_verbs_ioq_new (char *buf, int len, struct iovec *vector, - int count, struct iobref *iobref) -{ - ib_verbs_ioq_t *entry = NULL; - - /* TODO: use mem-pool */ - entry = GF_CALLOC (1, sizeof (*entry), gf_ibv_mt_ib_verbs_ioq_t); - - assert (count <= (MAX_IOVEC-2)); - - entry->header.colonO[0] = ':'; - entry->header.colonO[1] = 'O'; - entry->header.colonO[2] = '\0'; - entry->header.version = 42; - entry->header.size1 = hton32 (len); - entry->header.size2 = hton32 (iov_length (vector, count)); - - entry->vector[0].iov_base = &entry->header; - entry->vector[0].iov_len = sizeof (entry->header); - entry->count++; - - entry->vector[1].iov_base = buf; - entry->vector[1].iov_len = len; - entry->count++; - - if (vector && count) - { - memcpy (&entry->vector[2], vector, sizeof (*vector) * count); - entry->count += count; - } - - if (iobref) - entry->iobref = iobref_ref (iobref); - - entry->buf = buf; - - INIT_LIST_HEAD (&entry->list); - - return entry; -} - - -static int32_t -ib_verbs_submit (transport_t *this, char *buf, int32_t len, - struct iovec *vector, int count, struct iobref *iobref) -{ - int32_t ret = 0; - ib_verbs_ioq_t *entry = NULL; - - entry = ib_verbs_ioq_new (buf, len, vector, count, iobref); - ret = ib_verbs_writev (this, entry); - - if (ret > 0) { - ret = 0; - } - - return ret; -} - -static int -ib_verbs_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, - struct iobuf **iobuf_p) -{ - ib_verbs_private_t *priv = this->private; - /* TODO: return error if !priv->connected, check with locks */ - /* TODO: boundry checks for data_ptr/offset */ - char *copy_from = NULL; - ib_verbs_header_t *header = NULL; - uint32_t size1, size2, data_len = 0; - char *hdr = NULL; - struct iobuf *iobuf = NULL; - int32_t ret = 0; - - pthread_mutex_lock (&priv->recv_mutex); - { -/* - while (!priv->data_ptr) - pthread_cond_wait (&priv->recv_cond, &priv->recv_mutex); -*/ - - copy_from = priv->data_ptr + priv->data_offset; - - priv->data_ptr = NULL; - data_len = priv->data_len; - pthread_cond_broadcast (&priv->recv_cond); - } - pthread_mutex_unlock (&priv->recv_mutex); - - header = (ib_verbs_header_t *)copy_from; - if (strcmp (header->colonO, ":O")) { - gf_log ("transport/ib-verbs", GF_LOG_DEBUG, - "%s: corrupt header received", this->xl->name); - ret = -1; - goto err; - } - - size1 = ntoh32 (header->size1); - size2 = ntoh32 (header->size2); - - if (data_len != (size1 + size2 + sizeof (*header))) { - gf_log ("transport/ib-verbs", GF_LOG_DEBUG, - "%s: sizeof data read from transport is not equal " - "to the size specified in the header", - this->xl->name); - ret = -1; - goto err; - } - - copy_from += sizeof (*header); - - if (size1) { - hdr = GF_CALLOC (1, size1, gf_ibv_mt_char); - if (!hdr) { - gf_log (this->xl->name, GF_LOG_ERROR, - "unable to allocate header for peer %s", - this->peerinfo.identifier); - ret = -ENOMEM; - goto err; - } - memcpy (hdr, copy_from, size1); - copy_from += size1; - *hdr_p = hdr; - } - *hdrlen_p = size1; - - if (size2) { - iobuf = iobuf_get (this->xl->ctx->iobuf_pool); - if (!iobuf) { - gf_log (this->xl->name, GF_LOG_ERROR, - "unable to allocate IO buffer for peer %s", - this->peerinfo.identifier); - ret = -ENOMEM; - goto err; - } - memcpy (iobuf->ptr, copy_from, size2); - *iobuf_p = iobuf; - } - -err: - return ret; -} - - -static void -ib_verbs_destroy_cq (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - ib_verbs_device_t *device = priv->device; - - if (device->recv_cq) - ibv_destroy_cq (device->recv_cq); - device->recv_cq = NULL; - - if (device->send_cq) - ibv_destroy_cq (device->send_cq); - device->send_cq = NULL; - - return; -} - - -static int32_t -ib_verbs_create_cq (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - ib_verbs_options_t *options = &priv->options; - ib_verbs_device_t *device = priv->device; - int32_t ret = 0; - - device->recv_cq = ibv_create_cq (priv->device->context, - options->recv_count * 2, - device, - device->recv_chan, - 0); - if (!device->recv_cq) { - gf_log ("transport/ib-verbs", - GF_LOG_ERROR, - "%s: creation of CQ failed", - this->xl->name); - ret = -1; - } else if (ibv_req_notify_cq (device->recv_cq, 0)) { - gf_log ("transport/ib-verbs", - GF_LOG_ERROR, - "%s: ibv_req_notify_cq on CQ failed", - this->xl->name); - ret = -1; - } - - do { - /* TODO: make send_cq size dynamically adaptive */ - device->send_cq = ibv_create_cq (priv->device->context, - options->send_count * 1024, - device, - device->send_chan, - 0); - if (!device->send_cq) { - gf_log ("transport/ib-verbs", - GF_LOG_ERROR, - "%s: creation of send_cq failed", - this->xl->name); - ret = -1; - break; - } - - if (ibv_req_notify_cq (device->send_cq, 0)) { - gf_log ("transport/ib-verbs", - GF_LOG_ERROR, - "%s: ibv_req_notify_cq on send_cq failed", - this->xl->name); - ret = -1; - break; - } - } while (0); - - if (ret != 0) - ib_verbs_destroy_cq (this); - - return ret; -} - - -static void -ib_verbs_register_peer (ib_verbs_device_t *device, - int32_t qp_num, - ib_verbs_peer_t *peer) -{ - struct _qpent *ent; - ib_verbs_qpreg_t *qpreg = &device->qpreg; - int32_t hash = qp_num % 42; - - pthread_mutex_lock (&qpreg->lock); - ent = qpreg->ents[hash].next; - while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) - ent = ent->next; - if (ent->qp_num == qp_num) { - pthread_mutex_unlock (&qpreg->lock); - return; - } - ent = (struct _qpent *) GF_CALLOC (1, sizeof (*ent), gf_ibv_mt_qpent); - ERR_ABORT (ent); - /* TODO: ref reg->peer */ - ent->peer = peer; - ent->next = &qpreg->ents[hash]; - ent->prev = ent->next->prev; - ent->next->prev = ent; - ent->prev->next = ent; - ent->qp_num = qp_num; - qpreg->count++; - pthread_mutex_unlock (&qpreg->lock); -} - - -static void -ib_verbs_unregister_peer (ib_verbs_device_t *device, - int32_t qp_num) -{ - struct _qpent *ent; - ib_verbs_qpreg_t *qpreg = &device->qpreg; - int32_t hash = qp_num % 42; - - pthread_mutex_lock (&qpreg->lock); - ent = qpreg->ents[hash].next; - while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) - ent = ent->next; - if (ent->qp_num != qp_num) { - pthread_mutex_unlock (&qpreg->lock); - return; - } - ent->prev->next = ent->next; - ent->next->prev = ent->prev; - /* TODO: unref reg->peer */ - GF_FREE (ent); - qpreg->count--; - pthread_mutex_unlock (&qpreg->lock); -} - - -static ib_verbs_peer_t * -__ib_verbs_lookup_peer (ib_verbs_device_t *device, int32_t qp_num) -{ - struct _qpent *ent = NULL; - ib_verbs_peer_t *peer = NULL; - ib_verbs_qpreg_t *qpreg = NULL; - int32_t hash = 0; - - qpreg = &device->qpreg; - hash = qp_num % 42; - ent = qpreg->ents[hash].next; - while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) - ent = ent->next; - - if (ent != &qpreg->ents[hash]) { - peer = ent->peer; - } - - return peer; -} - -/* -static ib_verbs_peer_t * -ib_verbs_lookup_peer (ib_verbs_device_t *device, - int32_t qp_num) -{ - ib_verbs_qpreg_t *qpreg = NULL; - ib_verbs_peer_t *peer = NULL; - - qpreg = &device->qpreg; - pthread_mutex_lock (&qpreg->lock); - { - peer = __ib_verbs_lookup_peer (device, qp_num); - } - pthread_mutex_unlock (&qpreg->lock); - - return peer; -} -*/ - - -static void -__ib_verbs_destroy_qp (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - - if (priv->peer.qp) { - ib_verbs_unregister_peer (priv->device, priv->peer.qp->qp_num); - ibv_destroy_qp (priv->peer.qp); - } - priv->peer.qp = NULL; - - return; -} - - -static int32_t -ib_verbs_create_qp (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - ib_verbs_options_t *options = &priv->options; - ib_verbs_device_t *device = priv->device; - int32_t ret = 0; - ib_verbs_peer_t *peer; - - peer = &priv->peer; - struct ibv_qp_init_attr init_attr = { - .send_cq = device->send_cq, - .recv_cq = device->recv_cq, - .srq = device->srq, - .cap = { - .max_send_wr = peer->send_count, - .max_recv_wr = peer->recv_count, - .max_send_sge = 1, - .max_recv_sge = 1 - }, - .qp_type = IBV_QPT_RC - }; - - struct ibv_qp_attr attr = { - .qp_state = IBV_QPS_INIT, - .pkey_index = 0, - .port_num = options->port, - .qp_access_flags = 0 - }; - - peer->qp = ibv_create_qp (device->pd, &init_attr); - if (!peer->qp) { - gf_log ("transport/ib-verbs", - GF_LOG_CRITICAL, - "%s: could not create QP", - this->xl->name); - ret = -1; - goto out; - } else if (ibv_modify_qp (peer->qp, &attr, - IBV_QP_STATE | - IBV_QP_PKEY_INDEX | - IBV_QP_PORT | - IBV_QP_ACCESS_FLAGS)) { - gf_log ("transport/ib-verbs", - GF_LOG_ERROR, - "%s: failed to modify QP to INIT state", - this->xl->name); - ret = -1; - goto out; - } - - peer->local_lid = ib_verbs_get_local_lid (device->context, - options->port); - peer->local_qpn = peer->qp->qp_num; - peer->local_psn = lrand48 () & 0xffffff; - - ib_verbs_register_peer (device, peer->qp->qp_num, peer); - -out: - if (ret == -1) - __ib_verbs_destroy_qp (this); - - return ret; -} - - -static void -ib_verbs_destroy_posts (transport_t *this) -{ - -} - - -static int32_t -__ib_verbs_create_posts (transport_t *this, - int32_t count, - int32_t size, - ib_verbs_queue_t *q) -{ - int32_t i; - int32_t ret = 0; - ib_verbs_private_t *priv = this->private; - ib_verbs_device_t *device = priv->device; - - for (i=0 ; i<count ; i++) { - ib_verbs_post_t *post; - - post = ib_verbs_new_post (device, size + 2048); - if (!post) { - gf_log ("transport/ib-verbs", - GF_LOG_ERROR, - "%s: post creation failed", - this->xl->name); - ret = -1; - break; - } - - ib_verbs_put_post (q, post); - } - return ret; -} - - -static int32_t -ib_verbs_create_posts (transport_t *this) -{ - int32_t i, ret; - ib_verbs_post_t *post = NULL; - ib_verbs_private_t *priv = this->private; - ib_verbs_options_t *options = &priv->options; - ib_verbs_device_t *device = priv->device; - - ret = __ib_verbs_create_posts (this, options->send_count, - options->send_size, - &device->sendq); - if (!ret) - ret = __ib_verbs_create_posts (this, options->recv_count, - options->recv_size, - &device->recvq); - - if (!ret) { - for (i=0 ; i<options->recv_count ; i++) { - post = ib_verbs_get_post (&device->recvq); - if (ib_verbs_post_recv (device->srq, post) != 0) { - ret = -1; - break; - } - } - } - - if (ret) - ib_verbs_destroy_posts (this); - - return ret; -} - - -static int32_t -ib_verbs_connect_qp (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - ib_verbs_options_t *options = &priv->options; - struct ibv_qp_attr attr = { - .qp_state = IBV_QPS_RTR, - .path_mtu = options->mtu, - .dest_qp_num = priv->peer.remote_qpn, - .rq_psn = priv->peer.remote_psn, - .max_dest_rd_atomic = 1, - .min_rnr_timer = 12, - .ah_attr = { - .is_global = 0, - .dlid = priv->peer.remote_lid, - .sl = 0, - .src_path_bits = 0, - .port_num = options->port - } - }; - if (ibv_modify_qp (priv->peer.qp, &attr, - IBV_QP_STATE | - IBV_QP_AV | - IBV_QP_PATH_MTU | - IBV_QP_DEST_QPN | - IBV_QP_RQ_PSN | - IBV_QP_MAX_DEST_RD_ATOMIC | - IBV_QP_MIN_RNR_TIMER)) { - gf_log ("transport/ib-verbs", - GF_LOG_CRITICAL, - "Failed to modify QP to RTR\n"); - return -1; - } - - /* TODO: make timeout and retry_cnt configurable from options */ - attr.qp_state = IBV_QPS_RTS; - attr.timeout = 14; - attr.retry_cnt = 7; - attr.rnr_retry = 7; - attr.sq_psn = priv->peer.local_psn; - attr.max_rd_atomic = 1; - if (ibv_modify_qp (priv->peer.qp, &attr, - IBV_QP_STATE | - IBV_QP_TIMEOUT | - IBV_QP_RETRY_CNT | - IBV_QP_RNR_RETRY | - IBV_QP_SQ_PSN | - IBV_QP_MAX_QP_RD_ATOMIC)) { - gf_log ("transport/ib-verbs", - GF_LOG_CRITICAL, - "Failed to modify QP to RTS\n"); - return -1; - } - - return 0; -} - -static int32_t -__ib_verbs_teardown (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - - __ib_verbs_destroy_qp (this); - - if (!list_empty (&priv->peer.ioq)) { - __ib_verbs_ioq_flush (&priv->peer); - } - - /* TODO: decrement cq size */ - return 0; -} - -/* - * return value: - * 0 = success (completed) - * -1 = error - * > 0 = incomplete - */ - -static int -__tcp_rwv (transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count, - int write) -{ - ib_verbs_private_t *priv = NULL; - int sock = -1; - int ret = -1; - struct iovec *opvector = vector; - int opcount = count; - int moved = 0; - - priv = this->private; - sock = priv->sock; - - while (opcount) - { - if (write) - { - ret = writev (sock, opvector, opcount); - - if (ret == 0 || (ret == -1 && errno == EAGAIN)) - { - /* done for now */ - break; - } - } - else - { - ret = readv (sock, opvector, opcount); - - if (ret == -1 && errno == EAGAIN) - { - /* done for now */ - break; - } - } - - if (ret == 0) - { - gf_log (this->xl->name, GF_LOG_DEBUG, - "EOF from peer %s", this->peerinfo.identifier); - opcount = -1; - errno = ENOTCONN; - break; - } - - if (ret == -1) - { - if (errno == EINTR) - continue; - - gf_log (this->xl->name, GF_LOG_DEBUG, - "%s failed (%s)", write ? "writev" : "readv", - strerror (errno)); - if (write && !priv->connected && - (errno == ECONNREFUSED)) - gf_log (this->xl->name, GF_LOG_ERROR, - "possible mismatch of 'transport-type'" - " in protocol server and client. " - "check volume file"); - opcount = -1; - break; - } - - moved = 0; - - while (moved < ret) - { - if ((ret - moved) >= opvector[0].iov_len) - { - moved += opvector[0].iov_len; - opvector++; - opcount--; - } - else - { - opvector[0].iov_len -= (ret - moved); - opvector[0].iov_base += (ret - moved); - moved += (ret - moved); - } - while (opcount && !opvector[0].iov_len) - { - opvector++; - opcount--; - } - } - } - - if (pending_vector) - *pending_vector = opvector; - - if (pending_count) - *pending_count = opcount; - - return opcount; -} - - -static int -__tcp_readv (transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count) -{ - int ret = -1; - - ret = __tcp_rwv (this, vector, count, - pending_vector, pending_count, 0); - - return ret; -} - - -static int -__tcp_writev (transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count) -{ - int ret = -1; - ib_verbs_private_t *priv = this->private; - - ret = __tcp_rwv (this, vector, count, pending_vector, - pending_count, 1); - - if (ret > 0) { - /* TODO: Avoid multiple calls when socket is already - registered for POLLOUT */ - priv->idx = event_select_on (this->xl->ctx->event_pool, - priv->sock, priv->idx, -1, 1); - } else if (ret == 0) { - priv->idx = event_select_on (this->xl->ctx->event_pool, - priv->sock, - priv->idx, -1, 0); - } - - return ret; -} - - -static void * -ib_verbs_recv_completion_proc (void *data) -{ - struct ibv_comp_channel *chan = data; - ib_verbs_private_t *priv = NULL; - ib_verbs_device_t *device; - ib_verbs_post_t *post; - ib_verbs_peer_t *peer; - struct ibv_cq *event_cq; - struct ibv_wc wc; - void *event_ctx; - int32_t ret = 0; - - - while (1) { - ret = ibv_get_cq_event (chan, &event_cq, &event_ctx); - if (ret) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "ibv_get_cq_event failed, terminating recv " - "thread %d (%d)", ret, errno); - continue; - } - - device = event_ctx; - - ret = ibv_req_notify_cq (event_cq, 0); - if (ret) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "ibv_req_notify_cq on %s failed, terminating " - "recv thread: %d (%d)", - device->device_name, ret, errno); - continue; - } - - device = (ib_verbs_device_t *) event_ctx; - - while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) { - post = (ib_verbs_post_t *) (long) wc.wr_id; - - pthread_mutex_lock (&device->qpreg.lock); - { - peer = __ib_verbs_lookup_peer (device, - wc.qp_num); - - /* - * keep a refcount on transport so that it - * doesnot get freed because of some error - * indicated by wc.status till we are done - * with usage of peer and thereby that of trans. - */ - if (peer != NULL) { - transport_ref (peer->trans); - } - } - pthread_mutex_unlock (&device->qpreg.lock); - - if (wc.status != IBV_WC_SUCCESS) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "recv work request on `%s' returned " - "error (%d)", - device->device_name, - wc.status); - if (peer) { - transport_unref (peer->trans); - transport_disconnect (peer->trans); - } - - if (post) { - ib_verbs_post_recv (device->srq, post); - } - continue; - } - - if (peer) { - priv = peer->trans->private; - - pthread_mutex_lock (&priv->recv_mutex); - { - while (priv->data_ptr) - pthread_cond_wait (&priv->recv_cond, - &priv->recv_mutex); - - priv->data_ptr = post->buf; - priv->data_offset = 0; - priv->data_len = wc.byte_len; - - /*pthread_cond_broadcast (&priv->recv_cond);*/ - } - pthread_mutex_unlock (&priv->recv_mutex); - - if ((ret = xlator_notify (peer->trans->xl, GF_EVENT_POLLIN, - peer->trans, NULL)) == -1) { - gf_log ("transport/ib-verbs", - GF_LOG_DEBUG, - "pollin notification to %s " - "failed, disconnecting " - "transport", - peer->trans->xl->name); - transport_disconnect (peer->trans); - } - - transport_unref (peer->trans); - } else { - gf_log ("transport/ib-verbs", - GF_LOG_DEBUG, - "could not lookup peer for qp_num: %d", - wc.qp_num); - } - ib_verbs_post_recv (device->srq, post); - } - - if (ret < 0) { - gf_log ("transport/ib-verbs", - GF_LOG_ERROR, - "ibv_poll_cq on `%s' returned error " - "(ret = %d, errno = %d)", - device->device_name, ret, errno); - continue; - } - ibv_ack_cq_events (event_cq, 1); - } - return NULL; -} - - -static void * -ib_verbs_send_completion_proc (void *data) -{ - struct ibv_comp_channel *chan = data; - ib_verbs_post_t *post; - ib_verbs_peer_t *peer; - struct ibv_cq *event_cq; - void *event_ctx; - ib_verbs_device_t *device; - struct ibv_wc wc; - int32_t ret; - - while (1) { - ret = ibv_get_cq_event (chan, &event_cq, &event_ctx); - if (ret) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "ibv_get_cq_event on failed, terminating " - "send thread: %d (%d)", ret, errno); - continue; - } - - device = event_ctx; - - ret = ibv_req_notify_cq (event_cq, 0); - if (ret) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "ibv_req_notify_cq on %s failed, terminating " - "send thread: %d (%d)", - device->device_name, ret, errno); - continue; - } - - while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) { - post = (ib_verbs_post_t *) (long) wc.wr_id; - - pthread_mutex_lock (&device->qpreg.lock); - { - peer = __ib_verbs_lookup_peer (device, - wc.qp_num); - - /* - * keep a refcount on transport so that it - * doesnot get freed because of some error - * indicated by wc.status till we are done - * with usage of peer and thereby that of trans. - */ - if (peer != NULL) { - transport_ref (peer->trans); - } - } - pthread_mutex_unlock (&device->qpreg.lock); - - if (wc.status != IBV_WC_SUCCESS) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "send work request on `%s' returned " - "error wc.status = %d, wc.vendor_err " - "= %d, post->buf = %p, wc.byte_len = " - "%d, post->reused = %d", - device->device_name, wc.status, - wc.vendor_err, - post->buf, wc.byte_len, post->reused); - if (wc.status == IBV_WC_RETRY_EXC_ERR) - gf_log ("ib-verbs", GF_LOG_ERROR, - "connection between client and" - " server not working. check by" - " running 'ibv_srq_pingpong'. " - "also make sure subnet manager" - " is running (eg: 'opensm'), " - "or check if ib-verbs port is " - "valid (or active) by running " - " 'ibv_devinfo'. contact " - "Gluster Support Team if " - "the problem persists."); - if (peer) - transport_disconnect (peer->trans); - } - - if (post) { - ib_verbs_put_post (&device->sendq, post); - } - - if (peer) { - int quota_ret = ib_verbs_quota_put (peer); - if (quota_ret < 0) { - gf_log ("ib-verbs", GF_LOG_DEBUG, - "failed to send message"); - - } - - transport_unref (peer->trans); - } else { - gf_log ("transport/ib-verbs", GF_LOG_DEBUG, - "could not lookup peer for qp_num: %d", - wc.qp_num); - } - } - - if (ret < 0) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "ibv_poll_cq on `%s' returned error (ret = %d," - " errno = %d)", - device->device_name, ret, errno); - continue; - } - ibv_ack_cq_events (event_cq, 1); - } - - return NULL; -} - -static void -ib_verbs_options_init (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - ib_verbs_options_t *options = &priv->options; - int32_t mtu; - data_t *temp; - - /* TODO: validate arguments from options below */ - - options->send_size = this->xl->ctx->page_size * 4; /* 512 KB */ - options->recv_size = this->xl->ctx->page_size * 4; /* 512 KB */ - options->send_count = 32; - options->recv_count = 32; - - temp = dict_get (this->xl->options, - "transport.ib-verbs.work-request-send-count"); - if (temp) - options->send_count = data_to_int32 (temp); - - temp = dict_get (this->xl->options, - "transport.ib-verbs.work-request-recv-count"); - if (temp) - options->recv_count = data_to_int32 (temp); - - options->port = 0; - temp = dict_get (this->xl->options, - "transport.ib-verbs.port"); - if (temp) - options->port = data_to_uint64 (temp); - - options->mtu = mtu = IBV_MTU_2048; - temp = dict_get (this->xl->options, - "transport.ib-verbs.mtu"); - if (temp) - mtu = data_to_int32 (temp); - switch (mtu) { - case 256: options->mtu = IBV_MTU_256; - break; - case 512: options->mtu = IBV_MTU_512; - break; - case 1024: options->mtu = IBV_MTU_1024; - break; - case 2048: options->mtu = IBV_MTU_2048; - break; - case 4096: options->mtu = IBV_MTU_4096; - break; - default: - if (temp) - gf_log ("transport/ib-verbs", GF_LOG_WARNING, - "%s: unrecognized MTU value '%s', defaulting " - "to '2048'", this->xl->name, - data_to_str (temp)); - else - gf_log ("transport/ib-verbs", GF_LOG_TRACE, - "%s: defaulting MTU to '2048'", - this->xl->name); - options->mtu = IBV_MTU_2048; - break; - } - - temp = dict_get (this->xl->options, - "transport.ib-verbs.device-name"); - if (temp) - options->device_name = gf_strdup (temp->data); - - return; -} - -static void -ib_verbs_queue_init (ib_verbs_queue_t *queue) -{ - pthread_mutex_init (&queue->lock, NULL); - - queue->active_posts.next = &queue->active_posts; - queue->active_posts.prev = &queue->active_posts; - queue->passive_posts.next = &queue->passive_posts; - queue->passive_posts.prev = &queue->passive_posts; -} - - -static ib_verbs_device_t * -ib_verbs_get_device (transport_t *this, - struct ibv_context *ibctx) -{ - glusterfs_ctx_t *ctx = this->xl->ctx; - ib_verbs_private_t *priv = this->private; - ib_verbs_options_t *options = &priv->options; - char *device_name = priv->options.device_name; - uint32_t port = priv->options.port; - - uint8_t active_port = 0; - int32_t ret = 0; - int32_t i = 0; - - ib_verbs_device_t *trav; - - trav = ctx->ib; - while (trav) { - if ((!strcmp (trav->device_name, device_name)) && - (trav->port == port)) - break; - trav = trav->next; - } - - if (!trav) { - - trav = GF_CALLOC (1, sizeof (*trav), - gf_ibv_mt_ib_verbs_device_t); - ERR_ABORT (trav); - priv->device = trav; - - trav->context = ibctx; - - ret = ib_get_active_port (trav->context); - - if (ret < 0) { - if (!port) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "Failed to find any active ports and " - "none specified in volume file," - " exiting"); - return NULL; - } - } - - active_port = ret; - - if (port) { - ret = ib_check_active_port (trav->context, port); - if (ret < 0) { - gf_log ("transport/ib-verbs", GF_LOG_WARNING, - "On device %s: provided port:%u is " - "found to be offline, continuing to " - "use the same port", device_name, port); - } - } else { - priv->options.port = active_port; - port = active_port; - gf_log ("transport/ib-verbs", GF_LOG_TRACE, - "Port unspecified in volume file using active " - "port: %u", port); - } - - trav->device_name = gf_strdup (device_name); - trav->port = port; - - trav->next = ctx->ib; - ctx->ib = trav; - - trav->send_chan = ibv_create_comp_channel (trav->context); - if (!trav->send_chan) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "%s: could not create send completion channel", - device_name); - /* TODO: cleanup current mess */ - return NULL; - } - - trav->recv_chan = ibv_create_comp_channel (trav->context); - if (!trav->recv_chan) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "could not create recv completion channel"); - /* TODO: cleanup current mess */ - return NULL; - } - - if (ib_verbs_create_cq (this) < 0) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "%s: could not create CQ", - this->xl->name); - return NULL; - } - - /* protection domain */ - trav->pd = ibv_alloc_pd (trav->context); - - if (!trav->pd) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "%s: could not allocate protection domain", - this->xl->name); - return NULL; - } - - struct ibv_srq_init_attr attr = { - .attr = { - .max_wr = options->recv_count, - .max_sge = 1 - } - }; - trav->srq = ibv_create_srq (trav->pd, &attr); - - if (!trav->srq) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "%s: could not create SRQ", - this->xl->name); - return NULL; - } - - /* queue init */ - ib_verbs_queue_init (&trav->sendq); - ib_verbs_queue_init (&trav->recvq); - - if (ib_verbs_create_posts (this) < 0) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "%s: could not allocate posts", - this->xl->name); - return NULL; - } - - /* completion threads */ - ret = pthread_create (&trav->send_thread, - NULL, - ib_verbs_send_completion_proc, - trav->send_chan); - if (ret) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "could not create send completion thread"); - return NULL; - } - ret = pthread_create (&trav->recv_thread, - NULL, - ib_verbs_recv_completion_proc, - trav->recv_chan); - if (ret) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "could not create recv completion thread"); - return NULL; - } - - /* qpreg */ - pthread_mutex_init (&trav->qpreg.lock, NULL); - for (i=0; i<42; i++) { - trav->qpreg.ents[i].next = &trav->qpreg.ents[i]; - trav->qpreg.ents[i].prev = &trav->qpreg.ents[i]; - } - } - return trav; -} - -static int32_t -ib_verbs_init (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - ib_verbs_options_t *options = &priv->options; - struct ibv_device **dev_list; - struct ibv_context *ib_ctx = NULL; - int32_t ret = 0; - - ib_verbs_options_init (this); - - { - dev_list = ibv_get_device_list (NULL); - - if (!dev_list) { - gf_log ("transport/ib-verbs", - GF_LOG_CRITICAL, - "Failed to get IB devices"); - ret = -1; - goto cleanup; - } - - if (!*dev_list) { - gf_log ("transport/ib-verbs", - GF_LOG_CRITICAL, - "No IB devices found"); - ret = -1; - goto cleanup; - } - - if (!options->device_name) { - if (*dev_list) { - options->device_name = - gf_strdup (ibv_get_device_name (*dev_list)); - } else { - gf_log ("transport/ib-verbs", GF_LOG_CRITICAL, - "IB device list is empty. Check for " - "'ib_uverbs' module"); - return -1; - goto cleanup; - } - } - - while (*dev_list) { - if (!strcmp (ibv_get_device_name (*dev_list), - options->device_name)) { - ib_ctx = ibv_open_device (*dev_list); - - if (!ib_ctx) { - gf_log ("transport/ib-verbs", - GF_LOG_ERROR, - "Failed to get infiniband" - "device context"); - ret = -1; - goto cleanup; - } - break; - } - ++dev_list; - } - - priv->device = ib_verbs_get_device (this, ib_ctx); - - if (!priv->device) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "could not create ib_verbs device for %s", - priv->device->device_name); - ret = -1; - goto cleanup; - } - } - - priv->peer.trans = this; - INIT_LIST_HEAD (&priv->peer.ioq); - - pthread_mutex_init (&priv->read_mutex, NULL); - pthread_mutex_init (&priv->write_mutex, NULL); - pthread_mutex_init (&priv->recv_mutex, NULL); - pthread_cond_init (&priv->recv_cond, NULL); - -cleanup: - if (-1 == ret) { - if (ib_ctx) - ibv_close_device (ib_ctx); - } - - if (dev_list) - ibv_free_device_list (dev_list); - - return ret; -} - - -static int32_t -ib_verbs_disconnect (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - int32_t ret = 0; - - pthread_mutex_lock (&priv->write_mutex); - { - ret = __ib_verbs_disconnect (this); - } - pthread_mutex_unlock (&priv->write_mutex); - - return ret; -} - - -static int32_t -__tcp_connect_finish (int fd) -{ - int ret = -1; - int optval = 0; - socklen_t optlen = sizeof (int); - - ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, - (void *)&optval, &optlen); - - if (ret == 0 && optval) - { - errno = optval; - ret = -1; - } - - return ret; -} - -static inline void -ib_verbs_fill_handshake_data (char *buf, struct ib_verbs_nbio *nbio, - ib_verbs_private_t *priv) -{ - sprintf (buf, - "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n" - "QP1:LID=%04x:QPN=%06x:PSN=%06x\n", - priv->peer.recv_size, - priv->peer.send_size, - priv->peer.local_lid, - priv->peer.local_qpn, - priv->peer.local_psn); - - nbio->vector.iov_base = buf; - nbio->vector.iov_len = strlen (buf) + 1; - nbio->count = 1; - return; -} - -static inline void -ib_verbs_fill_handshake_ack (char *buf, struct ib_verbs_nbio *nbio) -{ - sprintf (buf, "DONE\n"); - nbio->vector.iov_base = buf; - nbio->vector.iov_len = strlen (buf) + 1; - nbio->count = 1; - return; -} - -static int -ib_verbs_handshake_pollin (transport_t *this) -{ - int ret = 0; - ib_verbs_private_t *priv = this->private; - char *buf = priv->handshake.incoming.buf; - int32_t recv_buf_size, send_buf_size; - socklen_t sock_len; - - if (priv->handshake.incoming.state == IB_VERBS_HANDSHAKE_COMPLETE) { - return -1; - } - - pthread_mutex_lock (&priv->write_mutex); - { - while (priv->handshake.incoming.state != IB_VERBS_HANDSHAKE_COMPLETE) - { - switch (priv->handshake.incoming.state) - { - case IB_VERBS_HANDSHAKE_START: - buf = priv->handshake.incoming.buf = GF_CALLOC (1, 256, gf_ibv_mt_char); - ib_verbs_fill_handshake_data (buf, &priv->handshake.incoming, priv); - buf[0] = 0; - priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVING_DATA; - break; - - case IB_VERBS_HANDSHAKE_RECEIVING_DATA: - ret = __tcp_readv (this, - &priv->handshake.incoming.vector, - priv->handshake.incoming.count, - &priv->handshake.incoming.pending_vector, - &priv->handshake.incoming.pending_count); - if (ret == -1) { - goto unlock; - } - - if (ret > 0) { - gf_log (this->xl->name, GF_LOG_TRACE, - "partial header read on NB socket. continue later"); - goto unlock; - } - - if (!ret) { - priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVED_DATA; - } - break; - - case IB_VERBS_HANDSHAKE_RECEIVED_DATA: - ret = sscanf (buf, - "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n" - "QP1:LID=%04x:QPN=%06x:PSN=%06x\n", - &recv_buf_size, - &send_buf_size, - &priv->peer.remote_lid, - &priv->peer.remote_qpn, - &priv->peer.remote_psn); - - if ((ret != 5) && (strncmp (buf, "QP1:", 4))) { - gf_log ("transport/ib-verbs", - GF_LOG_CRITICAL, - "%s: remote-host(%s)'s " - "transport type is different", - this->xl->name, - this->peerinfo.identifier); - ret = -1; - goto unlock; - } - - if (recv_buf_size < priv->peer.recv_size) - priv->peer.recv_size = recv_buf_size; - if (send_buf_size < priv->peer.send_size) - priv->peer.send_size = send_buf_size; - - gf_log ("transport/ib-verbs", GF_LOG_TRACE, - "%s: transacted recv_size=%d " - "send_size=%d", - this->xl->name, priv->peer.recv_size, - priv->peer.send_size); - - priv->peer.quota = priv->peer.send_count; - - if (ib_verbs_connect_qp (this)) { - gf_log ("transport/ib-verbs", - GF_LOG_ERROR, - "%s: failed to connect with " - "remote QP", this->xl->name); - ret = -1; - goto unlock; - } - ib_verbs_fill_handshake_ack (buf, &priv->handshake.incoming); - buf[0] = 0; - priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVING_ACK; - break; - - case IB_VERBS_HANDSHAKE_RECEIVING_ACK: - ret = __tcp_readv (this, - &priv->handshake.incoming.vector, - priv->handshake.incoming.count, - &priv->handshake.incoming.pending_vector, - &priv->handshake.incoming.pending_count); - if (ret == -1) { - goto unlock; - } - - if (ret > 0) { - gf_log (this->xl->name, GF_LOG_TRACE, - "partial header read on NB " - "socket. continue later"); - goto unlock; - } - - if (!ret) { - priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVED_ACK; - } - break; - - case IB_VERBS_HANDSHAKE_RECEIVED_ACK: - if (strncmp (buf, "DONE", 4)) { - gf_log ("transport/ib-verbs", - GF_LOG_DEBUG, - "%s: handshake-3 did not " - "return 'DONE' (%s)", - this->xl->name, buf); - ret = -1; - goto unlock; - } - ret = 0; - priv->connected = 1; - sock_len = sizeof (struct sockaddr_storage); - getpeername (priv->sock, - (struct sockaddr *) &this->peerinfo.sockaddr, - &sock_len); - - GF_FREE (priv->handshake.incoming.buf); - priv->handshake.incoming.buf = NULL; - priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_COMPLETE; - } - } - } -unlock: - pthread_mutex_unlock (&priv->write_mutex); - - if (ret == -1) { - transport_disconnect (this); - } else { - ret = 0; - } - - if (!ret && priv->connected) { - ret = xlator_notify (this->xl, GF_EVENT_CHILD_UP, this); - } - - return ret; -} - -static int -ib_verbs_handshake_pollout (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - char *buf = priv->handshake.outgoing.buf; - int32_t ret = 0; - - if (priv->handshake.outgoing.state == IB_VERBS_HANDSHAKE_COMPLETE) { - return 0; - } - - pthread_mutex_unlock (&priv->write_mutex); - { - while (priv->handshake.outgoing.state != IB_VERBS_HANDSHAKE_COMPLETE) - { - switch (priv->handshake.outgoing.state) - { - case IB_VERBS_HANDSHAKE_START: - buf = priv->handshake.outgoing.buf = GF_CALLOC (1, 256, gf_ibv_mt_char); - ib_verbs_fill_handshake_data (buf, &priv->handshake.outgoing, priv); - priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENDING_DATA; - break; - - case IB_VERBS_HANDSHAKE_SENDING_DATA: - ret = __tcp_writev (this, - &priv->handshake.outgoing.vector, - priv->handshake.outgoing.count, - &priv->handshake.outgoing.pending_vector, - &priv->handshake.outgoing.pending_count); - if (ret == -1) { - goto unlock; - } - - if (ret > 0) { - gf_log (this->xl->name, GF_LOG_TRACE, - "partial header read on NB socket. continue later"); - goto unlock; - } - - if (!ret) { - priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENT_DATA; - } - break; - - case IB_VERBS_HANDSHAKE_SENT_DATA: - ib_verbs_fill_handshake_ack (buf, &priv->handshake.outgoing); - priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENDING_ACK; - break; - - case IB_VERBS_HANDSHAKE_SENDING_ACK: - ret = __tcp_writev (this, - &priv->handshake.outgoing.vector, - priv->handshake.outgoing.count, - &priv->handshake.outgoing.pending_vector, - &priv->handshake.outgoing.pending_count); - - if (ret == -1) { - goto unlock; - } - - if (ret > 0) { - gf_log (this->xl->name, GF_LOG_TRACE, - "partial header read on NB " - "socket. continue later"); - goto unlock; - } - - if (!ret) { - GF_FREE (priv->handshake.outgoing.buf); - priv->handshake.outgoing.buf = NULL; - priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_COMPLETE; - } - break; - } - } - } -unlock: - pthread_mutex_unlock (&priv->write_mutex); - - if (ret == -1) { - transport_disconnect (this); - } else { - ret = 0; - } - - return ret; -} - -static int -ib_verbs_handshake_pollerr (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - int32_t ret = 0; - char need_unref = 0; - - gf_log ("transport/ib-verbs", GF_LOG_DEBUG, - "%s: peer disconnected, cleaning up", - this->xl->name); - - pthread_mutex_lock (&priv->write_mutex); - { - __ib_verbs_teardown (this); - - if (priv->sock != -1) { - event_unregister (this->xl->ctx->event_pool, - priv->sock, priv->idx); - need_unref = 1; - - if (close (priv->sock) != 0) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "close () - error: %s", - strerror (errno)); - ret = -errno; - } - priv->tcp_connected = priv->connected = 0; - priv->sock = -1; - } - - if (priv->handshake.incoming.buf) { - GF_FREE (priv->handshake.incoming.buf); - priv->handshake.incoming.buf = NULL; - } - - priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START; - - if (priv->handshake.outgoing.buf) { - GF_FREE (priv->handshake.outgoing.buf); - priv->handshake.outgoing.buf = NULL; - } - - priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START; - } - pthread_mutex_unlock (&priv->write_mutex); - - xlator_notify (this->xl, GF_EVENT_POLLERR, this, NULL); - - if (need_unref) - transport_unref (this); - - return 0; -} - - -static int -tcp_connect_finish (transport_t *this) -{ - ib_verbs_private_t *priv = this->private; - int error = 0, ret = 0; - - pthread_mutex_lock (&priv->write_mutex); - { - ret = __tcp_connect_finish (priv->sock); - - if (!ret) { - this->myinfo.sockaddr_len = - sizeof (this->myinfo.sockaddr); - ret = getsockname (priv->sock, - (struct sockaddr *)&this->myinfo.sockaddr, - &this->myinfo.sockaddr_len); - if (ret == -1) - { - gf_log (this->xl->name, GF_LOG_ERROR, - "getsockname on new client-socket %d " - "failed (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - error = 1; - goto unlock; - } - - get_transport_identifiers (this); - priv->tcp_connected = 1; - } - - if (ret == -1 && errno != EINPROGRESS) { - gf_log (this->xl->name, GF_LOG_ERROR, - "tcp connect to %s failed (%s)", - this->peerinfo.identifier, strerror (errno)); - error = 1; - } - } -unlock: - pthread_mutex_unlock (&priv->write_mutex); - - if (error) { - transport_disconnect (this); - } - - return ret; -} - -static int -ib_verbs_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - transport_t *this = data; - ib_verbs_private_t *priv = this->private; - ib_verbs_options_t *options = NULL; - int ret = 0; - - if (!priv->tcp_connected) { - ret = tcp_connect_finish (this); - if (priv->tcp_connected) { - options = &priv->options; - - priv->peer.send_count = options->send_count; - priv->peer.recv_count = options->recv_count; - priv->peer.send_size = options->send_size; - priv->peer.recv_size = options->recv_size; - - if ((ret = ib_verbs_create_qp (this)) < 0) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "%s: could not create QP", - this->xl->name); - transport_disconnect (this); - } - } - } - - if (!ret && poll_out && priv->tcp_connected) { - ret = ib_verbs_handshake_pollout (this); - } - - if (!ret && poll_in && priv->tcp_connected) { - if (priv->handshake.incoming.state == IB_VERBS_HANDSHAKE_COMPLETE) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "%s: pollin received on tcp socket (peer: %s) " - "after handshake is complete", - this->xl->name, this->peerinfo.identifier); - ib_verbs_handshake_pollerr (this); - return 0; - } - ret = ib_verbs_handshake_pollin (this); - } - - if (ret < 0 || poll_err) { - ret = ib_verbs_handshake_pollerr (this); - } - - return 0; -} - -static int -__tcp_nonblock (int fd) -{ - int flags = 0; - int ret = -1; - - flags = fcntl (fd, F_GETFL); - - if (flags != -1) - ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); - - return ret; -} - -static int32_t -ib_verbs_connect (struct transport *this) -{ - dict_t *options = this->xl->options; - - ib_verbs_private_t *priv = this->private; - - int32_t ret = 0; - gf_boolean_t non_blocking = 1; - struct sockaddr_storage sockaddr; - socklen_t sockaddr_len = 0; - - if (priv->connected) { - return 0; - } - - if (dict_get (options, "non-blocking-io")) { - char *nb_connect = data_to_str (dict_get (this->xl->options, - "non-blocking-io")); - - if (gf_string2boolean (nb_connect, &non_blocking) == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "'non-blocking-io' takes only boolean " - "options, not taking any action"); - non_blocking = 1; - } - } - - ret = ibverbs_client_get_remote_sockaddr (this, (struct sockaddr *)&sockaddr, - &sockaddr_len); - if (ret != 0) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "cannot get remote address to connect"); - return ret; - } - - pthread_mutex_lock (&priv->write_mutex); - { - if (priv->sock != -1) { - ret = 0; - goto unlock; - } - - priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family, - SOCK_STREAM, 0); - - if (priv->sock == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "socket () - error: %s", strerror (errno)); - ret = -errno; - goto unlock; - } - - gf_log (this->xl->name, GF_LOG_TRACE, - "socket fd = %d", priv->sock); - - memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len); - this->peerinfo.sockaddr_len = sockaddr_len; - - ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = - ((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family; - - if (non_blocking) - { - ret = __tcp_nonblock (priv->sock); - - if (ret == -1) - { - gf_log (this->xl->name, GF_LOG_ERROR, - "could not set socket %d to non " - "blocking mode (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } - - ret = client_bind (this, - (struct sockaddr *)&this->myinfo.sockaddr, - &this->myinfo.sockaddr_len, priv->sock); - if (ret == -1) - { - gf_log (this->xl->name, GF_LOG_WARNING, - "client bind failed: %s", strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - ret = connect (priv->sock, - (struct sockaddr *)&this->peerinfo.sockaddr, - this->peerinfo.sockaddr_len); - if (ret == -1 && errno != EINPROGRESS) - { - gf_log (this->xl->name, GF_LOG_ERROR, - "connection attempt failed (%s)", - strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - priv->tcp_connected = priv->connected = 0; - - transport_ref (this); - - priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START; - priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START; - - priv->idx = event_register (this->xl->ctx->event_pool, - priv->sock, ib_verbs_event_handler, - this, 1, 1); - } -unlock: - pthread_mutex_unlock (&priv->write_mutex); - - return ret; -} - -static int -ib_verbs_server_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - int32_t main_sock = -1; - transport_t *this, *trans = data; - ib_verbs_private_t *priv = NULL; - ib_verbs_private_t *trans_priv = (ib_verbs_private_t *) trans->private; - ib_verbs_options_t *options = NULL; - - if (!poll_in) - return 0; - - this = GF_CALLOC (1, sizeof (transport_t), - gf_ibv_mt_transport_t); - ERR_ABORT (this); - priv = GF_CALLOC (1, sizeof (ib_verbs_private_t), - gf_ibv_mt_ib_verbs_private_t); - ERR_ABORT (priv); - this->private = priv; - /* Copy all the ib_verbs related values in priv, from trans_priv - as other than QP, all the values remain same */ - priv->device = trans_priv->device; - priv->options = trans_priv->options; - options = &priv->options; - - this->ops = trans->ops; - this->xl = trans->xl; - this->init = trans->init; - this->fini = trans->fini; - - memcpy (&this->myinfo.sockaddr, &trans->myinfo.sockaddr, - trans->myinfo.sockaddr_len); - this->myinfo.sockaddr_len = trans->myinfo.sockaddr_len; - - main_sock = (trans_priv)->sock; - this->peerinfo.sockaddr_len = sizeof (this->peerinfo.sockaddr); - priv->sock = accept (main_sock, - (struct sockaddr *)&this->peerinfo.sockaddr, - &this->peerinfo.sockaddr_len); - if (priv->sock == -1) { - gf_log ("ib-verbs/server", GF_LOG_ERROR, - "accept() failed: %s", - strerror (errno)); - GF_FREE (this->private); - GF_FREE (this); - return -1; - } - - priv->peer.trans = this; - transport_ref (this); - - get_transport_identifiers (this); - - priv->tcp_connected = 1; - priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START; - priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START; - - priv->peer.send_count = options->send_count; - priv->peer.recv_count = options->recv_count; - priv->peer.send_size = options->send_size; - priv->peer.recv_size = options->recv_size; - INIT_LIST_HEAD (&priv->peer.ioq); - - if (ib_verbs_create_qp (this) < 0) { - gf_log ("transport/ib-verbs", GF_LOG_ERROR, - "%s: could not create QP", - this->xl->name); - transport_disconnect (this); - return -1; - } - - priv->idx = event_register (this->xl->ctx->event_pool, priv->sock, - ib_verbs_event_handler, this, 1, 1); - - pthread_mutex_init (&priv->read_mutex, NULL); - pthread_mutex_init (&priv->write_mutex, NULL); - pthread_mutex_init (&priv->recv_mutex, NULL); - /* pthread_cond_init (&priv->recv_cond, NULL); */ - - return 0; -} - -static int32_t -ib_verbs_listen (transport_t *this) -{ - struct sockaddr_storage sockaddr; - socklen_t sockaddr_len; - ib_verbs_private_t *priv = this->private; - int opt = 1, ret = 0; - char service[NI_MAXSERV], host[NI_MAXHOST]; - - memset (&sockaddr, 0, sizeof (sockaddr)); - ret = ibverbs_server_get_local_sockaddr (this, - (struct sockaddr *)&sockaddr, - &sockaddr_len); - if (ret != 0) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "cannot find network address of server to bind to"); - goto err; - } - - priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family, - SOCK_STREAM, 0); - if (priv->sock == -1) { - gf_log ("ib-verbs/server", GF_LOG_CRITICAL, - "init: failed to create socket, error: %s", - strerror (errno)); - GF_FREE (this->private); - ret = -1; - goto err; - } - - memcpy (&this->myinfo.sockaddr, &sockaddr, sockaddr_len); - this->myinfo.sockaddr_len = sockaddr_len; - - ret = getnameinfo ((struct sockaddr *)&this->myinfo.sockaddr, - this->myinfo.sockaddr_len, - host, sizeof (host), - service, sizeof (service), - NI_NUMERICHOST); - if (ret != 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "getnameinfo failed (%s)", gai_strerror (ret)); - goto err; - } - sprintf (this->myinfo.identifier, "%s:%s", host, service); - - setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); - if (bind (priv->sock, - (struct sockaddr *)&sockaddr, - sockaddr_len) != 0) { - ret = -1; - gf_log ("ib-verbs/server", GF_LOG_ERROR, - "init: failed to bind to socket for %s (%s)", - this->myinfo.identifier, strerror (errno)); - goto err; - } - - if (listen (priv->sock, 10) != 0) { - gf_log ("ib-verbs/server", GF_LOG_ERROR, - "init: listen () failed on socket for %s (%s)", - this->myinfo.identifier, strerror (errno)); - ret = -1; - goto err; - } - - /* Register the main socket */ - priv->idx = event_register (this->xl->ctx->event_pool, priv->sock, - ib_verbs_server_event_handler, - transport_ref (this), 1, 0); - -err: - return ret; -} - -struct transport_ops tops = { - .receive = ib_verbs_receive, - .submit = ib_verbs_submit, - .connect = ib_verbs_connect, - .disconnect = ib_verbs_disconnect, - .listen = ib_verbs_listen, -}; - -int32_t -init (transport_t *this) -{ - ib_verbs_private_t *priv = GF_CALLOC (1, sizeof (*priv), - gf_ibv_mt_ib_verbs_private_t); - this->private = priv; - priv->sock = -1; - - if (ib_verbs_init (this)) { - gf_log (this->xl->name, GF_LOG_ERROR, - "Failed to initialize IB Device"); - return -1; - } - - return 0; -} - -void -fini (struct transport *this) -{ - /* TODO: verify this function does graceful finish */ - ib_verbs_private_t *priv = this->private; - this->private = NULL; - - pthread_mutex_destroy (&priv->recv_mutex); - pthread_mutex_destroy (&priv->write_mutex); - pthread_mutex_destroy (&priv->read_mutex); - /* pthread_cond_destroy (&priv->recv_cond); */ - - gf_log (this->xl->name, GF_LOG_TRACE, - "called fini on transport: %p", - this); - GF_FREE (priv); - return; -} - -int32_t -mem_acct_init (xlator_t *this) -{ - int ret = -1; - - if (!this) - return ret; - - ret = xlator_mem_acct_init (this, gf_common_mt_end + 1); - - if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, "Memory accounting init" - "failed"); - return ret; - } - - return ret; -} - -/* TODO: expand each option */ -struct volume_options options[] = { - { .key = {"transport.ib-verbs.port", - "ib-verbs-port"}, - .type = GF_OPTION_TYPE_INT, - .min = 1, - .max = 4, - .description = "check the option by 'ibv_devinfo'" - }, - { .key = {"transport.ib-verbs.mtu", - "ib-verbs-mtu"}, - .type = GF_OPTION_TYPE_INT, - }, - { .key = {"transport.ib-verbs.device-name", - "ib-verbs-device-name"}, - .type = GF_OPTION_TYPE_ANY, - .description = "check by 'ibv_devinfo'" - }, - { .key = {"transport.ib-verbs.work-request-send-count", - "ib-verbs-work-request-send-count"}, - .type = GF_OPTION_TYPE_INT, - }, - { .key = {"transport.ib-verbs.work-request-recv-count", - "ib-verbs-work-request-recv-count"}, - .type = GF_OPTION_TYPE_INT, - }, - { .key = {"remote-port", - "transport.remote-port", - "transport.ib-verbs.remote-port"}, - .type = GF_OPTION_TYPE_INT - }, - { .key = {"transport.ib-verbs.listen-port", "listen-port"}, - .type = GF_OPTION_TYPE_INT - }, - { .key = {"transport.ib-verbs.connect-path", "connect-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = {"transport.ib-verbs.bind-path", "bind-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = {"transport.ib-verbs.listen-path", "listen-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = {"transport.address-family", - "address-family"}, - .value = {"inet", "inet6", "inet/inet6", "inet6/inet", - "unix", "inet-sdp" }, - .type = GF_OPTION_TYPE_STR - }, - { .key = {NULL} } -}; diff --git a/transport/ib-verbs/src/ib-verbs.h b/transport/ib-verbs/src/ib-verbs.h deleted file mode 100644 index c385b62e5c..0000000000 --- a/transport/ib-verbs/src/ib-verbs.h +++ /dev/null @@ -1,220 +0,0 @@ -/* - Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - -#ifndef _XPORT_IB_VERBS_H -#define _XPORT_IB_VERBS_H - - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#ifndef MAX_IOVEC -#define MAX_IOVEC 16 -#endif /* MAX_IOVEC */ - -#include "xlator.h" -#include "event.h" -#include "ib-verbs-mem-types.h" - -#include <stdio.h> -#include <list.h> -#include <arpa/inet.h> -#include <infiniband/verbs.h> - -#define GF_DEFAULT_IBVERBS_LISTEN_PORT 6997 - -/* options per transport end point */ -struct _ib_verbs_options { - int32_t port; - char *device_name; - enum ibv_mtu mtu; - int32_t send_count; - int32_t recv_count; - uint64_t recv_size; - uint64_t send_size; -}; -typedef struct _ib_verbs_options ib_verbs_options_t; - - -struct _ib_verbs_header { - char colonO[3]; - uint32_t size1; - uint32_t size2; - char version; -} __attribute__((packed)); -typedef struct _ib_verbs_header ib_verbs_header_t; - -struct _ib_verbs_ioq { - union { - struct list_head list; - struct { - struct _ib_verbs_ioq *next; - struct _ib_verbs_ioq *prev; - }; - }; - ib_verbs_header_t header; - struct iovec vector[MAX_IOVEC]; - int count; - char *buf; - struct iobref *iobref; -}; -typedef struct _ib_verbs_ioq ib_verbs_ioq_t; - -/* represents one communication peer, two per transport_t */ -struct _ib_verbs_peer { - transport_t *trans; - struct ibv_qp *qp; - - int32_t recv_count; - int32_t send_count; - int32_t recv_size; - int32_t send_size; - - int32_t quota; - union { - struct list_head ioq; - struct { - ib_verbs_ioq_t *ioq_next; - ib_verbs_ioq_t *ioq_prev; - }; - }; - - /* QP attributes, needed to connect with remote QP */ - int32_t local_lid; - int32_t local_psn; - int32_t local_qpn; - int32_t remote_lid; - int32_t remote_psn; - int32_t remote_qpn; -}; -typedef struct _ib_verbs_peer ib_verbs_peer_t; - - -struct _ib_verbs_post { - struct _ib_verbs_post *next, *prev; - struct ibv_mr *mr; - char *buf; - int32_t buf_size; - char aux; - int32_t reused; - pthread_barrier_t wait; -}; -typedef struct _ib_verbs_post ib_verbs_post_t; - - -struct _ib_verbs_queue { - ib_verbs_post_t active_posts, passive_posts; - int32_t active_count, passive_count; - pthread_mutex_t lock; -}; -typedef struct _ib_verbs_queue ib_verbs_queue_t; - - -struct _ib_verbs_qpreg { - pthread_mutex_t lock; - int32_t count; - struct _qpent { - struct _qpent *next, *prev; - int32_t qp_num; - ib_verbs_peer_t *peer; - } ents[42]; -}; -typedef struct _ib_verbs_qpreg ib_verbs_qpreg_t; - -/* context per device, stored in global glusterfs_ctx_t->ib */ -struct _ib_verbs_device { - struct _ib_verbs_device *next; - const char *device_name; - struct ibv_context *context; - int32_t port; - struct ibv_pd *pd; - struct ibv_srq *srq; - ib_verbs_qpreg_t qpreg; - struct ibv_comp_channel *send_chan, *recv_chan; - struct ibv_cq *send_cq, *recv_cq; - ib_verbs_queue_t sendq, recvq; - pthread_t send_thread, recv_thread; -}; -typedef struct _ib_verbs_device ib_verbs_device_t; - -typedef enum { - IB_VERBS_HANDSHAKE_START = 0, - IB_VERBS_HANDSHAKE_SENDING_DATA, - IB_VERBS_HANDSHAKE_RECEIVING_DATA, - IB_VERBS_HANDSHAKE_SENT_DATA, - IB_VERBS_HANDSHAKE_RECEIVED_DATA, - IB_VERBS_HANDSHAKE_SENDING_ACK, - IB_VERBS_HANDSHAKE_RECEIVING_ACK, - IB_VERBS_HANDSHAKE_RECEIVED_ACK, - IB_VERBS_HANDSHAKE_COMPLETE, -} ib_verbs_handshake_state_t; - -struct ib_verbs_nbio { - int state; - char *buf; - int count; - struct iovec vector; - struct iovec *pending_vector; - int pending_count; -}; - - -struct _ib_verbs_private { - int32_t sock; - int32_t idx; - unsigned char connected; - unsigned char tcp_connected; - unsigned char ib_connected; - in_addr_t addr; - unsigned short port; - - /* IB Verbs Driver specific variables, pointers */ - ib_verbs_peer_t peer; - ib_verbs_device_t *device; - ib_verbs_options_t options; - - /* Used by trans->op->receive */ - char *data_ptr; - int32_t data_offset; - int32_t data_len; - - /* Mutex */ - pthread_mutex_t read_mutex; - pthread_mutex_t write_mutex; - pthread_barrier_t handshake_barrier; - char handshake_ret; - - pthread_mutex_t recv_mutex; - pthread_cond_t recv_cond; - - /* used during ib_verbs_handshake */ - struct { - struct ib_verbs_nbio incoming; - struct ib_verbs_nbio outgoing; - int state; - ib_verbs_header_t header; - char *buf; - size_t size; - } handshake; -}; -typedef struct _ib_verbs_private ib_verbs_private_t; - -#endif /* _XPORT_IB_VERBS_H */ diff --git a/transport/ib-verbs/src/name.c b/transport/ib-verbs/src/name.c deleted file mode 100644 index a3e1848145..0000000000 --- a/transport/ib-verbs/src/name.c +++ /dev/null @@ -1,712 +0,0 @@ -/* - Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - -#include <sys/types.h> -#include <sys/socket.h> -#include <errno.h> -#include <netdb.h> -#include <string.h> - -#ifdef CLIENT_PORT_CEILING -#undef CLIENT_PORT_CEILING -#endif - -#define CLIENT_PORT_CEILING 1024 - -#ifndef AF_INET_SDP -#define AF_INET_SDP 27 -#endif - -#include "transport.h" -#include "ib-verbs.h" - -int32_t -gf_resolve_ip6 (const char *hostname, - uint16_t port, - int family, - void **dnscache, - struct addrinfo **addr_info); - -static int32_t -af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, - socklen_t sockaddr_len, int ceiling) -{ - int32_t ret = -1; - /* struct sockaddr_in sin = {0, }; */ - uint16_t port = ceiling - 1; - - while (port) - { - switch (sockaddr->sa_family) - { - case AF_INET6: - ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port); - break; - - case AF_INET_SDP: - case AF_INET: - ((struct sockaddr_in *)sockaddr)->sin_port = htons (port); - break; - } - - ret = bind (fd, sockaddr, sockaddr_len); - - if (ret == 0) - break; - - if (ret == -1 && errno == EACCES) - break; - - port--; - } - - return ret; -} - -static int32_t -af_unix_client_bind (transport_t *this, - struct sockaddr *sockaddr, - socklen_t sockaddr_len, - int sock) -{ - data_t *path_data = NULL; - struct sockaddr_un *addr = NULL; - int32_t ret = -1; - - path_data = dict_get (this->xl->options, - "transport.ib-verbs.bind-path"); - if (path_data) { - char *path = data_to_str (path_data); - if (!path || strlen (path) > UNIX_PATH_MAX) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "transport.ib-verbs.bind-path not specfied " - "for unix socket, letting connect to assign " - "default value"); - goto err; - } - - addr = (struct sockaddr_un *) sockaddr; - strcpy (addr->sun_path, path); - ret = bind (sock, (struct sockaddr *)addr, sockaddr_len); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "cannot bind to unix-domain socket %d (%s)", - sock, strerror (errno)); - goto err; - } - } - -err: - return ret; -} - -static int32_t -client_fill_address_family (transport_t *this, struct sockaddr *sockaddr) -{ - data_t *address_family_data = NULL; - - address_family_data = dict_get (this->xl->options, - "transport.address-family"); - if (!address_family_data) { - data_t *remote_host_data = NULL, *connect_path_data = NULL; - remote_host_data = dict_get (this->xl->options, "remote-host"); - connect_path_data = dict_get (this->xl->options, - "transport.ib-verbs.connect-path"); - - if (!(remote_host_data || connect_path_data) || - (remote_host_data && connect_path_data)) { - gf_log (this->xl->name, GF_LOG_ERROR, - "address-family not specified and not able to " - "determine the same from other options " - "(remote-host:%s and connect-path:%s)", - data_to_str (remote_host_data), - data_to_str (connect_path_data)); - return -1; - } - - if (remote_host_data) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "address-family not specified, guessing it " - "to be inet/inet6"); - sockaddr->sa_family = AF_UNSPEC; - } else { - gf_log (this->xl->name, GF_LOG_DEBUG, - "address-family not specified, guessing it " - "to be unix"); - sockaddr->sa_family = AF_UNIX; - } - - } else { - char *address_family = data_to_str (address_family_data); - if (!strcasecmp (address_family, "unix")) { - sockaddr->sa_family = AF_UNIX; - } else if (!strcasecmp (address_family, "inet")) { - sockaddr->sa_family = AF_INET; - } else if (!strcasecmp (address_family, "inet6")) { - sockaddr->sa_family = AF_INET6; - } else if (!strcasecmp (address_family, "inet-sdp")) { - sockaddr->sa_family = AF_INET_SDP; - } else if (!strcasecmp (address_family, "inet/inet6") - || !strcasecmp (address_family, "inet6/inet")) { - sockaddr->sa_family = AF_UNSPEC; - } else { - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address-family (%s) specified", - address_family); - return -1; - } - } - - return 0; -} - -static int32_t -af_inet_client_get_remote_sockaddr (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len) -{ - dict_t *options = this->xl->options; - data_t *remote_host_data = NULL; - data_t *remote_port_data = NULL; - char *remote_host = NULL; - uint16_t remote_port = 0; - struct addrinfo *addr_info = NULL; - int32_t ret = 0; - - remote_host_data = dict_get (options, "remote-host"); - if (remote_host_data == NULL) - { - gf_log (this->xl->name, GF_LOG_ERROR, - "option remote-host missing in volume %s", - this->xl->name); - ret = -1; - goto err; - } - - remote_host = data_to_str (remote_host_data); - if (remote_host == NULL) - { - gf_log (this->xl->name, GF_LOG_ERROR, - "option remote-host has data NULL in volume %s", - this->xl->name); - ret = -1; - goto err; - } - - remote_port_data = dict_get (options, "remote-port"); - if (remote_port_data == NULL) - { - gf_log (this->xl->name, GF_LOG_DEBUG, - "option remote-port missing in volume %s. " - "Defaulting to %d", - this->xl->name, GF_DEFAULT_IBVERBS_LISTEN_PORT); - - remote_port = GF_DEFAULT_IBVERBS_LISTEN_PORT; - } - else - { - remote_port = data_to_uint16 (remote_port_data); - } - - if (remote_port == (uint16_t)-1) - { - gf_log (this->xl->name, GF_LOG_ERROR, - "option remote-port has invalid port in volume %s", - this->xl->name); - ret = -1; - goto err; - } - - /* TODO: gf_resolve is a blocking call. kick in some - non blocking dns techniques */ - ret = gf_resolve_ip6 (remote_host, remote_port, - sockaddr->sa_family, - &this->dnscache, &addr_info); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "DNS resolution failed on host %s", remote_host); - goto err; - } - - memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen); - *sockaddr_len = addr_info->ai_addrlen; - -err: - return ret; -} - -static int32_t -af_unix_client_get_remote_sockaddr (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len) -{ - struct sockaddr_un *sockaddr_un = NULL; - char *connect_path = NULL; - data_t *connect_path_data = NULL; - int32_t ret = 0; - - connect_path_data = dict_get (this->xl->options, - "transport.ib-verbs.connect-path"); - if (!connect_path_data) { - gf_log (this->xl->name, GF_LOG_ERROR, - "option transport.ib-verbs.connect-path not " - "specified for address-family unix"); - ret = -1; - goto err; - } - - connect_path = data_to_str (connect_path_data); - if (!connect_path) { - gf_log (this->xl->name, GF_LOG_ERROR, - "connect-path is null-string"); - ret = -1; - goto err; - } - - if (strlen (connect_path) > UNIX_PATH_MAX) { - gf_log (this->xl->name, GF_LOG_ERROR, - "connect-path value length %"GF_PRI_SIZET" > " - "%d octets", strlen (connect_path), UNIX_PATH_MAX); - ret = -1; - goto err; - } - - gf_log (this->xl->name, - GF_LOG_DEBUG, - "using connect-path %s", connect_path); - sockaddr_un = (struct sockaddr_un *)sockaddr; - strcpy (sockaddr_un->sun_path, connect_path); - *sockaddr_len = sizeof (struct sockaddr_un); - -err: - return ret; -} - -static int32_t -af_unix_server_get_local_sockaddr (transport_t *this, - struct sockaddr *addr, - socklen_t *addr_len) -{ - data_t *listen_path_data = NULL; - char *listen_path = NULL; - int32_t ret = 0; - struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr; - - - listen_path_data = dict_get (this->xl->options, - "transport.ib-verbs.listen-path"); - if (!listen_path_data) { - gf_log (this->xl->name, GF_LOG_ERROR, - "missing option listen-path"); - ret = -1; - goto err; - } - - listen_path = data_to_str (listen_path_data); - -#ifndef UNIX_PATH_MAX -#define UNIX_PATH_MAX 108 -#endif - - if (strlen (listen_path) > UNIX_PATH_MAX) { - gf_log (this->xl->name, GF_LOG_ERROR, - "option listen-path has value length %"GF_PRI_SIZET" > %d", - strlen (listen_path), UNIX_PATH_MAX); - ret = -1; - goto err; - } - - sunaddr->sun_family = AF_UNIX; - strcpy (sunaddr->sun_path, listen_path); - *addr_len = sizeof (struct sockaddr_un); - -err: - return ret; -} - -static int32_t -af_inet_server_get_local_sockaddr (transport_t *this, - struct sockaddr *addr, - socklen_t *addr_len) -{ - struct addrinfo hints, *res = 0; - data_t *listen_port_data = NULL, *listen_host_data = NULL; - uint16_t listen_port = -1; - char service[NI_MAXSERV], *listen_host = NULL; - dict_t *options = NULL; - int32_t ret = 0; - - options = this->xl->options; - - listen_port_data = dict_get (options, "transport.ib-verbs.listen-port"); - listen_host_data = dict_get (options, "transport.ib-verbs.bind-address"); - - if (listen_port_data) - { - listen_port = data_to_uint16 (listen_port_data); - } else { - if (addr->sa_family == AF_INET6) { - struct sockaddr_in6 *in = (struct sockaddr_in6 *) addr; - in->sin6_addr = in6addr_any; - in->sin6_port = htons(listen_port); - *addr_len = sizeof(struct sockaddr_in6); - goto out; - } else if (addr->sa_family == AF_INET) { - struct sockaddr_in *in = (struct sockaddr_in *) addr; - in->sin_addr.s_addr = htonl(INADDR_ANY); - in->sin_port = htons(listen_port); - *addr_len = sizeof(struct sockaddr_in); - goto out; - } - } - - if (listen_port == (uint16_t) -1) - listen_port = GF_DEFAULT_IBVERBS_LISTEN_PORT; - - - if (listen_host_data) - { - listen_host = data_to_str (listen_host_data); - } - - memset (service, 0, sizeof (service)); - sprintf (service, "%d", listen_port); - - memset (&hints, 0, sizeof (hints)); - hints.ai_family = addr->sa_family; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE; - - ret = getaddrinfo(listen_host, service, &hints, &res); - if (ret != 0) { - gf_log (this->xl->name, - GF_LOG_ERROR, - "getaddrinfo failed for host %s, service %s (%s)", - listen_host, service, gai_strerror (ret)); - ret = -1; - goto out; - } - - memcpy (addr, res->ai_addr, res->ai_addrlen); - *addr_len = res->ai_addrlen; - - freeaddrinfo (res); - -out: - return ret; -} - -int32_t -client_bind (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - int sock) -{ - int ret = 0; - - *sockaddr_len = sizeof (struct sockaddr_in6); - switch (sockaddr->sa_family) - { - case AF_INET_SDP: - case AF_INET: - *sockaddr_len = sizeof (struct sockaddr_in); - - case AF_INET6: - ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr, - *sockaddr_len, - CLIENT_PORT_CEILING); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_WARNING, - "cannot bind inet socket (%d) to port " - "less than %d (%s)", - sock, CLIENT_PORT_CEILING, strerror (errno)); - ret = 0; - } - break; - - case AF_UNIX: - *sockaddr_len = sizeof (struct sockaddr_un); - ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr, - *sockaddr_len, sock); - break; - - default: - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address family %d", sockaddr->sa_family); - ret = -1; - break; - } - - return ret; -} - -int32_t -ibverbs_client_get_remote_sockaddr (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len) -{ - int32_t ret = 0; - char is_inet_sdp = 0; - - ret = client_fill_address_family (this, sockaddr); - if (ret) { - ret = -1; - goto err; - } - - switch (sockaddr->sa_family) - { - case AF_INET_SDP: - sockaddr->sa_family = AF_INET; - is_inet_sdp = 1; - - case AF_INET: - case AF_INET6: - case AF_UNSPEC: - ret = af_inet_client_get_remote_sockaddr (this, - sockaddr, - sockaddr_len); - - if (is_inet_sdp) { - sockaddr->sa_family = AF_INET_SDP; - } - - break; - - case AF_UNIX: - ret = af_unix_client_get_remote_sockaddr (this, - sockaddr, - sockaddr_len); - break; - - default: - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address-family %d", sockaddr->sa_family); - ret = -1; - } - -err: - return ret; -} - -int32_t -ibverbs_server_get_local_sockaddr (transport_t *this, - struct sockaddr *addr, - socklen_t *addr_len) -{ - data_t *address_family_data = NULL; - int32_t ret = 0; - char is_inet_sdp = 0; - - address_family_data = dict_get (this->xl->options, - "transport.address-family"); - if (address_family_data) { - char *address_family = NULL; - address_family = data_to_str (address_family_data); - - if (!strcasecmp (address_family, "inet")) { - addr->sa_family = AF_INET; - } else if (!strcasecmp (address_family, "inet6")) { - addr->sa_family = AF_INET6; - } else if (!strcasecmp (address_family, "inet-sdp")) { - addr->sa_family = AF_INET_SDP; - } else if (!strcasecmp (address_family, "unix")) { - addr->sa_family = AF_UNIX; - } else if (!strcasecmp (address_family, "inet/inet6") - || !strcasecmp (address_family, "inet6/inet")) { - addr->sa_family = AF_UNSPEC; - } else { - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address family (%s) specified", - address_family); - ret = -1; - goto err; - } - } else { - gf_log (this->xl->name, GF_LOG_DEBUG, - "option address-family not specified, defaulting " - "to inet/inet6"); - addr->sa_family = AF_UNSPEC; - } - - switch (addr->sa_family) - { - case AF_INET_SDP: - is_inet_sdp = 1; - addr->sa_family = AF_INET; - - case AF_INET: - case AF_INET6: - case AF_UNSPEC: - ret = af_inet_server_get_local_sockaddr (this, addr, addr_len); - if (is_inet_sdp && !ret) { - addr->sa_family = AF_INET_SDP; - } - break; - - case AF_UNIX: - ret = af_unix_server_get_local_sockaddr (this, addr, addr_len); - break; - } - -err: - return ret; -} - -int32_t -fill_inet6_inet_identifiers (transport_t *this, struct sockaddr_storage *addr, - int32_t addr_len, char *identifier) -{ - int32_t ret = 0, tmpaddr_len = 0; - char service[NI_MAXSERV], host[NI_MAXHOST]; - struct sockaddr_storage tmpaddr; - - memset (&tmpaddr, 0, sizeof (tmpaddr)); - tmpaddr = *addr; - tmpaddr_len = addr_len; - - if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) { - int32_t one_to_four, four_to_eight, twelve_to_sixteen; - int16_t eight_to_ten, ten_to_twelve; - - one_to_four = four_to_eight = twelve_to_sixteen = 0; - eight_to_ten = ten_to_twelve = 0; - - one_to_four = ((struct sockaddr_in6 *) - &tmpaddr)->sin6_addr.s6_addr32[0]; - four_to_eight = ((struct sockaddr_in6 *) - &tmpaddr)->sin6_addr.s6_addr32[1]; -#ifdef GF_SOLARIS_HOST_OS - eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *) - &tmpaddr)->sin6_addr)[4]; -#else - eight_to_ten = ((struct sockaddr_in6 *) - &tmpaddr)->sin6_addr.s6_addr16[4]; -#endif - -#ifdef GF_SOLARIS_HOST_OS - ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *) - &tmpaddr)->sin6_addr)[5]; -#else - ten_to_twelve = ((struct sockaddr_in6 *) - &tmpaddr)->sin6_addr.s6_addr16[5]; -#endif - twelve_to_sixteen = ((struct sockaddr_in6 *) - &tmpaddr)->sin6_addr.s6_addr32[3]; - - /* ipv4 mapped ipv6 address has - bits 0-80: 0 - bits 80-96: 0xffff - bits 96-128: ipv4 address - */ - - if (one_to_four == 0 && - four_to_eight == 0 && - eight_to_ten == 0 && - ten_to_twelve == -1) { - struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr; - memset (&tmpaddr, 0, sizeof (tmpaddr)); - - in_ptr->sin_family = AF_INET; - in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port; - in_ptr->sin_addr.s_addr = twelve_to_sixteen; - tmpaddr_len = sizeof (*in_ptr); - } - } - - ret = getnameinfo ((struct sockaddr *) &tmpaddr, - tmpaddr_len, - host, sizeof (host), - service, sizeof (service), - NI_NUMERICHOST | NI_NUMERICSERV); - if (ret != 0) { - gf_log (this->xl->name, - GF_LOG_ERROR, - "getnameinfo failed (%s)", gai_strerror (ret)); - } - - sprintf (identifier, "%s:%s", host, service); - - return ret; -} - -int32_t -get_transport_identifiers (transport_t *this) -{ - int32_t ret = 0; - char is_inet_sdp = 0; - - switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family) - { - case AF_INET_SDP: - is_inet_sdp = 1; - ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET; - - case AF_INET: - case AF_INET6: - { - ret = fill_inet6_inet_identifiers (this, - &this->myinfo.sockaddr, - this->myinfo.sockaddr_len, - this->myinfo.identifier); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "can't fill inet/inet6 identifier for server"); - goto err; - } - - ret = fill_inet6_inet_identifiers (this, - &this->peerinfo.sockaddr, - this->peerinfo.sockaddr_len, - this->peerinfo.identifier); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "can't fill inet/inet6 identifier for client"); - goto err; - } - - if (is_inet_sdp) { - ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP; - } - } - break; - - case AF_UNIX: - { - struct sockaddr_un *sunaddr = NULL; - - sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr; - strcpy (this->myinfo.identifier, sunaddr->sun_path); - - sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr; - strcpy (this->peerinfo.identifier, sunaddr->sun_path); - } - break; - - default: - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address family (%d)", - ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family); - ret = -1; - break; - } - -err: - return ret; -} diff --git a/transport/ib-verbs/src/name.h b/transport/ib-verbs/src/name.h deleted file mode 100644 index 4f0f47711d..0000000000 --- a/transport/ib-verbs/src/name.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - -#ifndef _IB_VERBS_NAME_H -#define _IB_VERBS_NAME_H - -#include <sys/socket.h> -#include <sys/un.h> - -#include "compat.h" - -int32_t -client_bind (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - int sock); - -int32_t -ibverbs_client_get_remote_sockaddr (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len); - -int32_t -ibverbs_server_get_local_sockaddr (transport_t *this, - struct sockaddr *addr, - socklen_t *addr_len); - -int32_t -get_transport_identifiers (transport_t *this); - -#endif /* _IB_VERBS_NAME_H */ diff --git a/transport/socket/Makefile.am b/transport/socket/Makefile.am deleted file mode 100644 index f963effea2..0000000000 --- a/transport/socket/Makefile.am +++ /dev/null @@ -1 +0,0 @@ -SUBDIRS = src
\ No newline at end of file diff --git a/transport/socket/src/Makefile.am b/transport/socket/src/Makefile.am deleted file mode 100644 index 1832587a6d..0000000000 --- a/transport/socket/src/Makefile.am +++ /dev/null @@ -1,14 +0,0 @@ -noinst_HEADERS = socket.h name.h - -transport_LTLIBRARIES = socket.la -transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/transport - -socket_la_LDFLAGS = -module -avoidversion - -socket_la_SOURCES = socket.c name.c -socket_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la - -AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ - -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) - -CLEANFILES = *~ diff --git a/transport/socket/src/name.c b/transport/socket/src/name.c deleted file mode 100644 index 120a669c87..0000000000 --- a/transport/socket/src/name.c +++ /dev/null @@ -1,737 +0,0 @@ -/* - Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <errno.h> -#include <netdb.h> -#include <string.h> - -#ifdef CLIENT_PORT_CEILING -#undef CLIENT_PORT_CEILING -#endif - -#define CLIENT_PORT_CEILING 1024 - -#ifndef AF_INET_SDP -#define AF_INET_SDP 27 -#endif - -#include "transport.h" -#include "socket.h" - -int32_t -gf_resolve_ip6 (const char *hostname, - uint16_t port, - int family, - void **dnscache, - struct addrinfo **addr_info); - -static int32_t -af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, - socklen_t sockaddr_len, int ceiling) -{ - int32_t ret = -1; - /* struct sockaddr_in sin = {0, }; */ - uint16_t port = ceiling - 1; - - while (port) - { - switch (sockaddr->sa_family) - { - case AF_INET6: - ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port); - break; - - case AF_INET_SDP: - case AF_INET: - ((struct sockaddr_in *)sockaddr)->sin_port = htons (port); - break; - } - - ret = bind (fd, sockaddr, sockaddr_len); - - if (ret == 0) - break; - - if (ret == -1 && errno == EACCES) - break; - - port--; - } - - return ret; -} - -static int32_t -af_unix_client_bind (transport_t *this, - struct sockaddr *sockaddr, - socklen_t sockaddr_len, - int sock) -{ - data_t *path_data = NULL; - struct sockaddr_un *addr = NULL; - int32_t ret = 0; - - path_data = dict_get (this->xl->options, "transport.socket.bind-path"); - if (path_data) { - char *path = data_to_str (path_data); - if (!path || strlen (path) > UNIX_PATH_MAX) { - gf_log (this->xl->name, GF_LOG_TRACE, - "bind-path not specfied for unix socket, " - "letting connect to assign default value"); - goto err; - } - - addr = (struct sockaddr_un *) sockaddr; - strcpy (addr->sun_path, path); - ret = bind (sock, (struct sockaddr *)addr, sockaddr_len); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "cannot bind to unix-domain socket %d (%s)", - sock, strerror (errno)); - goto err; - } - } else { - gf_log (this->xl->name, GF_LOG_TRACE, - "bind-path not specfied for unix socket, " - "letting connect to assign default value"); - } - -err: - return ret; -} - -int32_t -client_fill_address_family (transport_t *this, sa_family_t *sa_family) -{ - data_t *address_family_data = NULL; - int32_t ret = -1; - - if (sa_family == NULL) { - goto out; - } - - address_family_data = dict_get (this->xl->options, - "transport.address-family"); - if (!address_family_data) { - data_t *remote_host_data = NULL, *connect_path_data = NULL; - remote_host_data = dict_get (this->xl->options, "remote-host"); - connect_path_data = dict_get (this->xl->options, - "transport.socket.connect-path"); - - if (!(remote_host_data || connect_path_data) || - (remote_host_data && connect_path_data)) { - gf_log (this->xl->name, GF_LOG_ERROR, - "transport.address-family not specified and " - "not able to determine the " - "same from other options (remote-host:%s and " - "transport.unix.connect-path:%s)", - data_to_str (remote_host_data), - data_to_str (connect_path_data)); - goto out; - } - - if (remote_host_data) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "address-family not specified, guessing it " - "to be inet/inet6"); - *sa_family = AF_UNSPEC; - } else { - gf_log (this->xl->name, GF_LOG_DEBUG, - "address-family not specified, guessing it " - "to be unix"); - *sa_family = AF_UNIX; - } - - } else { - char *address_family = data_to_str (address_family_data); - if (!strcasecmp (address_family, "unix")) { - *sa_family = AF_UNIX; - } else if (!strcasecmp (address_family, "inet")) { - *sa_family = AF_INET; - } else if (!strcasecmp (address_family, "inet6")) { - *sa_family = AF_INET6; - } else if (!strcasecmp (address_family, "inet-sdp")) { - *sa_family = AF_INET_SDP; - } else if (!strcasecmp (address_family, "inet/inet6") - || !strcasecmp (address_family, "inet6/inet")) { - *sa_family = AF_UNSPEC; - } else { - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address-family (%s) specified", - address_family); - goto out; - } - } - - ret = 0; - -out: - return ret; -} - -static int32_t -af_inet_client_get_remote_sockaddr (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len) -{ - dict_t *options = this->xl->options; - data_t *remote_host_data = NULL; - data_t *remote_port_data = NULL; - char *remote_host = NULL; - uint16_t remote_port = 0; - struct addrinfo *addr_info = NULL; - int32_t ret = 0; - - remote_host_data = dict_get (options, "remote-host"); - if (remote_host_data == NULL) - { - gf_log (this->xl->name, GF_LOG_ERROR, - "option remote-host missing in volume %s", this->xl->name); - ret = -1; - goto err; - } - - remote_host = data_to_str (remote_host_data); - if (remote_host == NULL) - { - gf_log (this->xl->name, GF_LOG_ERROR, - "option remote-host has data NULL in volume %s", this->xl->name); - ret = -1; - goto err; - } - - remote_port_data = dict_get (options, "remote-port"); - if (remote_port_data == NULL) - { - gf_log (this->xl->name, GF_LOG_TRACE, - "option remote-port missing in volume %s. Defaulting to %d", - this->xl->name, GF_DEFAULT_SOCKET_LISTEN_PORT); - - remote_port = GF_DEFAULT_SOCKET_LISTEN_PORT; - } - else - { - remote_port = data_to_uint16 (remote_port_data); - } - - if (remote_port == (uint16_t)-1) - { - gf_log (this->xl->name, GF_LOG_ERROR, - "option remote-port has invalid port in volume %s", - this->xl->name); - ret = -1; - goto err; - } - - /* TODO: gf_resolve is a blocking call. kick in some - non blocking dns techniques */ - ret = gf_resolve_ip6 (remote_host, remote_port, - sockaddr->sa_family, &this->dnscache, &addr_info); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "DNS resolution failed on host %s", remote_host); - goto err; - } - - memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen); - *sockaddr_len = addr_info->ai_addrlen; - -err: - return ret; -} - -static int32_t -af_unix_client_get_remote_sockaddr (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len) -{ - struct sockaddr_un *sockaddr_un = NULL; - char *connect_path = NULL; - data_t *connect_path_data = NULL; - int32_t ret = 0; - - connect_path_data = dict_get (this->xl->options, - "transport.socket.connect-path"); - if (!connect_path_data) { - gf_log (this->xl->name, GF_LOG_ERROR, - "option transport.unix.connect-path not specified for " - "address-family unix"); - ret = -1; - goto err; - } - - connect_path = data_to_str (connect_path_data); - if (!connect_path) { - gf_log (this->xl->name, GF_LOG_ERROR, - "transport.unix.connect-path is null-string"); - ret = -1; - goto err; - } - - if (strlen (connect_path) > UNIX_PATH_MAX) { - gf_log (this->xl->name, GF_LOG_ERROR, - "connect-path value length %"GF_PRI_SIZET" > %d octets", - strlen (connect_path), UNIX_PATH_MAX); - ret = -1; - goto err; - } - - gf_log (this->xl->name, GF_LOG_TRACE, - "using connect-path %s", connect_path); - sockaddr_un = (struct sockaddr_un *)sockaddr; - strcpy (sockaddr_un->sun_path, connect_path); - *sockaddr_len = sizeof (struct sockaddr_un); - -err: - return ret; -} - -static int32_t -af_unix_server_get_local_sockaddr (transport_t *this, - struct sockaddr *addr, - socklen_t *addr_len) -{ - data_t *listen_path_data = NULL; - char *listen_path = NULL; - int32_t ret = 0; - struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr; - - - listen_path_data = dict_get (this->xl->options, - "transport.socket.listen-path"); - if (!listen_path_data) { - gf_log (this->xl->name, GF_LOG_ERROR, - "missing option transport.socket.listen-path"); - ret = -1; - goto err; - } - - listen_path = data_to_str (listen_path_data); - -#ifndef UNIX_PATH_MAX -#define UNIX_PATH_MAX 108 -#endif - - if (strlen (listen_path) > UNIX_PATH_MAX) { - gf_log (this->xl->name, GF_LOG_ERROR, - "option transport.unix.listen-path has value length " - "%"GF_PRI_SIZET" > %d", - strlen (listen_path), UNIX_PATH_MAX); - ret = -1; - goto err; - } - - sunaddr->sun_family = AF_UNIX; - strcpy (sunaddr->sun_path, listen_path); - *addr_len = sizeof (struct sockaddr_un); - -err: - return ret; -} - -static int32_t -af_inet_server_get_local_sockaddr (transport_t *this, - struct sockaddr *addr, - socklen_t *addr_len) -{ - struct addrinfo hints, *res = 0; - data_t *listen_port_data = NULL, *listen_host_data = NULL; - uint16_t listen_port = -1; - char service[NI_MAXSERV], *listen_host = NULL; - dict_t *options = NULL; - int32_t ret = 0; - - options = this->xl->options; - - listen_port_data = dict_get (options, "transport.socket.listen-port"); - listen_host_data = dict_get (options, "transport.socket.bind-address"); - - if (listen_port_data) - { - listen_port = data_to_uint16 (listen_port_data); - } - - if (listen_port == (uint16_t) -1) - listen_port = GF_DEFAULT_SOCKET_LISTEN_PORT; - - - if (listen_host_data) - { - listen_host = data_to_str (listen_host_data); - } else { - if (addr->sa_family == AF_INET6) { - struct sockaddr_in6 *in = (struct sockaddr_in6 *) addr; - in->sin6_addr = in6addr_any; - in->sin6_port = htons(listen_port); - *addr_len = sizeof(struct sockaddr_in6); - goto out; - } else if (addr->sa_family == AF_INET) { - struct sockaddr_in *in = (struct sockaddr_in *) addr; - in->sin_addr.s_addr = htonl(INADDR_ANY); - in->sin_port = htons(listen_port); - *addr_len = sizeof(struct sockaddr_in); - goto out; - } - } - - memset (service, 0, sizeof (service)); - sprintf (service, "%d", listen_port); - - memset (&hints, 0, sizeof (hints)); - hints.ai_family = addr->sa_family; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE; - - ret = getaddrinfo(listen_host, service, &hints, &res); - if (ret != 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "getaddrinfo failed for host %s, service %s (%s)", - listen_host, service, gai_strerror (ret)); - ret = -1; - goto out; - } - - memcpy (addr, res->ai_addr, res->ai_addrlen); - *addr_len = res->ai_addrlen; - - freeaddrinfo (res); - -out: - return ret; -} - -int32_t -client_bind (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - int sock) -{ - int ret = 0; - - *sockaddr_len = sizeof (struct sockaddr_in6); - switch (sockaddr->sa_family) - { - case AF_INET_SDP: - case AF_INET: - *sockaddr_len = sizeof (struct sockaddr_in); - - case AF_INET6: - ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr, - *sockaddr_len, CLIENT_PORT_CEILING); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_WARNING, - "cannot bind inet socket (%d) to port less than %d (%s)", - sock, CLIENT_PORT_CEILING, strerror (errno)); - ret = 0; - } - break; - - case AF_UNIX: - *sockaddr_len = sizeof (struct sockaddr_un); - ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr, - *sockaddr_len, sock); - break; - - default: - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address family %d", sockaddr->sa_family); - ret = -1; - break; - } - - return ret; -} - -int32_t -socket_client_get_remote_sockaddr (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - sa_family_t *sa_family) -{ - int32_t ret = 0; - - if ((sockaddr == NULL) || (sockaddr_len == NULL) - || (sa_family == NULL)) { - ret = -1; - goto err; - } - - - ret = client_fill_address_family (this, &sockaddr->sa_family); - if (ret) { - ret = -1; - goto err; - } - - *sa_family = sockaddr->sa_family; - - switch (sockaddr->sa_family) - { - case AF_INET_SDP: - sockaddr->sa_family = AF_INET; - - case AF_INET: - case AF_INET6: - case AF_UNSPEC: - ret = af_inet_client_get_remote_sockaddr (this, sockaddr, - sockaddr_len); - break; - - case AF_UNIX: - ret = af_unix_client_get_remote_sockaddr (this, sockaddr, - sockaddr_len); - break; - - default: - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address-family %d", sockaddr->sa_family); - ret = -1; - } - - if (*sa_family == AF_UNSPEC) { - *sa_family = sockaddr->sa_family; - } - -err: - return ret; -} - - -int32_t -server_fill_address_family (transport_t *this, sa_family_t *sa_family) -{ - data_t *address_family_data = NULL; - int32_t ret = -1; - - if (sa_family == NULL) { - goto out; - } - - address_family_data = dict_get (this->xl->options, - "transport.address-family"); - if (address_family_data) { - char *address_family = NULL; - address_family = data_to_str (address_family_data); - - if (!strcasecmp (address_family, "inet")) { - *sa_family = AF_INET; - } else if (!strcasecmp (address_family, "inet6")) { - *sa_family = AF_INET6; - } else if (!strcasecmp (address_family, "inet-sdp")) { - *sa_family = AF_INET_SDP; - } else if (!strcasecmp (address_family, "unix")) { - *sa_family = AF_UNIX; - } else if (!strcasecmp (address_family, "inet/inet6") - || !strcasecmp (address_family, "inet6/inet")) { - *sa_family = AF_UNSPEC; - } else { - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address family (%s) specified", address_family); - goto out; - } - } else { - gf_log (this->xl->name, GF_LOG_DEBUG, - "option address-family not specified, defaulting to inet/inet6"); - *sa_family = AF_UNSPEC; - } - - ret = 0; -out: - return ret; -} - - -int32_t -socket_server_get_local_sockaddr (transport_t *this, struct sockaddr *addr, - socklen_t *addr_len, sa_family_t *sa_family) -{ - int32_t ret = -1; - - if ((addr == NULL) || (addr_len == NULL) || (sa_family == NULL)) { - goto err; - } - - ret = server_fill_address_family (this, &addr->sa_family); - if (ret == -1) { - goto err; - } - - *sa_family = addr->sa_family; - - switch (addr->sa_family) - { - case AF_INET_SDP: - addr->sa_family = AF_INET; - - case AF_INET: - case AF_INET6: - case AF_UNSPEC: - ret = af_inet_server_get_local_sockaddr (this, addr, addr_len); - break; - - case AF_UNIX: - ret = af_unix_server_get_local_sockaddr (this, addr, addr_len); - break; - } - - if (*sa_family == AF_UNSPEC) { - *sa_family = addr->sa_family; - } - -err: - return ret; -} - -int32_t -fill_inet6_inet_identifiers (transport_t *this, struct sockaddr_storage *addr, - int32_t addr_len, char *identifier) -{ - int32_t ret = 0, tmpaddr_len = 0; - char service[NI_MAXSERV], host[NI_MAXHOST]; - struct sockaddr_storage tmpaddr; - - memset (&tmpaddr, 0, sizeof (tmpaddr)); - tmpaddr = *addr; - tmpaddr_len = addr_len; - - if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) { - int32_t one_to_four, four_to_eight, twelve_to_sixteen; - int16_t eight_to_ten, ten_to_twelve; - - one_to_four = four_to_eight = twelve_to_sixteen = 0; - eight_to_ten = ten_to_twelve = 0; - - one_to_four = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[0]; - four_to_eight = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[1]; -#ifdef GF_SOLARIS_HOST_OS - eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[4]; -#else - eight_to_ten = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[4]; -#endif - -#ifdef GF_SOLARIS_HOST_OS - ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[5]; -#else - ten_to_twelve = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[5]; -#endif - - twelve_to_sixteen = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[3]; - - /* ipv4 mapped ipv6 address has - bits 0-80: 0 - bits 80-96: 0xffff - bits 96-128: ipv4 address - */ - - if (one_to_four == 0 && - four_to_eight == 0 && - eight_to_ten == 0 && - ten_to_twelve == -1) { - struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr; - memset (&tmpaddr, 0, sizeof (tmpaddr)); - - in_ptr->sin_family = AF_INET; - in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port; - in_ptr->sin_addr.s_addr = twelve_to_sixteen; - tmpaddr_len = sizeof (*in_ptr); - } - } - - ret = getnameinfo ((struct sockaddr *) &tmpaddr, - tmpaddr_len, - host, sizeof (host), - service, sizeof (service), - NI_NUMERICHOST | NI_NUMERICSERV); - if (ret != 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "getnameinfo failed (%s)", gai_strerror (ret)); - } - - sprintf (identifier, "%s:%s", host, service); - - return ret; -} - -int32_t -get_transport_identifiers (transport_t *this) -{ - int32_t ret = 0; - char is_inet_sdp = 0; - - switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family) - { - case AF_INET_SDP: - is_inet_sdp = 1; - ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET; - - case AF_INET: - case AF_INET6: - { - ret = fill_inet6_inet_identifiers (this, - &this->myinfo.sockaddr, - this->myinfo.sockaddr_len, - this->myinfo.identifier); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "cannot fill inet/inet6 identifier for server"); - goto err; - } - - ret = fill_inet6_inet_identifiers (this, - &this->peerinfo.sockaddr, - this->peerinfo.sockaddr_len, - this->peerinfo.identifier); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "cannot fill inet/inet6 identifier for client"); - goto err; - } - - if (is_inet_sdp) { - ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP; - } - } - break; - - case AF_UNIX: - { - struct sockaddr_un *sunaddr = NULL; - - sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr; - strcpy (this->myinfo.identifier, sunaddr->sun_path); - - sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr; - strcpy (this->peerinfo.identifier, sunaddr->sun_path); - } - break; - - default: - gf_log (this->xl->name, GF_LOG_ERROR, - "unknown address family (%d)", - ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family); - ret = -1; - break; - } - -err: - return ret; -} diff --git a/transport/socket/src/name.h b/transport/socket/src/name.h deleted file mode 100644 index f50a7b7f4d..0000000000 --- a/transport/socket/src/name.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - -#ifndef _SOCKET_NAME_H -#define _SOCKET_NAME_H - -#include "compat.h" - -int32_t -client_bind (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - int sock); - -int32_t -socket_client_get_remote_sockaddr (transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - sa_family_t *sa_family); - -int32_t -socket_server_get_local_sockaddr (transport_t *this, struct sockaddr *addr, - socklen_t *addr_len, sa_family_t *sa_family); - -int32_t -get_transport_identifiers (transport_t *this); - -#endif /* _SOCKET_NAME_H */ diff --git a/transport/socket/src/socket-mem-types.h b/transport/socket/src/socket-mem-types.h deleted file mode 100644 index f50f4a75de..0000000000 --- a/transport/socket/src/socket-mem-types.h +++ /dev/null @@ -1,36 +0,0 @@ - -/* - Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - - -#ifndef __SOCKET_MEM_TYPES_H__ -#define __SOCKET_MEM_TYPES_H__ - -#include "mem-types.h" - -enum gf_socket_mem_types_ { - gf_socket_mt_socket_private_t = gf_common_mt_end + 1, - gf_socket_mt_ioq, - gf_socket_mt_transport_t, - gf_socket_mt_socket_local_t, - gf_socket_mt_char, - gf_socket_mt_end -}; -#endif - diff --git a/transport/socket/src/socket.c b/transport/socket/src/socket.c deleted file mode 100644 index 7f7f8093a7..0000000000 --- a/transport/socket/src/socket.c +++ /dev/null @@ -1,1552 +0,0 @@ -/* - Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include "socket.h" -#include "name.h" -#include "dict.h" -#include "transport.h" -#include "logging.h" -#include "xlator.h" -#include "byte-order.h" -#include "common-utils.h" -#include "compat-errno.h" - -#include <fcntl.h> -#include <errno.h> -#include <netinet/tcp.h> - - -#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR) -#define SA(ptr) ((struct sockaddr *)ptr) - -int socket_init (transport_t *this); - -/* - * return value: - * 0 = success (completed) - * -1 = error - * > 0 = incomplete - */ - -int -__socket_rwv (transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count, - int write) -{ - socket_private_t *priv = NULL; - int sock = -1; - int ret = -1; - struct iovec *opvector = NULL; - int opcount = 0; - int moved = 0; - - priv = this->private; - sock = priv->sock; - - opvector = vector; - opcount = count; - - while (opcount) { - if (write) { - ret = writev (sock, opvector, opcount); - - if (ret == 0 || (ret == -1 && errno == EAGAIN)) { - /* done for now */ - break; - } - } else { - ret = readv (sock, opvector, opcount); - - if (ret == -1 && errno == EAGAIN) { - /* done for now */ - break; - } - } - - if (ret == 0) { - /* Mostly due to 'umount' in client */ - gf_log (this->xl->name, GF_LOG_TRACE, - "EOF from peer %s", this->peerinfo.identifier); - opcount = -1; - errno = ENOTCONN; - break; - } - - if (ret == -1) { - if (errno == EINTR) - continue; - - gf_log (this->xl->name, GF_LOG_TRACE, - "%s failed (%s)", write ? "writev" : "readv", - strerror (errno)); - opcount = -1; - break; - } - - moved = 0; - - while (moved < ret) { - if ((ret - moved) >= opvector[0].iov_len) { - moved += opvector[0].iov_len; - opvector++; - opcount--; - } else { - opvector[0].iov_len -= (ret - moved); - opvector[0].iov_base += (ret - moved); - moved += (ret - moved); - } - while (opcount && !opvector[0].iov_len) { - opvector++; - opcount--; - } - } - } - - if (pending_vector) - *pending_vector = opvector; - - if (pending_count) - *pending_count = opcount; - - return opcount; -} - - -int -__socket_readv (transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count) -{ - int ret = -1; - - ret = __socket_rwv (this, vector, count, - pending_vector, pending_count, 0); - - return ret; -} - - -int -__socket_writev (transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count) -{ - int ret = -1; - - ret = __socket_rwv (this, vector, count, - pending_vector, pending_count, 1); - - return ret; -} - - -int -__socket_disconnect (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - if (priv->sock != -1) { - ret = shutdown (priv->sock, SHUT_RDWR); - priv->connected = -1; - gf_log (this->xl->name, GF_LOG_TRACE, - "shutdown() returned %d. set connection state to -1", - ret); - } - - return ret; -} - - -int -__socket_server_bind (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - int opt = 1; - - priv = this->private; - - ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, - &opt, sizeof (opt)); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setsockopt() for SO_REUSEADDR failed (%s)", - strerror (errno)); - } - - ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr, - this->myinfo.sockaddr_len); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "binding to %s failed: %s", - this->myinfo.identifier, strerror (errno)); - if (errno == EADDRINUSE) { - gf_log (this->xl->name, GF_LOG_ERROR, - "Port is already in use"); - } - } - - return ret; -} - - -int -__socket_nonblock (int fd) -{ - int flags = 0; - int ret = -1; - - flags = fcntl (fd, F_GETFL); - - if (flags != -1) - ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); - - return ret; -} - - -int -__socket_nodelay (int fd) -{ - int on = 1; - int ret = -1; - - ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, - &on, sizeof (on)); - if (!ret) - gf_log ("", GF_LOG_TRACE, - "NODELAY enabled for socket %d", fd); - - return ret; -} - -int -__socket_connect_finish (int fd) -{ - int ret = -1; - int optval = 0; - socklen_t optlen = sizeof (int); - - ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen); - - if (ret == 0 && optval) { - errno = optval; - ret = -1; - } - - return ret; -} - - -void -__socket_reset (transport_t *this) -{ - socket_private_t *priv = NULL; - - priv = this->private; - - /* TODO: use mem-pool on incoming data */ - - if (priv->incoming.hdr_p) - GF_FREE (priv->incoming.hdr_p); - - if (priv->incoming.iobuf) - iobuf_unref (priv->incoming.iobuf); - - memset (&priv->incoming, 0, sizeof (priv->incoming)); - - event_unregister (this->xl->ctx->event_pool, priv->sock, priv->idx); - close (priv->sock); - priv->sock = -1; - priv->idx = -1; - priv->connected = -1; -} - - -struct ioq * -__socket_ioq_new (transport_t *this, char *buf, int len, - struct iovec *vector, int count, struct iobref *iobref) -{ - socket_private_t *priv = NULL; - struct ioq *entry = NULL; - - priv = this->private; - - /* TODO: use mem-pool */ - entry = GF_CALLOC (1, sizeof (*entry), - gf_common_mt_ioq); - if (!entry) - return NULL; - - assert (count <= (MAX_IOVEC-2)); - - entry->header.colonO[0] = ':'; - entry->header.colonO[1] = 'O'; - entry->header.colonO[2] = '\0'; - entry->header.version = 42; - entry->header.size1 = hton32 (len); - entry->header.size2 = hton32 (iov_length (vector, count)); - - entry->vector[0].iov_base = &entry->header; - entry->vector[0].iov_len = sizeof (entry->header); - entry->count++; - - entry->vector[1].iov_base = buf; - entry->vector[1].iov_len = len; - entry->count++; - - if (vector && count) { - memcpy (&entry->vector[2], vector, sizeof (*vector) * count); - entry->count += count; - } - - entry->pending_vector = entry->vector; - entry->pending_count = entry->count; - - if (iobref) - entry->iobref = iobref_ref (iobref); - - entry->buf = buf; - - INIT_LIST_HEAD (&entry->list); - - return entry; -} - - -void -__socket_ioq_entry_free (struct ioq *entry) -{ - list_del_init (&entry->list); - if (entry->iobref) - iobref_unref (entry->iobref); - - /* TODO: use mem-pool */ - GF_FREE (entry->buf); - - /* TODO: use mem-pool */ - GF_FREE (entry); -} - - -void -__socket_ioq_flush (transport_t *this) -{ - socket_private_t *priv = NULL; - struct ioq *entry = NULL; - - priv = this->private; - - while (!list_empty (&priv->ioq)) { - entry = priv->ioq_next; - __socket_ioq_entry_free (entry); - } - - return; -} - - -int -__socket_ioq_churn_entry (transport_t *this, struct ioq *entry) -{ - int ret = -1; - - ret = __socket_writev (this, entry->pending_vector, - entry->pending_count, - &entry->pending_vector, - &entry->pending_count); - - if (ret == 0) { - /* current entry was completely written */ - assert (entry->pending_count == 0); - __socket_ioq_entry_free (entry); - } - - return ret; -} - - -int -__socket_ioq_churn (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = 0; - struct ioq *entry = NULL; - - priv = this->private; - - while (!list_empty (&priv->ioq)) { - /* pick next entry */ - entry = priv->ioq_next; - - ret = __socket_ioq_churn_entry (this, entry); - - if (ret != 0) - break; - } - - if (list_empty (&priv->ioq)) { - /* all pending writes done, not interested in POLLOUT */ - priv->idx = event_select_on (this->xl->ctx->event_pool, - priv->sock, priv->idx, -1, 0); - } - - return ret; -} - - -int -socket_event_poll_err (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - __socket_ioq_flush (this); - __socket_reset (this); - } - pthread_mutex_unlock (&priv->lock); - - xlator_notify (this->xl, GF_EVENT_POLLERR, this); - - return ret; -} - - -int -socket_event_poll_out (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected == 1) { - ret = __socket_ioq_churn (this); - - if (ret == -1) { - __socket_disconnect (this); - } - } - } - pthread_mutex_unlock (&priv->lock); - - xlator_notify (this->xl, GF_EVENT_POLLOUT, this); - - return ret; -} - - -int -__socket_proto_validate_header (transport_t *this, - struct socket_header *header, - size_t *size1_p, size_t *size2_p) -{ - size_t size1 = 0; - size_t size2 = 0; - - if (strcmp (header->colonO, ":O")) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket header signature does not match :O (%x.%x.%x)", - header->colonO[0], header->colonO[1], - header->colonO[2]); - return -1; - } - - if (header->version != 42) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket header version does not match 42 != %d", - header->version); - return -1; - } - - size1 = ntoh32 (header->size1); - size2 = ntoh32 (header->size2); - - if (size1 <= 0 || size1 > 1048576) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket header has incorrect size1=%"GF_PRI_SIZET, - size1); - return -1; - } - - if (size2 > (131072)) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket header has incorrect size2=%"GF_PRI_SIZET, - size2); - return -1; - } - - if (size1_p) - *size1_p = size1; - - if (size2_p) - *size2_p = size2; - - return 0; -} - - - -/* socket protocol state machine */ - -int -__socket_proto_state_machine (transport_t *this) -{ - int ret = -1; - socket_private_t *priv = NULL; - size_t size1 = 0; - size_t size2 = 0; - int previous_state = -1; - struct socket_header *hdr = NULL; - struct iobuf *iobuf = NULL; - - - priv = this->private; - - while (priv->incoming.state != SOCKET_PROTO_STATE_COMPLETE) { - /* debug check against infinite loops */ - if (previous_state == priv->incoming.state) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "state did not change! (%d) breaking", - previous_state); - ret = -1; - goto unlock; - } - previous_state = priv->incoming.state; - - switch (priv->incoming.state) { - - case SOCKET_PROTO_STATE_NADA: - priv->incoming.pending_vector = - priv->incoming.vector; - - priv->incoming.pending_vector->iov_base = - &priv->incoming.header; - - priv->incoming.pending_vector->iov_len = - sizeof (struct socket_header); - - priv->incoming.state = - SOCKET_PROTO_STATE_HEADER_COMING; - break; - - case SOCKET_PROTO_STATE_HEADER_COMING: - - ret = __socket_readv (this, - priv->incoming.pending_vector, 1, - &priv->incoming.pending_vector, - NULL); - if (ret == 0) { - priv->incoming.state = - SOCKET_PROTO_STATE_HEADER_CAME; - break; - } - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_TRACE, - "read (%s) in state %d (%s)", - strerror (errno), - SOCKET_PROTO_STATE_HEADER_COMING, - this->peerinfo.identifier); - goto unlock; - } - - if (ret > 0) { - gf_log (this->xl->name, GF_LOG_TRACE, - "partial header read on NB socket."); - goto unlock; - } - break; - - case SOCKET_PROTO_STATE_HEADER_CAME: - hdr = &priv->incoming.header; - ret = __socket_proto_validate_header (this, hdr, - &size1, &size2); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "socket header validate failed (%s). " - "possible mismatch of transport-type " - "between server and client volumes, " - "or version mismatch", - this->peerinfo.identifier); - goto unlock; - } - - priv->incoming.hdrlen = size1; - priv->incoming.buflen = size2; - - /* TODO: use mem-pool */ - priv->incoming.hdr_p = GF_MALLOC (size1, - gf_common_mt_char); - if (size2) { - /* TODO: sanity check size2 < page size - */ - iobuf = iobuf_get (this->xl->ctx->iobuf_pool); - if (!iobuf) { - gf_log (this->xl->name, GF_LOG_ERROR, - "unable to allocate IO buffer " - "for peer %s", - this->peerinfo.identifier); - ret = -ENOMEM; - goto unlock; - } - priv->incoming.iobuf = iobuf; - priv->incoming.buf_p = iobuf->ptr; - } - - priv->incoming.vector[0].iov_base = - priv->incoming.hdr_p; - - priv->incoming.vector[0].iov_len = size1; - - priv->incoming.vector[1].iov_base = - priv->incoming.buf_p; - - priv->incoming.vector[1].iov_len = size2; - priv->incoming.count = size2 ? 2 : 1; - - priv->incoming.pending_vector = - priv->incoming.vector; - - priv->incoming.pending_count = - priv->incoming.count; - - priv->incoming.state = - SOCKET_PROTO_STATE_DATA_COMING; - break; - - case SOCKET_PROTO_STATE_DATA_COMING: - - ret = __socket_readv (this, - priv->incoming.pending_vector, - priv->incoming.pending_count, - &priv->incoming.pending_vector, - &priv->incoming.pending_count); - if (ret == 0) { - priv->incoming.state = - SOCKET_PROTO_STATE_DATA_CAME; - break; - } - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "read (%s) in state %d (%s)", - strerror (errno), - SOCKET_PROTO_STATE_DATA_COMING, - this->peerinfo.identifier); - goto unlock; - } - - if (ret > 0) { - gf_log (this->xl->name, GF_LOG_TRACE, - "partial data read on NB socket"); - goto unlock; - } - break; - - case SOCKET_PROTO_STATE_DATA_CAME: - memset (&priv->incoming.vector, 0, - sizeof (priv->incoming.vector)); - priv->incoming.pending_vector = NULL; - priv->incoming.pending_count = 0; - priv->incoming.state = SOCKET_PROTO_STATE_COMPLETE; - break; - - case SOCKET_PROTO_STATE_COMPLETE: - /* not reached */ - break; - - default: - gf_log (this->xl->name, GF_LOG_DEBUG, - "undefined state reached: %d", - priv->incoming.state); - goto unlock; - } - } -unlock: - - return ret; -} - - -int -socket_proto_state_machine (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = 0; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - ret = __socket_proto_state_machine (this); - } - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_event_poll_in (transport_t *this) -{ - int ret = -1; - - ret = socket_proto_state_machine (this); - - /* call POLLIN on xlator even if complete block is not received, - just to keep the last_received timestamp ticking */ - - if (ret == 0) - ret = xlator_notify (this->xl, GF_EVENT_POLLIN, this); - - return ret; -} - - -int -socket_connect_finish (transport_t *this) -{ - int ret = -1; - socket_private_t *priv = NULL; - int event = -1; - char notify_xlator = 0; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected) - goto unlock; - - ret = __socket_connect_finish (priv->sock); - - if (ret == -1 && errno == EINPROGRESS) - ret = 1; - - if (ret == -1 && errno != EINPROGRESS) { - if (!priv->connect_finish_log) { - gf_log (this->xl->name, GF_LOG_ERROR, - "connection to %s failed (%s)", - this->peerinfo.identifier, - strerror (errno)); - priv->connect_finish_log = 1; - } - __socket_disconnect (this); - notify_xlator = 1; - event = GF_EVENT_POLLERR; - goto unlock; - } - - if (ret == 0) { - notify_xlator = 1; - - this->myinfo.sockaddr_len = - sizeof (this->myinfo.sockaddr); - - ret = getsockname (priv->sock, - SA (&this->myinfo.sockaddr), - &this->myinfo.sockaddr_len); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "getsockname on (%d) failed (%s)", - priv->sock, strerror (errno)); - __socket_disconnect (this); - event = GF_EVENT_POLLERR; - goto unlock; - } - - priv->connected = 1; - priv->connect_finish_log = 0; - event = GF_EVENT_CHILD_UP; - get_transport_identifiers (this); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - if (notify_xlator) - xlator_notify (this->xl, event, this); - - return 0; -} - - -int -socket_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = 0; - - this = data; - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - priv->idx = idx; - } - pthread_mutex_unlock (&priv->lock); - - if (!priv->connected) { - ret = socket_connect_finish (this); - } - - if (!ret && poll_out) { - ret = socket_event_poll_out (this); - } - - if (!ret && poll_in) { - ret = socket_event_poll_in (this); - } - - if (ret < 0 || poll_err) { - socket_event_poll_err (this); - transport_unref (this); - } - - return 0; -} - - -int -socket_server_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = 0; - int new_sock = -1; - transport_t *new_trans = NULL; - struct sockaddr_storage new_sockaddr = {0, }; - socklen_t addrlen = sizeof (new_sockaddr); - socket_private_t *new_priv = NULL; - glusterfs_ctx_t *ctx = NULL; - - this = data; - priv = this->private; - ctx = this->xl->ctx; - - pthread_mutex_lock (&priv->lock); - { - priv->idx = idx; - - if (poll_in) { - new_sock = accept (priv->sock, SA (&new_sockaddr), - &addrlen); - - if (new_sock == -1) - goto unlock; - - if (!priv->bio) { - ret = __socket_nonblock (new_sock); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "NBIO on %d failed (%s)", - new_sock, strerror (errno)); - close (new_sock); - goto unlock; - } - } - - if (priv->nodelay) { - ret = __socket_nodelay (new_sock); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setsockopt() failed for " - "NODELAY (%s)", - strerror (errno)); - } - } - - new_trans = GF_CALLOC (1, sizeof (*new_trans), - gf_common_mt_transport_t); - new_trans->xl = this->xl; - new_trans->fini = this->fini; - - memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, - addrlen); - new_trans->peerinfo.sockaddr_len = addrlen; - - new_trans->myinfo.sockaddr_len = - sizeof (new_trans->myinfo.sockaddr); - - ret = getsockname (new_sock, - SA (&new_trans->myinfo.sockaddr), - &new_trans->myinfo.sockaddr_len); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "getsockname on %d failed (%s)", - new_sock, strerror (errno)); - close (new_sock); - goto unlock; - } - - get_transport_identifiers (new_trans); - socket_init (new_trans); - new_trans->ops = this->ops; - new_trans->init = this->init; - new_trans->fini = this->fini; - - new_priv = new_trans->private; - - pthread_mutex_lock (&new_priv->lock); - { - new_priv->sock = new_sock; - new_priv->connected = 1; - - transport_ref (new_trans); - new_priv->idx = - event_register (ctx->event_pool, - new_sock, - socket_event_handler, - new_trans, 1, 0); - - if (new_priv->idx == -1) - ret = -1; - } - pthread_mutex_unlock (&new_priv->lock); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_disconnect (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - ret = __socket_disconnect (this); - } - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_connect (transport_t *this) -{ - int ret = -1; - int sock = -1; - socket_private_t *priv = NULL; - struct sockaddr_storage sockaddr = {0, }; - socklen_t sockaddr_len = 0; - glusterfs_ctx_t *ctx = NULL; - sa_family_t sa_family = {0, }; - - priv = this->private; - ctx = this->xl->ctx; - - if (!priv) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "connect() called on uninitialized transport"); - goto err; - } - - pthread_mutex_lock (&priv->lock); - { - sock = priv->sock; - } - pthread_mutex_unlock (&priv->lock); - - if (sock != -1) { - gf_log (this->xl->name, GF_LOG_TRACE, - "connect () called on transport already connected"); - ret = 0; - goto err; - } - - ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr), - &sockaddr_len, &sa_family); - if (ret == -1) { - /* logged inside client_get_remote_sockaddr */ - goto err; - } - - pthread_mutex_lock (&priv->lock); - { - if (priv->sock != -1) { - gf_log (this->xl->name, GF_LOG_TRACE, - "connect() -- already connected"); - goto unlock; - } - - memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len); - this->peerinfo.sockaddr_len = sockaddr_len; - - priv->sock = socket (sa_family, SOCK_STREAM, 0); - if (priv->sock == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "socket creation failed (%s)", - strerror (errno)); - goto unlock; - } - - /* Cant help if setting socket options fails. We can continue - * working nonetheless. - */ - if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setting receive window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setting send window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - - if (priv->nodelay && priv->lowlat) { - ret = __socket_nodelay (priv->sock); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setsockopt() failed for NODELAY (%s)", - strerror (errno)); - } - } - - if (!priv->bio) { - ret = __socket_nonblock (priv->sock); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "NBIO on %d failed (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } - - SA (&this->myinfo.sockaddr)->sa_family = - SA (&this->peerinfo.sockaddr)->sa_family; - - ret = client_bind (this, SA (&this->myinfo.sockaddr), - &this->myinfo.sockaddr_len, priv->sock); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_WARNING, - "client bind failed: %s", strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - ret = connect (priv->sock, SA (&this->peerinfo.sockaddr), - this->peerinfo.sockaddr_len); - - if (ret == -1 && errno != EINPROGRESS) { - gf_log (this->xl->name, GF_LOG_ERROR, - "connection attempt failed (%s)", - strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - priv->connected = 0; - - transport_ref (this); - - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_event_handler, this, 1, 1); - if (priv->idx == -1) - ret = -1; - } -unlock: - pthread_mutex_unlock (&priv->lock); - -err: - return ret; -} - - -int -socket_listen (transport_t *this) -{ - socket_private_t * priv = NULL; - int ret = -1; - int sock = -1; - struct sockaddr_storage sockaddr; - socklen_t sockaddr_len; - peer_info_t *myinfo = NULL; - glusterfs_ctx_t *ctx = NULL; - sa_family_t sa_family = {0, }; - - priv = this->private; - myinfo = &this->myinfo; - ctx = this->xl->ctx; - - pthread_mutex_lock (&priv->lock); - { - sock = priv->sock; - } - pthread_mutex_unlock (&priv->lock); - - if (sock != -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "alreading listening"); - return ret; - } - - ret = socket_server_get_local_sockaddr (this, SA (&sockaddr), - &sockaddr_len, &sa_family); - if (ret == -1) { - return ret; - } - - pthread_mutex_lock (&priv->lock); - { - if (priv->sock != -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "already listening"); - goto unlock; - } - - memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len); - myinfo->sockaddr_len = sockaddr_len; - - priv->sock = socket (sa_family, SOCK_STREAM, 0); - - if (priv->sock == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "socket creation failed (%s)", - strerror (errno)); - goto unlock; - } - - /* Cant help if setting socket options fails. We can continue - * working nonetheless. - */ - if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setting receive window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setting send window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - if (priv->nodelay) { - ret = __socket_nodelay (priv->sock); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setsockopt() failed for NODELAY (%s)", - strerror (errno)); - } - } - - if (!priv->bio) { - ret = __socket_nonblock (priv->sock); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "NBIO on %d failed (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } - - ret = __socket_server_bind (this); - - if (ret == -1) { - /* logged inside __socket_server_bind() */ - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - ret = listen (priv->sock, 10); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "could not set socket %d to listen mode (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - transport_ref (this); - - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_server_event_handler, - this, 1, 0); - - if (priv->idx == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "could not register socket %d with events", - priv->sock); - ret = -1; - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, - struct iobuf **iobuf_p) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected != 1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket not connected to receive"); - goto unlock; - } - - if (!hdr_p || !hdrlen_p || !iobuf_p) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "bad parameters %p %p %p", - hdr_p, hdrlen_p, iobuf_p); - goto unlock; - } - - if (priv->incoming.state == SOCKET_PROTO_STATE_COMPLETE) { - *hdr_p = priv->incoming.hdr_p; - *hdrlen_p = priv->incoming.hdrlen; - *iobuf_p = priv->incoming.iobuf; - - memset (&priv->incoming, 0, sizeof (priv->incoming)); - priv->incoming.state = SOCKET_PROTO_STATE_NADA; - - ret = 0; - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -/* TODO: implement per transfer limit */ -int -socket_submit (transport_t *this, char *buf, int len, - struct iovec *vector, int count, - struct iobref *iobref) -{ - socket_private_t *priv = NULL; - int ret = -1; - char need_poll_out = 0; - char need_append = 1; - struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; - - priv = this->private; - ctx = this->xl->ctx; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected != 1) { - if (!priv->submit_log && !priv->connect_finish_log) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "not connected (priv->connected = %d)", - priv->connected); - priv->submit_log = 1; - } - goto unlock; - } - - priv->submit_log = 0; - entry = __socket_ioq_new (this, buf, len, vector, count, iobref); - if (!entry) - goto unlock; - - if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); - - if (ret == 0) - need_append = 0; - - if (ret > 0) - need_poll_out = 1; - } - - if (need_append) { - list_add_tail (&entry->list, &priv->ioq); - ret = 0; - } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -struct transport_ops tops = { - .listen = socket_listen, - .connect = socket_connect, - .disconnect = socket_disconnect, - .submit = socket_submit, - .receive = socket_receive -}; - - -int -socket_init (transport_t *this) -{ - socket_private_t *priv = NULL; - gf_boolean_t tmp_bool = 0; - uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; - char *optstr = NULL; - - if (this->private) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "double init attempted"); - return -1; - } - - priv = GF_CALLOC (1, sizeof (*priv), - gf_common_mt_socket_private_t); - if (!priv) { - gf_log (this->xl->name, GF_LOG_ERROR, - "calloc (1, %"GF_PRI_SIZET") returned NULL", - sizeof (*priv)); - return -1; - } - - pthread_mutex_init (&priv->lock, NULL); - - priv->sock = -1; - priv->idx = -1; - priv->connected = -1; - - INIT_LIST_HEAD (&priv->ioq); - - if (dict_get (this->xl->options, "non-blocking-io")) { - optstr = data_to_str (dict_get (this->xl->options, - "non-blocking-io")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "'non-blocking-io' takes only boolean options," - " not taking any action"); - tmp_bool = 1; - } - priv->bio = 0; - if (!tmp_bool) { - priv->bio = 1; - gf_log (this->xl->name, GF_LOG_WARNING, - "disabling non-blocking IO"); - } - } - - optstr = NULL; - - // By default, we enable NODELAY - priv->nodelay = 1; - if (dict_get (this->xl->options, "transport.socket.nodelay")) { - optstr = data_to_str (dict_get (this->xl->options, - "transport.socket.nodelay")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "'transport.socket.nodelay' takes only " - "boolean options, not taking any action"); - tmp_bool = 1; - } - if (!tmp_bool) { - priv->nodelay = 0; - gf_log (this->xl->name, GF_LOG_DEBUG, - "disabling nodelay"); - } - } - - - optstr = NULL; - if (dict_get_str (this->xl->options, "transport.window-size", - &optstr) == 0) { - if (gf_string2bytesize (optstr, &windowsize) != 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "invalid number format: %s", optstr); - return -1; - } - } - - optstr = NULL; - - if (dict_get_str (this->xl->options, "transport.socket.lowlat", - &optstr) == 0) { - priv->lowlat = 1; - } - - priv->windowsize = (int)windowsize; - this->private = priv; - - return 0; -} - - -void -fini (transport_t *this) -{ - socket_private_t *priv = this->private; - - gf_log (this->xl->name, GF_LOG_TRACE, - "transport %p destroyed", this); - - pthread_mutex_destroy (&priv->lock); - GF_FREE (priv); -} - -int32_t -mem_acct_init (xlator_t *this) -{ - int ret = -1; - - if (!this) - return ret; - - ret = xlator_mem_acct_init (this, gf_common_mt_end + 1); - - if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, "Memory accounting init" - "failed"); - return ret; - } - - return ret; -} - -int32_t -init (transport_t *this) -{ - int ret = -1; - - ret = socket_init (this); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, "socket_init() failed"); - } - - return ret; -} - -struct volume_options options[] = { - { .key = {"remote-port", - "transport.remote-port", - "transport.socket.remote-port"}, - .type = GF_OPTION_TYPE_INT - }, - { .key = {"transport.socket.listen-port", "listen-port"}, - .type = GF_OPTION_TYPE_INT - }, - { .key = {"transport.socket.bind-address", "bind-address" }, - .type = GF_OPTION_TYPE_INTERNET_ADDRESS - }, - { .key = {"transport.socket.connect-path", "connect-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = {"transport.socket.bind-path", "bind-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = {"transport.socket.listen-path", "listen-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = { "transport.address-family", - "address-family" }, - .value = {"inet", "inet6", "inet/inet6", "inet6/inet", - "unix", "inet-sdp" }, - .type = GF_OPTION_TYPE_STR - }, - - { .key = {"non-blocking-io"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {"transport.window-size"}, - .type = GF_OPTION_TYPE_SIZET, - .min = GF_MIN_SOCKET_WINDOW_SIZE, - .max = GF_MAX_SOCKET_WINDOW_SIZE, - }, - { .key = {"transport.socket.nodelay"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {"transport.socket.lowlat"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {NULL} } -}; - diff --git a/transport/socket/src/socket.h b/transport/socket/src/socket.h deleted file mode 100644 index bc6d3b27c3..0000000000 --- a/transport/socket/src/socket.h +++ /dev/null @@ -1,125 +0,0 @@ -/* - Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - -#ifndef _SOCKET_H -#define _SOCKET_H - - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include "event.h" -#include "transport.h" -#include "logging.h" -#include "dict.h" -#include "mem-pool.h" -#include "socket-mem-types.h" - -#ifndef MAX_IOVEC -#define MAX_IOVEC 16 -#endif /* MAX_IOVEC */ - -#define GF_DEFAULT_SOCKET_LISTEN_PORT 6996 - -/* This is the size set through setsockopt for - * both the TCP receive window size and the - * send buffer size. - * Till the time iobuf size becomes configurable, this size is set to include - * two iobufs + the GlusterFS protocol headers. - * Linux allows us to over-ride the max values for the system. - * Should we over-ride them? Because if we set a value larger than the default - * setsockopt will fail. Having larger values might be beneficial for - * IB links. - */ -#define GF_DEFAULT_SOCKET_WINDOW_SIZE (512 * GF_UNIT_KB) -#define GF_MAX_SOCKET_WINDOW_SIZE (1 * GF_UNIT_MB) -#define GF_MIN_SOCKET_WINDOW_SIZE (128 * GF_UNIT_KB) - -typedef enum { - SOCKET_PROTO_STATE_NADA = 0, - SOCKET_PROTO_STATE_HEADER_COMING, - SOCKET_PROTO_STATE_HEADER_CAME, - SOCKET_PROTO_STATE_DATA_COMING, - SOCKET_PROTO_STATE_DATA_CAME, - SOCKET_PROTO_STATE_COMPLETE, -} socket_proto_state_t; - -struct socket_header { - char colonO[3]; - uint32_t size1; - uint32_t size2; - char version; -} __attribute__((packed)); - - -struct ioq { - union { - struct list_head list; - struct { - struct ioq *next; - struct ioq *prev; - }; - }; - struct socket_header header; - struct iovec vector[MAX_IOVEC]; - int count; - struct iovec *pending_vector; - int pending_count; - char *buf; - struct iobref *iobref; -}; - - -typedef struct { - int32_t sock; - int32_t idx; - unsigned char connected; // -1 = not connected. 0 = in progress. 1 = connected - char bio; - char connect_finish_log; - char submit_log; - union { - struct list_head ioq; - struct { - struct ioq *ioq_next; - struct ioq *ioq_prev; - }; - }; - struct { - int state; - struct socket_header header; - char *hdr_p; - size_t hdrlen; - struct iobuf *iobuf; - char *buf_p; - size_t buflen; - struct iovec vector[2]; - int count; - struct iovec *pending_vector; - int pending_count; - } incoming; - pthread_mutex_t lock; - int windowsize; - char lowlat; - char nodelay; -} socket_private_t; - - -#endif |