diff options
Diffstat (limited to 'ctdb/ib')
-rw-r--r-- | ctdb/ib/ibwrapper.c | 397 | ||||
-rw-r--r-- | ctdb/ib/ibwrapper.h | 5 | ||||
-rw-r--r-- | ctdb/ib/ibwrapper_internal.h | 31 |
3 files changed, 326 insertions, 107 deletions
diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c index b70b6caad6b..db6e303638b 100644 --- a/ctdb/ib/ibwrapper.c +++ b/ctdb/ib/ibwrapper.c @@ -49,7 +49,40 @@ static char ibw_lasterr[IBW_LASTERR_BUFSIZE]; static void ibw_event_handler_verbs(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private_data); static int ibw_fill_cq(struct ibw_conn *conn); +static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc); +static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc); +static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn, + int n, struct ibv_mr **ppmr) +{ + void *buf; + buf = memalign(pctx->pagesize, n); + if (!buf) { + sprintf(ibw_lasterr, "couldn't allocate memory\n"); + return NULL; + } + + *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE); + if (!*ppmr) { + sprintf(ibw_lasterr, "couldn't allocate mr\n"); + free(buf); + return NULL; + } + + return buf; +} + +static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr) +{ + if (*ppmr!=NULL) { + ibv_dereg_mr(*ppmr); + *ppmr = NULL; + } + if (*ppbuf) { + free(*ppbuf); + *ppbuf = NULL; + } +} static int ibw_init_memory(struct ibw_conn *conn) { @@ -59,23 +92,26 @@ static int ibw_init_memory(struct ibw_conn *conn) int i; struct ibw_wr *p; - pconn->buf = memalign(pctx->pagesize, pctx->max_msg_size); - if (!pconn->buf) { - sprintf(ibw_lasterr, "couldn't allocate work buf\n"); + pconn->buf_send = ibw_alloc_mr(pctx, pconn, + pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send); + if (!pconn->buf_send) { + sprintf(ibw_lasterr, "couldn't allocate work send buf\n"); return -1; } - pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf, - pctx->qsize * pctx->max_msg_size, IBV_ACCESS_LOCAL_WRITE); - if (!pconn->mr) { - sprintf(ibw_lasterr, "couldn't allocate mr\n"); + + pconn->buf_recv = ibw_alloc_mr(pctx, pconn, + pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv); + if (!pconn->buf_recv) { + sprintf(ibw_lasterr, "couldn't allocate work recv buf\n"); return -1; } - pconn->wr_index = talloc_size(pconn, pctx->qsize * sizeof(struct ibw_wr *)); + pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *)); + assert(pconn->wr_index!=NULL); - for(i=0; i<pctx->qsize; i++) { + for(i=0; i<pctx->opts.max_send_wr; i++) { p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr); - p->msg = pconn->buf + (i * pctx->max_msg_size); + p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size); p->wr_id = i; DLIST_ADD(pconn->wr_list_avail, p); @@ -117,14 +153,8 @@ static int ibw_ctx_destruct(struct ibw_ctx *ctx) static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn) { /* free memory regions */ - if (pconn->mr) { - ibv_dereg_mr(pconn->mr); - pconn->mr = NULL; - } - if (pconn->buf) { - free(pconn->buf); /* memalign-ed */ - pconn->buf = NULL; - } + ibw_free_mr(&pconn->buf_send, &pconn->mr_send); + ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv); /* pconn->wr_index is freed by talloc */ /* pconn->wr_index[i] are freed by talloc */ @@ -204,7 +234,8 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn) pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn); /* init cq */ - pconn->cq = ibv_create_cq(pconn->cm_id->verbs, pctx->qsize, + pconn->cq = ibv_create_cq(pconn->cm_id->verbs, + pctx->opts.max_recv_wr + pctx->opts.max_send_wr, conn, pconn->verbs_channel, 0); if (pconn->cq==NULL) { sprintf(ibw_lasterr, "ibv_create_cq failed\n"); @@ -244,8 +275,8 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn) int rc; struct ibv_sge list = { .addr = (uintptr_t) NULL, - .length = pctx->max_msg_size, - .lkey = pconn->mr->lkey + .length = pctx->opts.recv_bufsize, + .lkey = pconn->mr_recv->lkey }; struct ibv_recv_wr wr = { .wr_id = 0, @@ -253,17 +284,10 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn) .num_sge = 1, }; struct ibv_recv_wr *bad_wr; - struct ibw_wr *p = pconn->wr_list_avail; - if (p==NULL) { - sprintf(ibw_lasterr, "out of wr_list_avail"); - DEBUG(0, (ibw_lasterr)); - return -1; - } - DLIST_REMOVE(pconn->wr_list_avail, p); - DLIST_ADD(pconn->wr_list_used, p); - list.addr = (uintptr_t) p->msg; - wr.wr_id = p->wr_id; + list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index; + wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index; + pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr; rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr); if (rc) { @@ -282,8 +306,8 @@ static int ibw_fill_cq(struct ibw_conn *conn) int i, rc; struct ibv_sge list = { .addr = (uintptr_t) NULL, - .length = pctx->max_msg_size, - .lkey = pconn->mr->lkey + .length = pctx->opts.recv_bufsize, + .lkey = pconn->mr_recv->lkey }; struct ibv_recv_wr wr = { .wr_id = 0, @@ -291,19 +315,11 @@ static int ibw_fill_cq(struct ibw_conn *conn) .num_sge = 1, }; struct ibv_recv_wr *bad_wr; - struct ibw_wr *p; for(i = pctx->opts.max_recv_wr; i!=0; i--) { - p = pconn->wr_list_avail; - if (p==NULL) { - sprintf(ibw_lasterr, "out of wr_list_avail"); - DEBUG(0, (ibw_lasterr)); - return -1; - } - DLIST_REMOVE(pconn->wr_list_avail, p); - DLIST_ADD(pconn->wr_list_used, p); - list.addr = (uintptr_t) p->msg; - wr.wr_id = p->wr_id; + list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index; + wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index; + pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr; rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr); if (rc) { @@ -489,62 +505,61 @@ static void ibw_event_handler_verbs(struct event_context *ev, struct ibv_wc wc; int rc; + struct ibv_cq *ev_cq; + void *ev_ctx; - rc = ibv_poll_cq(pconn->cq, 1, &wc); - if (rc!=1) { - sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc); + /* TODO: check whether if it's good to have more channels here... */ + rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx); + if (rc) { + sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc); goto error; } - if (wc.status) { - sprintf(ibw_lasterr, "cq completion failed status %d\n", - wc.status); + if (ev_cq != pconn->cq) { + sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n", + (unsigned int)ev_cq, (unsigned int)pconn->cq); + goto error; + } + rc = ibv_req_notify_cq(pconn->cq, 0); + if (rc) { + sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc); goto error; } - switch(wc.opcode) { - case IBV_WC_SEND: - { - struct ibw_wr *p; - - DEBUG(10, ("send completion\n")); - assert(pconn->cm_id->qp->qp_num==wc.qp_num); - assert(wc.wr_id < pctx->qsize); - p = pconn->wr_index[wc.wr_id]; - DLIST_REMOVE(pconn->wr_list_used, p); - DLIST_ADD(pconn->wr_list_avail, p); + while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) { + if (wc.status) { + sprintf(ibw_lasterr, "cq completion failed status %d\n", + wc.status); + goto error; } - break; - - case IBV_WC_RDMA_WRITE: - DEBUG(10, ("rdma write completion\n")); - break; - case IBV_WC_RDMA_READ: - DEBUG(10, ("rdma read completion\n")); - break; + switch(wc.opcode) { + case IBV_WC_SEND: + DEBUG(10, ("send completion\n")); + if (ibw_wc_send(conn, &wc)) + goto error; + break; - case IBV_WC_RECV: - { - struct ibw_wr *p; - - assert(pconn->cm_id->qp->qp_num==wc.qp_num); - assert(wc.wr_id < pctx->qsize); - p = pconn->wr_index[wc.wr_id]; - - DLIST_REMOVE(pconn->wr_list_used, p); - DLIST_ADD(pconn->wr_list_avail, p); + case IBV_WC_RDMA_WRITE: + DEBUG(10, ("rdma write completion\n")); + break; + case IBV_WC_RDMA_READ: + DEBUG(10, ("rdma read completion\n")); + break; + + case IBV_WC_RECV: DEBUG(10, ("recv completion\n")); - assert(wc.byte_len <= pctx->max_msg_size); - - pctx->receive_func(conn, p->msg, wc.byte_len); - if (ibw_refill_cq_recv(conn)) + if (ibw_wc_recv(conn, &wc)) goto error; - } - break; + break; - default: - sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode); + default: + sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode); + goto error; + } + } + if (rc!=0) { + sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc); goto error; } @@ -555,6 +570,163 @@ error: pctx->connstate_func(NULL, conn); } +static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_wr *p; + + assert(pconn->cm_id->qp->qp_num==wc->qp_num); + assert(wc->wr_id < pctx->opts.max_send_wr); + + p = pconn->wr_index[wc->wr_id]; + if (p->msg_large) { + ibw_free_mr(&p->msg_large, &p->mr_large); + } + + DLIST_REMOVE(pconn->wr_list_used, p); + DLIST_ADD(pconn->wr_list_avail, p); + + return 0; +} + +static inline int ibw_append_to_part(void *memctx, struct ibw_part *part, + char **pp, uint32_t add_len, int info) +{ + /* allocate more if necessary - it's an "evergrowing" buffer... */ + if (part->len + add_len > part->bufsize) { + if (part->buf==NULL) { + assert(part->len==0); + part->buf = talloc_size(memctx, add_len); + if (part->buf==NULL) { + sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n", + add_len, info); + return -1; + } + part->bufsize = add_len; + } else { + part->buf = talloc_realloc_size(memctx, + part->buf, part->len + add_len); + if (part->buf==NULL) { + sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n", + part->len, add_len, info); + return -1; + } + } + part->bufsize = part->len + add_len; + } + + /* consume pp */ + memcpy(part->buf + part->len, *pp, add_len); + *pp += add_len; + part->len += add_len; + part->to_read -= add_len; + + return 0; +} + +static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold) +{ + if (part->bufsize > threshold) { + talloc_free(part->buf); + part->buf = talloc_size(memctx, threshold); + if (part->buf==NULL) { + sprintf(ibw_lasterr, "talloc_size failed\n"); + return -1; + } + part->bufsize = threshold; + } + return 0; +} + +static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + int recv_index; + char *p; + uint32_t remain; + struct ibw_part *part; + + assert(pconn->cm_id->qp->qp_num==wc->qp_num); + assert((int)wc->wr_id > pctx->opts.max_send_wr); + recv_index = (int)wc->wr_id - pctx->opts.max_send_wr; + assert(recv_index < pctx->opts.max_recv_wr); + assert(wc->byte_len <= pctx->opts.recv_bufsize); + + p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize); + part = &pconn->part; + + remain = wc->byte_len; + while(remain) { + /* here always true: (part->len!=0 && part->to_read!=0) || + (part->len==0 && part->to_read==0) */ + if (part->len) { /* is there a partial msg to be continued? */ + int read_len = (part->to_read<=remain) ? part->to_read : remain; + if (ibw_append_to_part(pconn, part, &p, read_len, 421)) + goto error; + remain -= read_len; + + if (part->len<=sizeof(uint32_t) && part->to_read==0) { + assert(part->len==sizeof(uint32_t)); + /* set it again now... */ + part->to_read = *((uint32_t *)(part->buf)); + if (part->to_read<sizeof(uint32_t)) { + sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read); + goto error; + } + part->to_read -= sizeof(uint32_t); /* it's already read */ + } + + if (part->to_read==0) { + pctx->receive_func(conn, part->buf, part->len); + part->len = 0; /* tells not having partial data (any more) */ + if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold)) + goto error; + } + } else { + if (remain>=sizeof(uint32_t)) { + uint32_t msglen = *(uint32_t *)p; + if (msglen<sizeof(uint32_t)) { + sprintf(ibw_lasterr, "got msglen=%u\n", msglen); + goto error; + } + + /* mostly awaited case: */ + if (msglen<=remain) { + pctx->receive_func(conn, p, msglen); + p += msglen; + remain -= msglen; + } else { + part->to_read = msglen; + /* part->len is already 0 */ + if (ibw_append_to_part(pconn, part, &p, remain, 422)) + goto error; + remain = 0; /* to be continued ... */ + /* part->to_read > 0 here */ + } + } else { /* edge case: */ + part->to_read = sizeof(uint32_t); + /* part->len is already 0 */ + if (ibw_append_to_part(pconn, part, &p, remain, 423)) + goto error; + remain = 0; + /* part->to_read > 0 here */ + } + } + } /* <remain> is always decreased at least by 1 */ + + if (ibw_refill_cq_recv(conn)) + goto error; + + return 0; + +error: + DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr)); + conn->state = IBWC_ERROR; + return -1; +} + static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts) { int i; @@ -562,6 +734,9 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i opts->max_send_wr = 256; opts->max_recv_wr = 1024; + opts->avg_send_size = 1024; + opts->recv_bufsize = 256; + opts->recv_threshold = 1 * 1024 * 1024; for(i=0; i<nattr; i++) { name = attr[i].name; @@ -572,6 +747,12 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i opts->max_send_wr = atoi(value); else if (strcmp(name, "max_recv_wr")==0) opts->max_recv_wr = atoi(value); + else if (strcmp(name, "avg_send_size")==0) + opts->avg_send_size = atoi(value); + else if (strcmp(name, "recv_bufsize")==0) + opts->recv_bufsize = atoi(value); + else if (strcmp(name, "recv_threshold")==0) + opts->recv_threshold = atoi(value); else { sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name); return -1; @@ -584,8 +765,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, void *ctx_userdata, ibw_connstate_fn_t ibw_connstate, ibw_receive_fn_t ibw_receive, - struct event_context *ectx, - int max_msg_size) + struct event_context *ectx) { struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx); struct ibw_ctx_priv *pctx; @@ -640,8 +820,6 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, DEBUG(10, ("created pd %p\n", pctx->pd)); pctx->pagesize = sysconf(_SC_PAGESIZE); - pctx->qsize = pctx->opts.max_send_wr + pctx->opts.max_recv_wr; - pctx->max_msg_size = max_msg_size; return ctx; /* don't put code here */ @@ -772,8 +950,9 @@ int ibw_disconnect(struct ibw_conn *conn) return 0; } -int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key) +int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n) { + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p = pconn->wr_list_avail; @@ -785,8 +964,18 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key) DLIST_REMOVE(pconn->wr_list_avail, p); DLIST_ADD(pconn->wr_list_used, p); - *buf = (void *)p->msg; - *key = (void *)p; + if (n + sizeof(long) <= pctx->opts.avg_send_size) { + *buf = (void *)(p->msg + sizeof(long)); + *key = (void *)p; + } else { + p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large); + if (!p->msg_large) { + sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n"); + DEBUG(0, (ibw_lasterr)); + return -1; + } + *buf = (void *)(p->msg_large + sizeof(long)); + } return 0; } @@ -797,9 +986,9 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n) struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p = talloc_get_type(key, struct ibw_wr); struct ibv_sge list = { - .addr = (uintptr_t) p->msg, + .addr = (uintptr_t) NULL, .length = n, - .lkey = pconn->mr->lkey + .lkey = 0 }; struct ibv_send_wr wr = { .wr_id = p->wr_id, @@ -810,8 +999,20 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n) }; struct ibv_send_wr *bad_wr; - assert(p->msg==(char *)buf); - assert(n<=pctx->max_msg_size); + if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) { + assert((p->msg + sizeof(long))==(char *)buf); + list.lkey = pconn->mr_send->lkey; + list.addr = (uintptr_t) p->msg; + + *((uint32_t *)p->msg) = htonl(n); + } else { + assert((p->msg_large + sizeof(long))==(char *)buf); + assert(p->mr_large!=NULL); + list.lkey = p->mr_large->lkey; + list.addr = (uintptr_t) p->msg_large; + + *((uint32_t *)p->msg_large) = htonl(n); + } return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr); } diff --git a/ctdb/ib/ibwrapper.h b/ctdb/ib/ibwrapper.h index 31f7a4f170b..8934e68d9fb 100644 --- a/ctdb/ib/ibwrapper.h +++ b/ctdb/ib/ibwrapper.h @@ -107,8 +107,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, void *ctx_userdata, ibw_connstate_fn_t ibw_connstate, ibw_receive_fn_t ibw_receive, - struct event_context *ectx, - int max_msg_size); + struct event_context *ectx); /* * Must be called in states of (IBWS_ERROR, IBWS_READY, IBWS_CONNECT_REQUEST) @@ -186,7 +185,7 @@ int ibw_disconnect(struct ibw_conn *conn); * * Returns 0 on success. */ -int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key); +int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n); /* * Send the message in one diff --git a/ctdb/ib/ibwrapper_internal.h b/ctdb/ib/ibwrapper_internal.h index 04d82f9565d..6e34917755b 100644 --- a/ctdb/ib/ibwrapper_internal.h +++ b/ctdb/ib/ibwrapper_internal.h @@ -22,13 +22,20 @@ */ struct ibw_opts { - int max_send_wr; - int max_recv_wr; + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t avg_send_size; + uint32_t recv_bufsize; + uint32_t recv_threshold; }; struct ibw_wr { char *msg; /* initialized in ibw_init_memory once per connection */ int wr_id; /* position in wr_index list; also used as wr id */ + + char *msg_large; /* allocated specially for "large" message */ + struct ibv_mr *mr_large; + struct ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */ }; @@ -48,8 +55,13 @@ struct ibw_ctx_priv { ibw_receive_fn_t receive_func; /* see ibw_init */ long pagesize; /* sysconf result for memalign */ - int qsize; /* opts.max_send_wr + opts.max_recv_wr */ - int max_msg_size; /* see ibw_init */ +}; + +struct ibw_part { + char *buf; /* talloced memory buffer */ + uint32_t bufsize; /* allocated size of buf - always grows */ + uint32_t len; /* message part length */ + uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */ }; struct ibw_conn_priv { @@ -60,10 +72,17 @@ struct ibw_conn_priv { int is_accepted; struct ibv_cq *cq; /* qp is in cm_id */ - struct ibv_mr *mr; - char *buf; /* fixed size (qsize * opts.max_msg_size) buffer for send/recv */ + + char *buf_send; /* max_send_wr * avg_send_size */ + struct ibv_mr *mr_send; struct ibw_wr *wr_list_avail; struct ibw_wr *wr_list_used; struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */ + + /* buf_recv is a ring buffer */ + char *buf_recv; /* max_recv_wr * avg_recv_size */ + struct ibv_mr *mr_recv; + int recv_index; /* index of the next recv buffer when refilling */ + struct ibw_part part; }; |