1 | // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause |
2 | /* |
3 | * Copyright (c) 2016-2018 Oracle. All rights reserved. |
4 | * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. |
5 | * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. |
6 | * |
7 | * This software is available to you under a choice of one of two |
8 | * licenses. You may choose to be licensed under the terms of the GNU |
9 | * General Public License (GPL) Version 2, available from the file |
10 | * COPYING in the main directory of this source tree, or the BSD-type |
11 | * license below: |
12 | * |
13 | * Redistribution and use in source and binary forms, with or without |
14 | * modification, are permitted provided that the following conditions |
15 | * are met: |
16 | * |
17 | * Redistributions of source code must retain the above copyright |
18 | * notice, this list of conditions and the following disclaimer. |
19 | * |
20 | * Redistributions in binary form must reproduce the above |
21 | * copyright notice, this list of conditions and the following |
22 | * disclaimer in the documentation and/or other materials provided |
23 | * with the distribution. |
24 | * |
25 | * Neither the name of the Network Appliance, Inc. nor the names of |
26 | * its contributors may be used to endorse or promote products |
27 | * derived from this software without specific prior written |
28 | * permission. |
29 | * |
30 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
31 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
32 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
33 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
34 | * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
35 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
36 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
37 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
38 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
39 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
40 | * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
41 | * |
42 | * Author: Tom Tucker <tom@opengridcomputing.com> |
43 | */ |
44 | |
45 | /* Operation |
46 | * |
47 | * The main entry point is svc_rdma_recvfrom. This is called from |
48 | * svc_recv when the transport indicates there is incoming data to |
49 | * be read. "Data Ready" is signaled when an RDMA Receive completes, |
50 | * or when a set of RDMA Reads complete. |
51 | * |
52 | * An svc_rqst is passed in. This structure contains an array of |
53 | * free pages (rq_pages) that will contain the incoming RPC message. |
54 | * |
55 | * Short messages are moved directly into svc_rqst::rq_arg, and |
56 | * the RPC Call is ready to be processed by the Upper Layer. |
57 | * svc_rdma_recvfrom returns the length of the RPC Call message, |
58 | * completing the reception of the RPC Call. |
59 | * |
60 | * However, when an incoming message has Read chunks, |
61 | * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's |
62 | * data payload from the client. svc_rdma_recvfrom sets up the |
63 | * RDMA Reads using pages in svc_rqst::rq_pages, which are |
64 | * transferred to an svc_rdma_recv_ctxt for the duration of the |
65 | * I/O. svc_rdma_recvfrom then returns zero, since the RPC message |
66 | * is still not yet ready. |
67 | * |
68 | * When the Read chunk payloads have become available on the |
69 | * server, "Data Ready" is raised again, and svc_recv calls |
70 | * svc_rdma_recvfrom again. This second call may use a different |
71 | * svc_rqst than the first one, thus any information that needs |
72 | * to be preserved across these two calls is kept in an |
73 | * svc_rdma_recv_ctxt. |
74 | * |
75 | * The second call to svc_rdma_recvfrom performs final assembly |
76 | * of the RPC Call message, using the RDMA Read sink pages kept in |
77 | * the svc_rdma_recv_ctxt. The xdr_buf is copied from the |
78 | * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns |
79 | * the length of the completed RPC Call message. |
80 | * |
81 | * Page Management |
82 | * |
83 | * Pages under I/O must be transferred from the first svc_rqst to an |
84 | * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns. |
85 | * |
86 | * The first svc_rqst supplies pages for RDMA Reads. These are moved |
87 | * from rqstp::rq_pages into ctxt::pages. The consumed elements of |
88 | * the rq_pages array are set to NULL and refilled with the first |
89 | * svc_rdma_recvfrom call returns. |
90 | * |
91 | * During the second svc_rdma_recvfrom call, RDMA Read sink pages |
92 | * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst. |
93 | */ |
94 | |
95 | #include <linux/slab.h> |
96 | #include <linux/spinlock.h> |
97 | #include <asm/unaligned.h> |
98 | #include <rdma/ib_verbs.h> |
99 | #include <rdma/rdma_cm.h> |
100 | |
101 | #include <linux/sunrpc/xdr.h> |
102 | #include <linux/sunrpc/debug.h> |
103 | #include <linux/sunrpc/rpc_rdma.h> |
104 | #include <linux/sunrpc/svc_rdma.h> |
105 | |
106 | #include "xprt_rdma.h" |
107 | #include <trace/events/rpcrdma.h> |
108 | |
109 | static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc); |
110 | |
111 | static inline struct svc_rdma_recv_ctxt * |
112 | svc_rdma_next_recv_ctxt(struct list_head *list) |
113 | { |
114 | return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt, |
115 | rc_list); |
116 | } |
117 | |
118 | static struct svc_rdma_recv_ctxt * |
119 | svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) |
120 | { |
121 | int node = ibdev_to_node(ibdev: rdma->sc_cm_id->device); |
122 | struct svc_rdma_recv_ctxt *ctxt; |
123 | dma_addr_t addr; |
124 | void *buffer; |
125 | |
126 | ctxt = kzalloc_node(size: sizeof(*ctxt), GFP_KERNEL, node); |
127 | if (!ctxt) |
128 | goto fail0; |
129 | buffer = kmalloc_node(size: rdma->sc_max_req_size, GFP_KERNEL, node); |
130 | if (!buffer) |
131 | goto fail1; |
132 | addr = ib_dma_map_single(dev: rdma->sc_pd->device, cpu_addr: buffer, |
133 | size: rdma->sc_max_req_size, direction: DMA_FROM_DEVICE); |
134 | if (ib_dma_mapping_error(dev: rdma->sc_pd->device, dma_addr: addr)) |
135 | goto fail2; |
136 | |
137 | svc_rdma_recv_cid_init(rdma, cid: &ctxt->rc_cid); |
138 | pcl_init(pcl: &ctxt->rc_call_pcl); |
139 | pcl_init(pcl: &ctxt->rc_read_pcl); |
140 | pcl_init(pcl: &ctxt->rc_write_pcl); |
141 | pcl_init(pcl: &ctxt->rc_reply_pcl); |
142 | |
143 | ctxt->rc_recv_wr.next = NULL; |
144 | ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe; |
145 | ctxt->rc_recv_wr.sg_list = &ctxt->rc_recv_sge; |
146 | ctxt->rc_recv_wr.num_sge = 1; |
147 | ctxt->rc_cqe.done = svc_rdma_wc_receive; |
148 | ctxt->rc_recv_sge.addr = addr; |
149 | ctxt->rc_recv_sge.length = rdma->sc_max_req_size; |
150 | ctxt->rc_recv_sge.lkey = rdma->sc_pd->local_dma_lkey; |
151 | ctxt->rc_recv_buf = buffer; |
152 | svc_rdma_cc_init(rdma, cc: &ctxt->rc_cc); |
153 | return ctxt; |
154 | |
155 | fail2: |
156 | kfree(objp: buffer); |
157 | fail1: |
158 | kfree(objp: ctxt); |
159 | fail0: |
160 | return NULL; |
161 | } |
162 | |
163 | static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma, |
164 | struct svc_rdma_recv_ctxt *ctxt) |
165 | { |
166 | ib_dma_unmap_single(dev: rdma->sc_pd->device, addr: ctxt->rc_recv_sge.addr, |
167 | size: ctxt->rc_recv_sge.length, direction: DMA_FROM_DEVICE); |
168 | kfree(objp: ctxt->rc_recv_buf); |
169 | kfree(objp: ctxt); |
170 | } |
171 | |
172 | /** |
173 | * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt |
174 | * @rdma: svcxprt_rdma being torn down |
175 | * |
176 | */ |
177 | void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma) |
178 | { |
179 | struct svc_rdma_recv_ctxt *ctxt; |
180 | struct llist_node *node; |
181 | |
182 | while ((node = llist_del_first(head: &rdma->sc_recv_ctxts))) { |
183 | ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); |
184 | svc_rdma_recv_ctxt_destroy(rdma, ctxt); |
185 | } |
186 | } |
187 | |
188 | /** |
189 | * svc_rdma_recv_ctxt_get - Allocate a recv_ctxt |
190 | * @rdma: controlling svcxprt_rdma |
191 | * |
192 | * Returns a recv_ctxt or (rarely) NULL if none are available. |
193 | */ |
194 | struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) |
195 | { |
196 | struct svc_rdma_recv_ctxt *ctxt; |
197 | struct llist_node *node; |
198 | |
199 | node = llist_del_first(head: &rdma->sc_recv_ctxts); |
200 | if (!node) |
201 | return NULL; |
202 | |
203 | ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); |
204 | ctxt->rc_page_count = 0; |
205 | return ctxt; |
206 | } |
207 | |
208 | /** |
209 | * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list |
210 | * @rdma: controlling svcxprt_rdma |
211 | * @ctxt: object to return to the free list |
212 | * |
213 | */ |
214 | void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, |
215 | struct svc_rdma_recv_ctxt *ctxt) |
216 | { |
217 | svc_rdma_cc_release(rdma, cc: &ctxt->rc_cc, dir: DMA_FROM_DEVICE); |
218 | |
219 | /* @rc_page_count is normally zero here, but error flows |
220 | * can leave pages in @rc_pages. |
221 | */ |
222 | release_pages(ctxt->rc_pages, nr: ctxt->rc_page_count); |
223 | |
224 | pcl_free(pcl: &ctxt->rc_call_pcl); |
225 | pcl_free(pcl: &ctxt->rc_read_pcl); |
226 | pcl_free(pcl: &ctxt->rc_write_pcl); |
227 | pcl_free(pcl: &ctxt->rc_reply_pcl); |
228 | |
229 | llist_add(new: &ctxt->rc_node, head: &rdma->sc_recv_ctxts); |
230 | } |
231 | |
232 | /** |
233 | * svc_rdma_release_ctxt - Release transport-specific per-rqst resources |
234 | * @xprt: the transport which owned the context |
235 | * @vctxt: the context from rqstp->rq_xprt_ctxt or dr->xprt_ctxt |
236 | * |
237 | * Ensure that the recv_ctxt is released whether or not a Reply |
238 | * was sent. For example, the client could close the connection, |
239 | * or svc_process could drop an RPC, before the Reply is sent. |
240 | */ |
241 | void svc_rdma_release_ctxt(struct svc_xprt *xprt, void *vctxt) |
242 | { |
243 | struct svc_rdma_recv_ctxt *ctxt = vctxt; |
244 | struct svcxprt_rdma *rdma = |
245 | container_of(xprt, struct svcxprt_rdma, sc_xprt); |
246 | |
247 | if (ctxt) |
248 | svc_rdma_recv_ctxt_put(rdma, ctxt); |
249 | } |
250 | |
251 | static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma, |
252 | unsigned int wanted) |
253 | { |
254 | const struct ib_recv_wr *bad_wr = NULL; |
255 | struct svc_rdma_recv_ctxt *ctxt; |
256 | struct ib_recv_wr *recv_chain; |
257 | int ret; |
258 | |
259 | if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) |
260 | return false; |
261 | |
262 | recv_chain = NULL; |
263 | while (wanted--) { |
264 | ctxt = svc_rdma_recv_ctxt_get(rdma); |
265 | if (!ctxt) |
266 | break; |
267 | |
268 | trace_svcrdma_post_recv(cid: &ctxt->rc_cid); |
269 | ctxt->rc_recv_wr.next = recv_chain; |
270 | recv_chain = &ctxt->rc_recv_wr; |
271 | rdma->sc_pending_recvs++; |
272 | } |
273 | if (!recv_chain) |
274 | return true; |
275 | |
276 | ret = ib_post_recv(qp: rdma->sc_qp, recv_wr: recv_chain, bad_recv_wr: &bad_wr); |
277 | if (ret) |
278 | goto err_free; |
279 | return true; |
280 | |
281 | err_free: |
282 | trace_svcrdma_rq_post_err(rdma, status: ret); |
283 | while (bad_wr) { |
284 | ctxt = container_of(bad_wr, struct svc_rdma_recv_ctxt, |
285 | rc_recv_wr); |
286 | bad_wr = bad_wr->next; |
287 | svc_rdma_recv_ctxt_put(rdma, ctxt); |
288 | } |
289 | /* Since we're destroying the xprt, no need to reset |
290 | * sc_pending_recvs. */ |
291 | return false; |
292 | } |
293 | |
294 | /** |
295 | * svc_rdma_post_recvs - Post initial set of Recv WRs |
296 | * @rdma: fresh svcxprt_rdma |
297 | * |
298 | * Return values: |
299 | * %true: Receive Queue initialization successful |
300 | * %false: memory allocation or DMA error |
301 | */ |
302 | bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma) |
303 | { |
304 | unsigned int total; |
305 | |
306 | /* For each credit, allocate enough recv_ctxts for one |
307 | * posted Receive and one RPC in process. |
308 | */ |
309 | total = (rdma->sc_max_requests * 2) + rdma->sc_recv_batch; |
310 | while (total--) { |
311 | struct svc_rdma_recv_ctxt *ctxt; |
312 | |
313 | ctxt = svc_rdma_recv_ctxt_alloc(rdma); |
314 | if (!ctxt) |
315 | return false; |
316 | llist_add(new: &ctxt->rc_node, head: &rdma->sc_recv_ctxts); |
317 | } |
318 | |
319 | return svc_rdma_refresh_recvs(rdma, wanted: rdma->sc_max_requests); |
320 | } |
321 | |
322 | /** |
323 | * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC |
324 | * @cq: Completion Queue context |
325 | * @wc: Work Completion object |
326 | * |
327 | */ |
328 | static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) |
329 | { |
330 | struct svcxprt_rdma *rdma = cq->cq_context; |
331 | struct ib_cqe *cqe = wc->wr_cqe; |
332 | struct svc_rdma_recv_ctxt *ctxt; |
333 | |
334 | rdma->sc_pending_recvs--; |
335 | |
336 | /* WARNING: Only wc->wr_cqe and wc->status are reliable */ |
337 | ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe); |
338 | |
339 | if (wc->status != IB_WC_SUCCESS) |
340 | goto flushed; |
341 | trace_svcrdma_wc_recv(wc, cid: &ctxt->rc_cid); |
342 | |
343 | /* If receive posting fails, the connection is about to be |
344 | * lost anyway. The server will not be able to send a reply |
345 | * for this RPC, and the client will retransmit this RPC |
346 | * anyway when it reconnects. |
347 | * |
348 | * Therefore we drop the Receive, even if status was SUCCESS |
349 | * to reduce the likelihood of replayed requests once the |
350 | * client reconnects. |
351 | */ |
352 | if (rdma->sc_pending_recvs < rdma->sc_max_requests) |
353 | if (!svc_rdma_refresh_recvs(rdma, wanted: rdma->sc_recv_batch)) |
354 | goto dropped; |
355 | |
356 | /* All wc fields are now known to be valid */ |
357 | ctxt->rc_byte_len = wc->byte_len; |
358 | |
359 | spin_lock(lock: &rdma->sc_rq_dto_lock); |
360 | list_add_tail(new: &ctxt->rc_list, head: &rdma->sc_rq_dto_q); |
361 | /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */ |
362 | set_bit(nr: XPT_DATA, addr: &rdma->sc_xprt.xpt_flags); |
363 | spin_unlock(lock: &rdma->sc_rq_dto_lock); |
364 | if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags)) |
365 | svc_xprt_enqueue(xprt: &rdma->sc_xprt); |
366 | return; |
367 | |
368 | flushed: |
369 | if (wc->status == IB_WC_WR_FLUSH_ERR) |
370 | trace_svcrdma_wc_recv_flush(wc, cid: &ctxt->rc_cid); |
371 | else |
372 | trace_svcrdma_wc_recv_err(wc, cid: &ctxt->rc_cid); |
373 | dropped: |
374 | svc_rdma_recv_ctxt_put(rdma, ctxt); |
375 | svc_xprt_deferred_close(xprt: &rdma->sc_xprt); |
376 | } |
377 | |
378 | /** |
379 | * svc_rdma_flush_recv_queues - Drain pending Receive work |
380 | * @rdma: svcxprt_rdma being shut down |
381 | * |
382 | */ |
383 | void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma) |
384 | { |
385 | struct svc_rdma_recv_ctxt *ctxt; |
386 | |
387 | while ((ctxt = svc_rdma_next_recv_ctxt(list: &rdma->sc_read_complete_q))) { |
388 | list_del(entry: &ctxt->rc_list); |
389 | svc_rdma_recv_ctxt_put(rdma, ctxt); |
390 | } |
391 | while ((ctxt = svc_rdma_next_recv_ctxt(list: &rdma->sc_rq_dto_q))) { |
392 | list_del(entry: &ctxt->rc_list); |
393 | svc_rdma_recv_ctxt_put(rdma, ctxt); |
394 | } |
395 | } |
396 | |
397 | static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, |
398 | struct svc_rdma_recv_ctxt *ctxt) |
399 | { |
400 | struct xdr_buf *arg = &rqstp->rq_arg; |
401 | |
402 | arg->head[0].iov_base = ctxt->rc_recv_buf; |
403 | arg->head[0].iov_len = ctxt->rc_byte_len; |
404 | arg->tail[0].iov_base = NULL; |
405 | arg->tail[0].iov_len = 0; |
406 | arg->page_len = 0; |
407 | arg->page_base = 0; |
408 | arg->buflen = ctxt->rc_byte_len; |
409 | arg->len = ctxt->rc_byte_len; |
410 | } |
411 | |
412 | /** |
413 | * xdr_count_read_segments - Count number of Read segments in Read list |
414 | * @rctxt: Ingress receive context |
415 | * @p: Start of an un-decoded Read list |
416 | * |
417 | * Before allocating anything, ensure the ingress Read list is safe |
418 | * to use. |
419 | * |
420 | * The segment count is limited to how many segments can fit in the |
421 | * transport header without overflowing the buffer. That's about 40 |
422 | * Read segments for a 1KB inline threshold. |
423 | * |
424 | * Return values: |
425 | * %true: Read list is valid. @rctxt's xdr_stream is updated to point |
426 | * to the first byte past the Read list. rc_read_pcl and |
427 | * rc_call_pcl cl_count fields are set to the number of |
428 | * Read segments in the list. |
429 | * %false: Read list is corrupt. @rctxt's xdr_stream is left in an |
430 | * unknown state. |
431 | */ |
432 | static bool xdr_count_read_segments(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) |
433 | { |
434 | rctxt->rc_call_pcl.cl_count = 0; |
435 | rctxt->rc_read_pcl.cl_count = 0; |
436 | while (xdr_item_is_present(p)) { |
437 | u32 position, handle, length; |
438 | u64 offset; |
439 | |
440 | p = xdr_inline_decode(xdr: &rctxt->rc_stream, |
441 | nbytes: rpcrdma_readseg_maxsz * sizeof(*p)); |
442 | if (!p) |
443 | return false; |
444 | |
445 | xdr_decode_read_segment(p, position: &position, handle: &handle, |
446 | length: &length, offset: &offset); |
447 | if (position) { |
448 | if (position & 3) |
449 | return false; |
450 | ++rctxt->rc_read_pcl.cl_count; |
451 | } else { |
452 | ++rctxt->rc_call_pcl.cl_count; |
453 | } |
454 | |
455 | p = xdr_inline_decode(xdr: &rctxt->rc_stream, nbytes: sizeof(*p)); |
456 | if (!p) |
457 | return false; |
458 | } |
459 | return true; |
460 | } |
461 | |
462 | /* Sanity check the Read list. |
463 | * |
464 | * Sanity checks: |
465 | * - Read list does not overflow Receive buffer. |
466 | * - Chunk size limited by largest NFS data payload. |
467 | * |
468 | * Return values: |
469 | * %true: Read list is valid. @rctxt's xdr_stream is updated |
470 | * to point to the first byte past the Read list. |
471 | * %false: Read list is corrupt. @rctxt's xdr_stream is left |
472 | * in an unknown state. |
473 | */ |
474 | static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt) |
475 | { |
476 | __be32 *p; |
477 | |
478 | p = xdr_inline_decode(xdr: &rctxt->rc_stream, nbytes: sizeof(*p)); |
479 | if (!p) |
480 | return false; |
481 | if (!xdr_count_read_segments(rctxt, p)) |
482 | return false; |
483 | if (!pcl_alloc_call(rctxt, p)) |
484 | return false; |
485 | return pcl_alloc_read(rctxt, p); |
486 | } |
487 | |
488 | static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt) |
489 | { |
490 | u32 segcount; |
491 | __be32 *p; |
492 | |
493 | if (xdr_stream_decode_u32(xdr: &rctxt->rc_stream, ptr: &segcount)) |
494 | return false; |
495 | |
496 | /* A bogus segcount causes this buffer overflow check to fail. */ |
497 | p = xdr_inline_decode(xdr: &rctxt->rc_stream, |
498 | nbytes: segcount * rpcrdma_segment_maxsz * sizeof(*p)); |
499 | return p != NULL; |
500 | } |
501 | |
502 | /** |
503 | * xdr_count_write_chunks - Count number of Write chunks in Write list |
504 | * @rctxt: Received header and decoding state |
505 | * @p: start of an un-decoded Write list |
506 | * |
507 | * Before allocating anything, ensure the ingress Write list is |
508 | * safe to use. |
509 | * |
510 | * Return values: |
511 | * %true: Write list is valid. @rctxt's xdr_stream is updated |
512 | * to point to the first byte past the Write list, and |
513 | * the number of Write chunks is in rc_write_pcl.cl_count. |
514 | * %false: Write list is corrupt. @rctxt's xdr_stream is left |
515 | * in an indeterminate state. |
516 | */ |
517 | static bool xdr_count_write_chunks(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) |
518 | { |
519 | rctxt->rc_write_pcl.cl_count = 0; |
520 | while (xdr_item_is_present(p)) { |
521 | if (!xdr_check_write_chunk(rctxt)) |
522 | return false; |
523 | ++rctxt->rc_write_pcl.cl_count; |
524 | p = xdr_inline_decode(xdr: &rctxt->rc_stream, nbytes: sizeof(*p)); |
525 | if (!p) |
526 | return false; |
527 | } |
528 | return true; |
529 | } |
530 | |
531 | /* Sanity check the Write list. |
532 | * |
533 | * Implementation limits: |
534 | * - This implementation currently supports only one Write chunk. |
535 | * |
536 | * Sanity checks: |
537 | * - Write list does not overflow Receive buffer. |
538 | * - Chunk size limited by largest NFS data payload. |
539 | * |
540 | * Return values: |
541 | * %true: Write list is valid. @rctxt's xdr_stream is updated |
542 | * to point to the first byte past the Write list. |
543 | * %false: Write list is corrupt. @rctxt's xdr_stream is left |
544 | * in an unknown state. |
545 | */ |
546 | static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt) |
547 | { |
548 | __be32 *p; |
549 | |
550 | p = xdr_inline_decode(xdr: &rctxt->rc_stream, nbytes: sizeof(*p)); |
551 | if (!p) |
552 | return false; |
553 | if (!xdr_count_write_chunks(rctxt, p)) |
554 | return false; |
555 | if (!pcl_alloc_write(rctxt, pcl: &rctxt->rc_write_pcl, p)) |
556 | return false; |
557 | |
558 | rctxt->rc_cur_result_payload = pcl_first_chunk(pcl: &rctxt->rc_write_pcl); |
559 | return true; |
560 | } |
561 | |
562 | /* Sanity check the Reply chunk. |
563 | * |
564 | * Sanity checks: |
565 | * - Reply chunk does not overflow Receive buffer. |
566 | * - Chunk size limited by largest NFS data payload. |
567 | * |
568 | * Return values: |
569 | * %true: Reply chunk is valid. @rctxt's xdr_stream is updated |
570 | * to point to the first byte past the Reply chunk. |
571 | * %false: Reply chunk is corrupt. @rctxt's xdr_stream is left |
572 | * in an unknown state. |
573 | */ |
574 | static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt) |
575 | { |
576 | __be32 *p; |
577 | |
578 | p = xdr_inline_decode(xdr: &rctxt->rc_stream, nbytes: sizeof(*p)); |
579 | if (!p) |
580 | return false; |
581 | |
582 | if (!xdr_item_is_present(p)) |
583 | return true; |
584 | if (!xdr_check_write_chunk(rctxt)) |
585 | return false; |
586 | |
587 | rctxt->rc_reply_pcl.cl_count = 1; |
588 | return pcl_alloc_write(rctxt, pcl: &rctxt->rc_reply_pcl, p); |
589 | } |
590 | |
591 | /* RPC-over-RDMA Version One private extension: Remote Invalidation. |
592 | * Responder's choice: requester signals it can handle Send With |
593 | * Invalidate, and responder chooses one R_key to invalidate. |
594 | * |
595 | * If there is exactly one distinct R_key in the received transport |
596 | * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero. |
597 | */ |
598 | static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma, |
599 | struct svc_rdma_recv_ctxt *ctxt) |
600 | { |
601 | struct svc_rdma_segment *segment; |
602 | struct svc_rdma_chunk *chunk; |
603 | u32 inv_rkey; |
604 | |
605 | ctxt->rc_inv_rkey = 0; |
606 | |
607 | if (!rdma->sc_snd_w_inv) |
608 | return; |
609 | |
610 | inv_rkey = 0; |
611 | pcl_for_each_chunk(chunk, &ctxt->rc_call_pcl) { |
612 | pcl_for_each_segment(segment, chunk) { |
613 | if (inv_rkey == 0) |
614 | inv_rkey = segment->rs_handle; |
615 | else if (inv_rkey != segment->rs_handle) |
616 | return; |
617 | } |
618 | } |
619 | pcl_for_each_chunk(chunk, &ctxt->rc_read_pcl) { |
620 | pcl_for_each_segment(segment, chunk) { |
621 | if (inv_rkey == 0) |
622 | inv_rkey = segment->rs_handle; |
623 | else if (inv_rkey != segment->rs_handle) |
624 | return; |
625 | } |
626 | } |
627 | pcl_for_each_chunk(chunk, &ctxt->rc_write_pcl) { |
628 | pcl_for_each_segment(segment, chunk) { |
629 | if (inv_rkey == 0) |
630 | inv_rkey = segment->rs_handle; |
631 | else if (inv_rkey != segment->rs_handle) |
632 | return; |
633 | } |
634 | } |
635 | pcl_for_each_chunk(chunk, &ctxt->rc_reply_pcl) { |
636 | pcl_for_each_segment(segment, chunk) { |
637 | if (inv_rkey == 0) |
638 | inv_rkey = segment->rs_handle; |
639 | else if (inv_rkey != segment->rs_handle) |
640 | return; |
641 | } |
642 | } |
643 | ctxt->rc_inv_rkey = inv_rkey; |
644 | } |
645 | |
646 | /** |
647 | * svc_rdma_xdr_decode_req - Decode the transport header |
648 | * @rq_arg: xdr_buf containing ingress RPC/RDMA message |
649 | * @rctxt: state of decoding |
650 | * |
651 | * On entry, xdr->head[0].iov_base points to first byte of the |
652 | * RPC-over-RDMA transport header. |
653 | * |
654 | * On successful exit, head[0] points to first byte past the |
655 | * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message. |
656 | * |
657 | * The length of the RPC-over-RDMA header is returned. |
658 | * |
659 | * Assumptions: |
660 | * - The transport header is entirely contained in the head iovec. |
661 | */ |
662 | static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg, |
663 | struct svc_rdma_recv_ctxt *rctxt) |
664 | { |
665 | __be32 *p, *rdma_argp; |
666 | unsigned int hdr_len; |
667 | |
668 | rdma_argp = rq_arg->head[0].iov_base; |
669 | xdr_init_decode(xdr: &rctxt->rc_stream, buf: rq_arg, p: rdma_argp, NULL); |
670 | |
671 | p = xdr_inline_decode(xdr: &rctxt->rc_stream, |
672 | nbytes: rpcrdma_fixed_maxsz * sizeof(*p)); |
673 | if (unlikely(!p)) |
674 | goto out_short; |
675 | p++; |
676 | if (*p != rpcrdma_version) |
677 | goto out_version; |
678 | p += 2; |
679 | rctxt->rc_msgtype = *p; |
680 | switch (rctxt->rc_msgtype) { |
681 | case rdma_msg: |
682 | break; |
683 | case rdma_nomsg: |
684 | break; |
685 | case rdma_done: |
686 | goto out_drop; |
687 | case rdma_error: |
688 | goto out_drop; |
689 | default: |
690 | goto out_proc; |
691 | } |
692 | |
693 | if (!xdr_check_read_list(rctxt)) |
694 | goto out_inval; |
695 | if (!xdr_check_write_list(rctxt)) |
696 | goto out_inval; |
697 | if (!xdr_check_reply_chunk(rctxt)) |
698 | goto out_inval; |
699 | |
700 | rq_arg->head[0].iov_base = rctxt->rc_stream.p; |
701 | hdr_len = xdr_stream_pos(xdr: &rctxt->rc_stream); |
702 | rq_arg->head[0].iov_len -= hdr_len; |
703 | rq_arg->len -= hdr_len; |
704 | trace_svcrdma_decode_rqst(ctxt: rctxt, p: rdma_argp, hdrlen: hdr_len); |
705 | return hdr_len; |
706 | |
707 | out_short: |
708 | trace_svcrdma_decode_short_err(ctxt: rctxt, hdrlen: rq_arg->len); |
709 | return -EINVAL; |
710 | |
711 | out_version: |
712 | trace_svcrdma_decode_badvers_err(ctxt: rctxt, p: rdma_argp); |
713 | return -EPROTONOSUPPORT; |
714 | |
715 | out_drop: |
716 | trace_svcrdma_decode_drop_err(ctxt: rctxt, p: rdma_argp); |
717 | return 0; |
718 | |
719 | out_proc: |
720 | trace_svcrdma_decode_badproc_err(ctxt: rctxt, p: rdma_argp); |
721 | return -EINVAL; |
722 | |
723 | out_inval: |
724 | trace_svcrdma_decode_parse_err(ctxt: rctxt, p: rdma_argp); |
725 | return -EINVAL; |
726 | } |
727 | |
728 | static void svc_rdma_send_error(struct svcxprt_rdma *rdma, |
729 | struct svc_rdma_recv_ctxt *rctxt, |
730 | int status) |
731 | { |
732 | struct svc_rdma_send_ctxt *sctxt; |
733 | |
734 | sctxt = svc_rdma_send_ctxt_get(rdma); |
735 | if (!sctxt) |
736 | return; |
737 | svc_rdma_send_error_msg(rdma, sctxt, rctxt, status); |
738 | } |
739 | |
740 | /* By convention, backchannel calls arrive via rdma_msg type |
741 | * messages, and never populate the chunk lists. This makes |
742 | * the RPC/RDMA header small and fixed in size, so it is |
743 | * straightforward to check the RPC header's direction field. |
744 | */ |
745 | static bool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt, |
746 | struct svc_rdma_recv_ctxt *rctxt) |
747 | { |
748 | __be32 *p = rctxt->rc_recv_buf; |
749 | |
750 | if (!xprt->xpt_bc_xprt) |
751 | return false; |
752 | |
753 | if (rctxt->rc_msgtype != rdma_msg) |
754 | return false; |
755 | |
756 | if (!pcl_is_empty(pcl: &rctxt->rc_call_pcl)) |
757 | return false; |
758 | if (!pcl_is_empty(pcl: &rctxt->rc_read_pcl)) |
759 | return false; |
760 | if (!pcl_is_empty(pcl: &rctxt->rc_write_pcl)) |
761 | return false; |
762 | if (!pcl_is_empty(pcl: &rctxt->rc_reply_pcl)) |
763 | return false; |
764 | |
765 | /* RPC call direction */ |
766 | if (*(p + 8) == cpu_to_be32(RPC_CALL)) |
767 | return false; |
768 | |
769 | return true; |
770 | } |
771 | |
772 | /* Finish constructing the RPC Call message in rqstp::rq_arg. |
773 | * |
774 | * The incoming RPC/RDMA message is an RDMA_MSG type message |
775 | * with a single Read chunk (only the upper layer data payload |
776 | * was conveyed via RDMA Read). |
777 | */ |
778 | static void svc_rdma_read_complete_one(struct svc_rqst *rqstp, |
779 | struct svc_rdma_recv_ctxt *ctxt) |
780 | { |
781 | struct svc_rdma_chunk *chunk = pcl_first_chunk(pcl: &ctxt->rc_read_pcl); |
782 | struct xdr_buf *buf = &rqstp->rq_arg; |
783 | unsigned int length; |
784 | |
785 | /* Split the Receive buffer between the head and tail |
786 | * buffers at Read chunk's position. XDR roundup of the |
787 | * chunk is not included in either the pagelist or in |
788 | * the tail. |
789 | */ |
790 | buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; |
791 | buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; |
792 | buf->head[0].iov_len = chunk->ch_position; |
793 | |
794 | /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). |
795 | * |
796 | * If the client already rounded up the chunk length, the |
797 | * length does not change. Otherwise, the length of the page |
798 | * list is increased to include XDR round-up. |
799 | * |
800 | * Currently these chunks always start at page offset 0, |
801 | * thus the rounded-up length never crosses a page boundary. |
802 | */ |
803 | buf->pages = &rqstp->rq_pages[0]; |
804 | length = xdr_align_size(n: chunk->ch_length); |
805 | buf->page_len = length; |
806 | buf->len += length; |
807 | buf->buflen += length; |
808 | } |
809 | |
810 | /* Finish constructing the RPC Call message in rqstp::rq_arg. |
811 | * |
812 | * The incoming RPC/RDMA message is an RDMA_MSG type message |
813 | * with payload in multiple Read chunks and no PZRC. |
814 | */ |
815 | static void svc_rdma_read_complete_multiple(struct svc_rqst *rqstp, |
816 | struct svc_rdma_recv_ctxt *ctxt) |
817 | { |
818 | struct xdr_buf *buf = &rqstp->rq_arg; |
819 | |
820 | buf->len += ctxt->rc_readbytes; |
821 | buf->buflen += ctxt->rc_readbytes; |
822 | |
823 | buf->head[0].iov_base = page_address(rqstp->rq_pages[0]); |
824 | buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, ctxt->rc_readbytes); |
825 | buf->pages = &rqstp->rq_pages[1]; |
826 | buf->page_len = ctxt->rc_readbytes - buf->head[0].iov_len; |
827 | } |
828 | |
829 | /* Finish constructing the RPC Call message in rqstp::rq_arg. |
830 | * |
831 | * The incoming RPC/RDMA message is an RDMA_NOMSG type message |
832 | * (the RPC message body was conveyed via RDMA Read). |
833 | */ |
834 | static void svc_rdma_read_complete_pzrc(struct svc_rqst *rqstp, |
835 | struct svc_rdma_recv_ctxt *ctxt) |
836 | { |
837 | struct xdr_buf *buf = &rqstp->rq_arg; |
838 | |
839 | buf->len += ctxt->rc_readbytes; |
840 | buf->buflen += ctxt->rc_readbytes; |
841 | |
842 | buf->head[0].iov_base = page_address(rqstp->rq_pages[0]); |
843 | buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, ctxt->rc_readbytes); |
844 | buf->pages = &rqstp->rq_pages[1]; |
845 | buf->page_len = ctxt->rc_readbytes - buf->head[0].iov_len; |
846 | } |
847 | |
848 | static noinline void svc_rdma_read_complete(struct svc_rqst *rqstp, |
849 | struct svc_rdma_recv_ctxt *ctxt) |
850 | { |
851 | unsigned int i; |
852 | |
853 | /* Transfer the Read chunk pages into @rqstp.rq_pages, replacing |
854 | * the rq_pages that were already allocated for this rqstp. |
855 | */ |
856 | release_pages(rqstp->rq_respages, nr: ctxt->rc_page_count); |
857 | for (i = 0; i < ctxt->rc_page_count; i++) |
858 | rqstp->rq_pages[i] = ctxt->rc_pages[i]; |
859 | |
860 | /* Update @rqstp's result send buffer to start after the |
861 | * last page in the RDMA Read payload. |
862 | */ |
863 | rqstp->rq_respages = &rqstp->rq_pages[ctxt->rc_page_count]; |
864 | rqstp->rq_next_page = rqstp->rq_respages + 1; |
865 | |
866 | /* Prevent svc_rdma_recv_ctxt_put() from releasing the |
867 | * pages in ctxt::rc_pages a second time. |
868 | */ |
869 | ctxt->rc_page_count = 0; |
870 | |
871 | /* Finish constructing the RPC Call message. The exact |
872 | * procedure for that depends on what kind of RPC/RDMA |
873 | * chunks were provided by the client. |
874 | */ |
875 | rqstp->rq_arg = ctxt->rc_saved_arg; |
876 | if (pcl_is_empty(pcl: &ctxt->rc_call_pcl)) { |
877 | if (ctxt->rc_read_pcl.cl_count == 1) |
878 | svc_rdma_read_complete_one(rqstp, ctxt); |
879 | else |
880 | svc_rdma_read_complete_multiple(rqstp, ctxt); |
881 | } else { |
882 | svc_rdma_read_complete_pzrc(rqstp, ctxt); |
883 | } |
884 | |
885 | trace_svcrdma_read_finished(cid: &ctxt->rc_cid); |
886 | } |
887 | |
888 | /** |
889 | * svc_rdma_recvfrom - Receive an RPC call |
890 | * @rqstp: request structure into which to receive an RPC Call |
891 | * |
892 | * Returns: |
893 | * The positive number of bytes in the RPC Call message, |
894 | * %0 if there were no Calls ready to return, |
895 | * %-EINVAL if the Read chunk data is too large, |
896 | * %-ENOMEM if rdma_rw context pool was exhausted, |
897 | * %-ENOTCONN if posting failed (connection is lost), |
898 | * %-EIO if rdma_rw initialization failed (DMA mapping, etc). |
899 | * |
900 | * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only |
901 | * when there are no remaining ctxt's to process. |
902 | * |
903 | * The next ctxt is removed from the "receive" lists. |
904 | * |
905 | * - If the ctxt completes a Receive, then construct the Call |
906 | * message from the contents of the Receive buffer. |
907 | * |
908 | * - If there are no Read chunks in this message, then finish |
909 | * assembling the Call message and return the number of bytes |
910 | * in the message. |
911 | * |
912 | * - If there are Read chunks in this message, post Read WRs to |
913 | * pull that payload. When the Read WRs complete, build the |
914 | * full message and return the number of bytes in it. |
915 | */ |
916 | int svc_rdma_recvfrom(struct svc_rqst *rqstp) |
917 | { |
918 | struct svc_xprt *xprt = rqstp->rq_xprt; |
919 | struct svcxprt_rdma *rdma_xprt = |
920 | container_of(xprt, struct svcxprt_rdma, sc_xprt); |
921 | struct svc_rdma_recv_ctxt *ctxt; |
922 | int ret; |
923 | |
924 | /* Prevent svc_xprt_release() from releasing pages in rq_pages |
925 | * when returning 0 or an error. |
926 | */ |
927 | rqstp->rq_respages = rqstp->rq_pages; |
928 | rqstp->rq_next_page = rqstp->rq_respages; |
929 | |
930 | rqstp->rq_xprt_ctxt = NULL; |
931 | |
932 | spin_lock(lock: &rdma_xprt->sc_rq_dto_lock); |
933 | ctxt = svc_rdma_next_recv_ctxt(list: &rdma_xprt->sc_read_complete_q); |
934 | if (ctxt) { |
935 | list_del(entry: &ctxt->rc_list); |
936 | spin_unlock(lock: &rdma_xprt->sc_rq_dto_lock); |
937 | svc_xprt_received(xprt); |
938 | svc_rdma_read_complete(rqstp, ctxt); |
939 | goto complete; |
940 | } |
941 | ctxt = svc_rdma_next_recv_ctxt(list: &rdma_xprt->sc_rq_dto_q); |
942 | if (ctxt) |
943 | list_del(entry: &ctxt->rc_list); |
944 | else |
945 | /* No new incoming requests, terminate the loop */ |
946 | clear_bit(nr: XPT_DATA, addr: &xprt->xpt_flags); |
947 | spin_unlock(lock: &rdma_xprt->sc_rq_dto_lock); |
948 | |
949 | /* Unblock the transport for the next receive */ |
950 | svc_xprt_received(xprt); |
951 | if (!ctxt) |
952 | return 0; |
953 | |
954 | percpu_counter_inc(fbc: &svcrdma_stat_recv); |
955 | ib_dma_sync_single_for_cpu(dev: rdma_xprt->sc_pd->device, |
956 | addr: ctxt->rc_recv_sge.addr, size: ctxt->rc_byte_len, |
957 | dir: DMA_FROM_DEVICE); |
958 | svc_rdma_build_arg_xdr(rqstp, ctxt); |
959 | |
960 | ret = svc_rdma_xdr_decode_req(rq_arg: &rqstp->rq_arg, rctxt: ctxt); |
961 | if (ret < 0) |
962 | goto out_err; |
963 | if (ret == 0) |
964 | goto out_drop; |
965 | |
966 | if (svc_rdma_is_reverse_direction_reply(xprt, rctxt: ctxt)) |
967 | goto out_backchannel; |
968 | |
969 | svc_rdma_get_inv_rkey(rdma: rdma_xprt, ctxt); |
970 | |
971 | if (!pcl_is_empty(pcl: &ctxt->rc_read_pcl) || |
972 | !pcl_is_empty(pcl: &ctxt->rc_call_pcl)) |
973 | goto out_readlist; |
974 | |
975 | complete: |
976 | rqstp->rq_xprt_ctxt = ctxt; |
977 | rqstp->rq_prot = IPPROTO_MAX; |
978 | svc_xprt_copy_addrs(rqstp, xprt); |
979 | set_bit(nr: RQ_SECURE, addr: &rqstp->rq_flags); |
980 | return rqstp->rq_arg.len; |
981 | |
982 | out_err: |
983 | svc_rdma_send_error(rdma: rdma_xprt, rctxt: ctxt, status: ret); |
984 | svc_rdma_recv_ctxt_put(rdma: rdma_xprt, ctxt); |
985 | return 0; |
986 | |
987 | out_readlist: |
988 | /* This @rqstp is about to be recycled. Save the work |
989 | * already done constructing the Call message in rq_arg |
990 | * so it can be restored when the RDMA Reads have |
991 | * completed. |
992 | */ |
993 | ctxt->rc_saved_arg = rqstp->rq_arg; |
994 | |
995 | ret = svc_rdma_process_read_list(rdma: rdma_xprt, rqstp, head: ctxt); |
996 | if (ret < 0) { |
997 | if (ret == -EINVAL) |
998 | svc_rdma_send_error(rdma: rdma_xprt, rctxt: ctxt, status: ret); |
999 | svc_rdma_recv_ctxt_put(rdma: rdma_xprt, ctxt); |
1000 | svc_xprt_deferred_close(xprt); |
1001 | return ret; |
1002 | } |
1003 | return 0; |
1004 | |
1005 | out_backchannel: |
1006 | svc_rdma_handle_bc_reply(rqstp, rctxt: ctxt); |
1007 | out_drop: |
1008 | svc_rdma_recv_ctxt_put(rdma: rdma_xprt, ctxt); |
1009 | return 0; |
1010 | } |
1011 | |