diff options
Diffstat (limited to 'ctdb/ib')
-rw-r--r-- | ctdb/ib/ibw_ctdb.c | 38 | ||||
-rw-r--r-- | ctdb/ib/ibw_ctdb.h | 18 | ||||
-rw-r--r-- | ctdb/ib/ibw_ctdb_init.c | 73 | ||||
-rw-r--r-- | ctdb/ib/ibwrapper.c | 213 | ||||
-rw-r--r-- | ctdb/ib/ibwrapper.h | 10 | ||||
-rw-r--r-- | ctdb/ib/ibwrapper_test.c | 24 |
6 files changed, 266 insertions, 110 deletions
diff --git a/ctdb/ib/ibw_ctdb.c b/ctdb/ib/ibw_ctdb.c index 53293810ca8..50dd8d68d43 100644 --- a/ctdb/ib/ibw_ctdb.c +++ b/ctdb/ib/ibw_ctdb.c @@ -29,8 +29,13 @@ #include "ibwrapper.h" #include "ibw_ctdb.h" -int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node) +int ctdb_ibw_node_connect(struct ctdb_node *node) { + struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node); + int rc; + + assert(cn!=NULL); + assert(cn->conn!=NULL); struct sockaddr_in sock_out; memset(&sock_out, 0, sizeof(struct sockaddr_in)); @@ -38,12 +43,12 @@ int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node) sock_out.sin_port = htons(node->address.port); sock_out.sin_family = PF_INET; - if (ibw_connect(ictx, &sock_out, node)) { - DEBUG(0, ("ctdb_ibw_node_connect: ibw_connect failed - retrying in 1 sec...\n")); + rc = ibw_connect(cn->conn, &sock_out, node); + if (rc) { + DEBUG(0, ("ctdb_ibw_node_connect/ibw_connect failed - retrying...\n")); /* try again once a second */ event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0), ctdb_ibw_node_connect_event, node); - return -1; } /* continues at ibw_ctdb.c/IBWC_CONNECTED in good case */ @@ -54,9 +59,8 @@ void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *t struct timeval t, void *private) { struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private, struct ibw_ctx); - ctdb_ibw_node_connect(ictx, node); + ctdb_ibw_node_connect(node); } int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) @@ -94,14 +98,15 @@ int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) case IBWC_CONNECTED: { /* after ibw_accept or ibw_connect */ struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node); if (node!=NULL) { /* after ibw_connect */ - node->private = (void *)conn; + struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node); + node->ctdb->upcalls->node_connected(node); + ctdb_flush_cn_queue(cn); } else { /* after ibw_accept */ /* NOP in CTDB case */ } } break; case IBWC_DISCONNECTED: { /* after ibw_disconnect */ - /* TODO: have a CTDB upcall */ struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node); if (node!=NULL) node->ctdb->upcalls->node_dead(node); @@ -110,13 +115,16 @@ int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) } break; case IBWC_ERROR: { struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node); - if (node!=NULL) - node->private = NULL; /* not to use again */ - - DEBUG(10, ("IBWC_ERROR, reconnecting immediately...\n")); - talloc_free(conn); - event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0), - ctdb_ibw_node_connect_event, node); + if (node!=NULL) { + struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node); + struct ibw_ctx *ictx = cn->conn->ctx; + + DEBUG(10, ("IBWC_ERROR, reconnecting...\n")); + talloc_free(cn->conn); /* internal queue content is destroyed */ + cn->conn = (void *)ibw_conn_new(ictx, node); + event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0), + ctdb_ibw_node_connect_event, node); + } } break; default: assert(0); diff --git a/ctdb/ib/ibw_ctdb.h b/ctdb/ib/ibw_ctdb.h index 14308682b21..24adeb7233e 100644 --- a/ctdb/ib/ibw_ctdb.h +++ b/ctdb/ib/ibw_ctdb.h @@ -21,10 +21,26 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +struct ctdb_ibw_msg { + uint8_t *data; + uint32_t length; + struct ctdb_ibw_msg *prev; + struct ctdb_ibw_msg *next; +}; + +struct ctdb_ibw_node { + struct ibw_conn *conn; + + struct ctdb_ibw_msg *queue; + struct ctdb_ibw_msg *queue_last; + int qcnt; +}; + int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn); int ctdb_ibw_receive_handler(struct ibw_conn *conn, void *buf, int n); -int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node); +int ctdb_ibw_node_connect(struct ctdb_node *node); void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *te, struct timeval t, void *private); +int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn); diff --git a/ctdb/ib/ibw_ctdb_init.c b/ctdb/ib/ibw_ctdb_init.c index 78924632292..9c0300c4f4c 100644 --- a/ctdb/ib/ibw_ctdb_init.c +++ b/ctdb/ib/ibw_ctdb_init.c @@ -58,7 +58,6 @@ static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog) */ static int ctdb_ibw_start(struct ctdb_context *ctdb) { - struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx); int i; /* listen on our own address */ @@ -71,43 +70,87 @@ static int ctdb_ibw_start(struct ctdb_context *ctdb) if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) && ctdb_same_address(&ctdb->address, &node->address)) continue; - ctdb_ibw_node_connect(ictx, node); + ctdb_ibw_node_connect(node); } return 0; } - /* * initialise ibw portion of a ctdb node */ static int ctdb_ibw_add_node(struct ctdb_node *node) { - /* TODO: clarify whether is this necessary for us ? - - why not enough doing such thing internally at connect time ? */ - return 0; + struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private, struct ibw_ctx); + struct ctdb_ibw_node *cn = talloc_zero(node, struct ctdb_ibw_node); + + assert(cn!=NULL); + cn->conn = ibw_conn_new(ictx, node); + node->private = (void *)cn; + + return (cn->conn!=NULL ? 0 : -1); +} + +static int ctdb_ibw_send_pkt(struct ibw_conn *conn, uint8_t *data, uint32_t length) +{ + void *buf, *key; + + if (ibw_alloc_send_buf(conn, &buf, &key, length)) { + DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n")); + return -1; + } + + memcpy(buf, data, length); + return ibw_send(conn, buf, key, length); +} + +int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn) +{ + struct ctdb_ibw_msg *p; + int rc = 0; + + while(cn->queue) { + p = cn->queue; + rc = ctdb_ibw_send_pkt(cn->conn, p->data, p->length); + if (rc) + return -1; /* will be retried later when conn is up */ + + DLIST_REMOVE(cn->queue, p); + cn->qcnt--; + talloc_free(p); /* it will talloc_free p->data as well */ + } + assert(cn->qcnt==0); + /* cn->queue_last = NULL is not needed - see DLIST_ADD_AFTER */ + + return rc; } static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) { - struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn); + struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node); int rc; - void *buf, *key; assert(length>=sizeof(uint32_t)); + assert(cn!=NULL); - if (conn==NULL) { + if (cn->conn==NULL) { DEBUG(0, ("ctdb_ibw_queue_pkt: conn is NULL\n")); return -1; } - if (ibw_alloc_send_buf(conn, &buf, &key, length)) { - DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n")); - return -1; - } + if (cn->conn->state==IBWC_CONNECTED) { + rc = ctdb_ibw_send_pkt(cn->conn, data, length); + } else { + struct ctdb_ibw_msg *p = talloc_zero(cn, struct ctdb_ibw_msg); + p->data = talloc_memdup(p, data, length); + p->length = length; - memcpy(buf, data, length); - rc = ibw_send(conn, buf, key, length); + DLIST_ADD_AFTER(cn->queue, p, cn->queue_last); + cn->queue_last = p; + cn->qcnt++; + + rc = 0; + } return rc; } diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c index f3ef0c4c5cd..f7b233954d2 100644 --- a/ctdb/ib/ibwrapper.c +++ b/ctdb/ib/ibwrapper.c @@ -161,42 +161,57 @@ static int ibw_ctx_destruct(struct ibw_ctx *ctx) static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn) { - DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %p)\n", - (uint32_t)pconn, pconn->cm_id)); - - /* free memory regions */ - ibw_free_mr(&pconn->buf_send, &pconn->mr_send); - ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv); + DEBUG(10, ("ibw_conn_priv_destruct(%p, cmid: %p)\n", + pconn, pconn->cm_id)); /* pconn->wr_index is freed by talloc */ /* pconn->wr_index[i] are freed by talloc */ /* destroy verbs */ - if (pconn->cm_id->qp) { - ibv_destroy_qp(pconn->cm_id->qp); + if (pconn->cm_id!=NULL && pconn->cm_id->qp!=NULL) { + rdma_destroy_qp(pconn->cm_id); pconn->cm_id->qp = NULL; } - if (pconn->cq) { + + if (pconn->cq!=NULL) { ibv_destroy_cq(pconn->cq); pconn->cq = NULL; } - if (pconn->verbs_channel) { + + if (pconn->verbs_channel!=NULL) { ibv_destroy_comp_channel(pconn->verbs_channel); pconn->verbs_channel = NULL; } + + /* must be freed here because its order is important */ if (pconn->verbs_channel_event) { - /* TODO: do we have to do this here? */ talloc_free(pconn->verbs_channel_event); pconn->verbs_channel_event = NULL; } + + /* free memory regions */ + ibw_free_mr(&pconn->buf_send, &pconn->mr_send); + ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv); + if (pconn->pd) { ibv_dealloc_pd(pconn->pd); pconn->pd = NULL; + DEBUG(10, ("pconn=%p pd deallocated\n", pconn)); } + if (pconn->cm_id) { rdma_destroy_id(pconn->cm_id); pconn->cm_id = NULL; + DEBUG(10, ("pconn=%p cm_id destroyed\n", pconn)); } + + return 0; +} + +static int ibw_wr_destruct(struct ibw_wr *wr) +{ + if (wr->buf_large!=NULL) + ibw_free_mr(&wr->buf_large, &wr->mr_large); return 0; } @@ -209,16 +224,18 @@ static int ibw_conn_destruct(struct ibw_conn *conn) return 0; } -static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx) +struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx) { struct ibw_conn *conn; struct ibw_conn_priv *pconn; - conn = talloc_zero(ctx, struct ibw_conn); + assert(ctx!=NULL); + + conn = talloc_zero(mem_ctx, struct ibw_conn); assert(conn!=NULL); talloc_set_destructor(conn, ibw_conn_destruct); - pconn = talloc_zero(ctx, struct ibw_conn_priv); + pconn = talloc_zero(conn, struct ibw_conn_priv); assert(pconn!=NULL); talloc_set_destructor(pconn, ibw_conn_priv_destruct); @@ -248,7 +265,7 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn) } DEBUG(10, ("created channel %p\n", pconn->verbs_channel)); - pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn, + pconn->verbs_channel_event = event_add_fd(pctx->ectx, NULL, /* not pconn or conn */ pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn); pconn->pd = ibv_alloc_pd(pconn->cm_id->verbs); @@ -371,14 +388,15 @@ static int ibw_fill_cq(struct ibw_conn *conn) return 0; } -static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id) +static int ibw_manage_connect(struct ibw_conn *conn) { struct rdma_conn_param conn_param; + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); int rc; - DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", cma_id)); - rc = ibw_setup_cq_qp(conn); - if (rc) + DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", pconn->cm_id)); + + if (ibw_setup_cq_qp(conn)) return -1; /* cm connect */ @@ -387,7 +405,7 @@ static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id) conn_param.initiator_depth = 1; conn_param.retry_count = 10; - rc = rdma_connect(cma_id, &conn_param); + rc = rdma_connect(pconn->cm_id, &conn_param); if (rc) sprintf(ibw_lasterr, "rdma_connect error %d\n", rc); @@ -436,7 +454,7 @@ static void ibw_event_handler_cm(struct event_context *ev, assert(cma_id->context!=NULL); conn = talloc_get_type(cma_id->context, struct ibw_conn); - rc = ibw_manage_connect(conn, cma_id); + rc = ibw_manage_connect(conn); if (rc) goto error; @@ -445,7 +463,7 @@ static void ibw_event_handler_cm(struct event_context *ev, case RDMA_CM_EVENT_CONNECT_REQUEST: DEBUG(11, ("RDMA_CM_EVENT_CONNECT_REQUEST\n")); ctx->state = IBWS_CONNECT_REQUEST; - conn = ibw_conn_new(ctx); + conn = ibw_conn_new(ctx, ctx); pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); pconn->cm_id = cma_id; /* !!! event will be freed but id not */ cma_id->context = (void *)conn; @@ -459,6 +477,9 @@ static void ibw_event_handler_cm(struct event_context *ev, /* continued at ibw_accept when invoked by the func above */ if (!pconn->is_accepted) { + rc = rdma_reject(cma_id, NULL, 0); + if (rc) + DEBUG(0, ("rdma_reject failed with rc=%d\n", rc)); talloc_free(conn); DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id)); } @@ -476,6 +497,8 @@ static void ibw_event_handler_cm(struct event_context *ev, conn = talloc_get_type(cma_id->context, struct ibw_conn); assert(conn!=NULL); /* important assumption */ + DEBUG(10, ("ibw_setup_cq_qp succeeded (cmid=%p)\n", cma_id)); + /* client conn is up */ conn->state = IBWC_CONNECTED; @@ -485,22 +508,30 @@ static void ibw_event_handler_cm(struct event_context *ev, case RDMA_CM_EVENT_ADDR_ERROR: sprintf(ibw_lasterr, "RDMA_CM_EVENT_ADDR_ERROR, error %d\n", event->status); - goto error; case RDMA_CM_EVENT_ROUTE_ERROR: sprintf(ibw_lasterr, "RDMA_CM_EVENT_ROUTE_ERROR, error %d\n", event->status); - goto error; case RDMA_CM_EVENT_CONNECT_ERROR: sprintf(ibw_lasterr, "RDMA_CM_EVENT_CONNECT_ERROR, error %d\n", event->status); - goto error; case RDMA_CM_EVENT_UNREACHABLE: sprintf(ibw_lasterr, "RDMA_CM_EVENT_UNREACHABLE, error %d\n", event->status); - goto error; case RDMA_CM_EVENT_REJECTED: sprintf(ibw_lasterr, "RDMA_CM_EVENT_REJECTED, error %d\n", event->status); + conn = talloc_get_type(cma_id->context, struct ibw_conn); + if (conn) { + if ((rc=rdma_ack_cm_event(event))) + DEBUG(0, ("reject/rdma_ack_cm_event failed with %d\n", rc)); + event = NULL; + pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + ibw_conn_priv_destruct(pconn); + } goto error; case RDMA_CM_EVENT_DISCONNECTED: DEBUG(11, ("RDMA_CM_EVENT_DISCONNECTED\n")); + if ((rc=rdma_ack_cm_event(event))) + DEBUG(0, ("disc/rdma_ack_cm_event failed with %d\n", rc)); + event = NULL; /* don't ack more */ + if (cma_id!=pctx->cm_id) { DEBUG(0, ("client DISCONNECT event cm_id=%p\n", cma_id)); conn = talloc_get_type(cma_id->context, struct ibw_conn); @@ -518,14 +549,20 @@ static void ibw_event_handler_cm(struct event_context *ev, goto error; } - if ((rc=rdma_ack_cm_event(event))) { + if (event!=NULL && (rc=rdma_ack_cm_event(event))) { sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc); goto error; } return; error: + if (event!=NULL && (rc=rdma_ack_cm_event(event))) { + sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc); + goto error; + } + DEBUG(0, ("cm event handler: %s", ibw_lasterr)); + if (cma_id!=pctx->cm_id) { conn = talloc_get_type(cma_id->context, struct ibw_conn); if (conn) @@ -569,8 +606,8 @@ static void ibw_event_handler_verbs(struct event_context *ev, while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) { if (wc.status) { - sprintf(ibw_lasterr, "cq completion failed status %d rc %d\n", - wc.status, rc); + sprintf(ibw_lasterr, "cq completion failed status=%d, opcode=%d, rc=%d\n", + wc.status, wc.opcode, rc); goto error; } @@ -605,11 +642,57 @@ static void ibw_event_handler_verbs(struct event_context *ev, goto error; } + ibv_ack_cq_events(pconn->cq, 1); + return; error: + ibv_ack_cq_events(pconn->cq, 1); + DEBUG(0, (ibw_lasterr)); - conn->state = IBWC_ERROR; - pctx->connstate_func(NULL, conn); + + if (conn->state!=IBWC_ERROR) { + conn->state = IBWC_ERROR; + pctx->connstate_func(NULL, conn); + } +} + +static int ibw_process_queue(struct ibw_conn *conn) +{ + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_ctx_priv *pctx; + struct ibw_wr *p; + int rc; + uint32_t msg_size; + + if (pconn->queue==NULL) + return 0; /* NOP */ + + p = pconn->queue; + + /* we must have at least 1 fragment to send */ + assert(p->queued_ref_cnt>0); + p->queued_ref_cnt--; + + pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen; + + assert(p->queued_msg!=NULL); + assert(msg_size!=0); + + DEBUG(10, ("ibw_process_queue refcnt=%d msgsize=%u\n", + p->queued_ref_cnt, msg_size)); + + rc = ibw_send_packet(conn, p->queued_msg, p, msg_size); + + /* was this the last fragment? */ + if (p->queued_ref_cnt) { + p->queued_msg += pctx->opts.recv_bufsize; + } else { + DLIST_REMOVE2(pconn->queue, p, qprev, qnext); + p->queued_msg = NULL; + } + + return rc; } static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) @@ -618,7 +701,6 @@ static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p; int send_index; - int rc = 0; DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n", pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len)); @@ -662,30 +744,7 @@ static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) } } - if (pconn->queue) { - uint32_t msg_size; - - DEBUG(10, ("ibw_wc_send#queue %u\n", (int)wc->wr_id)); - - p = pconn->queue; - - assert(p->queued_ref_cnt>0); - p->queued_ref_cnt--; - - msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen; - - assert(p->queued_msg!=NULL); - assert(msg_size!=0); - rc = ibw_send_packet(conn, p->queued_msg, p, msg_size); - if (p->queued_ref_cnt) { - p->queued_msg += pctx->opts.recv_bufsize; - } else { - DLIST_REMOVE2(pconn->queue, p, qprev, qnext); - p->queued_msg = NULL; - } - } - - return rc; + return ibw_process_queue(conn); } static int ibw_append_to_part(struct ibw_conn_priv *pconn, @@ -874,8 +933,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, struct ibw_ctx_priv *pctx; int rc; - DEBUG(10, ("ibw_init(ctx_userdata: %u, ectx: %u)\n", - (uint32_t)ctx_userdata, (uint32_t)ectx)); + DEBUG(10, ("ibw_init(ctx_userdata: %p, ectx: %p)\n", ctx_userdata, ectx)); /* initialize basic data structures */ memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE); @@ -1010,19 +1068,25 @@ int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata) return 0; } -int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata) +int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata) { - struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv); - struct ibw_conn *conn = NULL; + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = NULL; int rc; - conn = ibw_conn_new(ctx); + assert(conn!=NULL); + conn->conn_userdata = conn_userdata; pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr), ntohs(serv_addr->sin_port))); + /* clean previous - probably half - initialization */ + if (ibw_conn_priv_destruct(pconn)) { + DEBUG(0, ("ibw_connect/ibw_pconn_destruct failed for cm_id=%p\n", pconn->cm_id)); + return -1; + } + /* init cm */ rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP); if (rc) { @@ -1053,11 +1117,23 @@ int ibw_disconnect(struct ibw_conn *conn) DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id)); - rc = rdma_disconnect(pconn->cm_id); - if (rc) { - sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc); - DEBUG(0, (ibw_lasterr)); - return rc; + assert(pconn!=NULL); + + switch(conn->state) { + case IBWC_ERROR: + ibw_conn_priv_destruct(pconn); /* do this here right now */ + break; + case IBWC_CONNECTED: + rc = rdma_disconnect(pconn->cm_id); + if (rc) { + sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc); + DEBUG(0, (ibw_lasterr)); + return rc; + } + break; + default: + DEBUG(9, ("invalid state for disconnect: %d\n", conn->state)); + break; } return 0; @@ -1092,6 +1168,7 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l p = pconn->extra_avail; if (!p) { p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr); + talloc_set_destructor(p, ibw_wr_destruct); if (p==NULL) { sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max); goto error; @@ -1174,6 +1251,8 @@ static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, u DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len)); + /* TODO: clarify how to continue when state==IBWC_STOPPED */ + /* to be sent by ibw_wc_send */ /* regardless "normal" or [a part of] "large" packet */ if (!p->queued_ref_cnt) { diff --git a/ctdb/ib/ibwrapper.h b/ctdb/ib/ibwrapper.h index 52018cb7cb5..36385d6f46b 100644 --- a/ctdb/ib/ibwrapper.h +++ b/ctdb/ib/ibwrapper.h @@ -154,6 +154,14 @@ int ibw_listen(struct ibw_ctx *ctx, int backlog); int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata); /* + * Create a new connection structure + * available for queueing ibw_send + * + * <parent> is needed to be notified by talloc destruct action. + */ +struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx); + +/* * Needs a normal internet address here * can be called within IBWS_READY|IBWS_CONNECT_REQUEST * @@ -162,7 +170,7 @@ int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata); * You have +1 waiting here: you will get ibw_conn (having the * same <conn_userdata> member) structure in ibw_connstate_fn_t. */ -int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata); +int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata); /* * Sends out a disconnect request. diff --git a/ctdb/ib/ibwrapper_test.c b/ctdb/ib/ibwrapper_test.c index 2fa590588ce..2ab4c97158b 100644 --- a/ctdb/ib/ibwrapper_test.c +++ b/ctdb/ib/ibwrapper_test.c @@ -81,11 +81,13 @@ enum testopcode { int ibwtest_connect_everybody(struct ibwtest_ctx *tcx) { - struct ibwtest_conn *pconn = talloc_zero(tcx, struct ibwtest_conn); + struct ibw_conn *conn; + struct ibwtest_conn *tconn = talloc_zero(tcx, struct ibwtest_conn); int i; for(i=0; i<tcx->naddrs; i++) { - if (ibw_connect(tcx->ibwctx, &tcx->addrs[i], pconn)) { + conn = ibw_conn_new(tcx->ibwctx, tconn); + if (ibw_connect(conn, &tcx->addrs[i], tconn)) { fprintf(stderr, "ibw_connect error at %d\n", i); return -1; } @@ -237,7 +239,7 @@ int ibwtest_do_varsize_scenario_conn(struct ibwtest_ctx *tcx, struct ibw_conn *c int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) { struct ibwtest_ctx *tcx = NULL; /* userdata */ - struct ibwtest_conn *pconn = NULL; /* userdata */ + struct ibwtest_conn *tconn = NULL; /* userdata */ if (ctx) { tcx = talloc_get_type(ctx->ctx_userdata, struct ibwtest_ctx); @@ -251,8 +253,8 @@ int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) break; case IBWS_CONNECT_REQUEST: DEBUG(10, ("test IBWS_CONNECT_REQUEST\n")); - pconn = talloc_zero(conn, struct ibwtest_conn); - if (ibw_accept(ctx, conn, pconn)) { + tconn = talloc_zero(conn, struct ibwtest_conn); + if (ibw_accept(ctx, conn, tconn)) { DEBUG(0, ("error accepting the connect request\n")); } break; @@ -271,7 +273,7 @@ int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) } if (conn) { - pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn); + tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn); switch(conn->state) { case IBWC_INIT: DEBUG(10, ("test IBWC_INIT\n")); @@ -300,22 +302,22 @@ int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn) int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n) { - struct ibwtest_conn *pconn; + struct ibwtest_conn *tconn; enum testopcode op; struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx); int rc = 0; assert(conn!=NULL); assert(n>=sizeof(uint32_t)+1); - pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn); + tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn); op = (enum testopcode)((char *)buf)[sizeof(uint32_t)]; if (op==TESTOP_SEND_ID) { - pconn->id = talloc_strdup(pconn, ((char *)buf)+sizeof(uint32_t)+1); + tconn->id = talloc_strdup(tconn, ((char *)buf)+sizeof(uint32_t)+1); } if (op==TESTOP_SEND_ID || op==TESTOP_SEND_TEXT) { DEBUG(11, ("[%d]msg from %s: \"%s\"(%d)\n", op, - pconn->id ? pconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n)); + tconn->id ? tconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n)); } if (tcx->is_server) { @@ -327,7 +329,7 @@ int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n) op, n - sizeof(uint32_t) - 2, (uint32_t)sum, - pconn->id ? pconn->id : "NULL")); + tconn->id ? tconn->id : "NULL")); if (sum!=((unsigned char *)buf)[n-1]) { DEBUG(0, ("ERROR: checksum mismatch %u!=%u\n", (uint32_t)sum, (uint32_t)((unsigned char *)buf)[n-1])); |