SUNRPC: More fixes for backlog congestion
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Tue, 25 May 2021 22:43:38 +0000 (18:43 -0400)
committerGreg Kroah-Hartman <gregkh@linuxfoundation.org>
Thu, 3 Jun 2021 07:00:51 +0000 (09:00 +0200)
commit e86be3a04bc4aeaf12f93af35f08f8d4385bcd98 upstream.

Ensure that we fix the XPRT_CONGESTED starvation issue for RDMA as well
as socket based transports.
Ensure we always initialise the request after waking up from the backlog
list.

Fixes: e877a88d1f06 ("SUNRPC in case of backlog, hand free slots directly to waiting task")
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
include/linux/sunrpc/xprt.h
net/sunrpc/xprt.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 3ac5037d1c3da2e86de6a07fb1c498bd50c68187..cad1fa2b6baa2b25bd708c77219c33dc66ffbe0b 100644 (file)
@@ -367,6 +367,8 @@ struct rpc_xprt *   xprt_alloc(struct net *net, size_t size,
                                unsigned int num_prealloc,
                                unsigned int max_req);
 void                   xprt_free(struct rpc_xprt *);
+void                   xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task);
+bool                   xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req);
 
 static inline int
 xprt_enable_swap(struct rpc_xprt *xprt)
index efa33fb471b5cdd6d4f0d2da04712d59ef902c57..9a50764be916003652af8c337b40a3aa39eb6153 100644 (file)
@@ -1575,11 +1575,18 @@ xprt_transmit(struct rpc_task *task)
        spin_unlock(&xprt->queue_lock);
 }
 
-static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
+static void xprt_complete_request_init(struct rpc_task *task)
+{
+       if (task->tk_rqstp)
+               xprt_request_init(task);
+}
+
+void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        set_bit(XPRT_CONGESTED, &xprt->state);
-       rpc_sleep_on(&xprt->backlog, task, NULL);
+       rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
 }
+EXPORT_SYMBOL_GPL(xprt_add_backlog);
 
 static bool __xprt_set_rq(struct rpc_task *task, void *data)
 {
@@ -1587,14 +1594,13 @@ static bool __xprt_set_rq(struct rpc_task *task, void *data)
 
        if (task->tk_rqstp == NULL) {
                memset(req, 0, sizeof(*req));   /* mark unused */
-               task->tk_status = -EAGAIN;
                task->tk_rqstp = req;
                return true;
        }
        return false;
 }
 
-static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
+bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
        if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
                clear_bit(XPRT_CONGESTED, &xprt->state);
@@ -1602,6 +1608,7 @@ static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
        }
        return true;
 }
+EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
 
 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
 {
@@ -1611,7 +1618,7 @@ static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task
                goto out;
        spin_lock(&xprt->reserve_lock);
        if (test_bit(XPRT_CONGESTED, &xprt->state)) {
-               rpc_sleep_on(&xprt->backlog, task, NULL);
+               xprt_add_backlog(xprt, task);
                ret = true;
        }
        spin_unlock(&xprt->reserve_lock);
@@ -1780,10 +1787,6 @@ xprt_request_init(struct rpc_task *task)
        struct rpc_xprt *xprt = task->tk_xprt;
        struct rpc_rqst *req = task->tk_rqstp;
 
-       if (req->rq_task)
-               /* Already initialized */
-               return;
-
        req->rq_task    = task;
        req->rq_xprt    = xprt;
        req->rq_buffer  = NULL;
@@ -1844,10 +1847,8 @@ void xprt_retry_reserve(struct rpc_task *task)
        struct rpc_xprt *xprt = task->tk_xprt;
 
        task->tk_status = 0;
-       if (task->tk_rqstp != NULL) {
-               xprt_request_init(task);
+       if (task->tk_rqstp != NULL)
                return;
-       }
 
        task->tk_status = -EAGAIN;
        xprt_do_reserve(xprt, task);
@@ -1872,24 +1873,21 @@ void xprt_release(struct rpc_task *task)
        }
 
        xprt = req->rq_xprt;
