summaryrefslogtreecommitdiffstats
path: root/ctdb/ib
diff options
context:
space:
mode:
Diffstat (limited to 'ctdb/ib')
-rw-r--r--ctdb/ib/ibwrapper.c397
-rw-r--r--ctdb/ib/ibwrapper.h5
-rw-r--r--ctdb/ib/ibwrapper_internal.h31
3 files changed, 326 insertions, 107 deletions
diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c
index b70b6caad6b..db6e303638b 100644
--- a/ctdb/ib/ibwrapper.c
+++ b/ctdb/ib/ibwrapper.c
@@ -49,7 +49,40 @@ static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
static void ibw_event_handler_verbs(struct event_context *ev,
struct fd_event *fde, uint16_t flags, void *private_data);
static int ibw_fill_cq(struct ibw_conn *conn);
+static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
+static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
+static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
+ int n, struct ibv_mr **ppmr)
+{
+ void *buf;
+ buf = memalign(pctx->pagesize, n);
+ if (!buf) {
+ sprintf(ibw_lasterr, "couldn't allocate memory\n");
+ return NULL;
+ }
+
+ *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
+ if (!*ppmr) {
+ sprintf(ibw_lasterr, "couldn't allocate mr\n");
+ free(buf);
+ return NULL;
+ }
+
+ return buf;
+}
+
+static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
+{
+ if (*ppmr!=NULL) {
+ ibv_dereg_mr(*ppmr);
+ *ppmr = NULL;
+ }
+ if (*ppbuf) {
+ free(*ppbuf);
+ *ppbuf = NULL;
+ }
+}
static int ibw_init_memory(struct ibw_conn *conn)
{
@@ -59,23 +92,26 @@ static int ibw_init_memory(struct ibw_conn *conn)
int i;
struct ibw_wr *p;
- pconn->buf = memalign(pctx->pagesize, pctx->max_msg_size);
- if (!pconn->buf) {
- sprintf(ibw_lasterr, "couldn't allocate work buf\n");
+ pconn->buf_send = ibw_alloc_mr(pctx, pconn,
+ pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send);
+ if (!pconn->buf_send) {
+ sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
return -1;
}
- pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf,
- pctx->qsize * pctx->max_msg_size, IBV_ACCESS_LOCAL_WRITE);
- if (!pconn->mr) {
- sprintf(ibw_lasterr, "couldn't allocate mr\n");
+
+ pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
+ pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv);
+ if (!pconn->buf_recv) {
+ sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
return -1;
}
- pconn->wr_index = talloc_size(pconn, pctx->qsize * sizeof(struct ibw_wr *));
+ pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *));
+ assert(pconn->wr_index!=NULL);
- for(i=0; i<pctx->qsize; i++) {
+ for(i=0; i<pctx->opts.max_send_wr; i++) {
p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
- p->msg = pconn->buf + (i * pctx->max_msg_size);
+ p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size);
p->wr_id = i;
DLIST_ADD(pconn->wr_list_avail, p);
@@ -117,14 +153,8 @@ static int ibw_ctx_destruct(struct ibw_ctx *ctx)
static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
{
/* free memory regions */
- if (pconn->mr) {
- ibv_dereg_mr(pconn->mr);
- pconn->mr = NULL;
- }
- if (pconn->buf) {
- free(pconn->buf); /* memalign-ed */
- pconn->buf = NULL;
- }
+ ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
+ ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
/* pconn->wr_index is freed by talloc */
/* pconn->wr_index[i] are freed by talloc */
@@ -204,7 +234,8 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn)
pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
/* init cq */
- pconn->cq = ibv_create_cq(pconn->cm_id->verbs, pctx->qsize,
+ pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
+ pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
conn, pconn->verbs_channel, 0);
if (pconn->cq==NULL) {
sprintf(ibw_lasterr, "ibv_create_cq failed\n");
@@ -244,8 +275,8 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
int rc;
struct ibv_sge list = {
.addr = (uintptr_t) NULL,
- .length = pctx->max_msg_size,
- .lkey = pconn->mr->lkey
+ .length = pctx->opts.recv_bufsize,
+ .lkey = pconn->mr_recv->lkey
};
struct ibv_recv_wr wr = {
.wr_id = 0,
@@ -253,17 +284,10 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
.num_sge = 1,
};
struct ibv_recv_wr *bad_wr;
- struct ibw_wr *p = pconn->wr_list_avail;
- if (p==NULL) {
- sprintf(ibw_lasterr, "out of wr_list_avail");
- DEBUG(0, (ibw_lasterr));
- return -1;
- }
- DLIST_REMOVE(pconn->wr_list_avail, p);
- DLIST_ADD(pconn->wr_list_used, p);
- list.addr = (uintptr_t) p->msg;
- wr.wr_id = p->wr_id;
+ list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
+ wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+ pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
if (rc) {
@@ -282,8 +306,8 @@ static int ibw_fill_cq(struct ibw_conn *conn)
int i, rc;
struct ibv_sge list = {
.addr = (uintptr_t) NULL,
- .length = pctx->max_msg_size,
- .lkey = pconn->mr->lkey
+ .length = pctx->opts.recv_bufsize,
+ .lkey = pconn->mr_recv->lkey
};
struct ibv_recv_wr wr = {
.wr_id = 0,
@@ -291,19 +315,11 @@ static int ibw_fill_cq(struct ibw_conn *conn)
.num_sge = 1,
};
struct ibv_recv_wr *bad_wr;
- struct ibw_wr *p;
for(i = pctx->opts.max_recv_wr; i!=0; i--) {
- p = pconn->wr_list_avail;
- if (p==NULL) {
- sprintf(ibw_lasterr, "out of wr_list_avail");
- DEBUG(0, (ibw_lasterr));
- return -1;
- }
- DLIST_REMOVE(pconn->wr_list_avail, p);
- DLIST_ADD(pconn->wr_list_used, p);
- list.addr = (uintptr_t) p->msg;
- wr.wr_id = p->wr_id;
+ list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
+ wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+ pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
if (rc) {
@@ -489,62 +505,61 @@ static void ibw_event_handler_verbs(struct event_context *ev,
struct ibv_wc wc;
int rc;
+ struct ibv_cq *ev_cq;
+ void *ev_ctx;
- rc = ibv_poll_cq(pconn->cq, 1, &wc);
- if (rc!=1) {
- sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
+ /* TODO: check whether if it's good to have more channels here... */
+ rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx);
+ if (rc) {
+ sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc);
goto error;
}
- if (wc.status) {
- sprintf(ibw_lasterr, "cq completion failed status %d\n",
- wc.status);
+ if (ev_cq != pconn->cq) {
+ sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
+ (unsigned int)ev_cq, (unsigned int)pconn->cq);
+ goto error;
+ }
+ rc = ibv_req_notify_cq(pconn->cq, 0);
+ if (rc) {
+ sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc);
goto error;
}
- switch(wc.opcode) {
- case IBV_WC_SEND:
- {
- struct ibw_wr *p;
-
- DEBUG(10, ("send completion\n"));
- assert(pconn->cm_id->qp->qp_num==wc.qp_num);
- assert(wc.wr_id < pctx->qsize);
- p = pconn->wr_index[wc.wr_id];
- DLIST_REMOVE(pconn->wr_list_used, p);
- DLIST_ADD(pconn->wr_list_avail, p);
+ while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
+ if (wc.status) {
+ sprintf(ibw_lasterr, "cq completion failed status %d\n",
+ wc.status);
+ goto error;
}
- break;
-
- case IBV_WC_RDMA_WRITE:
- DEBUG(10, ("rdma write completion\n"));
- break;
- case IBV_WC_RDMA_READ:
- DEBUG(10, ("rdma read completion\n"));
- break;
+ switch(wc.opcode) {
+ case IBV_WC_SEND:
+ DEBUG(10, ("send completion\n"));
+ if (ibw_wc_send(conn, &wc))
+ goto error;
+ break;
- case IBV_WC_RECV:
- {
- struct ibw_wr *p;
-
- assert(pconn->cm_id->qp->qp_num==wc.qp_num);
- assert(wc.wr_id < pctx->qsize);
- p = pconn->wr_index[wc.wr_id];
-
- DLIST_REMOVE(pconn->wr_list_used, p);
- DLIST_ADD(pconn->wr_list_avail, p);
+ case IBV_WC_RDMA_WRITE:
+ DEBUG(10, ("rdma write completion\n"));
+ break;
+ case IBV_WC_RDMA_READ:
+ DEBUG(10, ("rdma read completion\n"));
+ break;
+
+ case IBV_WC_RECV:
DEBUG(10, ("recv completion\n"));
- assert(wc.byte_len <= pctx->max_msg_size);
-
- pctx->receive_func(conn, p->msg, wc.byte_len);
- if (ibw_refill_cq_recv(conn))
+ if (ibw_wc_recv(conn, &wc))
goto error;
- }
- break;
+ break;
- default:
- sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
+ default:
+ sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
+ goto error;
+ }
+ }
+ if (rc!=0) {
+ sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
goto error;
}
@@ -555,6 +570,163 @@ error:
pctx->connstate_func(NULL, conn);
}
+static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
+{
+ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+ struct ibw_wr *p;
+
+ assert(pconn->cm_id->qp->qp_num==wc->qp_num);
+ assert(wc->wr_id < pctx->opts.max_send_wr);
+
+ p = pconn->wr_index[wc->wr_id];
+ if (p->msg_large) {
+ ibw_free_mr(&p->msg_large, &p->mr_large);
+ }
+
+ DLIST_REMOVE(pconn->wr_list_used, p);
+ DLIST_ADD(pconn->wr_list_avail, p);
+
+ return 0;
+}
+
+static inline int ibw_append_to_part(void *memctx, struct ibw_part *part,
+ char **pp, uint32_t add_len, int info)
+{
+ /* allocate more if necessary - it's an "evergrowing" buffer... */
+ if (part->len + add_len > part->bufsize) {
+ if (part->buf==NULL) {
+ assert(part->len==0);
+ part->buf = talloc_size(memctx, add_len);
+ if (part->buf==NULL) {
+ sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n",
+ add_len, info);
+ return -1;
+ }
+ part->bufsize = add_len;
+ } else {
+ part->buf = talloc_realloc_size(memctx,
+ part->buf, part->len + add_len);
+ if (part->buf==NULL) {
+ sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n",
+ part->len, add_len, info);
+ return -1;
+ }
+ }
+ part->bufsize = part->len + add_len;
+ }
+
+ /* consume pp */
+ memcpy(part->buf + part->len, *pp, add_len);
+ *pp += add_len;
+ part->len += add_len;
+ part->to_read -= add_len;
+
+ return 0;
+}
+
+static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold)
+{
+ if (part->bufsize > threshold) {
+ talloc_free(part->buf);
+ part->buf = talloc_size(memctx, threshold);
+ if (part->buf==NULL) {
+ sprintf(ibw_lasterr, "talloc_size failed\n");
+ return -1;
+ }
+ part->bufsize = threshold;
+ }
+ return 0;
+}
+
+static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
+{
+ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+ int recv_index;
+ char *p;
+ uint32_t remain;
+ struct ibw_part *part;
+
+ assert(pconn->cm_id->qp->qp_num==wc->qp_num);
+ assert((int)wc->wr_id > pctx->opts.max_send_wr);
+ recv_index = (int)wc->wr_id - pctx->opts.max_send_wr;
+ assert(recv_index < pctx->opts.max_recv_wr);
+ assert(wc->byte_len <= pctx->opts.recv_bufsize);
+
+ p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize);
+ part = &pconn->part;
+
+ remain = wc->byte_len;
+ while(remain) {
+ /* here always true: (part->len!=0 && part->to_read!=0) ||
+ (part->len==0 && part->to_read==0) */
+ if (part->len) { /* is there a partial msg to be continued? */
+ int read_len = (part->to_read<=remain) ? part->to_read : remain;
+ if (ibw_append_to_part(pconn, part, &p, read_len, 421))
+ goto error;
+ remain -= read_len;
+
+ if (part->len<=sizeof(uint32_t) && part->to_read==0) {
+ assert(part->len==sizeof(uint32_t));
+ /* set it again now... */
+ part->to_read = *((uint32_t *)(part->buf));
+ if (part->to_read<sizeof(uint32_t)) {
+ sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read);
+ goto error;
+ }
+ part->to_read -= sizeof(uint32_t); /* it's already read */
+ }
+
+ if (part->to_read==0) {
+ pctx->receive_func(conn, part->buf, part->len);
+ part->len = 0; /* tells not having partial data (any more) */
+ if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold))
+ goto error;
+ }
+ } else {
+ if (remain>=sizeof(uint32_t)) {
+ uint32_t msglen = *(uint32_t *)p;
+ if (msglen<sizeof(uint32_t)) {
+ sprintf(ibw_lasterr, "got msglen=%u\n", msglen);
+ goto error;
+ }
+
+ /* mostly awaited case: */
+ if (msglen<=remain) {
+ pctx->receive_func(conn, p, msglen);
+ p += msglen;
+ remain -= msglen;
+ } else {
+ part->to_read = msglen;
+ /* part->len is already 0 */
+ if (ibw_append_to_part(pconn, part, &p, remain, 422))
+ goto error;
+ remain = 0; /* to be continued ... */
+ /* part->to_read > 0 here */
+ }
+ } else { /* edge case: */
+ part->to_read = sizeof(uint32_t);
+ /* part->len is already 0 */
+ if (ibw_append_to_part(pconn, part, &p, remain, 423))
+ goto error;
+ remain = 0;
+ /* part->to_read > 0 here */
+ }
+ }
+ } /* <remain> is always decreased at least by 1 */
+
+ if (ibw_refill_cq_recv(conn))
+ goto error;
+
+ return 0;
+
+error:
+ DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
+ conn->state = IBWC_ERROR;
+ return -1;
+}
+
static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
{
int i;
@@ -562,6 +734,9 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
opts->max_send_wr = 256;
opts->max_recv_wr = 1024;
+ opts->avg_send_size = 1024;
+ opts->recv_bufsize = 256;
+ opts->recv_threshold = 1 * 1024 * 1024;
for(i=0; i<nattr; i++) {
name = attr[i].name;
@@ -572,6 +747,12 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
opts->max_send_wr = atoi(value);
else if (strcmp(name, "max_recv_wr")==0)
opts->max_recv_wr = atoi(value);
+ else if (strcmp(name, "avg_send_size")==0)
+ opts->avg_send_size = atoi(value);
+ else if (strcmp(name, "recv_bufsize")==0)
+ opts->recv_bufsize = atoi(value);
+ else if (strcmp(name, "recv_threshold")==0)
+ opts->recv_threshold = atoi(value);
else {
sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
return -1;
@@ -584,8 +765,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
void *ctx_userdata,
ibw_connstate_fn_t ibw_connstate,
ibw_receive_fn_t ibw_receive,
- struct event_context *ectx,
- int max_msg_size)
+ struct event_context *ectx)
{
struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);
struct ibw_ctx_priv *pctx;
@@ -640,8 +820,6 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
DEBUG(10, ("created pd %p\n", pctx->pd));
pctx->pagesize = sysconf(_SC_PAGESIZE);
- pctx->qsize = pctx->opts.max_send_wr + pctx->opts.max_recv_wr;
- pctx->max_msg_size = max_msg_size;
return ctx;
/* don't put code here */
@@ -772,8 +950,9 @@ int ibw_disconnect(struct ibw_conn *conn)
return 0;
}
-int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key)
+int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
{
+ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p = pconn->wr_list_avail;
@@ -785,8 +964,18 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key)
DLIST_REMOVE(pconn->wr_list_avail, p);
DLIST_ADD(pconn->wr_list_used, p);
- *buf = (void *)p->msg;
- *key = (void *)p;
+ if (n + sizeof(long) <= pctx->opts.avg_send_size) {
+ *buf = (void *)(p->msg + sizeof(long));
+ *key = (void *)p;
+ } else {
+ p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
+ if (!p->msg_large) {
+ sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n");
+ DEBUG(0, (ibw_lasterr));
+ return -1;
+ }
+ *buf = (void *)(p->msg_large + sizeof(long));
+ }
return 0;
}
@@ -797,9 +986,9 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
struct ibv_sge list = {
- .addr = (uintptr_t) p->msg,
+ .addr = (uintptr_t) NULL,
.length = n,
- .lkey = pconn->mr->lkey
+ .lkey = 0
};
struct ibv_send_wr wr = {
.wr_id = p->wr_id,
@@ -810,8 +999,20 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
};
struct ibv_send_wr *bad_wr;
- assert(p->msg==(char *)buf);
- assert(n<=pctx->max_msg_size);
+ if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
+ assert((p->msg + sizeof(long))==(char *)buf);
+ list.lkey = pconn->mr_send->lkey;
+ list.addr = (uintptr_t) p->msg;
+
+ *((uint32_t *)p->msg) = htonl(n);
+ } else {
+ assert((p->msg_large + sizeof(long))==(char *)buf);
+ assert(p->mr_large!=NULL);
+ list.lkey = p->mr_large->lkey;
+ list.addr = (uintptr_t) p->msg_large;
+
+ *((uint32_t *)p->msg_large) = htonl(n);
+ }
return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
}
diff --git a/ctdb/ib/ibwrapper.h b/ctdb/ib/ibwrapper.h
index 31f7a4f170b..8934e68d9fb 100644
--- a/ctdb/ib/ibwrapper.h
+++ b/ctdb/ib/ibwrapper.h
@@ -107,8 +107,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
void *ctx_userdata,
ibw_connstate_fn_t ibw_connstate,
ibw_receive_fn_t ibw_receive,
- struct event_context *ectx,
- int max_msg_size);
+ struct event_context *ectx);
/*
* Must be called in states of (IBWS_ERROR, IBWS_READY, IBWS_CONNECT_REQUEST)
@@ -186,7 +185,7 @@ int ibw_disconnect(struct ibw_conn *conn);
*
* Returns 0 on success.
*/
-int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key);
+int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n);
/*
* Send the message in one
diff --git a/ctdb/ib/ibwrapper_internal.h b/ctdb/ib/ibwrapper_internal.h
index 04d82f9565d..6e34917755b 100644
--- a/ctdb/ib/ibwrapper_internal.h
+++ b/ctdb/ib/ibwrapper_internal.h
@@ -22,13 +22,20 @@
*/
struct ibw_opts {
- int max_send_wr;
- int max_recv_wr;
+ uint32_t max_send_wr;
+ uint32_t max_recv_wr;
+ uint32_t avg_send_size;
+ uint32_t recv_bufsize;
+ uint32_t recv_threshold;
};
struct ibw_wr {
char *msg; /* initialized in ibw_init_memory once per connection */
int wr_id; /* position in wr_index list; also used as wr id */
+
+ char *msg_large; /* allocated specially for "large" message */
+ struct ibv_mr *mr_large;
+
struct ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */
};
@@ -48,8 +55,13 @@ struct ibw_ctx_priv {
ibw_receive_fn_t receive_func; /* see ibw_init */
long pagesize; /* sysconf result for memalign */
- int qsize; /* opts.max_send_wr + opts.max_recv_wr */
- int max_msg_size; /* see ibw_init */
+};
+
+struct ibw_part {
+ char *buf; /* talloced memory buffer */
+ uint32_t bufsize; /* allocated size of buf - always grows */
+ uint32_t len; /* message part length */
+ uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */
};
struct ibw_conn_priv {
@@ -60,10 +72,17 @@ struct ibw_conn_priv {
int is_accepted;
struct ibv_cq *cq; /* qp is in cm_id */
- struct ibv_mr *mr;
- char *buf; /* fixed size (qsize * opts.max_msg_size) buffer for send/recv */
+
+ char *buf_send; /* max_send_wr * avg_send_size */
+ struct ibv_mr *mr_send;
struct ibw_wr *wr_list_avail;
struct ibw_wr *wr_list_used;
struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */
+
+ /* buf_recv is a ring buffer */
+ char *buf_recv; /* max_recv_wr * avg_recv_size */
+ struct ibv_mr *mr_recv;
+ int recv_index; /* index of the next recv buffer when refilling */
+ struct ibw_part part;
};