summaryrefslogtreecommitdiffstats
path: root/ctdb/ib
diff options
context:
space:
mode:
authorPeter Somogyi <psomogyi@gamax.hu>2007-02-08 19:06:14 +0100
committerPeter Somogyi <psomogyi@gamax.hu>2007-02-08 19:06:14 +0100
commitcae71b84d660a52d8c3a2fef346a002c2523a78c (patch)
tree7dcda02e19f4a784a335d821ab6572293756494f /ctdb/ib
parent3222a41b9b94f1dfca54e6d07040da75016d89f1 (diff)
downloadsamba-cae71b84d660a52d8c3a2fef346a002c2523a78c.tar.gz
samba-cae71b84d660a52d8c3a2fef346a002c2523a78c.tar.xz
samba-cae71b84d660a52d8c3a2fef346a002c2523a78c.zip
ib: fragment sent buf + many bugfixes
It came to light I have to fragment the send buffer in case destination's to fit receiver's buffers. Additionally fixed many bugs. Still testing. + TODO: clean code. (This used to be ctdb commit 2f8876f09bc92169487cb077326579044560a121)
Diffstat (limited to 'ctdb/ib')
-rw-r--r--ctdb/ib/ibwrapper.c241
-rw-r--r--ctdb/ib/ibwrapper_internal.h40
2 files changed, 198 insertions, 83 deletions
diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c
index 84dcd088831..2f8dd256f91 100644
--- a/ctdb/ib/ibwrapper.c
+++ b/ctdb/ib/ibwrapper.c
@@ -39,6 +39,7 @@
#include "ibwrapper.h"
#include <rdma/rdma_cma.h>
+#include "infiniband/sa-kern-abi.h"
#include "ibwrapper_internal.h"
#include "lib/util/dlinklist.h"
@@ -46,11 +47,17 @@
#define IBW_LASTERR_BUFSIZE 512
static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
+#define IBW_MAX_SEND_WR 256
+#define IBW_MAX_RECV_WR 1024
+#define IBW_RECV_BUFSIZE 256
+#define IBW_RECV_THRESHOLD (1 * 1024 * 1024)
+
static void ibw_event_handler_verbs(struct event_context *ev,
struct fd_event *fde, uint16_t flags, void *private_data);
static int ibw_fill_cq(struct ibw_conn *conn);
-static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
-static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
+static int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
+static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
+static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len);
static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
uint32_t n, struct ibv_mr **ppmr)
@@ -97,7 +104,7 @@ static int ibw_init_memory(struct ibw_conn *conn)
DEBUG(10, ("ibw_init_memory(cmid: %p)\n", pconn->cm_id));
pconn->buf_send = ibw_alloc_mr(pctx, pconn,
- opts->max_send_wr * opts->avg_send_size, &pconn->mr_send);
+ opts->max_send_wr * opts->recv_bufsize, &pconn->mr_send);
if (!pconn->buf_send) {
sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
return -1;
@@ -115,7 +122,7 @@ static int ibw_init_memory(struct ibw_conn *conn)
for(i=0; i<opts->max_send_wr; i++) {
p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
- p->msg = pconn->buf_send + (i * opts->avg_send_size);
+ p->buf = pconn->buf_send + (i * opts->recv_bufsize);
p->wr_id = i;
DLIST_ADD(pconn->wr_list_avail, p);
@@ -228,6 +235,7 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn)
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibv_qp_init_attr init_attr;
+ struct ibv_qp_attr attr;
int rc;
DEBUG(10, ("ibw_setup_cq_qp(cmid: %p)\n", pconn->cm_id));
@@ -286,6 +294,12 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn)
}
/* elase result is in pconn->cm_id->qp */
+ rc = ibv_query_qp(pconn->cm_id->qp, &attr, IBV_QP_PATH_MTU, &init_attr);
+ if (rc) {
+ sprintf(ibw_lasterr, "ibv_query_qp failed with %d\n", rc);
+ return rc;
+ }
+
return ibw_fill_cq(conn);
}
@@ -295,12 +309,12 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
int rc;
struct ibv_sge list = {
- .addr = (uintptr_t) NULL,
+ .addr = (uintptr_t) NULL, /* filled below */
.length = pctx->opts.recv_bufsize,
- .lkey = pconn->mr_recv->lkey
+ .lkey = pconn->mr_recv->lkey /* always the same */
};
struct ibv_recv_wr wr = {
- .wr_id = 0,
+ .wr_id = 0, /* filled below */
.sg_list = &list,
.num_sge = 1,
};
@@ -314,7 +328,7 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
if (rc) {
- sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
+ sprintf(ibw_lasterr, "refill/ibv_post_recv failed with %d\n", rc);
DEBUG(0, (ibw_lasterr));
return -2;
}
@@ -328,12 +342,12 @@ static int ibw_fill_cq(struct ibw_conn *conn)
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
int i, rc;
struct ibv_sge list = {
- .addr = (uintptr_t) NULL,
+ .addr = (uintptr_t) NULL, /* filled below */
.length = pctx->opts.recv_bufsize,
- .lkey = pconn->mr_recv->lkey
+ .lkey = pconn->mr_recv->lkey /* always the same */
};
struct ibv_recv_wr wr = {
- .wr_id = 0,
+ .wr_id = 0, /* filled below */
.sg_list = &list,
.num_sge = 1,
};
@@ -348,7 +362,7 @@ static int ibw_fill_cq(struct ibw_conn *conn)
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
if (rc) {
- sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
+ sprintf(ibw_lasterr, "fill/ibv_post_recv failed with %d\n", rc);
DEBUG(0, (ibw_lasterr));
return -2;
}
@@ -532,8 +546,7 @@ static void ibw_event_handler_verbs(struct event_context *ev,
goto error;
}
if (ev_cq != pconn->cq) {
- sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
- (unsigned int)ev_cq, (unsigned int)pconn->cq);
+ sprintf(ibw_lasterr, "ev_cq(%p) != pconn->cq(%p)\n", ev_cq, pconn->cq);
goto error;
}
rc = ibv_req_notify_cq(pconn->cq, 0);
@@ -544,8 +557,8 @@ static void ibw_event_handler_verbs(struct event_context *ev,
while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
if (wc.status) {
- sprintf(ibw_lasterr, "cq completion failed status %d\n",
- wc.status);
+ sprintf(ibw_lasterr, "cq completion failed status %d rc %d\n",
+ wc.status, rc);
goto error;
}
@@ -587,12 +600,13 @@ error:
pctx->connstate_func(NULL, conn);
}
-static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
+static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p;
int send_index;
+ int rc = 0;
DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n",
pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
@@ -605,10 +619,19 @@ static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
if (send_index < pctx->opts.max_send_wr) {
DEBUG(10, ("ibw_wc_send#1 %u\n", (int)wc->wr_id));
p = pconn->wr_index[send_index];
- if (p->msg_large)
- ibw_free_mr(&p->msg_large, &p->mr_large);
- DLIST_REMOVE(pconn->wr_list_used, p);
- DLIST_ADD(pconn->wr_list_avail, p);
+ if (p->buf_large!=NULL) {
+ if (p->ref_cnt) {
+ /* awaiting more of it... */
+ p->ref_cnt--;
+ } else {
+ ibw_free_mr(&p->buf_large, &p->mr_large);
+ DLIST_REMOVE(pconn->wr_list_used, p);
+ DLIST_ADD(pconn->wr_list_avail, p);
+ }
+ } else { /* nasty - but necessary */
+ DLIST_REMOVE(pconn->wr_list_used, p);
+ DLIST_ADD(pconn->wr_list_avail, p);
+ }
} else { /* "extra" request - not optimized */
DEBUG(10, ("ibw_wc_send#2 %u\n", (int)wc->wr_id));
for(p=pconn->extra_sent; p!=NULL; p=p->next)
@@ -618,25 +641,42 @@ static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
sprintf(ibw_lasterr, "failed to find wr_id %d\n", (int)wc->wr_id);
return -1;
}
- ibw_free_mr(&p->msg_large, &p->mr_large);
- DLIST_REMOVE(pconn->extra_sent, p);
- DLIST_ADD(pconn->extra_avail, p);
+ if (p->ref_cnt) {
+ p->ref_cnt--;
+ } else {
+ ibw_free_mr(&p->buf_large, &p->mr_large);
+ DLIST_REMOVE(pconn->extra_sent, p);
+ DLIST_ADD(pconn->extra_avail, p);
+ }
}
if (pconn->queue) {
+ uint32_t msg_size;
+
DEBUG(10, ("ibw_wc_send#queue %u\n", (int)wc->wr_id));
+
p = pconn->queue;
- DLIST_REMOVE(pconn->queue, p);
+ assert(p->queued_ref_cnt>0);
+ p->queued_ref_cnt--;
+
+ msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen;
+
assert(p->queued_msg!=NULL);
- ibw_send(conn, p->queued_msg, p, ntohl(*(uint32_t *)p->queued_msg));
- p->queued_msg = NULL;
+ assert(msg_size!=0);
+ rc = ibw_send_packet(conn, p->queued_msg, p, msg_size);
+ if (p->queued_ref_cnt) {
+ p->queued_msg += pctx->opts.recv_bufsize;
+ } else {
+ DLIST_REMOVE2(pconn->queue, p, qprev, qnext);
+ p->queued_msg = NULL;
+ }
}
- return 0;
+ return rc;
}
-static inline int ibw_append_to_part(struct ibw_conn_priv *pconn,
+static int ibw_append_to_part(struct ibw_conn_priv *pconn,
struct ibw_part *part, char **pp, uint32_t add_len, int info)
{
DEBUG(10, ("ibw_append_to_part: cmid=%p, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n",
@@ -674,7 +714,7 @@ static inline int ibw_append_to_part(struct ibw_conn_priv *pconn,
return 0;
}
-static inline int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn,
+static int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn,
struct ibw_part *part, uint32_t threshold)
{
DEBUG(10, ("ibw_wc_mem_threshold: cmid=%p, (bs=%u, len=%u, tr=%u), thr=%u\n",
@@ -694,7 +734,7 @@ static inline int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn,
return 0;
}
-static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
+static int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
@@ -786,11 +826,10 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
DEBUG(10, ("ibw_process_init_attrs: nattr: %d\n", nattr));
- opts->max_send_wr = 256;
- opts->max_recv_wr = 1024;
- opts->avg_send_size = 1024;
- opts->recv_bufsize = 256;
- opts->recv_threshold = 1 * 1024 * 1024;
+ opts->max_send_wr = IBW_MAX_SEND_WR;
+ opts->max_recv_wr = IBW_MAX_RECV_WR;
+ opts->recv_bufsize = IBW_RECV_BUFSIZE;
+ opts->recv_threshold = IBW_RECV_THRESHOLD;
for(i=0; i<nattr; i++) {
name = attr[i].name;
@@ -801,8 +840,6 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
opts->max_send_wr = atoi(value);
else if (strcmp(name, "max_recv_wr")==0)
opts->max_recv_wr = atoi(value);
- else if (strcmp(name, "avg_send_size")==0)
- opts->avg_send_size = atoi(value);
else if (strcmp(name, "recv_bufsize")==0)
opts->recv_bufsize = atoi(value);
else if (strcmp(name, "recv_threshold")==0)
@@ -1003,7 +1040,7 @@ int ibw_disconnect(struct ibw_conn *conn)
rc = rdma_disconnect(pconn->cm_id);
if (rc) {
- sprintf(ibw_lasterr, "ibw_disconnect failed with %d", rc);
+ sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc);
DEBUG(0, (ibw_lasterr));
return rc;
}
@@ -1023,15 +1060,15 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l
DLIST_REMOVE(pconn->wr_list_avail, p);
DLIST_ADD(pconn->wr_list_used, p);
- if (len <= pctx->opts.avg_send_size) {
- *buf = (void *)p->msg;
+ if (len <= pctx->opts.recv_bufsize) {
+ *buf = (void *)p->buf;
} else {
- p->msg_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);
- if (!p->msg_large) {
+ p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);
+ if (p->buf_large==NULL) {
sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n");
goto error;
}
- *buf = (void *)p->msg_large;
+ *buf = (void *)p->buf_large;
}
/* p->wr_id is already filled in ibw_init_memory */
} else {
@@ -1041,7 +1078,7 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l
if (!p) {
p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr);
if (p==NULL) {
- sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)", pconn->extra_max);
+ sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max);
goto error;
}
p->wr_id = pctx->opts.max_send_wr + pconn->extra_max;
@@ -1054,40 +1091,42 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l
default: break;
}
}
- DLIST_REMOVE(pconn->extra_avail, p);
- p->msg_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);
- if (!p->msg_large) {
- sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed");
+ p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);
+ if (p->buf_large==NULL) {
+ sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed\n");
goto error;
}
- *buf = (void *)p->msg_large;
+ *buf = (void *)p->buf_large;
+
+ DLIST_REMOVE(pconn->extra_avail, p);
+ /* we don't have prepared index for this, so that
+ * we will have to find this by wr_id later on */
+ DLIST_ADD(pconn->extra_sent, p);
}
*key = (void *)p;
return 0;
error:
- DEBUG(0, ("ibw_alloc_send_buf error: %s\n", ibw_lasterr));
+ DEBUG(0, ("ibw_alloc_send_buf error: %s", ibw_lasterr));
return -1;
}
-int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len)
+static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
- struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
int rc;
- *((uint32_t *)buf) = htonl(len);
-
/* can we send it right now? */
if (pconn->wr_sent<pctx->opts.max_send_wr) {
+ struct ibv_send_wr *bad_wr;
struct ibv_sge list = {
- .addr = (uintptr_t) NULL,
+ .addr = (uintptr_t)buf,
.length = len,
- .lkey = 0
+ .lkey = pconn->mr_send->lkey
};
struct ibv_send_wr wr = {
.wr_id = p->wr_id + pctx->opts.max_recv_wr,
@@ -1096,16 +1135,13 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len)
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
};
- struct ibv_send_wr *bad_wr;
-
- DEBUG(10, ("ibw_send#1(cmid: %p, wrid: %u, n: %d)\n",
- pconn->cm_id, (uint32_t)wr.wr_id, len));
- list.addr = (uintptr_t)buf;
- if (p->msg_large==NULL) {
- list.lkey = pconn->mr_send->lkey;
+ if (p->buf_large==NULL) {
+ DEBUG(10, ("ibw_send#normal(cmid: %p, wrid: %u, n: %d)\n",
+ pconn->cm_id, (uint32_t)wr.wr_id, len));
} else {
- assert(p->mr_large!=NULL);
+ DEBUG(10, ("ibw_send#large(cmid: %p, wrid: %u, n: %d)\n",
+ pconn->cm_id, (uint32_t)wr.wr_id, len));
list.lkey = p->mr_large->lkey;
}
@@ -1113,26 +1149,72 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len)
if (rc) {
sprintf(ibw_lasterr, "ibv_post_send error %d (%d)\n",
rc, pconn->wr_sent);
- DEBUG(0, (ibw_lasterr));
- } else {
- /* good case */
- if (p->wr_id>=pctx->opts.max_send_wr) {
- /* we don't have prepared index for this, so that
- * we will have to find this later on */
- DLIST_ADD(pconn->extra_sent, p);
- }
- pconn->wr_sent++;
+ goto error;
}
+
+ pconn->wr_sent++;
+
return rc;
} /* else put the request into our own queue: */
- DEBUG(10, ("ibw_send#2(cmid: %p, len: %u)\n", pconn->cm_id, len));
+ DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len));
/* to be sent by ibw_wc_send */
- DLIST_ADD_END(pconn->queue, p, struct ibw_wr *); /* TODO: optimize */
- p->queued_msg = buf;
+ /* regardless "normal" or [a part of] "large" packet */
+ if (!p->queued_ref_cnt) {
+ DLIST_ADD_END2(pconn->queue, p, struct ibw_wr *,
+ qprev, qnext); /* TODO: optimize */
+ p->queued_msg = buf;
+ }
+ p->queued_ref_cnt++;
+ p->queued_rlen = len; /* last wins; see ibw_wc_send */
return 0;
+error:
+ DEBUG(0, (ibw_lasterr));
+ return -1;
+}
+
+int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len)
+{
+ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+ struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
+ int rc;
+
+ assert(len>=sizeof(uint32_t));
+ *((uint32_t *)buf) = htonl(len);
+
+ if (len > pctx->opts.recv_bufsize) {
+ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+ int rlen = len;
+ char *packet = (char *)buf;
+ uint32_t recv_bufsize = pctx->opts.recv_bufsize;
+
+ DEBUG(10, ("ibw_send#frag(cmid: %p, buf: %p, len: %u)\n",
+ pconn->cm_id, buf, len));
+
+ /* single threaded => no race here: */
+ assert(p->ref_cnt==0);
+ while(rlen > recv_bufsize) {
+ rc = ibw_send_packet(conn, packet, p, recv_bufsize);
+ if (rc)
+ return rc;
+ packet += recv_bufsize;
+ rlen -= recv_bufsize;
+ p->ref_cnt++; /* not good to have it in ibw_send_packet */
+ }
+ if (rlen) {
+ rc = ibw_send_packet(conn, packet, p, rlen);
+ p->ref_cnt++; /* not good to have it in ibw_send_packet */
+ }
+ p->ref_cnt--; /* for the same handling */
+ } else {
+ assert(p->ref_cnt==0);
+ assert(p->queued_ref_cnt==0);
+
+ rc = ibw_send_packet(conn, buf, p, len);
+ }
+ return rc;
}
int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key)
@@ -1145,8 +1227,8 @@ int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key)
assert(buf!=NULL);
assert(conn!=NULL);
- if (p->msg_large)
- ibw_free_mr(&p->msg_large, &p->mr_large);
+ if (p->buf_large!=NULL)
+ ibw_free_mr(&p->buf_large, &p->mr_large);
/* parallel case */
if (p->wr_id < pctx->opts.max_send_wr) {
@@ -1155,6 +1237,7 @@ int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key)
DLIST_ADD(pconn->wr_list_avail, p);
} else { /* "extra" packet */
DEBUG(10, ("ibw_cancel_send_buf#2 %u", (int)p->wr_id));
+ DLIST_REMOVE(pconn->extra_sent, p);
DLIST_ADD(pconn->extra_avail, p);
}
diff --git a/ctdb/ib/ibwrapper_internal.h b/ctdb/ib/ibwrapper_internal.h
index a879427a115..9c6bfab519e 100644
--- a/ctdb/ib/ibwrapper_internal.h
+++ b/ctdb/ib/ibwrapper_internal.h
@@ -24,21 +24,25 @@
struct ibw_opts {
uint32_t max_send_wr;
uint32_t max_recv_wr;
- uint32_t avg_send_size;
uint32_t recv_bufsize;
uint32_t recv_threshold;
};
struct ibw_wr {
- char *msg; /* initialized in ibw_init_memory once per connection */
+ char *buf; /* initialized in ibw_init_memory once per connection */
int wr_id; /* position in wr_index list; also used as wr id */
- char *msg_large; /* allocated specially for "large" message */
+ char *buf_large; /* allocated specially for "large" message */
struct ibv_mr *mr_large;
+ int ref_cnt; /* reference count for ibw_wc_send to know when to release */
char *queued_msg; /* set at ibw_send - can be different than above */
+ int queued_ref_cnt; /* instead of adding the same to the queue again */
+ uint32_t queued_rlen; /* last wins when queued_ref_cnt>0; or simple msg size */
struct ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */
+ /* or extra_sent or extra_avail */
+ struct ibw_wr *qnext, *qprev; /* in queue */
};
struct ibw_ctx_priv {
@@ -81,11 +85,12 @@ struct ibw_conn_priv {
struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */
int wr_sent; /* # of send wrs in the CQ */
- struct ibw_wr *queue;
struct ibw_wr *extra_sent;
struct ibw_wr *extra_avail;
int extra_max; /* max wr_id in the queue */
+ struct ibw_wr *queue;
+
/* buf_recv is a ring buffer */
char *buf_recv; /* max_recv_wr * avg_recv_size */
struct ibv_mr *mr_recv;
@@ -93,3 +98,30 @@ struct ibw_conn_priv {
struct ibw_part part;
};
+/* remove an element from a list - element doesn't have to be in list. */
+#define DLIST_REMOVE2(list, p, prev, next) \
+do { \
+ if ((p) == (list)) { \
+ (list) = (p)->next; \
+ if (list) (list)->prev = NULL; \
+ } else { \
+ if ((p)->prev) (p)->prev->next = (p)->next; \
+ if ((p)->next) (p)->next->prev = (p)->prev; \
+ } \
+ if ((p) != (list)) (p)->next = (p)->prev = NULL; \
+} while (0)
+
+/* hook into the end of the list - needs a tmp pointer */
+#define DLIST_ADD_END2(list, p, type, prev, next) \
+do { \
+ if (!(list)) { \
+ (list) = (p); \
+ (p)->next = (p)->prev = NULL; \
+ } else { \
+ type tmp; \
+ for (tmp = (list); tmp->next; tmp = tmp->next) ; \
+ tmp->next = (p); \
+ (p)->next = NULL; \
+ (p)->prev = tmp; \
+ } \
+} while (0)