diff options
Diffstat (limited to 'ctdb/ib/ibwrapper.c')
-rw-r--r-- | ctdb/ib/ibwrapper.c | 213 |
1 files changed, 146 insertions, 67 deletions
diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c index f3ef0c4c5c..f7b233954d 100644 --- a/ctdb/ib/ibwrapper.c +++ b/ctdb/ib/ibwrapper.c @@ -161,42 +161,57 @@ static int ibw_ctx_destruct(struct ibw_ctx *ctx) static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn) { - DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %p)\n", - (uint32_t)pconn, pconn->cm_id)); - - /* free memory regions */ - ibw_free_mr(&pconn->buf_send, &pconn->mr_send); - ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv); + DEBUG(10, ("ibw_conn_priv_destruct(%p, cmid: %p)\n", + pconn, pconn->cm_id)); /* pconn->wr_index is freed by talloc */ /* pconn->wr_index[i] are freed by talloc */ /* destroy verbs */ - if (pconn->cm_id->qp) { - ibv_destroy_qp(pconn->cm_id->qp); + if (pconn->cm_id!=NULL && pconn->cm_id->qp!=NULL) { + rdma_destroy_qp(pconn->cm_id); pconn->cm_id->qp = NULL; } - if (pconn->cq) { + + if (pconn->cq!=NULL) { ibv_destroy_cq(pconn->cq); pconn->cq = NULL; } - if (pconn->verbs_channel) { + + if (pconn->verbs_channel!=NULL) { ibv_destroy_comp_channel(pconn->verbs_channel); pconn->verbs_channel = NULL; } + + /* must be freed here because its order is important */ if (pconn->verbs_channel_event) { - /* TODO: do we have to do this here? */ talloc_free(pconn->verbs_channel_event); pconn->verbs_channel_event = NULL; } + + /* free memory regions */ + ibw_free_mr(&pconn->buf_send, &pconn->mr_send); + ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv); + if (pconn->pd) { ibv_dealloc_pd(pconn->pd); pconn->pd = NULL; + DEBUG(10, ("pconn=%p pd deallocated\n", pconn)); } + if (pconn->cm_id) { rdma_destroy_id(pconn->cm_id); pconn->cm_id = NULL; + DEBUG(10, ("pconn=%p cm_id destroyed\n", pconn)); } + + return 0; +} + +static int ibw_wr_destruct(struct ibw_wr *wr) +{ + if (wr->buf_large!=NULL) + ibw_free_mr(&wr->buf_large, &wr->mr_large); return 0; } @@ -209,16 +224,18 @@ static int ibw_conn_destruct(struct ibw_conn *conn) return 0; } -static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx) +struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx) { struct ibw_conn *conn; struct ibw_conn_priv *pconn; - conn = talloc_zero(ctx, struct ibw_conn); + assert(ctx!=NULL); + + conn = talloc_zero(mem_ctx, struct ibw_conn); assert(conn!=NULL); talloc_set_destructor(conn, ibw_conn_destruct); - pconn = talloc_zero(ctx, struct ibw_conn_priv); + pconn = talloc_zero(conn, struct ibw_conn_priv); assert(pconn!=NULL); talloc_set_destructor(pconn, ibw_conn_priv_destruct); @@ -248,7 +265,7 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn) } DEBUG(10, ("created channel %p\n", pconn->verbs_channel)); - pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn, + pconn->verbs_channel_event = event_add_fd(pctx->ectx, NULL, /* not pconn or conn */ pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn); pconn->pd = ibv_alloc_pd(pconn->cm_id->verbs); @@ -371,14 +388,15 @@ static int ibw_fill_cq(struct ibw_conn *conn) return 0; } -static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id) +static int ibw_manage_connect(struct ibw_conn *conn) { struct rdma_conn_param conn_param; + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); int rc; - DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", cma_id)); - rc = ibw_setup_cq_qp(conn); - if (rc) + DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", pconn->cm_id)); + + if (ibw_setup_cq_qp(conn)) return -1; /* cm connect */ @@ -387,7 +405,7 @@ static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id) conn_param.initiator_depth = 1; conn_param.retry_count = 10; - rc = rdma_connect(cma_id, &conn_param); + rc = rdma_connect(pconn->cm_id, &conn_param); if (rc) sprintf(ibw_lasterr, "rdma_connect error %d\n", rc); @@ -436,7 +454,7 @@ static void ibw_event_handler_cm(struct event_context *ev, assert(cma_id->context!=NULL); conn = talloc_get_type(cma_id->context, struct ibw_conn); - rc = ibw_manage_connect(conn, cma_id); + rc = ibw_manage_connect(conn); if (rc) goto error; @@ -445,7 +463,7 @@ static void ibw_event_handler_cm(struct event_context *ev, case RDMA_CM_EVENT_CONNECT_REQUEST: DEBUG(11, ("RDMA_CM_EVENT_CONNECT_REQUEST\n")); ctx->state = IBWS_CONNECT_REQUEST; - conn = ibw_conn_new(ctx); + conn = ibw_conn_new(ctx, ctx); pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); pconn->cm_id = cma_id; /* !!! event will be freed but id not */ cma_id->context = (void *)conn; @@ -459,6 +477,9 @@ static void ibw_event_handler_cm(struct event_context *ev, /* continued at ibw_accept when invoked by the func above */ if (!pconn->is_accepted) { + rc = rdma_reject(cma_id, NULL, 0); + if (rc) + DEBUG(0, ("rdma_reject failed with rc=%d\n", rc)); talloc_free(conn); DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id)); } @@ -476,6 +497,8 @@ static void ibw_event_handler_cm(struct event_context *ev, conn = talloc_get_type(cma_id->context, struct ibw_conn); assert(conn!=NULL); /* important assumption */ + DEBUG(10, ("ibw_setup_cq_qp succeeded (cmid=%p)\n", cma_id)); + /* client conn is up */ conn->state = IBWC_CONNECTED; @@ -485,22 +508,30 @@ static void ibw_event_handler_cm(struct event_context *ev, case RDMA_CM_EVENT_ADDR_ERROR: sprintf(ibw_lasterr, "RDMA_CM_EVENT_ADDR_ERROR, error %d\n", event->status); - goto error; case RDMA_CM_EVENT_ROUTE_ERROR: sprintf(ibw_lasterr, "RDMA_CM_EVENT_ROUTE_ERROR, error %d\n", event->status); - goto error; case RDMA_CM_EVENT_CONNECT_ERROR: sprintf(ibw_lasterr, "RDMA_CM_EVENT_CONNECT_ERROR, error %d\n", event->status); - goto error; case RDMA_CM_EVENT_UNREACHABLE: sprintf(ibw_lasterr, "RDMA_CM_EVENT_UNREACHABLE, error %d\n", event->status); - goto error; case RDMA_CM_EVENT_REJECTED: sprintf(ibw_lasterr, "RDMA_CM_EVENT_REJECTED, error %d\n", event->status); + conn = talloc_get_type(cma_id->context, struct ibw_conn); + if (conn) { + if ((rc=rdma_ack_cm_event(event))) + DEBUG(0, ("reject/rdma_ack_cm_event failed with %d\n", rc)); + event = NULL; + pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + ibw_conn_priv_destruct(pconn); + } goto error; case RDMA_CM_EVENT_DISCONNECTED: DEBUG(11, ("RDMA_CM_EVENT_DISCONNECTED\n")); + if ((rc=rdma_ack_cm_event(event))) + DEBUG(0, ("disc/rdma_ack_cm_event failed with %d\n", rc)); + event = NULL; /* don't ack more */ + if (cma_id!=pctx->cm_id) { DEBUG(0, ("client DISCONNECT event cm_id=%p\n", cma_id)); conn = talloc_get_type(cma_id->context, struct ibw_conn); @@ -518,14 +549,20 @@ static void ibw_event_handler_cm(struct event_context *ev, goto error; } - if ((rc=rdma_ack_cm_event(event))) { + if (event!=NULL && (rc=rdma_ack_cm_event(event))) { sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc); goto error; } return; error: + if (event!=NULL && (rc=rdma_ack_cm_event(event))) { + sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc); + goto error; + } + DEBUG(0, ("cm event handler: %s", ibw_lasterr)); + if (cma_id!=pctx->cm_id) { conn = talloc_get_type(cma_id->context, struct ibw_conn); if (conn) @@ -569,8 +606,8 @@ static void ibw_event_handler_verbs(struct event_context *ev, while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) { if (wc.status) { - sprintf(ibw_lasterr, "cq completion failed status %d rc %d\n", - wc.status, rc); + sprintf(ibw_lasterr, "cq completion failed status=%d, opcode=%d, rc=%d\n", + wc.status, wc.opcode, rc); goto error; } @@ -605,11 +642,57 @@ static void ibw_event_handler_verbs(struct event_context *ev, goto error; } + ibv_ack_cq_events(pconn->cq, 1); + return; error: + ibv_ack_cq_events(pconn->cq, 1); + DEBUG(0, (ibw_lasterr)); - conn->state = IBWC_ERROR; - pctx->connstate_func(NULL, conn); + + if (conn->state!=IBWC_ERROR) { + conn->state = IBWC_ERROR; + pctx->connstate_func(NULL, conn); + } +} + +static int ibw_process_queue(struct ibw_conn *conn) +{ + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_ctx_priv *pctx; + struct ibw_wr *p; + int rc; + uint32_t msg_size; + + if (pconn->queue==NULL) + return 0; /* NOP */ + + p = pconn->queue; + + /* we must have at least 1 fragment to send */ + assert(p->queued_ref_cnt>0); + p->queued_ref_cnt--; + + pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen; + + assert(p->queued_msg!=NULL); + assert(msg_size!=0); + + DEBUG(10, ("ibw_process_queue refcnt=%d msgsize=%u\n", + p->queued_ref_cnt, msg_size)); + + rc = ibw_send_packet(conn, p->queued_msg, p, msg_size); + + /* was this the last fragment? */ + if (p->queued_ref_cnt) { + p->queued_msg += pctx->opts.recv_bufsize; + } else { + DLIST_REMOVE2(pconn->queue, p, qprev, qnext); + p->queued_msg = NULL; + } + + return rc; } static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) @@ -618,7 +701,6 @@ static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p; int send_index; - int rc = 0; DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n", pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len)); @@ -662,30 +744,7 @@ static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) } } - if (pconn->queue) { - uint32_t msg_size; - - DEBUG(10, ("ibw_wc_send#queue %u\n", (int)wc->wr_id)); - - p = pconn->queue; - - assert(p->queued_ref_cnt>0); - p->queued_ref_cnt--; - - msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen; - - assert(p->queued_msg!=NULL); - assert(msg_size!=0); - rc = ibw_send_packet(conn, p->queued_msg, p, msg_size); - if (p->queued_ref_cnt) { - p->queued_msg += pctx->opts.recv_bufsize; - } else { - DLIST_REMOVE2(pconn->queue, p, qprev, qnext); - p->queued_msg = NULL; - } - } - - return rc; + return ibw_process_queue(conn); } static int ibw_append_to_part(struct ibw_conn_priv *pconn, @@ -874,8 +933,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, struct ibw_ctx_priv *pctx; int rc; - DEBUG(10, ("ibw_init(ctx_userdata: %u, ectx: %u)\n", - (uint32_t)ctx_userdata, (uint32_t)ectx)); + DEBUG(10, ("ibw_init(ctx_userdata: %p, ectx: %p)\n", ctx_userdata, ectx)); /* initialize basic data structures */ memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE); @@ -1010,19 +1068,25 @@ int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata) return 0; } -int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata) +int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata) { - struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv); - struct ibw_conn *conn = NULL; + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = NULL; int rc; - conn = ibw_conn_new(ctx); + assert(conn!=NULL); + conn->conn_userdata = conn_userdata; pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr), ntohs(serv_addr->sin_port))); + /* clean previous - probably half - initialization */ + if (ibw_conn_priv_destruct(pconn)) { + DEBUG(0, ("ibw_connect/ibw_pconn_destruct failed for cm_id=%p\n", pconn->cm_id)); + return -1; + } + /* init cm */ rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP); if (rc) { @@ -1053,11 +1117,23 @@ int ibw_disconnect(struct ibw_conn *conn) DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id)); - rc = rdma_disconnect(pconn->cm_id); - if (rc) { - sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc); - DEBUG(0, (ibw_lasterr)); - return rc; + assert(pconn!=NULL); + + switch(conn->state) { + case IBWC_ERROR: + ibw_conn_priv_destruct(pconn); /* do this here right now */ + break; + case IBWC_CONNECTED: + rc = rdma_disconnect(pconn->cm_id); + if (rc) { + sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc); + DEBUG(0, (ibw_lasterr)); + return rc; + } + break; + default: + DEBUG(9, ("invalid state for disconnect: %d\n", conn->state)); + break; } return 0; @@ -1092,6 +1168,7 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l p = pconn->extra_avail; if (!p) { p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr); + talloc_set_destructor(p, ibw_wr_destruct); if (p==NULL) { sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max); goto error; @@ -1174,6 +1251,8 @@ static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, u DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len)); + /* TODO: clarify how to continue when state==IBWC_STOPPED */ + /* to be sent by ibw_wc_send */ /* regardless "normal" or [a part of] "large" packet */ if (!p->queued_ref_cnt) { |