1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
4 *
5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
6 */
7
8#include <rdma/rw.h>
9
10#include <linux/sunrpc/xdr.h>
11#include <linux/sunrpc/rpc_rdma.h>
12#include <linux/sunrpc/svc_rdma.h>
13
14#include "xprt_rdma.h"
15#include <trace/events/rpcrdma.h>
16
17static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
18static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
19
20/* Each R/W context contains state for one chain of RDMA Read or
21 * Write Work Requests.
22 *
23 * Each WR chain handles a single contiguous server-side buffer,
24 * because scatterlist entries after the first have to start on
25 * page alignment. xdr_buf iovecs cannot guarantee alignment.
26 *
27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
28 * from a client may contain a unique R_key, so each WR chain moves
29 * up to one segment at a time.
30 *
31 * The scatterlist makes this data structure over 4KB in size. To
32 * make it less likely to fail, and to handle the allocation for
33 * smaller I/O requests without disabling bottom-halves, these
34 * contexts are created on demand, but cached and reused until the
35 * controlling svcxprt_rdma is destroyed.
36 */
37struct svc_rdma_rw_ctxt {
38 struct llist_node rw_node;
39 struct list_head rw_list;
40 struct rdma_rw_ctx rw_ctx;
41 unsigned int rw_nents;
42 unsigned int rw_first_sgl_nents;
43 struct sg_table rw_sg_table;
44 struct scatterlist rw_first_sgl[];
45};
46
47static inline struct svc_rdma_rw_ctxt *
48svc_rdma_next_ctxt(struct list_head *list)
49{
50 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
51 rw_list);
52}
53
54static struct svc_rdma_rw_ctxt *
55svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
56{
57 struct ib_device *dev = rdma->sc_cm_id->device;
58 unsigned int first_sgl_nents = dev->attrs.max_send_sge;
59 struct svc_rdma_rw_ctxt *ctxt;
60 struct llist_node *node;
61
62 spin_lock(lock: &rdma->sc_rw_ctxt_lock);
63 node = llist_del_first(head: &rdma->sc_rw_ctxts);
64 spin_unlock(lock: &rdma->sc_rw_ctxt_lock);
65 if (node) {
66 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
67 } else {
68 ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents),
69 GFP_KERNEL, node: ibdev_to_node(ibdev: dev));
70 if (!ctxt)
71 goto out_noctx;
72
73 INIT_LIST_HEAD(list: &ctxt->rw_list);
74 ctxt->rw_first_sgl_nents = first_sgl_nents;
75 }
76
77 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
78 if (sg_alloc_table_chained(table: &ctxt->rw_sg_table, nents: sges,
79 first_chunk: ctxt->rw_sg_table.sgl,
80 nents_first_chunk: first_sgl_nents))
81 goto out_free;
82 return ctxt;
83
84out_free:
85 kfree(objp: ctxt);
86out_noctx:
87 trace_svcrdma_rwctx_empty(rdma, num_sges: sges);
88 return NULL;
89}
90
91static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt,
92 struct llist_head *list)
93{
94 sg_free_table_chained(table: &ctxt->rw_sg_table, nents_first_chunk: ctxt->rw_first_sgl_nents);
95 llist_add(new: &ctxt->rw_node, head: list);
96}
97
98static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
99 struct svc_rdma_rw_ctxt *ctxt)
100{
101 __svc_rdma_put_rw_ctxt(ctxt, list: &rdma->sc_rw_ctxts);
102}
103
104/**
105 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
106 * @rdma: transport about to be destroyed
107 *
108 */
109void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
110{
111 struct svc_rdma_rw_ctxt *ctxt;
112 struct llist_node *node;
113
114 while ((node = llist_del_first(head: &rdma->sc_rw_ctxts)) != NULL) {
115 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
116 kfree(objp: ctxt);
117 }
118}
119
120/**
121 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
122 * @rdma: controlling transport instance
123 * @ctxt: R/W context to prepare
124 * @offset: RDMA offset
125 * @handle: RDMA tag/handle
126 * @direction: I/O direction
127 *
128 * Returns on success, the number of WQEs that will be needed
129 * on the workqueue, or a negative errno.
130 */
131static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
132 struct svc_rdma_rw_ctxt *ctxt,
133 u64 offset, u32 handle,
134 enum dma_data_direction direction)
135{
136 int ret;
137
138 ret = rdma_rw_ctx_init(ctx: &ctxt->rw_ctx, qp: rdma->sc_qp, port_num: rdma->sc_port_num,
139 sg: ctxt->rw_sg_table.sgl, sg_cnt: ctxt->rw_nents,
140 sg_offset: 0, remote_addr: offset, rkey: handle, dir: direction);
141 if (unlikely(ret < 0)) {
142 trace_svcrdma_dma_map_rw_err(rdma, offset, handle,
143 nents: ctxt->rw_nents, status: ret);
144 svc_rdma_put_rw_ctxt(rdma, ctxt);
145 }
146 return ret;
147}
148
149/**
150 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt
151 * @rdma: controlling transport instance
152 * @cc: svc_rdma_chunk_ctxt to be initialized
153 */
154void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
155 struct svc_rdma_chunk_ctxt *cc)
156{
157 struct rpc_rdma_cid *cid = &cc->cc_cid;
158
159 if (unlikely(!cid->ci_completion_id))
160 svc_rdma_send_cid_init(rdma, cid);
161
162 INIT_LIST_HEAD(list: &cc->cc_rwctxts);
163 cc->cc_sqecount = 0;
164}
165
166/**
167 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
168 * @rdma: controlling transport instance
169 * @cc: svc_rdma_chunk_ctxt to be released
170 * @dir: DMA direction
171 */
172void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
173 struct svc_rdma_chunk_ctxt *cc,
174 enum dma_data_direction dir)
175{
176 struct llist_node *first, *last;
177 struct svc_rdma_rw_ctxt *ctxt;
178 LLIST_HEAD(free);
179
180 trace_svcrdma_cc_release(cid: &cc->cc_cid, sqecount: cc->cc_sqecount);
181
182 first = last = NULL;
183 while ((ctxt = svc_rdma_next_ctxt(list: &cc->cc_rwctxts)) != NULL) {
184 list_del(entry: &ctxt->rw_list);
185
186 rdma_rw_ctx_destroy(ctx: &ctxt->rw_ctx, qp: rdma->sc_qp,
187 port_num: rdma->sc_port_num, sg: ctxt->rw_sg_table.sgl,
188 sg_cnt: ctxt->rw_nents, dir);
189 __svc_rdma_put_rw_ctxt(ctxt, list: &free);
190
191 ctxt->rw_node.next = first;
192 first = &ctxt->rw_node;
193 if (!last)
194 last = first;
195 }
196 if (first)
197 llist_add_batch(new_first: first, new_last: last, head: &rdma->sc_rw_ctxts);
198}
199
200static struct svc_rdma_write_info *
201svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
202 const struct svc_rdma_chunk *chunk)
203{
204 struct svc_rdma_write_info *info;
205
206 info = kzalloc_node(size: sizeof(*info), GFP_KERNEL,
207 node: ibdev_to_node(ibdev: rdma->sc_cm_id->device));
208 if (!info)
209 return info;
210
211 info->wi_rdma = rdma;
212 info->wi_chunk = chunk;
213 svc_rdma_cc_init(rdma, cc: &info->wi_cc);
214 info->wi_cc.cc_cqe.done = svc_rdma_write_done;
215 return info;
216}
217
218static void svc_rdma_write_info_free_async(struct work_struct *work)
219{
220 struct svc_rdma_write_info *info;
221
222 info = container_of(work, struct svc_rdma_write_info, wi_work);
223 svc_rdma_cc_release(rdma: info->wi_rdma, cc: &info->wi_cc, dir: DMA_TO_DEVICE);
224 kfree(objp: info);
225}
226
227static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
228{
229 INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async);
230 queue_work(wq: svcrdma_wq, work: &info->wi_work);
231}
232
233/**
234 * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
235 * @rdma: controlling transport
236 * @ctxt: Send context that is being released
237 */
238void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
239 struct svc_rdma_send_ctxt *ctxt)
240{
241 struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc;
242
243 if (!cc->cc_sqecount)
244 return;
245 svc_rdma_cc_release(rdma, cc, dir: DMA_TO_DEVICE);
246}
247
248/**
249 * svc_rdma_reply_done - Reply chunk Write completion handler
250 * @cq: controlling Completion Queue
251 * @wc: Work Completion report
252 *
253 * Pages under I/O are released by a subsequent Send completion.
254 */
255static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
256{
257 struct ib_cqe *cqe = wc->wr_cqe;
258 struct svc_rdma_chunk_ctxt *cc =
259 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
260 struct svcxprt_rdma *rdma = cq->cq_context;
261
262 switch (wc->status) {
263 case IB_WC_SUCCESS:
264 trace_svcrdma_wc_reply(cid: &cc->cc_cid);
265 return;
266 case IB_WC_WR_FLUSH_ERR:
267 trace_svcrdma_wc_reply_flush(wc, cid: &cc->cc_cid);
268 break;
269 default:
270 trace_svcrdma_wc_reply_err(wc, cid: &cc->cc_cid);
271 }
272
273 svc_xprt_deferred_close(xprt: &rdma->sc_xprt);
274}
275
276/**
277 * svc_rdma_write_done - Write chunk completion
278 * @cq: controlling Completion Queue
279 * @wc: Work Completion
280 *
281 * Pages under I/O are freed by a subsequent Send completion.
282 */
283static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
284{
285 struct svcxprt_rdma *rdma = cq->cq_context;
286 struct ib_cqe *cqe = wc->wr_cqe;
287 struct svc_rdma_chunk_ctxt *cc =
288 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
289 struct svc_rdma_write_info *info =
290 container_of(cc, struct svc_rdma_write_info, wi_cc);
291
292 switch (wc->status) {
293 case IB_WC_SUCCESS:
294 trace_svcrdma_wc_write(cid: &cc->cc_cid);
295 break;
296 case IB_WC_WR_FLUSH_ERR:
297 trace_svcrdma_wc_write_flush(wc, cid: &cc->cc_cid);
298 break;
299 default:
300 trace_svcrdma_wc_write_err(wc, cid: &cc->cc_cid);
301 }
302
303 svc_rdma_wake_send_waiters(rdma, avail: cc->cc_sqecount);
304
305 if (unlikely(wc->status != IB_WC_SUCCESS))
306 svc_xprt_deferred_close(xprt: &rdma->sc_xprt);
307
308 svc_rdma_write_info_free(info);
309}
310
311/**
312 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
313 * @cq: controlling Completion Queue
314 * @wc: Work Completion
315 *
316 */
317static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
318{
319 struct svcxprt_rdma *rdma = cq->cq_context;
320 struct ib_cqe *cqe = wc->wr_cqe;
321 struct svc_rdma_chunk_ctxt *cc =
322 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
323 struct svc_rdma_recv_ctxt *ctxt;
324
325 svc_rdma_wake_send_waiters(rdma, avail: cc->cc_sqecount);
326
327 ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
328 switch (wc->status) {
329 case IB_WC_SUCCESS:
330 trace_svcrdma_wc_read(wc, cid: &cc->cc_cid, totalbytes: ctxt->rc_readbytes,
331 posttime: cc->cc_posttime);
332
333 spin_lock(lock: &rdma->sc_rq_dto_lock);
334 list_add_tail(new: &ctxt->rc_list, head: &rdma->sc_read_complete_q);
335 /* the unlock pairs with the smp_rmb in svc_xprt_ready */
336 set_bit(nr: XPT_DATA, addr: &rdma->sc_xprt.xpt_flags);
337 spin_unlock(lock: &rdma->sc_rq_dto_lock);
338 svc_xprt_enqueue(xprt: &rdma->sc_xprt);
339 return;
340 case IB_WC_WR_FLUSH_ERR:
341 trace_svcrdma_wc_read_flush(wc, cid: &cc->cc_cid);
342 break;
343 default:
344 trace_svcrdma_wc_read_err(wc, cid: &cc->cc_cid);
345 }
346
347 /* The RDMA Read has flushed, so the incoming RPC message
348 * cannot be constructed and must be dropped. Signal the
349 * loss to the client by closing the connection.
350 */
351 svc_rdma_cc_release(rdma, cc, dir: DMA_FROM_DEVICE);
352 svc_rdma_recv_ctxt_put(rdma, ctxt);
353 svc_xprt_deferred_close(xprt: &rdma->sc_xprt);
354}
355
356/*
357 * Assumptions:
358 * - If ib_post_send() succeeds, only one completion is expected,
359 * even if one or more WRs are flushed. This is true when posting
360 * an rdma_rw_ctx or when posting a single signaled WR.
361 */
362static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma,
363 struct svc_rdma_chunk_ctxt *cc)
364{
365 struct ib_send_wr *first_wr;
366 const struct ib_send_wr *bad_wr;
367 struct list_head *tmp;
368 struct ib_cqe *cqe;
369 int ret;
370
371 might_sleep();
372
373 if (cc->cc_sqecount > rdma->sc_sq_depth)
374 return -EINVAL;
375
376 first_wr = NULL;
377 cqe = &cc->cc_cqe;
378 list_for_each(tmp, &cc->cc_rwctxts) {
379 struct svc_rdma_rw_ctxt *ctxt;
380
381 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
382 first_wr = rdma_rw_ctx_wrs(ctx: &ctxt->rw_ctx, qp: rdma->sc_qp,
383 port_num: rdma->sc_port_num, cqe, chain_wr: first_wr);
384 cqe = NULL;
385 }
386
387 do {
388 if (atomic_sub_return(i: cc->cc_sqecount,
389 v: &rdma->sc_sq_avail) > 0) {
390 cc->cc_posttime = ktime_get();
391 ret = ib_post_send(qp: rdma->sc_qp, send_wr: first_wr, bad_send_wr: &bad_wr);
392 if (ret)
393 break;
394 return 0;
395 }
396
397 percpu_counter_inc(fbc: &svcrdma_stat_sq_starve);
398 trace_svcrdma_sq_full(rdma, cid: &cc->cc_cid);
399 atomic_add(i: cc->cc_sqecount, v: &rdma->sc_sq_avail);
400 wait_event(rdma->sc_send_wait,
401 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
402 trace_svcrdma_sq_retry(rdma, cid: &cc->cc_cid);
403 } while (1);
404
405 trace_svcrdma_sq_post_err(rdma, cid: &cc->cc_cid, status: ret);
406 svc_xprt_deferred_close(xprt: &rdma->sc_xprt);
407
408 /* If even one was posted, there will be a completion. */
409 if (bad_wr != first_wr)
410 return 0;
411
412 atomic_add(i: cc->cc_sqecount, v: &rdma->sc_sq_avail);
413 wake_up(&rdma->sc_send_wait);
414 return -ENOTCONN;
415}
416
417/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
418 */
419static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
420 unsigned int len,
421 struct svc_rdma_rw_ctxt *ctxt)
422{
423 struct scatterlist *sg = ctxt->rw_sg_table.sgl;
424
425 sg_set_buf(sg: &sg[0], buf: info->wi_base, buflen: len);
426 info->wi_base += len;
427
428 ctxt->rw_nents = 1;
429}
430
431/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
432 */
433static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
434 unsigned int remaining,
435 struct svc_rdma_rw_ctxt *ctxt)
436{
437 unsigned int sge_no, sge_bytes, page_off, page_no;
438 const struct xdr_buf *xdr = info->wi_xdr;
439 struct scatterlist *sg;
440 struct page **page;
441
442 page_off = info->wi_next_off + xdr->page_base;
443 page_no = page_off >> PAGE_SHIFT;
444 page_off = offset_in_page(page_off);
445 page = xdr->pages + page_no;
446 info->wi_next_off += remaining;
447 sg = ctxt->rw_sg_table.sgl;
448 sge_no = 0;
449 do {
450 sge_bytes = min_t(unsigned int, remaining,
451 PAGE_SIZE - page_off);
452 sg_set_page(sg, page: *page, len: sge_bytes, offset: page_off);
453
454 remaining -= sge_bytes;
455 sg = sg_next(sg);
456 page_off = 0;
457 sge_no++;
458 page++;
459 } while (remaining);
460
461 ctxt->rw_nents = sge_no;
462}
463
464/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
465 * an RPC Reply.
466 */
467static int
468svc_rdma_build_writes(struct svc_rdma_write_info *info,
469 void (*constructor)(struct svc_rdma_write_info *info,
470 unsigned int len,
471 struct svc_rdma_rw_ctxt *ctxt),
472 unsigned int remaining)
473{
474 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
475 struct svcxprt_rdma *rdma = info->wi_rdma;
476 const struct svc_rdma_segment *seg;
477 struct svc_rdma_rw_ctxt *ctxt;
478 int ret;
479
480 do {
481 unsigned int write_len;
482 u64 offset;
483
484 if (info->wi_seg_no >= info->wi_chunk->ch_segcount)
485 goto out_overflow;
486
487 seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
488 write_len = min(remaining, seg->rs_length - info->wi_seg_off);
489 if (!write_len)
490 goto out_overflow;
491 ctxt = svc_rdma_get_rw_ctxt(rdma,
492 sges: (write_len >> PAGE_SHIFT) + 2);
493 if (!ctxt)
494 return -ENOMEM;
495
496 constructor(info, write_len, ctxt);
497 offset = seg->rs_offset + info->wi_seg_off;
498 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, handle: seg->rs_handle,
499 direction: DMA_TO_DEVICE);
500 if (ret < 0)
501 return -EIO;
502 percpu_counter_inc(fbc: &svcrdma_stat_write);
503
504 list_add(new: &ctxt->rw_list, head: &cc->cc_rwctxts);
505 cc->cc_sqecount += ret;
506 if (write_len == seg->rs_length - info->wi_seg_off) {
507 info->wi_seg_no++;
508 info->wi_seg_off = 0;
509 } else {
510 info->wi_seg_off += write_len;
511 }
512 remaining -= write_len;
513 } while (remaining);
514
515 return 0;
516
517out_overflow:
518 trace_svcrdma_small_wrch_err(cid: &cc->cc_cid, remaining, seg_no: info->wi_seg_no,
519 num_segs: info->wi_chunk->ch_segcount);
520 return -E2BIG;
521}
522
523/**
524 * svc_rdma_iov_write - Construct RDMA Writes from an iov
525 * @info: pointer to write arguments
526 * @iov: kvec to write
527 *
528 * Returns:
529 * On success, returns zero
530 * %-E2BIG if the client-provided Write chunk is too small
531 * %-ENOMEM if a resource has been exhausted
532 * %-EIO if an rdma-rw error occurred
533 */
534static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
535 const struct kvec *iov)
536{
537 info->wi_base = iov->iov_base;
538 return svc_rdma_build_writes(info, constructor: svc_rdma_vec_to_sg,
539 remaining: iov->iov_len);
540}
541
542/**
543 * svc_rdma_pages_write - Construct RDMA Writes from pages
544 * @info: pointer to write arguments
545 * @xdr: xdr_buf with pages to write
546 * @offset: offset into the content of @xdr
547 * @length: number of bytes to write
548 *
549 * Returns:
550 * On success, returns zero
551 * %-E2BIG if the client-provided Write chunk is too small
552 * %-ENOMEM if a resource has been exhausted
553 * %-EIO if an rdma-rw error occurred
554 */
555static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
556 const struct xdr_buf *xdr,
557 unsigned int offset,
558 unsigned long length)
559{
560 info->wi_xdr = xdr;
561 info->wi_next_off = offset - xdr->head[0].iov_len;
562 return svc_rdma_build_writes(info, constructor: svc_rdma_pagelist_to_sg,
563 remaining: length);
564}
565
566/**
567 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
568 * @xdr: xdr_buf to write
569 * @data: pointer to write arguments
570 *
571 * Returns:
572 * On success, returns zero
573 * %-E2BIG if the client-provided Write chunk is too small
574 * %-ENOMEM if a resource has been exhausted
575 * %-EIO if an rdma-rw error occurred
576 */
577static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
578{
579 struct svc_rdma_write_info *info = data;
580 int ret;
581
582 if (xdr->head[0].iov_len) {
583 ret = svc_rdma_iov_write(info, iov: &xdr->head[0]);
584 if (ret < 0)
585 return ret;
586 }
587
588 if (xdr->page_len) {
589 ret = svc_rdma_pages_write(info, xdr, offset: xdr->head[0].iov_len,
590 length: xdr->page_len);
591 if (ret < 0)
592 return ret;
593 }
594
595 if (xdr->tail[0].iov_len) {
596 ret = svc_rdma_iov_write(info, iov: &xdr->tail[0]);
597 if (ret < 0)
598 return ret;
599 }
600
601 return xdr->len;
602}
603
604static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
605 const struct svc_rdma_chunk *chunk,
606 const struct xdr_buf *xdr)
607{
608 struct svc_rdma_write_info *info;
609 struct svc_rdma_chunk_ctxt *cc;
610 struct xdr_buf payload;
611 int ret;
612
613 if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
614 chunk->ch_payload_length))
615 return -EMSGSIZE;
616
617 info = svc_rdma_write_info_alloc(rdma, chunk);
618 if (!info)
619 return -ENOMEM;
620 cc = &info->wi_cc;
621
622 ret = svc_rdma_xb_write(xdr: &payload, data: info);
623 if (ret != payload.len)
624 goto out_err;
625
626 trace_svcrdma_post_write_chunk(cid: &cc->cc_cid, sqecount: cc->cc_sqecount);
627 ret = svc_rdma_post_chunk_ctxt(rdma, cc);
628 if (ret < 0)
629 goto out_err;
630 return 0;
631
632out_err:
633 svc_rdma_write_info_free(info);
634 return ret;
635}
636
637/**
638 * svc_rdma_send_write_list - Send all chunks on the Write list
639 * @rdma: controlling RDMA transport
640 * @rctxt: Write list provisioned by the client
641 * @xdr: xdr_buf containing an RPC Reply message
642 *
643 * Returns zero on success, or a negative errno if one or more
644 * Write chunks could not be sent.
645 */
646int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
647 const struct svc_rdma_recv_ctxt *rctxt,
648 const struct xdr_buf *xdr)
649{
650 struct svc_rdma_chunk *chunk;
651 int ret;
652
653 pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
654 if (!chunk->ch_payload_length)
655 break;
656 ret = svc_rdma_send_write_chunk(rdma, chunk, xdr);
657 if (ret < 0)
658 return ret;
659 }
660 return 0;
661}
662
663/**
664 * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
665 * @rdma: controlling RDMA transport
666 * @write_pcl: Write chunk list provided by client
667 * @reply_pcl: Reply chunk provided by client
668 * @sctxt: Send WR resources
669 * @xdr: xdr_buf containing an RPC Reply
670 *
671 * Returns a non-negative number of bytes the chunk consumed, or
672 * %-E2BIG if the payload was larger than the Reply chunk,
673 * %-EINVAL if client provided too many segments,
674 * %-ENOMEM if rdma_rw context pool was exhausted,
675 * %-ENOTCONN if posting failed (connection is lost),
676 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
677 */
678int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
679 const struct svc_rdma_pcl *write_pcl,
680 const struct svc_rdma_pcl *reply_pcl,
681 struct svc_rdma_send_ctxt *sctxt,
682 const struct xdr_buf *xdr)
683{
684 struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
685 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
686 struct ib_send_wr *first_wr;
687 struct list_head *pos;
688 struct ib_cqe *cqe;
689 int ret;
690
691 info->wi_rdma = rdma;
692 info->wi_chunk = pcl_first_chunk(pcl: reply_pcl);
693 info->wi_seg_off = 0;
694 info->wi_seg_no = 0;
695 info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
696
697 ret = pcl_process_nonpayloads(pcl: write_pcl, xdr,
698 actor: svc_rdma_xb_write, data: info);
699 if (ret < 0)
700 return ret;
701
702 first_wr = sctxt->sc_wr_chain;
703 cqe = &cc->cc_cqe;
704 list_for_each(pos, &cc->cc_rwctxts) {
705 struct svc_rdma_rw_ctxt *rwc;
706
707 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
708 first_wr = rdma_rw_ctx_wrs(ctx: &rwc->rw_ctx, qp: rdma->sc_qp,
709 port_num: rdma->sc_port_num, cqe, chain_wr: first_wr);
710 cqe = NULL;
711 }
712 sctxt->sc_wr_chain = first_wr;
713 sctxt->sc_sqecount += cc->cc_sqecount;
714
715 trace_svcrdma_post_reply_chunk(cid: &cc->cc_cid, sqecount: cc->cc_sqecount);
716 return xdr->len;
717}
718
719/**
720 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
721 * @rqstp: RPC transaction context
722 * @head: context for ongoing I/O
723 * @segment: co-ordinates of remote memory to be read
724 *
725 * Returns:
726 * %0: the Read WR chain was constructed successfully
727 * %-EINVAL: there were not enough rq_pages to finish
728 * %-ENOMEM: allocating a local resources failed
729 * %-EIO: a DMA mapping error occurred
730 */
731static int svc_rdma_build_read_segment(struct svc_rqst *rqstp,
732 struct svc_rdma_recv_ctxt *head,
733 const struct svc_rdma_segment *segment)
734{
735 struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp);
736 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
737 unsigned int sge_no, seg_len, len;
738 struct svc_rdma_rw_ctxt *ctxt;
739 struct scatterlist *sg;
740 int ret;
741
742 len = segment->rs_length;
743 sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT;
744 ctxt = svc_rdma_get_rw_ctxt(rdma, sges: sge_no);
745 if (!ctxt)
746 return -ENOMEM;
747 ctxt->rw_nents = sge_no;
748
749 sg = ctxt->rw_sg_table.sgl;
750 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
751 seg_len = min_t(unsigned int, len,
752 PAGE_SIZE - head->rc_pageoff);
753
754 if (!head->rc_pageoff)
755 head->rc_page_count++;
756
757 sg_set_page(sg, page: rqstp->rq_pages[head->rc_curpage],
758 len: seg_len, offset: head->rc_pageoff);
759 sg = sg_next(sg);
760
761 head->rc_pageoff += seg_len;
762 if (head->rc_pageoff == PAGE_SIZE) {
763 head->rc_curpage++;
764 head->rc_pageoff = 0;
765 }
766 len -= seg_len;
767
768 if (len && ((head->rc_curpage + 1) > ARRAY_SIZE(rqstp->rq_pages)))
769 goto out_overrun;
770 }
771
772 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset: segment->rs_offset,
773 handle: segment->rs_handle, direction: DMA_FROM_DEVICE);
774 if (ret < 0)
775 return -EIO;
776 percpu_counter_inc(fbc: &svcrdma_stat_read);
777
778 list_add(new: &ctxt->rw_list, head: &cc->cc_rwctxts);
779 cc->cc_sqecount += ret;
780 return 0;
781
782out_overrun:
783 trace_svcrdma_page_overrun_err(cid: &cc->cc_cid, pageno: head->rc_curpage);
784 return -EINVAL;
785}
786
787/**
788 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
789 * @rqstp: RPC transaction context
790 * @head: context for ongoing I/O
791 * @chunk: Read chunk to pull
792 *
793 * Return values:
794 * %0: the Read WR chain was constructed successfully
795 * %-EINVAL: there were not enough resources to finish
796 * %-ENOMEM: allocating a local resources failed
797 * %-EIO: a DMA mapping error occurred
798 */
799static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
800 struct svc_rdma_recv_ctxt *head,
801 const struct svc_rdma_chunk *chunk)
802{
803 const struct svc_rdma_segment *segment;
804 int ret;
805
806 ret = -EINVAL;
807 pcl_for_each_segment(segment, chunk) {
808 ret = svc_rdma_build_read_segment(rqstp, head, segment);
809 if (ret < 0)
810 break;
811 head->rc_readbytes += segment->rs_length;
812 }
813 return ret;
814}
815
816/**
817 * svc_rdma_copy_inline_range - Copy part of the inline content into pages
818 * @rqstp: RPC transaction context
819 * @head: context for ongoing I/O
820 * @offset: offset into the Receive buffer of region to copy
821 * @remaining: length of region to copy
822 *
823 * Take a page at a time from rqstp->rq_pages and copy the inline
824 * content from the Receive buffer into that page. Update
825 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read
826 * result will land contiguously with the copied content.
827 *
828 * Return values:
829 * %0: Inline content was successfully copied
830 * %-EINVAL: offset or length was incorrect
831 */
832static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp,
833 struct svc_rdma_recv_ctxt *head,
834 unsigned int offset,
835 unsigned int remaining)
836{
837 unsigned char *dst, *src = head->rc_recv_buf;
838 unsigned int page_no, numpages;
839
840 numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT;
841 for (page_no = 0; page_no < numpages; page_no++) {
842 unsigned int page_len;
843
844 page_len = min_t(unsigned int, remaining,
845 PAGE_SIZE - head->rc_pageoff);
846
847 if (!head->rc_pageoff)
848 head->rc_page_count++;
849
850 dst = page_address(rqstp->rq_pages[head->rc_curpage]);
851 memcpy(dst + head->rc_curpage, src + offset, page_len);
852
853 head->rc_readbytes += page_len;
854 head->rc_pageoff += page_len;
855 if (head->rc_pageoff == PAGE_SIZE) {
856 head->rc_curpage++;
857 head->rc_pageoff = 0;
858 }
859 remaining -= page_len;
860 offset += page_len;
861 }
862
863 return -EINVAL;
864}
865
866/**
867 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
868 * @rqstp: RPC transaction context
869 * @head: context for ongoing I/O
870 *
871 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
872 * like an incoming TCP call.
873 *
874 * Return values:
875 * %0: RDMA Read WQEs were successfully built
876 * %-EINVAL: client provided too many chunks or segments,
877 * %-ENOMEM: rdma_rw context pool was exhausted,
878 * %-ENOTCONN: posting failed (connection is lost),
879 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
880 */
881static noinline int
882svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
883 struct svc_rdma_recv_ctxt *head)
884{
885 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
886 struct svc_rdma_chunk *chunk, *next;
887 unsigned int start, length;
888 int ret;
889
890 start = 0;
891 chunk = pcl_first_chunk(pcl);
892 length = chunk->ch_position;
893 ret = svc_rdma_copy_inline_range(rqstp, head, offset: start, remaining: length);
894 if (ret < 0)
895 return ret;
896
897 pcl_for_each_chunk(chunk, pcl) {
898 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
899 if (ret < 0)
900 return ret;
901
902 next = pcl_next_chunk(pcl, chunk);
903 if (!next)
904 break;
905
906 start += length;
907 length = next->ch_position - head->rc_readbytes;
908 ret = svc_rdma_copy_inline_range(rqstp, head, offset: start, remaining: length);
909 if (ret < 0)
910 return ret;
911 }
912
913 start += length;
914 length = head->rc_byte_len - start;
915 return svc_rdma_copy_inline_range(rqstp, head, offset: start, remaining: length);
916}
917
918/**
919 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
920 * @rqstp: RPC transaction context
921 * @head: context for ongoing I/O
922 *
923 * The chunk data lands in the page list of rqstp->rq_arg.pages.
924 *
925 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
926 * Therefore, XDR round-up of the Read chunk and trailing
927 * inline content must both be added at the end of the pagelist.
928 *
929 * Return values:
930 * %0: RDMA Read WQEs were successfully built
931 * %-EINVAL: client provided too many chunks or segments,
932 * %-ENOMEM: rdma_rw context pool was exhausted,
933 * %-ENOTCONN: posting failed (connection is lost),
934 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
935 */
936static int svc_rdma_read_data_item(struct svc_rqst *rqstp,
937 struct svc_rdma_recv_ctxt *head)
938{
939 return svc_rdma_build_read_chunk(rqstp, head,
940 chunk: pcl_first_chunk(pcl: &head->rc_read_pcl));
941}
942
943/**
944 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk
945 * @rqstp: RPC transaction context
946 * @head: context for ongoing I/O
947 * @chunk: parsed Call chunk to pull
948 * @offset: offset of region to pull
949 * @length: length of region to pull
950 *
951 * Return values:
952 * %0: RDMA Read WQEs were successfully built
953 * %-EINVAL: there were not enough resources to finish
954 * %-ENOMEM: rdma_rw context pool was exhausted,
955 * %-ENOTCONN: posting failed (connection is lost),
956 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
957 */
958static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp,
959 struct svc_rdma_recv_ctxt *head,
960 const struct svc_rdma_chunk *chunk,
961 unsigned int offset, unsigned int length)
962{
963 const struct svc_rdma_segment *segment;
964 int ret;
965
966 ret = -EINVAL;
967 pcl_for_each_segment(segment, chunk) {
968 struct svc_rdma_segment dummy;
969
970 if (offset > segment->rs_length) {
971 offset -= segment->rs_length;
972 continue;
973 }
974
975 dummy.rs_handle = segment->rs_handle;
976 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
977 dummy.rs_offset = segment->rs_offset + offset;
978
979 ret = svc_rdma_build_read_segment(rqstp, head, segment: &dummy);
980 if (ret < 0)
981 break;
982
983 head->rc_readbytes += dummy.rs_length;
984 length -= dummy.rs_length;
985 offset = 0;
986 }
987 return ret;
988}
989
990/**
991 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
992 * @rqstp: RPC transaction context
993 * @head: context for ongoing I/O
994 *
995 * Return values:
996 * %0: RDMA Read WQEs were successfully built
997 * %-EINVAL: there were not enough resources to finish
998 * %-ENOMEM: rdma_rw context pool was exhausted,
999 * %-ENOTCONN: posting failed (connection is lost),
1000 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1001 */
1002static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
1003 struct svc_rdma_recv_ctxt *head)
1004{
1005 const struct svc_rdma_chunk *call_chunk =
1006 pcl_first_chunk(pcl: &head->rc_call_pcl);
1007 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
1008 struct svc_rdma_chunk *chunk, *next;
1009 unsigned int start, length;
1010 int ret;
1011
1012 if (pcl_is_empty(pcl))
1013 return svc_rdma_build_read_chunk(rqstp, head, chunk: call_chunk);
1014
1015 start = 0;
1016 chunk = pcl_first_chunk(pcl);
1017 length = chunk->ch_position;
1018 ret = svc_rdma_read_chunk_range(rqstp, head, chunk: call_chunk,
1019 offset: start, length);
1020 if (ret < 0)
1021 return ret;
1022
1023 pcl_for_each_chunk(chunk, pcl) {
1024 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
1025 if (ret < 0)
1026 return ret;
1027
1028 next = pcl_next_chunk(pcl, chunk);
1029 if (!next)
1030 break;
1031
1032 start += length;
1033 length = next->ch_position - head->rc_readbytes;
1034 ret = svc_rdma_read_chunk_range(rqstp, head, chunk: call_chunk,
1035 offset: start, length);
1036 if (ret < 0)
1037 return ret;
1038 }
1039
1040 start += length;
1041 length = call_chunk->ch_length - start;
1042 return svc_rdma_read_chunk_range(rqstp, head, chunk: call_chunk,
1043 offset: start, length);
1044}
1045
1046/**
1047 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1048 * @rqstp: RPC transaction context
1049 * @head: context for ongoing I/O
1050 *
1051 * The start of the data lands in the first page just after the
1052 * Transport header, and the rest lands in rqstp->rq_arg.pages.
1053 *
1054 * Assumptions:
1055 * - A PZRC is never sent in an RDMA_MSG message, though it's
1056 * allowed by spec.
1057 *
1058 * Return values:
1059 * %0: RDMA Read WQEs were successfully built
1060 * %-EINVAL: client provided too many chunks or segments,
1061 * %-ENOMEM: rdma_rw context pool was exhausted,
1062 * %-ENOTCONN: posting failed (connection is lost),
1063 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1064 */
1065static noinline int svc_rdma_read_special(struct svc_rqst *rqstp,
1066 struct svc_rdma_recv_ctxt *head)
1067{
1068 return svc_rdma_read_call_chunk(rqstp, head);
1069}
1070
1071/* Pages under I/O have been copied to head->rc_pages. Ensure that
1072 * svc_xprt_release() does not put them when svc_rdma_recvfrom()
1073 * returns. This has to be done after all Read WRs are constructed
1074 * to properly handle a page that happens to be part of I/O on behalf
1075 * of two different RDMA segments.
1076 *
1077 * Note: if the subsequent post_send fails, these pages have already
1078 * been moved to head->rc_pages and thus will be cleaned up by
1079 * svc_rdma_recv_ctxt_put().
1080 */
1081static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
1082 struct svc_rdma_recv_ctxt *head)
1083{
1084 unsigned int i;
1085
1086 for (i = 0; i < head->rc_page_count; i++) {
1087 head->rc_pages[i] = rqstp->rq_pages[i];
1088 rqstp->rq_pages[i] = NULL;
1089 }
1090}
1091
1092/**
1093 * svc_rdma_process_read_list - Pull list of Read chunks from the client
1094 * @rdma: controlling RDMA transport
1095 * @rqstp: set of pages to use as Read sink buffers
1096 * @head: pages under I/O collect here
1097 *
1098 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1099 * pull each Read chunk as they decode an incoming RPC message.
1100 *
1101 * On Linux, however, the server needs to have a fully-constructed RPC
1102 * message in rqstp->rq_arg when there is a positive return code from
1103 * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1104 * it is received, then here the whole Read list is pulled all at once.
1105 * The ingress RPC message is fully reconstructed once all associated
1106 * RDMA Reads have completed.
1107 *
1108 * Return values:
1109 * %1: all needed RDMA Reads were posted successfully,
1110 * %-EINVAL: client provided too many chunks or segments,
1111 * %-ENOMEM: rdma_rw context pool was exhausted,
1112 * %-ENOTCONN: posting failed (connection is lost),
1113 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1114 */
1115int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1116 struct svc_rqst *rqstp,
1117 struct svc_rdma_recv_ctxt *head)
1118{
1119 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
1120 int ret;
1121
1122 cc->cc_cqe.done = svc_rdma_wc_read_done;
1123 cc->cc_sqecount = 0;
1124 head->rc_pageoff = 0;
1125 head->rc_curpage = 0;
1126 head->rc_readbytes = 0;
1127
1128 if (pcl_is_empty(pcl: &head->rc_call_pcl)) {
1129 if (head->rc_read_pcl.cl_count == 1)
1130 ret = svc_rdma_read_data_item(rqstp, head);
1131 else
1132 ret = svc_rdma_read_multiple_chunks(rqstp, head);
1133 } else
1134 ret = svc_rdma_read_special(rqstp, head);
1135 svc_rdma_clear_rqst_pages(rqstp, head);
1136 if (ret < 0)
1137 return ret;
1138
1139 trace_svcrdma_post_read_chunk(cid: &cc->cc_cid, sqecount: cc->cc_sqecount);
1140 ret = svc_rdma_post_chunk_ctxt(rdma, cc);
1141 return ret < 0 ? ret : 1;
1142}
1143

source code of linux/net/sunrpc/xprtrdma/svc_rdma_rw.c