summaryrefslogtreecommitdiffstats
path: root/ctdb/ib
diff options
context:
space:
mode:
authorPeter Somogyi <psomogyi@gamax.hu>2006-12-20 17:42:58 +0100
committerPeter Somogyi <psomogyi@gamax.hu>2006-12-20 17:42:58 +0100
commitefd2903e0fa4f52f42b9ad33274d224db70db4b9 (patch)
tree80bc9772e8f29c1f3351d2c88c44e40557ba72bb /ctdb/ib
parente667345409e0ca2322a4f0dd8e4d358796b8b661 (diff)
parent6dbaa5abfcde3c6be286ed8b59ff1fe12665c032 (diff)
downloadsamba-efd2903e0fa4f52f42b9ad33274d224db70db4b9.tar.gz
samba-efd2903e0fa4f52f42b9ad33274d224db70db4b9.tar.xz
samba-efd2903e0fa4f52f42b9ad33274d224db70db4b9.zip
Made receiver handle partial packets.
(This used to be ctdb commit 808fd658552e489825fb22453755e225549ebfcc)
Diffstat (limited to 'ctdb/ib')
-rw-r--r--ctdb/ib/ibwrapper.c254
-rw-r--r--ctdb/ib/ibwrapper_internal.h19
2 files changed, 219 insertions, 54 deletions
diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c
index c04505bc474..db6e303638b 100644
--- a/ctdb/ib/ibwrapper.c
+++ b/ctdb/ib/ibwrapper.c
@@ -49,6 +49,8 @@ static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
static void ibw_event_handler_verbs(struct event_context *ev,
struct fd_event *fde, uint16_t flags, void *private_data);
static int ibw_fill_cq(struct ibw_conn *conn);
+static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
+static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
int n, struct ibv_mr **ppmr)
@@ -503,67 +505,61 @@ static void ibw_event_handler_verbs(struct event_context *ev,
struct ibv_wc wc;
int rc;
+ struct ibv_cq *ev_cq;
+ void *ev_ctx;
- rc = ibv_poll_cq(pconn->cq, 1, &wc);
- if (rc!=1) {
- sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
+ /* TODO: check whether if it's good to have more channels here... */
+ rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx);
+ if (rc) {
+ sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc);
goto error;
}
- if (wc.status) {
- sprintf(ibw_lasterr, "cq completion failed status %d\n",
- wc.status);
+ if (ev_cq != pconn->cq) {
+ sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
+ (unsigned int)ev_cq, (unsigned int)pconn->cq);
+ goto error;
+ }
+ rc = ibv_req_notify_cq(pconn->cq, 0);
+ if (rc) {
+ sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc);
goto error;
}
- switch(wc.opcode) {
- case IBV_WC_SEND:
- {
- struct ibw_wr *p;
-
- DEBUG(10, ("send completion\n"));
- assert(pconn->cm_id->qp->qp_num==wc.qp_num);
- assert(wc.wr_id < pctx->opts.max_send_wr);
-
- p = pconn->wr_index[wc.wr_id];
- if (p->msg_large) {
- ibw_free_mr(&p->msg_large, &p->mr_large);
- }
-
- DLIST_REMOVE(pconn->wr_list_used, p);
- DLIST_ADD(pconn->wr_list_avail, p);
+ while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
+ if (wc.status) {
+ sprintf(ibw_lasterr, "cq completion failed status %d\n",
+ wc.status);
+ goto error;
}
- break;
-
- case IBV_WC_RDMA_WRITE:
- DEBUG(10, ("rdma write completion\n"));
- break;
- case IBV_WC_RDMA_READ:
- DEBUG(10, ("rdma read completion\n"));
- break;
+ switch(wc.opcode) {
+ case IBV_WC_SEND:
+ DEBUG(10, ("send completion\n"));
+ if (ibw_wc_send(conn, &wc))
+ goto error;
+ break;
- case IBV_WC_RECV:
- {
- int recv_index;
+ case IBV_WC_RDMA_WRITE:
+ DEBUG(10, ("rdma write completion\n"));
+ break;
+
+ case IBV_WC_RDMA_READ:
+ DEBUG(10, ("rdma read completion\n"));
+ break;
+ case IBV_WC_RECV:
DEBUG(10, ("recv completion\n"));
- assert(pconn->cm_id->qp->qp_num==wc.qp_num);
- assert((int)wc.wr_id > pctx->opts.max_send_wr);
- recv_index = (int)wc.wr_id - pctx->opts.max_send_wr;
- assert(recv_index < pctx->opts.max_recv_wr);
- assert(wc.byte_len <= pctx->opts.recv_bufsize);
-
-/* TODO: take care of fragmented messages !!! */
- pctx->receive_func(conn,
- pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize),
- wc.byte_len);
- if (ibw_refill_cq_recv(conn))
+ if (ibw_wc_recv(conn, &wc))
goto error;
- }
- break;
+ break;
- default:
- sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
+ default:
+ sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
+ goto error;
+ }
+ }
+ if (rc!=0) {
+ sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
goto error;
}
@@ -574,6 +570,163 @@ error:
pctx->connstate_func(NULL, conn);
}
+static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
+{
+ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+ struct ibw_wr *p;
+
+ assert(pconn->cm_id->qp->qp_num==wc->qp_num);
+ assert(wc->wr_id < pctx->opts.max_send_wr);
+
+ p = pconn->wr_index[wc->wr_id];
+ if (p->msg_large) {
+ ibw_free_mr(&p->msg_large, &p->mr_large);
+ }
+
+ DLIST_REMOVE(pconn->wr_list_used, p);
+ DLIST_ADD(pconn->wr_list_avail, p);
+
+ return 0;
+}
+
+static inline int ibw_append_to_part(void *memctx, struct ibw_part *part,
+ char **pp, uint32_t add_len, int info)
+{
+ /* allocate more if necessary - it's an "evergrowing" buffer... */
+ if (part->len + add_len > part->bufsize) {
+ if (part->buf==NULL) {
+ assert(part->len==0);
+ part->buf = talloc_size(memctx, add_len);
+ if (part->buf==NULL) {
+ sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n",
+ add_len, info);
+ return -1;
+ }
+ part->bufsize = add_len;
+ } else {
+ part->buf = talloc_realloc_size(memctx,
+ part->buf, part->len + add_len);
+ if (part->buf==NULL) {
+ sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n",
+ part->len, add_len, info);
+ return -1;
+ }
+ }
+ part->bufsize = part->len + add_len;
+ }
+
+ /* consume pp */
+ memcpy(part->buf + part->len, *pp, add_len);
+ *pp += add_len;
+ part->len += add_len;
+ part->to_read -= add_len;
+
+ return 0;
+}
+
+static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold)
+{
+ if (part->bufsize > threshold) {
+ talloc_free(part->buf);
+ part->buf = talloc_size(memctx, threshold);
+ if (part->buf==NULL) {
+ sprintf(ibw_lasterr, "talloc_size failed\n");
+ return -1;
+ }
+ part->bufsize = threshold;
+ }
+ return 0;
+}
+
+static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
+{
+ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+ int recv_index;
+ char *p;
+ uint32_t remain;
+ struct ibw_part *part;
+
+ assert(pconn->cm_id->qp->qp_num==wc->qp_num);
+ assert((int)wc->wr_id > pctx->opts.max_send_wr);
+ recv_index = (int)wc->wr_id - pctx->opts.max_send_wr;
+ assert(recv_index < pctx->opts.max_recv_wr);
+ assert(wc->byte_len <= pctx->opts.recv_bufsize);
+
+ p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize);
+ part = &pconn->part;
+
+ remain = wc->byte_len;
+ while(remain) {
+ /* here always true: (part->len!=0 && part->to_read!=0) ||
+ (part->len==0 && part->to_read==0) */
+ if (part->len) { /* is there a partial msg to be continued? */
+ int read_len = (part->to_read<=remain) ? part->to_read : remain;
+ if (ibw_append_to_part(pconn, part, &p, read_len, 421))
+ goto error;
+ remain -= read_len;
+
+ if (part->len<=sizeof(uint32_t) && part->to_read==0) {
+ assert(part->len==sizeof(uint32_t));
+ /* set it again now... */
+ part->to_read = *((uint32_t *)(part->buf));
+ if (part->to_read<sizeof(uint32_t)) {
+ sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read);
+ goto error;
+ }
+ part->to_read -= sizeof(uint32_t); /* it's already read */
+ }
+
+ if (part->to_read==0) {
+ pctx->receive_func(conn, part->buf, part->len);
+ part->len = 0; /* tells not having partial data (any more) */
+ if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold))
+ goto error;
+ }
+ } else {
+ if (remain>=sizeof(uint32_t)) {
+ uint32_t msglen = *(uint32_t *)p;
+ if (msglen<sizeof(uint32_t)) {
+ sprintf(ibw_lasterr, "got msglen=%u\n", msglen);
+ goto error;
+ }
+
+ /* mostly awaited case: */
+ if (msglen<=remain) {
+ pctx->receive_func(conn, p, msglen);
+ p += msglen;
+ remain -= msglen;
+ } else {
+ part->to_read = msglen;
+ /* part->len is already 0 */
+ if (ibw_append_to_part(pconn, part, &p, remain, 422))
+ goto error;
+ remain = 0; /* to be continued ... */
+ /* part->to_read > 0 here */
+ }
+ } else { /* edge case: */
+ part->to_read = sizeof(uint32_t);
+ /* part->len is already 0 */
+ if (ibw_append_to_part(pconn, part, &p, remain, 423))
+ goto error;
+ remain = 0;
+ /* part->to_read > 0 here */
+ }
+ }
+ } /* <remain> is always decreased at least by 1 */
+
+ if (ibw_refill_cq_recv(conn))
+ goto error;
+
+ return 0;
+
+error:
+ DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
+ conn->state = IBWC_ERROR;
+ return -1;
+}
+
static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
{
int i;
@@ -583,6 +736,7 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
opts->max_recv_wr = 1024;
opts->avg_send_size = 1024;
opts->recv_bufsize = 256;
+ opts->recv_threshold = 1 * 1024 * 1024;
for(i=0; i<nattr; i++) {
name = attr[i].name;
@@ -597,6 +751,8 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
opts->avg_send_size = atoi(value);
else if (strcmp(name, "recv_bufsize")==0)
opts->recv_bufsize = atoi(value);
+ else if (strcmp(name, "recv_threshold")==0)
+ opts->recv_threshold = atoi(value);
else {
sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
return -1;
@@ -843,7 +999,7 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
};
struct ibv_send_wr *bad_wr;
- if (n + sizeof(long)<=pctx->opts.avg_send_size) {
+ if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
assert((p->msg + sizeof(long))==(char *)buf);
list.lkey = pconn->mr_send->lkey;
list.addr = (uintptr_t) p->msg;
diff --git a/ctdb/ib/ibwrapper_internal.h b/ctdb/ib/ibwrapper_internal.h
index b819c483d3d..6e34917755b 100644
--- a/ctdb/ib/ibwrapper_internal.h
+++ b/ctdb/ib/ibwrapper_internal.h
@@ -22,10 +22,11 @@
*/
struct ibw_opts {
- int max_send_wr;
- int max_recv_wr;
- int avg_send_size;
- int recv_bufsize;
+ uint32_t max_send_wr;
+ uint32_t max_recv_wr;
+ uint32_t avg_send_size;
+ uint32_t recv_bufsize;
+ uint32_t recv_threshold;
};
struct ibw_wr {
@@ -56,6 +57,13 @@ struct ibw_ctx_priv {
long pagesize; /* sysconf result for memalign */
};
+struct ibw_part {
+ char *buf; /* talloced memory buffer */
+ uint32_t bufsize; /* allocated size of buf - always grows */
+ uint32_t len; /* message part length */
+ uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */
+};
+
struct ibw_conn_priv {
struct ibv_comp_channel *verbs_channel;
struct fd_event *verbs_channel_event;
@@ -74,6 +82,7 @@ struct ibw_conn_priv {
/* buf_recv is a ring buffer */
char *buf_recv; /* max_recv_wr * avg_recv_size */
struct ibv_mr *mr_recv;
- int recv_index; /* index of the next recv buffer */
+ int recv_index; /* index of the next recv buffer when refilling */
+ struct ibw_part part;
};