summaryrefslogtreecommitdiffstats
path: root/ctdb/ib
diff options
context:
space:
mode:
Diffstat (limited to 'ctdb/ib')
-rw-r--r--ctdb/ib/ibw_ctdb.c38
-rw-r--r--ctdb/ib/ibw_ctdb.h18
-rw-r--r--ctdb/ib/ibw_ctdb_init.c73
-rw-r--r--ctdb/ib/ibwrapper.c213
-rw-r--r--ctdb/ib/ibwrapper.h10
-rw-r--r--ctdb/ib/ibwrapper_test.c24
6 files changed, 266 insertions, 110 deletions
diff --git a/ctdb/ib/ibw_ctdb.c b/ctdb/ib/ibw_ctdb.c
index 53293810ca8..50dd8d68d43 100644
--- a/ctdb/ib/ibw_ctdb.c
+++ b/ctdb/ib/ibw_ctdb.c
@@ -29,8 +29,13 @@
#include "ibwrapper.h"
#include "ibw_ctdb.h"
-int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node)
+int ctdb_ibw_node_connect(struct ctdb_node *node)
{
+ struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
+ int rc;
+
+ assert(cn!=NULL);
+ assert(cn->conn!=NULL);
struct sockaddr_in sock_out;
memset(&sock_out, 0, sizeof(struct sockaddr_in));
@@ -38,12 +43,12 @@ int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node)
sock_out.sin_port = htons(node->address.port);
sock_out.sin_family = PF_INET;
- if (ibw_connect(ictx, &sock_out, node)) {
- DEBUG(0, ("ctdb_ibw_node_connect: ibw_connect failed - retrying in 1 sec...\n"));
+ rc = ibw_connect(cn->conn, &sock_out, node);
+ if (rc) {
+ DEBUG(0, ("ctdb_ibw_node_connect/ibw_connect failed - retrying...\n"));
/* try again once a second */
event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0),
ctdb_ibw_node_connect_event, node);
- return -1;
}
/* continues at ibw_ctdb.c/IBWC_CONNECTED in good case */
@@ -54,9 +59,8 @@ void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *t
struct timeval t, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
- struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private, struct ibw_ctx);
- ctdb_ibw_node_connect(ictx, node);
+ ctdb_ibw_node_connect(node);
}
int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
@@ -94,14 +98,15 @@ int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
case IBWC_CONNECTED: { /* after ibw_accept or ibw_connect */
struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node);
if (node!=NULL) { /* after ibw_connect */
- node->private = (void *)conn;
+ struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
+
node->ctdb->upcalls->node_connected(node);
+ ctdb_flush_cn_queue(cn);
} else { /* after ibw_accept */
/* NOP in CTDB case */
}
} break;
case IBWC_DISCONNECTED: { /* after ibw_disconnect */
- /* TODO: have a CTDB upcall */
struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node);
if (node!=NULL)
node->ctdb->upcalls->node_dead(node);
@@ -110,13 +115,16 @@ int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
} break;
case IBWC_ERROR: {
struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node);
- if (node!=NULL)
- node->private = NULL; /* not to use again */
-
- DEBUG(10, ("IBWC_ERROR, reconnecting immediately...\n"));
- talloc_free(conn);
- event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0),
- ctdb_ibw_node_connect_event, node);
+ if (node!=NULL) {
+ struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
+ struct ibw_ctx *ictx = cn->conn->ctx;
+
+ DEBUG(10, ("IBWC_ERROR, reconnecting...\n"));
+ talloc_free(cn->conn); /* internal queue content is destroyed */
+ cn->conn = (void *)ibw_conn_new(ictx, node);
+ event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0),
+ ctdb_ibw_node_connect_event, node);
+ }
} break;
default:
assert(0);
diff --git a/ctdb/ib/ibw_ctdb.h b/ctdb/ib/ibw_ctdb.h
index 14308682b21..24adeb7233e 100644
--- a/ctdb/ib/ibw_ctdb.h
+++ b/ctdb/ib/ibw_ctdb.h
@@ -21,10 +21,26 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
+struct ctdb_ibw_msg {
+ uint8_t *data;
+ uint32_t length;
+ struct ctdb_ibw_msg *prev;
+ struct ctdb_ibw_msg *next;
+};
+
+struct ctdb_ibw_node {
+ struct ibw_conn *conn;
+
+ struct ctdb_ibw_msg *queue;
+ struct ctdb_ibw_msg *queue_last;
+ int qcnt;
+};
+
int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn);
int ctdb_ibw_receive_handler(struct ibw_conn *conn, void *buf, int n);
-int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node);
+int ctdb_ibw_node_connect(struct ctdb_node *node);
void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private);
+int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn);
diff --git a/ctdb/ib/ibw_ctdb_init.c b/ctdb/ib/ibw_ctdb_init.c
index 78924632292..9c0300c4f4c 100644
--- a/ctdb/ib/ibw_ctdb_init.c
+++ b/ctdb/ib/ibw_ctdb_init.c
@@ -58,7 +58,6 @@ static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog)
*/
static int ctdb_ibw_start(struct ctdb_context *ctdb)
{
- struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx);
int i;
/* listen on our own address */
@@ -71,43 +70,87 @@ static int ctdb_ibw_start(struct ctdb_context *ctdb)
if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) &&
ctdb_same_address(&ctdb->address, &node->address))
continue;
- ctdb_ibw_node_connect(ictx, node);
+ ctdb_ibw_node_connect(node);
}
return 0;
}
-
/*
* initialise ibw portion of a ctdb node
*/
static int ctdb_ibw_add_node(struct ctdb_node *node)
{
- /* TODO: clarify whether is this necessary for us ?
- - why not enough doing such thing internally at connect time ? */
- return 0;
+ struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private, struct ibw_ctx);
+ struct ctdb_ibw_node *cn = talloc_zero(node, struct ctdb_ibw_node);
+
+ assert(cn!=NULL);
+ cn->conn = ibw_conn_new(ictx, node);
+ node->private = (void *)cn;
+
+ return (cn->conn!=NULL ? 0 : -1);
+}
+
+static int ctdb_ibw_send_pkt(struct ibw_conn *conn, uint8_t *data, uint32_t length)
+{
+ void *buf, *key;
+
+ if (ibw_alloc_send_buf(conn, &buf, &key, length)) {
+ DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n"));
+ return -1;
+ }
+
+ memcpy(buf, data, length);
+ return ibw_send(conn, buf, key, length);
+}
+
+int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn)
+{
+ struct ctdb_ibw_msg *p;
+ int rc = 0;
+
+ while(cn->queue) {
+ p = cn->queue;
+ rc = ctdb_ibw_send_pkt(cn->conn, p->data, p->length);
+ if (rc)
+ return -1; /* will be retried later when conn is up */
+
+ DLIST_REMOVE(cn->queue, p);
+ cn->qcnt--;
+ talloc_free(p); /* it will talloc_free p->data as well */
+ }
+ assert(cn->qcnt==0);
+ /* cn->queue_last = NULL is not needed - see DLIST_ADD_AFTER */
+
+ return rc;
}
static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
{
- struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
+ struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
int rc;
- void *buf, *key;
assert(length>=sizeof(uint32_t));
+ assert(cn!=NULL);
- if (conn==NULL) {
+ if (cn->conn==NULL) {
DEBUG(0, ("ctdb_ibw_queue_pkt: conn is NULL\n"));
return -1;
}
- if (ibw_alloc_send_buf(conn, &buf, &key, length)) {
- DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n"));
- return -1;
- }
+ if (cn->conn->state==IBWC_CONNECTED) {
+ rc = ctdb_ibw_send_pkt(cn->conn, data, length);
+ } else {
+ struct ctdb_ibw_msg *p = talloc_zero(cn, struct ctdb_ibw_msg);
+ p->data = talloc_memdup(p, data, length);
+ p->length = length;
- memcpy(buf, data, length);
- rc = ibw_send(conn, buf, key, length);
+ DLIST_ADD_AFTER(cn->queue, p, cn->queue_last);
+ cn->queue_last = p;
+ cn->qcnt++;
+
+ rc = 0;
+ }
return rc;
}
diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c
index f3ef0c4c5cd..f7b233954d2 100644
--- a/ctdb/ib/ibwrapper.c
+++ b/ctdb/ib/ibwrapper.c
@@ -161,42 +161,57 @@ static int ibw_ctx_destruct(struct ibw_ctx *ctx)
static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
{
- DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %p)\n",
- (uint32_t)pconn, pconn->cm_id));
-
- /* free memory regions */
- ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
- ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
+ DEBUG(10, ("ibw_conn_priv_destruct(%p, cmid: %p)\n",
+ pconn, pconn->cm_id));
/* pconn->wr_index is freed by talloc */
/* pconn->wr_index[i] are freed by talloc */
/* destroy verbs */
- if (pconn->cm_id->qp) {
- ibv_destroy_qp(pconn->cm_id->qp);
+ if (pconn->cm_id!=NULL && pconn->cm_id->qp!=NULL) {
+ rdma_destroy_qp(pconn->cm_id);
pconn->cm_id->qp = NULL;
}
- if (pconn->cq) {
+
+ if (pconn->cq!=NULL) {
ibv_destroy_cq(pconn->cq);
pconn->cq = NULL;
}
- if (pconn->verbs_channel) {
+
+ if (pconn->verbs_channel!=NULL) {
ibv_destroy_comp_channel(pconn->verbs_channel);
pconn->verbs_channel = NULL;
}
+
+ /* must be freed here because its order is important */
if (pconn->verbs_channel_event) {
- /* TODO: do we have to do this here? */
talloc_free(pconn->verbs_channel_event);
pconn->verbs_channel_event = NULL;
}
+
+ /* free memory regions */
+ ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
+ ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
+
if (pconn->pd) {
ibv_dealloc_pd(pconn->pd);
pconn->pd = NULL;
+ DEBUG(10, ("pconn=%p pd deallocated\n", pconn));
}
+
if (pconn->cm_id) {
rdma_destroy_id(pconn->cm_id);
pconn->cm_id = NULL;
+ DEBUG(10, ("pconn=%p cm_id destroyed\n", pconn));
}
+
+ return 0;
+}
+
+static int ibw_wr_destruct(struct ibw_wr *wr)
+{
+ if (wr->buf_large!=NULL)
+ ibw_free_mr(&wr->buf_large, &wr->mr_large);
return 0;
}
@@ -209,16 +224,18 @@ static int ibw_conn_destruct(struct ibw_conn *conn)
return 0;
}
-static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx)
+struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx)
{
struct ibw_conn *conn;
struct ibw_conn_priv *pconn;
- conn = talloc_zero(ctx, struct ibw_conn);
+ assert(ctx!=NULL);
+
+ conn = talloc_zero(mem_ctx, struct ibw_conn);
assert(conn!=NULL);
talloc_set_destructor(conn, ibw_conn_destruct);
- pconn = talloc_zero(ctx, struct ibw_conn_priv);
+ pconn = talloc_zero(conn, struct ibw_conn_priv);
assert(pconn!=NULL);
talloc_set_destructor(pconn, ibw_conn_priv_destruct);
@@ -248,7 +265,7 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn)
}
DEBUG(10, ("created channel %p\n", pconn->verbs_channel));
- pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
+ pconn->verbs_channel_event = event_add_fd(pctx->ectx, NULL, /* not pconn or conn */
pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
pconn->pd = ibv_alloc_pd(pconn->cm_id->verbs);
@@ -371,14 +388,15 @@ static int ibw_fill_cq(struct ibw_conn *conn)
return 0;
}
-static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
+static int ibw_manage_connect(struct ibw_conn *conn)
{
struct rdma_conn_param conn_param;
+ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
int rc;
- DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", cma_id));
- rc = ibw_setup_cq_qp(conn);
- if (rc)
+ DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", pconn->cm_id));
+
+ if (ibw_setup_cq_qp(conn))
return -1;
/* cm connect */
@@ -387,7 +405,7 @@ static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
conn_param.initiator_depth = 1;
conn_param.retry_count = 10;
- rc = rdma_connect(cma_id, &conn_param);
+ rc = rdma_connect(pconn->cm_id, &conn_param);
if (rc)
sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
@@ -436,7 +454,7 @@ static void ibw_event_handler_cm(struct event_context *ev,
assert(cma_id->context!=NULL);
conn = talloc_get_type(cma_id->context, struct ibw_conn);
- rc = ibw_manage_connect(conn, cma_id);
+ rc = ibw_manage_connect(conn);
if (rc)
goto error;
@@ -445,7 +463,7 @@ static void ibw_event_handler_cm(struct event_context *ev,
case RDMA_CM_EVENT_CONNECT_REQUEST:
DEBUG(11, ("RDMA_CM_EVENT_CONNECT_REQUEST\n"));
ctx->state = IBWS_CONNECT_REQUEST;
- conn = ibw_conn_new(ctx);
+ conn = ibw_conn_new(ctx, ctx);
pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
pconn->cm_id = cma_id; /* !!! event will be freed but id not */
cma_id->context = (void *)conn;
@@ -459,6 +477,9 @@ static void ibw_event_handler_cm(struct event_context *ev,
/* continued at ibw_accept when invoked by the func above */
if (!pconn->is_accepted) {
+ rc = rdma_reject(cma_id, NULL, 0);
+ if (rc)
+ DEBUG(0, ("rdma_reject failed with rc=%d\n", rc));
talloc_free(conn);
DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
}
@@ -476,6 +497,8 @@ static void ibw_event_handler_cm(struct event_context *ev,
conn = talloc_get_type(cma_id->context, struct ibw_conn);
assert(conn!=NULL); /* important assumption */
+ DEBUG(10, ("ibw_setup_cq_qp succeeded (cmid=%p)\n", cma_id));
+
/* client conn is up */
conn->state = IBWC_CONNECTED;
@@ -485,22 +508,30 @@ static void ibw_event_handler_cm(struct event_context *ev,
case RDMA_CM_EVENT_ADDR_ERROR:
sprintf(ibw_lasterr, "RDMA_CM_EVENT_ADDR_ERROR, error %d\n", event->status);
- goto error;
case RDMA_CM_EVENT_ROUTE_ERROR:
sprintf(ibw_lasterr, "RDMA_CM_EVENT_ROUTE_ERROR, error %d\n", event->status);
- goto error;
case RDMA_CM_EVENT_CONNECT_ERROR:
sprintf(ibw_lasterr, "RDMA_CM_EVENT_CONNECT_ERROR, error %d\n", event->status);
- goto error;
case RDMA_CM_EVENT_UNREACHABLE:
sprintf(ibw_lasterr, "RDMA_CM_EVENT_UNREACHABLE, error %d\n", event->status);
- goto error;
case RDMA_CM_EVENT_REJECTED:
sprintf(ibw_lasterr, "RDMA_CM_EVENT_REJECTED, error %d\n", event->status);
+ conn = talloc_get_type(cma_id->context, struct ibw_conn);
+ if (conn) {
+ if ((rc=rdma_ack_cm_event(event)))
+ DEBUG(0, ("reject/rdma_ack_cm_event failed with %d\n", rc));
+ event = NULL;
+ pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+ ibw_conn_priv_destruct(pconn);
+ }
goto error;
case RDMA_CM_EVENT_DISCONNECTED:
DEBUG(11, ("RDMA_CM_EVENT_DISCONNECTED\n"));
+ if ((rc=rdma_ack_cm_event(event)))
+ DEBUG(0, ("disc/rdma_ack_cm_event failed with %d\n", rc));
+ event = NULL; /* don't ack more */
+
if (cma_id!=pctx->cm_id) {
DEBUG(0, ("client DISCONNECT event cm_id=%p\n", cma_id));
conn = talloc_get_type(cma_id->context, struct ibw_conn);
@@ -518,14 +549,20 @@ static void ibw_event_handler_cm(struct event_context *ev,
goto error;
}
- if ((rc=rdma_ack_cm_event(event))) {
+ if (event!=NULL && (rc=rdma_ack_cm_event(event))) {
sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
goto error;
}
return;
error:
+ if (event!=NULL && (rc=rdma_ack_cm_event(event))) {
+ sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
+ goto error;
+ }
+
DEBUG(0, ("cm event handler: %s", ibw_lasterr));
+
if (cma_id!=pctx->cm_id) {
conn = talloc_get_type(cma_id->context, struct ibw_conn);
if (conn)
@@ -569,8 +606,8 @@ static void ibw_event_handler_verbs(struct event_context *ev,
while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
if (wc.status) {
- sprintf(ibw_lasterr, "cq completion failed status %d rc %d\n",
- wc.status, rc);
+ sprintf(ibw_lasterr, "cq completion failed status=%d, opcode=%d, rc=%d\n",
+ wc.status, wc.opcode, rc);
goto error;
}
@@ -605,11 +642,57 @@ static void ibw_event_handler_verbs(struct event_context *ev,
goto error;
}
+ ibv_ack_cq_events(pconn->cq, 1);
+
return;
error:
+ ibv_ack_cq_events(pconn->cq, 1);
+
DEBUG(0, (ibw_lasterr));
- conn->state = IBWC_ERROR;
- pctx->connstate_func(NULL, conn);
+
+ if (conn->state!=IBWC_ERROR) {
+ conn->state = IBWC_ERROR;
+ pctx->connstate_func(NULL, conn);
+ }
+}
+
+static int ibw_process_queue(struct ibw_conn *conn)
+{
+ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+ struct ibw_ctx_priv *pctx;
+ struct ibw_wr *p;
+ int rc;
+ uint32_t msg_size;
+
+ if (pconn->queue==NULL)
+ return 0; /* NOP */
+
+ p = pconn->queue;
+
+ /* we must have at least 1 fragment to send */
+ assert(p->queued_ref_cnt>0);
+ p->queued_ref_cnt--;
+
+ pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+ msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen;
+
+ assert(p->queued_msg!=NULL);
+ assert(msg_size!=0);
+
+ DEBUG(10, ("ibw_process_queue refcnt=%d msgsize=%u\n",
+ p->queued_ref_cnt, msg_size));
+
+ rc = ibw_send_packet(conn, p->queued_msg, p, msg_size);
+
+ /* was this the last fragment? */
+ if (p->queued_ref_cnt) {
+ p->queued_msg += pctx->opts.recv_bufsize;
+ } else {
+ DLIST_REMOVE2(pconn->queue, p, qprev, qnext);
+ p->queued_msg = NULL;
+ }
+
+ return rc;
}
static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
@@ -618,7 +701,6 @@ static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p;
int send_index;
- int rc = 0;
DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n",
pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
@@ -662,30 +744,7 @@ static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
}
}
- if (pconn->queue) {
- uint32_t msg_size;
-
- DEBUG(10, ("ibw_wc_send#queue %u\n", (int)wc->wr_id));
-
- p = pconn->queue;
-
- assert(p->queued_ref_cnt>0);
- p->queued_ref_cnt--;
-
- msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen;
-
- assert(p->queued_msg!=NULL);
- assert(msg_size!=0);
- rc = ibw_send_packet(conn, p->queued_msg, p, msg_size);
- if (p->queued_ref_cnt) {
- p->queued_msg += pctx->opts.recv_bufsize;
- } else {
- DLIST_REMOVE2(pconn->queue, p, qprev, qnext);
- p->queued_msg = NULL;
- }
- }
-
- return rc;
+ return ibw_process_queue(conn);
}
static int ibw_append_to_part(struct ibw_conn_priv *pconn,
@@ -874,8 +933,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
struct ibw_ctx_priv *pctx;
int rc;
- DEBUG(10, ("ibw_init(ctx_userdata: %u, ectx: %u)\n",
- (uint32_t)ctx_userdata, (uint32_t)ectx));
+ DEBUG(10, ("ibw_init(ctx_userdata: %p, ectx: %p)\n", ctx_userdata, ectx));
/* initialize basic data structures */
memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);
@@ -1010,19 +1068,25 @@ int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata)
return 0;
}
-int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
+int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata)
{
- struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
- struct ibw_conn *conn = NULL;
+ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = NULL;
int rc;
- conn = ibw_conn_new(ctx);
+ assert(conn!=NULL);
+
conn->conn_userdata = conn_userdata;
pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr),
ntohs(serv_addr->sin_port)));
+ /* clean previous - probably half - initialization */
+ if (ibw_conn_priv_destruct(pconn)) {
+ DEBUG(0, ("ibw_connect/ibw_pconn_destruct failed for cm_id=%p\n", pconn->cm_id));
+ return -1;
+ }
+
/* init cm */
rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
if (rc) {
@@ -1053,11 +1117,23 @@ int ibw_disconnect(struct ibw_conn *conn)
DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id));
- rc = rdma_disconnect(pconn->cm_id);
- if (rc) {
- sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc);
- DEBUG(0, (ibw_lasterr));
- return rc;
+ assert(pconn!=NULL);
+
+ switch(conn->state) {
+ case IBWC_ERROR:
+ ibw_conn_priv_destruct(pconn); /* do this here right now */
+ break;
+ case IBWC_CONNECTED:
+ rc = rdma_disconnect(pconn->cm_id);
+ if (rc) {
+ sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc);
+ DEBUG(0, (ibw_lasterr));
+ return rc;
+ }
+ break;
+ default:
+ DEBUG(9, ("invalid state for disconnect: %d\n", conn->state));
+ break;
}
return 0;
@@ -1092,6 +1168,7 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l
p = pconn->extra_avail;
if (!p) {
p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr);
+ talloc_set_destructor(p, ibw_wr_destruct);
if (p==NULL) {
sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max);
goto error;
@@ -1174,6 +1251,8 @@ static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, u
DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len));
+ /* TODO: clarify how to continue when state==IBWC_STOPPED */
+
/* to be sent by ibw_wc_send */
/* regardless "normal" or [a part of] "large" packet */
if (!p->queued_ref_cnt) {
diff --git a/ctdb/ib/ibwrapper.h b/ctdb/ib/ibwrapper.h
index 52018cb7cb5..36385d6f46b 100644
--- a/ctdb/ib/ibwrapper.h
+++ b/ctdb/ib/ibwrapper.h
@@ -154,6 +154,14 @@ int ibw_listen(struct ibw_ctx *ctx, int backlog);
int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata);
/*
+ * Create a new connection structure
+ * available for queueing ibw_send
+ *
+ * <parent> is needed to be notified by talloc destruct action.
+ */
+struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx);
+
+/*
* Needs a normal internet address here
* can be called within IBWS_READY|IBWS_CONNECT_REQUEST
*
@@ -162,7 +170,7 @@ int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata);
* You have +1 waiting here: you will get ibw_conn (having the
* same <conn_userdata> member) structure in ibw_connstate_fn_t.
*/
-int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata);
+int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata);
/*
* Sends out a disconnect request.
diff --git a/ctdb/ib/ibwrapper_test.c b/ctdb/ib/ibwrapper_test.c
index 2fa590588ce..2ab4c97158b 100644
--- a/ctdb/ib/ibwrapper_test.c
+++ b/ctdb/ib/ibwrapper_test.c
@@ -81,11 +81,13 @@ enum testopcode {
int ibwtest_connect_everybody(struct ibwtest_ctx *tcx)
{
- struct ibwtest_conn *pconn = talloc_zero(tcx, struct ibwtest_conn);
+ struct ibw_conn *conn;
+ struct ibwtest_conn *tconn = talloc_zero(tcx, struct ibwtest_conn);
int i;
for(i=0; i<tcx->naddrs; i++) {
- if (ibw_connect(tcx->ibwctx, &tcx->addrs[i], pconn)) {
+ conn = ibw_conn_new(tcx->ibwctx, tconn);
+ if (ibw_connect(conn, &tcx->addrs[i], tconn)) {
fprintf(stderr, "ibw_connect error at %d\n", i);
return -1;
}
@@ -237,7 +239,7 @@ int ibwtest_do_varsize_scenario_conn(struct ibwtest_ctx *tcx, struct ibw_conn *c
int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
{
struct ibwtest_ctx *tcx = NULL; /* userdata */
- struct ibwtest_conn *pconn = NULL; /* userdata */
+ struct ibwtest_conn *tconn = NULL; /* userdata */
if (ctx) {
tcx = talloc_get_type(ctx->ctx_userdata, struct ibwtest_ctx);
@@ -251,8 +253,8 @@ int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
break;
case IBWS_CONNECT_REQUEST:
DEBUG(10, ("test IBWS_CONNECT_REQUEST\n"));
- pconn = talloc_zero(conn, struct ibwtest_conn);
- if (ibw_accept(ctx, conn, pconn)) {
+ tconn = talloc_zero(conn, struct ibwtest_conn);
+ if (ibw_accept(ctx, conn, tconn)) {
DEBUG(0, ("error accepting the connect request\n"));
}
break;
@@ -271,7 +273,7 @@ int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
}
if (conn) {
- pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
+ tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
switch(conn->state) {
case IBWC_INIT:
DEBUG(10, ("test IBWC_INIT\n"));
@@ -300,22 +302,22 @@ int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n)
{
- struct ibwtest_conn *pconn;
+ struct ibwtest_conn *tconn;
enum testopcode op;
struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx);
int rc = 0;
assert(conn!=NULL);
assert(n>=sizeof(uint32_t)+1);
- pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
+ tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
op = (enum testopcode)((char *)buf)[sizeof(uint32_t)];
if (op==TESTOP_SEND_ID) {
- pconn->id = talloc_strdup(pconn, ((char *)buf)+sizeof(uint32_t)+1);
+ tconn->id = talloc_strdup(tconn, ((char *)buf)+sizeof(uint32_t)+1);
}
if (op==TESTOP_SEND_ID || op==TESTOP_SEND_TEXT) {
DEBUG(11, ("[%d]msg from %s: \"%s\"(%d)\n", op,
- pconn->id ? pconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n));
+ tconn->id ? tconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n));
}
if (tcx->is_server) {
@@ -327,7 +329,7 @@ int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n)
op,
n - sizeof(uint32_t) - 2,
(uint32_t)sum,
- pconn->id ? pconn->id : "NULL"));
+ tconn->id ? tconn->id : "NULL"));
if (sum!=((unsigned char *)buf)[n-1]) {
DEBUG(0, ("ERROR: checksum mismatch %u!=%u\n",
(uint32_t)sum, (uint32_t)((unsigned char *)buf)[n-1]));