diff options
-rw-r--r-- | include/linux/sunrpc/svc.h | 25 | ||||
-rw-r--r-- | include/linux/sunrpc/svcsock.h | 1 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 56 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 125 |
4 files changed, 153 insertions, 54 deletions
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h index 5eabded1106..c27d806af31 100644 --- a/include/linux/sunrpc/svc.h +++ b/include/linux/sunrpc/svc.h @@ -17,6 +17,25 @@ #include <linux/wait.h> #include <linux/mm.h> + +/* + * + * RPC service thread pool. + * + * Pool of threads and temporary sockets. Generally there is only + * a single one of these per RPC service, but on NUMA machines those + * services that can benefit from it (i.e. nfs but not lockd) will + * have one pool per NUMA node. This optimisation reduces cross- + * node traffic on multi-node NUMA NFS servers. + */ +struct svc_pool { + unsigned int sp_id; /* pool id; also node id on NUMA */ + spinlock_t sp_lock; /* protects all fields */ + struct list_head sp_threads; /* idle server threads */ + struct list_head sp_sockets; /* pending sockets */ + unsigned int sp_nrthreads; /* # of threads in pool */ +} ____cacheline_aligned_in_smp; + /* * RPC service. * @@ -28,8 +47,6 @@ * We currently do not support more than one RPC program per daemon. */ struct svc_serv { - struct list_head sv_threads; /* idle server threads */ - struct list_head sv_sockets; /* pending sockets */ struct svc_program * sv_program; /* RPC program */ struct svc_stat * sv_stats; /* RPC statistics */ spinlock_t sv_lock; @@ -44,6 +61,9 @@ struct svc_serv { char * sv_name; /* service name */ + unsigned int sv_nrpools; /* number of thread pools */ + struct svc_pool * sv_pools; /* array of thread pools */ + void (*sv_shutdown)(struct svc_serv *serv); /* Callback to use when last thread * exits. @@ -138,6 +158,7 @@ struct svc_rqst { int rq_addrlen; struct svc_serv * rq_server; /* RPC service definition */ + struct svc_pool * rq_pool; /* thread pool */ struct svc_procedure * rq_procinfo; /* procedure info */ struct auth_ops * rq_authop; /* authentication flavour */ struct svc_cred rq_cred; /* auth info */ diff --git a/include/linux/sunrpc/svcsock.h b/include/linux/sunrpc/svcsock.h index 7154e71c6d1..4c296152cbf 100644 --- a/include/linux/sunrpc/svcsock.h +++ b/include/linux/sunrpc/svcsock.h @@ -20,6 +20,7 @@ struct svc_sock { struct socket * sk_sock; /* berkeley socket layer */ struct sock * sk_sk; /* INET layer */ + struct svc_pool * sk_pool; /* current pool iff queued */ struct svc_serv * sk_server; /* service for this socket */ atomic_t sk_inuse; /* use count */ unsigned long sk_flags; diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 0c2c5227628..6750cd474f8 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -32,6 +32,7 @@ svc_create(struct svc_program *prog, unsigned int bufsize, struct svc_serv *serv; int vers; unsigned int xdrsize; + unsigned int i; if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL))) return NULL; @@ -55,13 +56,33 @@ svc_create(struct svc_program *prog, unsigned int bufsize, prog = prog->pg_next; } serv->sv_xdrsize = xdrsize; - INIT_LIST_HEAD(&serv->sv_threads); - INIT_LIST_HEAD(&serv->sv_sockets); INIT_LIST_HEAD(&serv->sv_tempsocks); INIT_LIST_HEAD(&serv->sv_permsocks); init_timer(&serv->sv_temptimer); spin_lock_init(&serv->sv_lock); + serv->sv_nrpools = 1; + serv->sv_pools = + kcalloc(sizeof(struct svc_pool), serv->sv_nrpools, + GFP_KERNEL); + if (!serv->sv_pools) { + kfree(serv); + return NULL; + } + + for (i = 0; i < serv->sv_nrpools; i++) { + struct svc_pool *pool = &serv->sv_pools[i]; + + dprintk("initialising pool %u for %s\n", + i, serv->sv_name); + + pool->sp_id = i; + INIT_LIST_HEAD(&pool->sp_threads); + INIT_LIST_HEAD(&pool->sp_sockets); + spin_lock_init(&pool->sp_lock); + } + + /* Remove any stale portmap registrations */ svc_register(serv, 0, 0); @@ -69,7 +90,7 @@ svc_create(struct svc_program *prog, unsigned int bufsize, } /* - * Destroy an RPC service + * Destroy an RPC service. Should be called with the BKL held */ void svc_destroy(struct svc_serv *serv) @@ -110,6 +131,7 @@ svc_destroy(struct svc_serv *serv) /* Unregister service with the portmapper */ svc_register(serv, 0, 0); + kfree(serv->sv_pools); kfree(serv); } @@ -158,10 +180,11 @@ svc_release_buffer(struct svc_rqst *rqstp) } /* - * Create a server thread + * Create a thread in the given pool. Caller must hold BKL. */ -int -svc_create_thread(svc_thread_fn func, struct svc_serv *serv) +static int +__svc_create_thread(svc_thread_fn func, struct svc_serv *serv, + struct svc_pool *pool) { struct svc_rqst *rqstp; int error = -ENOMEM; @@ -178,7 +201,11 @@ svc_create_thread(svc_thread_fn func, struct svc_serv *serv) goto out_thread; serv->sv_nrthreads++; + spin_lock_bh(&pool->sp_lock); + pool->sp_nrthreads++; + spin_unlock_bh(&pool->sp_lock); rqstp->rq_server = serv; + rqstp->rq_pool = pool; error = kernel_thread((int (*)(void *)) func, rqstp, 0); if (error < 0) goto out_thread; @@ -193,17 +220,32 @@ out_thread: } /* - * Destroy an RPC server thread + * Create a thread in the default pool. Caller must hold BKL. + */ +int +svc_create_thread(svc_thread_fn func, struct svc_serv *serv) +{ + return __svc_create_thread(func, serv, &serv->sv_pools[0]); +} + +/* + * Called from a server thread as it's exiting. Caller must hold BKL. */ void svc_exit_thread(struct svc_rqst *rqstp) { struct svc_serv *serv = rqstp->rq_server; + struct svc_pool *pool = rqstp->rq_pool; svc_release_buffer(rqstp); kfree(rqstp->rq_resp); kfree(rqstp->rq_argp); kfree(rqstp->rq_auth_data); + + spin_lock_bh(&pool->sp_lock); + pool->sp_nrthreads--; + spin_unlock_bh(&pool->sp_lock); + kfree(rqstp); /* Release the server */ diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index a38df4589ae..b78659adeff 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -46,7 +46,10 @@ /* SMP locking strategy: * - * svc_serv->sv_lock protects most stuff for that service. + * svc_pool->sp_lock protects most of the fields of that pool. + * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. + * when both need to be taken (rare), svc_serv->sv_lock is first. + * BKL protects svc_serv->sv_nrthread. * svc_sock->sk_defer_lock protects the svc_sock->sk_deferred list * svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply. * @@ -82,22 +85,22 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req); static int svc_conn_age_period = 6*60; /* - * Queue up an idle server thread. Must have serv->sv_lock held. + * Queue up an idle server thread. Must have pool->sp_lock held. * Note: this is really a stack rather than a queue, so that we only - * use as many different threads as we need, and the rest don't polute + * use as many different threads as we need, and the rest don't pollute * the cache. */ static inline void -svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp) +svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) { - list_add(&rqstp->rq_list, &serv->sv_threads); + list_add(&rqstp->rq_list, &pool->sp_threads); } /* - * Dequeue an nfsd thread. Must have serv->sv_lock held. + * Dequeue an nfsd thread. Must have pool->sp_lock held. */ static inline void -svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp) +svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) { list_del(&rqstp->rq_list); } @@ -148,6 +151,7 @@ static void svc_sock_enqueue(struct svc_sock *svsk) { struct svc_serv *serv = svsk->sk_server; + struct svc_pool *pool = &serv->sv_pools[0]; struct svc_rqst *rqstp; if (!(svsk->sk_flags & @@ -156,10 +160,10 @@ svc_sock_enqueue(struct svc_sock *svsk) if (test_bit(SK_DEAD, &svsk->sk_flags)) return; - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&pool->sp_lock); - if (!list_empty(&serv->sv_threads) && - !list_empty(&serv->sv_sockets)) + if (!list_empty(&pool->sp_threads) && + !list_empty(&pool->sp_sockets)) printk(KERN_ERR "svc_sock_enqueue: threads and sockets both waiting??\n"); @@ -179,6 +183,8 @@ svc_sock_enqueue(struct svc_sock *svsk) dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk); goto out_unlock; } + BUG_ON(svsk->sk_pool != NULL); + svsk->sk_pool = pool; set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); if (((atomic_read(&svsk->sk_reserved) + serv->sv_bufsz)*2 @@ -189,19 +195,20 @@ svc_sock_enqueue(struct svc_sock *svsk) dprintk("svc: socket %p no space, %d*2 > %ld, not enqueued\n", svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_bufsz, svc_sock_wspace(svsk)); + svsk->sk_pool = NULL; clear_bit(SK_BUSY, &svsk->sk_flags); goto out_unlock; } clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); - if (!list_empty(&serv->sv_threads)) { - rqstp = list_entry(serv->sv_threads.next, + if (!list_empty(&pool->sp_threads)) { + rqstp = list_entry(pool->sp_threads.next, struct svc_rqst, rq_list); dprintk("svc: socket %p served by daemon %p\n", svsk->sk_sk, rqstp); - svc_serv_dequeue(serv, rqstp); + svc_thread_dequeue(pool, rqstp); if (rqstp->rq_sock) printk(KERN_ERR "svc_sock_enqueue: server %p, rq_sock=%p!\n", @@ -210,28 +217,30 @@ svc_sock_enqueue(struct svc_sock *svsk) atomic_inc(&svsk->sk_inuse); rqstp->rq_reserved = serv->sv_bufsz; atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); + BUG_ON(svsk->sk_pool != pool); wake_up(&rqstp->rq_wait); } else { dprintk("svc: socket %p put into queue\n", svsk->sk_sk); - list_add_tail(&svsk->sk_ready, &serv->sv_sockets); + list_add_tail(&svsk->sk_ready, &pool->sp_sockets); + BUG_ON(svsk->sk_pool != pool); } out_unlock: - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); } /* - * Dequeue the first socket. Must be called with the serv->sv_lock held. + * Dequeue the first socket. Must be called with the pool->sp_lock held. */ static inline struct svc_sock * -svc_sock_dequeue(struct svc_serv *serv) +svc_sock_dequeue(struct svc_pool *pool) { struct svc_sock *svsk; - if (list_empty(&serv->sv_sockets)) + if (list_empty(&pool->sp_sockets)) return NULL; - svsk = list_entry(serv->sv_sockets.next, + svsk = list_entry(pool->sp_sockets.next, struct svc_sock, sk_ready); list_del_init(&svsk->sk_ready); @@ -250,6 +259,7 @@ svc_sock_dequeue(struct svc_serv *serv) static inline void svc_sock_received(struct svc_sock *svsk) { + svsk->sk_pool = NULL; clear_bit(SK_BUSY, &svsk->sk_flags); svc_sock_enqueue(svsk); } @@ -322,25 +332,33 @@ svc_sock_release(struct svc_rqst *rqstp) /* * External function to wake up a server waiting for data + * This really only makes sense for services like lockd + * which have exactly one thread anyway. */ void svc_wake_up(struct svc_serv *serv) { struct svc_rqst *rqstp; - - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&serv->sv_threads)) { - rqstp = list_entry(serv->sv_threads.next, - struct svc_rqst, - rq_list); - dprintk("svc: daemon %p woken up.\n", rqstp); - /* - svc_serv_dequeue(serv, rqstp); - rqstp->rq_sock = NULL; - */ - wake_up(&rqstp->rq_wait); + unsigned int i; + struct svc_pool *pool; + + for (i = 0; i < serv->sv_nrpools; i++) { + pool = &serv->sv_pools[i]; + + spin_lock_bh(&pool->sp_lock); + if (!list_empty(&pool->sp_threads)) { + rqstp = list_entry(pool->sp_threads.next, + struct svc_rqst, + rq_list); + dprintk("svc: daemon %p woken up.\n", rqstp); + /* + svc_thread_dequeue(pool, rqstp); + rqstp->rq_sock = NULL; + */ + wake_up(&rqstp->rq_wait); + } + spin_unlock_bh(&pool->sp_lock); } - spin_unlock_bh(&serv->sv_lock); } /* @@ -603,7 +621,10 @@ svc_udp_recvfrom(struct svc_rqst *rqstp) /* udp sockets need large rcvbuf as all pending * requests are still in that buffer. sndbuf must * also be large enough that there is enough space - * for one reply per thread. + * for one reply per thread. We count all threads + * rather than threads in a particular pool, which + * provides an upper bound on the number of threads + * which will access the socket. */ svc_sock_setbufsize(svsk->sk_sock, (serv->sv_nrthreads+3) * serv->sv_bufsz, @@ -948,6 +969,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) /* sndbuf needs to have room for one request * per thread, otherwise we can stall even when the * network isn't a bottleneck. + * + * We count all threads rather than threads in a + * particular pool, which provides an upper bound + * on the number of threads which will access the socket. + * * rcvbuf just needs to be able to hold a few requests. * Normally they will be removed from the queue * as soon a a complete request arrives. @@ -1163,13 +1189,16 @@ svc_sock_update_bufs(struct svc_serv *serv) } /* - * Receive the next request on any socket. + * Receive the next request on any socket. This code is carefully + * organised not to touch any cachelines in the shared svc_serv + * structure, only cachelines in the local svc_pool. */ int svc_recv(struct svc_rqst *rqstp, long timeout) { struct svc_sock *svsk =NULL; struct svc_serv *serv = rqstp->rq_server; + struct svc_pool *pool = rqstp->rq_pool; int len; int pages; struct xdr_buf *arg; @@ -1219,15 +1248,15 @@ svc_recv(struct svc_rqst *rqstp, long timeout) if (signalled()) return -EINTR; - spin_lock_bh(&serv->sv_lock); - if ((svsk = svc_sock_dequeue(serv)) != NULL) { + spin_lock_bh(&pool->sp_lock); + if ((svsk = svc_sock_dequeue(pool)) != NULL) { rqstp->rq_sock = svsk; atomic_inc(&svsk->sk_inuse); rqstp->rq_reserved = serv->sv_bufsz; atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); } else { /* No data pending. Go to sleep */ - svc_serv_enqueue(serv, rqstp); + svc_thread_enqueue(pool, rqstp); /* * We have to be able to interrupt this wait @@ -1235,26 +1264,26 @@ svc_recv(struct svc_rqst *rqstp, long timeout) */ set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&rqstp->rq_wait, &wait); - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); schedule_timeout(timeout); try_to_freeze(); - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&pool->sp_lock); remove_wait_queue(&rqstp->rq_wait, &wait); if (!(svsk = rqstp->rq_sock)) { - svc_serv_dequeue(serv, rqstp); - spin_unlock_bh(&serv->sv_lock); + svc_thread_dequeue(pool, rqstp); + spin_unlock_bh(&pool->sp_lock); dprintk("svc: server %p, no data yet\n", rqstp); return signalled()? -EINTR : -EAGAIN; } } - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); - dprintk("svc: server %p, socket %p, inuse=%d\n", - rqstp, svsk, atomic_read(&svsk->sk_inuse)); + dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n", + rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse)); len = svsk->sk_recvfrom(rqstp); dprintk("svc: got len=%d\n", len); @@ -1553,7 +1582,13 @@ svc_delete_socket(struct svc_sock *svsk) if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags)) list_del_init(&svsk->sk_list); - list_del_init(&svsk->sk_ready); + /* + * We used to delete the svc_sock from whichever list + * it's sk_ready node was on, but we don't actually + * need to. This is because the only time we're called + * while still attached to a queue, the queue itself + * is about to be destroyed (in svc_destroy). + */ if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags)) if (test_bit(SK_TEMP, &svsk->sk_flags)) serv->sv_tmpcnt--; |