-       if (xprt) {
-               xprt_request_dequeue_xprt(task);
-               spin_lock(&xprt->transport_lock);
-               xprt->ops->release_xprt(xprt, task);
-               if (xprt->ops->release_request)
-                       xprt->ops->release_request(task);
-               xprt_schedule_autodisconnect(xprt);
-               spin_unlock(&xprt->transport_lock);
-               if (req->rq_buffer)
-                       xprt->ops->buf_free(task);
-               xdr_free_bvec(&req->rq_rcv_buf);
-               xdr_free_bvec(&req->rq_snd_buf);
-               if (req->rq_cred != NULL)
-                       put_rpccred(req->rq_cred);
-               if (req->rq_release_snd_buf)
-                       req->rq_release_snd_buf(req);
-       } else
-               xprt = task->tk_xprt;
+       xprt_request_dequeue_xprt(task);
+       spin_lock(&xprt->transport_lock);
+       xprt->ops->release_xprt(xprt, task);
+       if (xprt->ops->release_request)
+               xprt->ops->release_request(task);
+       xprt_schedule_autodisconnect(xprt);
+       spin_unlock(&xprt->transport_lock);
+       if (req->rq_buffer)
+               xprt->ops->buf_free(task);
+       xdr_free_bvec(&req->rq_rcv_buf);
+       xdr_free_bvec(&req->rq_snd_buf);
+       if (req->rq_cred != NULL)
+               put_rpccred(req->rq_cred);
+       if (req->rq_release_snd_buf)
+               req->rq_release_snd_buf(req);
 
        task->tk_rqstp = NULL;
        if (likely(!bc_prealloc(req)))
index f93ff4282bf4f82d6bb5dce77253dec7d42ba25d..c26db0a37996754ad7e371456e97f622ab167bd4 100644 (file)
@@ -520,9 +520,8 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
        return;
 
 out_sleep:
-       set_bit(XPRT_CONGESTED, &xprt->state);
-       rpc_sleep_on(&xprt->backlog, task, NULL);
        task->tk_status = -EAGAIN;
+       xprt_add_backlog(xprt, task);
 }
 
 /**
@@ -537,10 +536,11 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
        struct rpcrdma_xprt *r_xprt =
                container_of(xprt, struct rpcrdma_xprt, rx_xprt);
 
-       memset(rqst, 0, sizeof(*rqst));
-       rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
-       if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
-               clear_bit(XPRT_CONGESTED, &xprt->state);
+       rpcrdma_reply_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
+       if (!xprt_wake_up_backlog(xprt, rqst)) {
+               memset(rqst, 0, sizeof(*rqst));
+               rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
+       }
 }
 
 static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
index 04325f0267c1c8ef0aa3bb78764f2273dd7a9305..25554260a5931541288c2abb72184fa44e847462 100644 (file)
@@ -1197,6 +1197,20 @@ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
        rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
 }
 
+/**
+ * rpcrdma_reply_put - Put reply buffers back into pool
+ * @buffers: buffer pool
+ * @req: object to return
+ *
+ */
+void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+       if (req->rl_reply) {
+               rpcrdma_rep_put(buffers, req->rl_reply);
+               req->rl_reply = NULL;
+       }
+}
+
 /**
  * rpcrdma_buffer_get - Get a request buffer
  * @buffers: Buffer pool from which to obtain a buffer
@@ -1225,9 +1239,7 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
  */
 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 {
-       if (req->rl_reply)
-               rpcrdma_rep_put(buffers, req->rl_reply);
-       req->rl_reply = NULL;
+       rpcrdma_reply_put(buffers, req);
 
        spin_lock(&buffers->rb_lock);
        list_add(&req->rl_list, &buffers->rb_send_bufs);
index 3cacc6f4c5271fd0a06edffaa765f23a932d934f..702f0344523cc933881d06440a8278dd45c67898 100644 (file)
@@ -472,6 +472,7 @@ void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
                        struct rpcrdma_req *req);
+void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,