1 | // SPDX-License-Identifier: GPL-2.0 |
2 | /* |
3 | * Copyright (c) 2015-2018 Oracle. All rights reserved. |
4 | * |
5 | * Support for reverse-direction RPCs on RPC/RDMA (server-side). |
6 | */ |
7 | |
8 | #include <linux/sunrpc/svc_rdma.h> |
9 | |
10 | #include "xprt_rdma.h" |
11 | #include <trace/events/rpcrdma.h> |
12 | |
13 | /** |
14 | * svc_rdma_handle_bc_reply - Process incoming backchannel Reply |
15 | * @rqstp: resources for handling the Reply |
16 | * @rctxt: Received message |
17 | * |
18 | */ |
19 | void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp, |
20 | struct svc_rdma_recv_ctxt *rctxt) |
21 | { |
22 | struct svc_xprt *sxprt = rqstp->rq_xprt; |
23 | struct rpc_xprt *xprt = sxprt->xpt_bc_xprt; |
24 | struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); |
25 | struct xdr_buf *rcvbuf = &rqstp->rq_arg; |
26 | struct kvec *dst, *src = &rcvbuf->head[0]; |
27 | __be32 *rdma_resp = rctxt->rc_recv_buf; |
28 | struct rpc_rqst *req; |
29 | u32 credits; |
30 | |
31 | spin_lock(lock: &xprt->queue_lock); |
32 | req = xprt_lookup_rqst(xprt, xid: *rdma_resp); |
33 | if (!req) |
34 | goto out_unlock; |
35 | |
36 | dst = &req->rq_private_buf.head[0]; |
37 | memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf)); |
38 | if (dst->iov_len < src->iov_len) |
39 | goto out_unlock; |
40 | memcpy(dst->iov_base, src->iov_base, src->iov_len); |
41 | xprt_pin_rqst(req); |
42 | spin_unlock(lock: &xprt->queue_lock); |
43 | |
44 | credits = be32_to_cpup(p: rdma_resp + 2); |
45 | if (credits == 0) |
46 | credits = 1; /* don't deadlock */ |
47 | else if (credits > r_xprt->rx_buf.rb_bc_max_requests) |
48 | credits = r_xprt->rx_buf.rb_bc_max_requests; |
49 | spin_lock(lock: &xprt->transport_lock); |
50 | xprt->cwnd = credits << RPC_CWNDSHIFT; |
51 | spin_unlock(lock: &xprt->transport_lock); |
52 | |
53 | spin_lock(lock: &xprt->queue_lock); |
54 | xprt_complete_rqst(task: req->rq_task, copied: rcvbuf->len); |
55 | xprt_unpin_rqst(req); |
56 | rcvbuf->len = 0; |
57 | |
58 | out_unlock: |
59 | spin_unlock(lock: &xprt->queue_lock); |
60 | } |
61 | |
62 | /* Send a reverse-direction RPC Call. |
63 | * |
64 | * Caller holds the connection's mutex and has already marshaled |
65 | * the RPC/RDMA request. |
66 | * |
67 | * This is similar to svc_rdma_send_reply_msg, but takes a struct |
68 | * rpc_rqst instead, does not support chunks, and avoids blocking |
69 | * memory allocation. |
70 | * |
71 | * XXX: There is still an opportunity to block in svc_rdma_send() |
72 | * if there are no SQ entries to post the Send. This may occur if |
73 | * the adapter has a small maximum SQ depth. |
74 | */ |
75 | static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, |
76 | struct rpc_rqst *rqst, |
77 | struct svc_rdma_send_ctxt *sctxt) |
78 | { |
79 | struct svc_rdma_pcl empty_pcl; |
80 | int ret; |
81 | |
82 | pcl_init(pcl: &empty_pcl); |
83 | ret = svc_rdma_map_reply_msg(rdma, sctxt, write_pcl: &empty_pcl, reply_pcl: &empty_pcl, |
84 | xdr: &rqst->rq_snd_buf); |
85 | if (ret < 0) |
86 | return -EIO; |
87 | |
88 | /* Bump page refcnt so Send completion doesn't release |
89 | * the rq_buffer before all retransmits are complete. |
90 | */ |
91 | get_page(virt_to_page(rqst->rq_buffer)); |
92 | sctxt->sc_send_wr.opcode = IB_WR_SEND; |
93 | return svc_rdma_post_send(rdma, ctxt: sctxt); |
94 | } |
95 | |
96 | /* Server-side transport endpoint wants a whole page for its send |
97 | * buffer. The client RPC code constructs the RPC header in this |
98 | * buffer before it invokes ->send_request. |
99 | */ |
100 | static int |
101 | xprt_rdma_bc_allocate(struct rpc_task *task) |
102 | { |
103 | struct rpc_rqst *rqst = task->tk_rqstp; |
104 | size_t size = rqst->rq_callsize; |
105 | struct page *page; |
106 | |
107 | if (size > PAGE_SIZE) { |
108 | WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n" , |
109 | size); |
110 | return -EINVAL; |
111 | } |
112 | |
113 | page = alloc_page(GFP_NOIO | __GFP_NOWARN); |
114 | if (!page) |
115 | return -ENOMEM; |
116 | rqst->rq_buffer = page_address(page); |
117 | |
118 | rqst->rq_rbuffer = kmalloc(size: rqst->rq_rcvsize, GFP_NOIO | __GFP_NOWARN); |
119 | if (!rqst->rq_rbuffer) { |
120 | put_page(page); |
121 | return -ENOMEM; |
122 | } |
123 | return 0; |
124 | } |
125 | |
126 | static void |
127 | xprt_rdma_bc_free(struct rpc_task *task) |
128 | { |
129 | struct rpc_rqst *rqst = task->tk_rqstp; |
130 | |
131 | put_page(virt_to_page(rqst->rq_buffer)); |
132 | kfree(objp: rqst->rq_rbuffer); |
133 | } |
134 | |
135 | static int |
136 | rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) |
137 | { |
138 | struct rpc_xprt *xprt = rqst->rq_xprt; |
139 | struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); |
140 | struct svc_rdma_send_ctxt *ctxt; |
141 | __be32 *p; |
142 | int rc; |
143 | |
144 | ctxt = svc_rdma_send_ctxt_get(rdma); |
145 | if (!ctxt) |
146 | goto drop_connection; |
147 | |
148 | p = xdr_reserve_space(xdr: &ctxt->sc_stream, RPCRDMA_HDRLEN_MIN); |
149 | if (!p) |
150 | goto put_ctxt; |
151 | *p++ = rqst->rq_xid; |
152 | *p++ = rpcrdma_version; |
153 | *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests); |
154 | *p++ = rdma_msg; |
155 | *p++ = xdr_zero; |
156 | *p++ = xdr_zero; |
157 | *p = xdr_zero; |
158 | |
159 | rqst->rq_xtime = ktime_get(); |
160 | rc = svc_rdma_bc_sendto(rdma, rqst, sctxt: ctxt); |
161 | if (rc) |
162 | goto put_ctxt; |
163 | return 0; |
164 | |
165 | put_ctxt: |
166 | svc_rdma_send_ctxt_put(rdma, ctxt); |
167 | |
168 | drop_connection: |
169 | return -ENOTCONN; |
170 | } |
171 | |
172 | /** |
173 | * xprt_rdma_bc_send_request - Send a reverse-direction Call |
174 | * @rqst: rpc_rqst containing Call message to be sent |
175 | * |
176 | * Return values: |
177 | * %0 if the message was sent successfully |
178 | * %ENOTCONN if the message was not sent |
179 | */ |
180 | static int xprt_rdma_bc_send_request(struct rpc_rqst *rqst) |
181 | { |
182 | struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt; |
183 | struct svcxprt_rdma *rdma = |
184 | container_of(sxprt, struct svcxprt_rdma, sc_xprt); |
185 | int ret; |
186 | |
187 | if (test_bit(XPT_DEAD, &sxprt->xpt_flags)) |
188 | return -ENOTCONN; |
189 | |
190 | ret = rpcrdma_bc_send_request(rdma, rqst); |
191 | if (ret == -ENOTCONN) |
192 | svc_xprt_close(xprt: sxprt); |
193 | return ret; |
194 | } |
195 | |
196 | static void |
197 | xprt_rdma_bc_close(struct rpc_xprt *xprt) |
198 | { |
199 | xprt_disconnect_done(xprt); |
200 | xprt->cwnd = RPC_CWNDSHIFT; |
201 | } |
202 | |
203 | static void |
204 | xprt_rdma_bc_put(struct rpc_xprt *xprt) |
205 | { |
206 | xprt_rdma_free_addresses(xprt); |
207 | xprt_free(xprt); |
208 | } |
209 | |
210 | static const struct rpc_xprt_ops xprt_rdma_bc_procs = { |
211 | .reserve_xprt = xprt_reserve_xprt_cong, |
212 | .release_xprt = xprt_release_xprt_cong, |
213 | .alloc_slot = xprt_alloc_slot, |
214 | .free_slot = xprt_free_slot, |
215 | .release_request = xprt_release_rqst_cong, |
216 | .buf_alloc = xprt_rdma_bc_allocate, |
217 | .buf_free = xprt_rdma_bc_free, |
218 | .send_request = xprt_rdma_bc_send_request, |
219 | .wait_for_reply_request = xprt_wait_for_reply_request_def, |
220 | .close = xprt_rdma_bc_close, |
221 | .destroy = xprt_rdma_bc_put, |
222 | .print_stats = xprt_rdma_print_stats |
223 | }; |
224 | |
225 | static const struct rpc_timeout xprt_rdma_bc_timeout = { |
226 | .to_initval = 60 * HZ, |
227 | .to_maxval = 60 * HZ, |
228 | }; |
229 | |
230 | /* It shouldn't matter if the number of backchannel session slots |
231 | * doesn't match the number of RPC/RDMA credits. That just means |
232 | * one or the other will have extra slots that aren't used. |
233 | */ |
234 | static struct rpc_xprt * |
235 | xprt_setup_rdma_bc(struct xprt_create *args) |
236 | { |
237 | struct rpc_xprt *xprt; |
238 | struct rpcrdma_xprt *new_xprt; |
239 | |
240 | if (args->addrlen > sizeof(xprt->addr)) |
241 | return ERR_PTR(error: -EBADF); |
242 | |
243 | xprt = xprt_alloc(net: args->net, size: sizeof(*new_xprt), |
244 | num_prealloc: RPCRDMA_MAX_BC_REQUESTS, |
245 | max_req: RPCRDMA_MAX_BC_REQUESTS); |
246 | if (!xprt) |
247 | return ERR_PTR(error: -ENOMEM); |
248 | |
249 | xprt->timeout = &xprt_rdma_bc_timeout; |
250 | xprt_set_bound(xprt); |
251 | xprt_set_connected(xprt); |
252 | xprt->bind_timeout = 0; |
253 | xprt->reestablish_timeout = 0; |
254 | xprt->idle_timeout = 0; |
255 | |
256 | xprt->prot = XPRT_TRANSPORT_BC_RDMA; |
257 | xprt->ops = &xprt_rdma_bc_procs; |
258 | |
259 | memcpy(&xprt->addr, args->dstaddr, args->addrlen); |
260 | xprt->addrlen = args->addrlen; |
261 | xprt_rdma_format_addresses(xprt, sap: (struct sockaddr *)&xprt->addr); |
262 | xprt->resvport = 0; |
263 | |
264 | xprt->max_payload = xprt_rdma_max_inline_read; |
265 | |
266 | new_xprt = rpcx_to_rdmax(xprt); |
267 | new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs; |
268 | |
269 | xprt_get(xprt); |
270 | args->bc_xprt->xpt_bc_xprt = xprt; |
271 | xprt->bc_xprt = args->bc_xprt; |
272 | |
273 | /* Final put for backchannel xprt is in __svc_rdma_free */ |
274 | xprt_get(xprt); |
275 | return xprt; |
276 | } |
277 | |
278 | struct xprt_class xprt_rdma_bc = { |
279 | .list = LIST_HEAD_INIT(xprt_rdma_bc.list), |
280 | .name = "rdma backchannel" , |
281 | .owner = THIS_MODULE, |
282 | .ident = XPRT_TRANSPORT_BC_RDMA, |
283 | .setup = xprt_setup_rdma_bc, |
284 | }; |
285 | |