1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2/*
3 * Copyright (c) 2015-2018 Oracle. All rights reserved.
4 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
5 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
6 *
7 * This software is available to you under a choice of one of two
8 * licenses. You may choose to be licensed under the terms of the GNU
9 * General Public License (GPL) Version 2, available from the file
10 * COPYING in the main directory of this source tree, or the BSD-type
11 * license below:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
15 * are met:
16 *
17 * Redistributions of source code must retain the above copyright
18 * notice, this list of conditions and the following disclaimer.
19 *
20 * Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials provided
23 * with the distribution.
24 *
25 * Neither the name of the Network Appliance, Inc. nor the names of
26 * its contributors may be used to endorse or promote products
27 * derived from this software without specific prior written
28 * permission.
29 *
30 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
31 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
32 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
33 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
34 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
36 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
37 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
38 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 *
42 * Author: Tom Tucker <tom@opengridcomputing.com>
43 */
44
45#include <linux/interrupt.h>
46#include <linux/sched.h>
47#include <linux/slab.h>
48#include <linux/spinlock.h>
49#include <linux/workqueue.h>
50#include <linux/export.h>
51
52#include <rdma/ib_verbs.h>
53#include <rdma/rdma_cm.h>
54#include <rdma/rw.h>
55
56#include <linux/sunrpc/addr.h>
57#include <linux/sunrpc/debug.h>
58#include <linux/sunrpc/svc_xprt.h>
59#include <linux/sunrpc/svc_rdma.h>
60
61#include "xprt_rdma.h"
62#include <trace/events/rpcrdma.h>
63
64#define RPCDBG_FACILITY RPCDBG_SVCXPRT
65
66static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
67 struct net *net, int node);
68static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
69 struct net *net,
70 struct sockaddr *sa, int salen,
71 int flags);
72static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
73static void svc_rdma_detach(struct svc_xprt *xprt);
74static void svc_rdma_free(struct svc_xprt *xprt);
75static int svc_rdma_has_wspace(struct svc_xprt *xprt);
76static void svc_rdma_kill_temp_xprt(struct svc_xprt *);
77
78static const struct svc_xprt_ops svc_rdma_ops = {
79 .xpo_create = svc_rdma_create,
80 .xpo_recvfrom = svc_rdma_recvfrom,
81 .xpo_sendto = svc_rdma_sendto,
82 .xpo_result_payload = svc_rdma_result_payload,
83 .xpo_release_ctxt = svc_rdma_release_ctxt,
84 .xpo_detach = svc_rdma_detach,
85 .xpo_free = svc_rdma_free,
86 .xpo_has_wspace = svc_rdma_has_wspace,
87 .xpo_accept = svc_rdma_accept,
88 .xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt,
89};
90
91struct svc_xprt_class svc_rdma_class = {
92 .xcl_name = "rdma",
93 .xcl_owner = THIS_MODULE,
94 .xcl_ops = &svc_rdma_ops,
95 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA,
96 .xcl_ident = XPRT_TRANSPORT_RDMA,
97};
98
99/* QP event handler */
100static void qp_event_handler(struct ib_event *event, void *context)
101{
102 struct svc_xprt *xprt = context;
103
104 trace_svcrdma_qp_error(event, sap: (struct sockaddr *)&xprt->xpt_remote);
105 switch (event->event) {
106 /* These are considered benign events */
107 case IB_EVENT_PATH_MIG:
108 case IB_EVENT_COMM_EST:
109 case IB_EVENT_SQ_DRAINED:
110 case IB_EVENT_QP_LAST_WQE_REACHED:
111 break;
112
113 /* These are considered fatal events */
114 case IB_EVENT_PATH_MIG_ERR:
115 case IB_EVENT_QP_FATAL:
116 case IB_EVENT_QP_REQ_ERR:
117 case IB_EVENT_QP_ACCESS_ERR:
118 case IB_EVENT_DEVICE_FATAL:
119 default:
120 svc_xprt_deferred_close(xprt);
121 break;
122 }
123}
124
125static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
126 struct net *net, int node)
127{
128 static struct lock_class_key svcrdma_rwctx_lock;
129 static struct lock_class_key svcrdma_sctx_lock;
130 static struct lock_class_key svcrdma_dto_lock;
131 struct svcxprt_rdma *cma_xprt;
132
133 cma_xprt = kzalloc_node(size: sizeof(*cma_xprt), GFP_KERNEL, node);
134 if (!cma_xprt)
135 return NULL;
136
137 svc_xprt_init(net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);
138 INIT_LIST_HEAD(list: &cma_xprt->sc_accept_q);
139 INIT_LIST_HEAD(list: &cma_xprt->sc_rq_dto_q);
140 INIT_LIST_HEAD(list: &cma_xprt->sc_read_complete_q);
141 init_llist_head(list: &cma_xprt->sc_send_ctxts);
142 init_llist_head(list: &cma_xprt->sc_recv_ctxts);
143 init_llist_head(list: &cma_xprt->sc_rw_ctxts);
144 init_waitqueue_head(&cma_xprt->sc_send_wait);
145
146 spin_lock_init(&cma_xprt->sc_lock);
147 spin_lock_init(&cma_xprt->sc_rq_dto_lock);
148 lockdep_set_class(&cma_xprt->sc_rq_dto_lock, &svcrdma_dto_lock);
149 spin_lock_init(&cma_xprt->sc_send_lock);
150 lockdep_set_class(&cma_xprt->sc_send_lock, &svcrdma_sctx_lock);
151 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
152 lockdep_set_class(&cma_xprt->sc_rw_ctxt_lock, &svcrdma_rwctx_lock);
153
154 /*
155 * Note that this implies that the underlying transport support
156 * has some form of congestion control (see RFC 7530 section 3.1
157 * paragraph 2). For now, we assume that all supported RDMA
158 * transports are suitable here.
159 */
160 set_bit(nr: XPT_CONG_CTRL, addr: &cma_xprt->sc_xprt.xpt_flags);
161
162 return cma_xprt;
163}
164
165static void
166svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt,
167 struct rdma_conn_param *param)
168{
169 const struct rpcrdma_connect_private *pmsg = param->private_data;
170
171 if (pmsg &&
172 pmsg->cp_magic == rpcrdma_cmp_magic &&
173 pmsg->cp_version == RPCRDMA_CMP_VERSION) {
174 newxprt->sc_snd_w_inv = pmsg->cp_flags &
175 RPCRDMA_CMP_F_SND_W_INV_OK;
176
177 dprintk("svcrdma: client send_size %u, recv_size %u "
178 "remote inv %ssupported\n",
179 rpcrdma_decode_buffer_size(pmsg->cp_send_size),
180 rpcrdma_decode_buffer_size(pmsg->cp_recv_size),
181 newxprt->sc_snd_w_inv ? "" : "un");
182 }
183}
184
185/*
186 * This function handles the CONNECT_REQUEST event on a listening
187 * endpoint. It is passed the cma_id for the _new_ connection. The context in
188 * this cma_id is inherited from the listening cma_id and is the svc_xprt
189 * structure for the listening endpoint.
190 *
191 * This function creates a new xprt for the new connection and enqueues it on
192 * the accept queue for the listent xprt. When the listen thread is kicked, it
193 * will call the recvfrom method on the listen xprt which will accept the new
194 * connection.
195 */
196static void handle_connect_req(struct rdma_cm_id *new_cma_id,
197 struct rdma_conn_param *param)
198{
199 struct svcxprt_rdma *listen_xprt = new_cma_id->context;
200 struct svcxprt_rdma *newxprt;
201 struct sockaddr *sa;
202
203 newxprt = svc_rdma_create_xprt(serv: listen_xprt->sc_xprt.xpt_server,
204 net: listen_xprt->sc_xprt.xpt_net,
205 node: ibdev_to_node(ibdev: new_cma_id->device));
206 if (!newxprt)
207 return;
208 newxprt->sc_cm_id = new_cma_id;
209 new_cma_id->context = newxprt;
210 svc_rdma_parse_connect_private(newxprt, param);
211
212 /* Save client advertised inbound read limit for use later in accept. */
213 newxprt->sc_ord = param->initiator_depth;
214
215 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
216 newxprt->sc_xprt.xpt_remotelen = svc_addr_len(sa);
217 memcpy(&newxprt->sc_xprt.xpt_remote, sa,
218 newxprt->sc_xprt.xpt_remotelen);
219 snprintf(buf: newxprt->sc_xprt.xpt_remotebuf,
220 size: sizeof(newxprt->sc_xprt.xpt_remotebuf) - 1, fmt: "%pISc", sa);
221
222 /* The remote port is arbitrary and not under the control of the
223 * client ULP. Set it to a fixed value so that the DRC continues
224 * to be effective after a reconnect.
225 */
226 rpc_set_port(sap: (struct sockaddr *)&newxprt->sc_xprt.xpt_remote, port: 0);
227
228 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
229 svc_xprt_set_local(xprt: &newxprt->sc_xprt, sa, salen: svc_addr_len(sa));
230
231 /*
232 * Enqueue the new transport on the accept queue of the listening
233 * transport
234 */
235 spin_lock(lock: &listen_xprt->sc_lock);
236 list_add_tail(new: &newxprt->sc_accept_q, head: &listen_xprt->sc_accept_q);
237 spin_unlock(lock: &listen_xprt->sc_lock);
238
239 set_bit(nr: XPT_CONN, addr: &listen_xprt->sc_xprt.xpt_flags);
240 svc_xprt_enqueue(xprt: &listen_xprt->sc_xprt);
241}
242
243/**
244 * svc_rdma_listen_handler - Handle CM events generated on a listening endpoint
245 * @cma_id: the server's listener rdma_cm_id
246 * @event: details of the event
247 *
248 * Return values:
249 * %0: Do not destroy @cma_id
250 * %1: Destroy @cma_id (never returned here)
251 *
252 * NB: There is never a DEVICE_REMOVAL event for INADDR_ANY listeners.
253 */
254static int svc_rdma_listen_handler(struct rdma_cm_id *cma_id,
255 struct rdma_cm_event *event)
256{
257 switch (event->event) {
258 case RDMA_CM_EVENT_CONNECT_REQUEST:
259 handle_connect_req(new_cma_id: cma_id, param: &event->param.conn);
260 break;
261 default:
262 break;
263 }
264 return 0;
265}
266
267/**
268 * svc_rdma_cma_handler - Handle CM events on client connections
269 * @cma_id: the server's listener rdma_cm_id
270 * @event: details of the event
271 *
272 * Return values:
273 * %0: Do not destroy @cma_id
274 * %1: Destroy @cma_id (never returned here)
275 */
276static int svc_rdma_cma_handler(struct rdma_cm_id *cma_id,
277 struct rdma_cm_event *event)
278{
279 struct svcxprt_rdma *rdma = cma_id->context;
280 struct svc_xprt *xprt = &rdma->sc_xprt;
281
282 switch (event->event) {
283 case RDMA_CM_EVENT_ESTABLISHED:
284 clear_bit(RDMAXPRT_CONN_PENDING, addr: &rdma->sc_flags);
285
286 /* Handle any requests that were received while
287 * CONN_PENDING was set. */
288 svc_xprt_enqueue(xprt);
289 break;
290 case RDMA_CM_EVENT_DISCONNECTED:
291 case RDMA_CM_EVENT_DEVICE_REMOVAL:
292 svc_xprt_deferred_close(xprt);
293 break;
294 default:
295 break;
296 }
297 return 0;
298}
299
300/*
301 * Create a listening RDMA service endpoint.
302 */
303static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
304 struct net *net,
305 struct sockaddr *sa, int salen,
306 int flags)
307{
308 struct rdma_cm_id *listen_id;
309 struct svcxprt_rdma *cma_xprt;
310 int ret;
311
312 if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6)
313 return ERR_PTR(error: -EAFNOSUPPORT);
314 cma_xprt = svc_rdma_create_xprt(serv, net, NUMA_NO_NODE);
315 if (!cma_xprt)
316 return ERR_PTR(error: -ENOMEM);
317 set_bit(nr: XPT_LISTENER, addr: &cma_xprt->sc_xprt.xpt_flags);
318 strcpy(p: cma_xprt->sc_xprt.xpt_remotebuf, q: "listener");
319
320 listen_id = rdma_create_id(net, svc_rdma_listen_handler, cma_xprt,
321 RDMA_PS_TCP, IB_QPT_RC);
322 if (IS_ERR(ptr: listen_id)) {
323 ret = PTR_ERR(ptr: listen_id);
324 goto err0;
325 }
326
327 /* Allow both IPv4 and IPv6 sockets to bind a single port
328 * at the same time.
329 */
330#if IS_ENABLED(CONFIG_IPV6)
331 ret = rdma_set_afonly(id: listen_id, afonly: 1);
332 if (ret)
333 goto err1;
334#endif
335 ret = rdma_bind_addr(id: listen_id, addr: sa);
336 if (ret)
337 goto err1;
338 cma_xprt->sc_cm_id = listen_id;
339
340 ret = rdma_listen(id: listen_id, backlog: RPCRDMA_LISTEN_BACKLOG);
341 if (ret)
342 goto err1;
343
344 /*
345 * We need to use the address from the cm_id in case the
346 * caller specified 0 for the port number.
347 */
348 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
349 svc_xprt_set_local(xprt: &cma_xprt->sc_xprt, sa, salen);
350
351 return &cma_xprt->sc_xprt;
352
353 err1:
354 rdma_destroy_id(id: listen_id);
355 err0:
356 kfree(objp: cma_xprt);
357 return ERR_PTR(error: ret);
358}
359
360/*
361 * This is the xpo_recvfrom function for listening endpoints. Its
362 * purpose is to accept incoming connections. The CMA callback handler
363 * has already created a new transport and attached it to the new CMA
364 * ID.
365 *
366 * There is a queue of pending connections hung on the listening
367 * transport. This queue contains the new svc_xprt structure. This
368 * function takes svc_xprt structures off the accept_q and completes
369 * the connection.
370 */
371static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
372{
373 struct svcxprt_rdma *listen_rdma;
374 struct svcxprt_rdma *newxprt = NULL;
375 struct rdma_conn_param conn_param;
376 struct rpcrdma_connect_private pmsg;
377 struct ib_qp_init_attr qp_attr;
378 unsigned int ctxts, rq_depth;
379 struct ib_device *dev;
380 int ret = 0;
381 RPC_IFDEBUG(struct sockaddr *sap);
382
383 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
384 clear_bit(nr: XPT_CONN, addr: &xprt->xpt_flags);
385 /* Get the next entry off the accept list */
386 spin_lock(lock: &listen_rdma->sc_lock);
387 if (!list_empty(head: &listen_rdma->sc_accept_q)) {
388 newxprt = list_entry(listen_rdma->sc_accept_q.next,
389 struct svcxprt_rdma, sc_accept_q);
390 list_del_init(entry: &newxprt->sc_accept_q);
391 }
392 if (!list_empty(head: &listen_rdma->sc_accept_q))
393 set_bit(nr: XPT_CONN, addr: &listen_rdma->sc_xprt.xpt_flags);
394 spin_unlock(lock: &listen_rdma->sc_lock);
395 if (!newxprt)
396 return NULL;
397
398 dev = newxprt->sc_cm_id->device;
399 newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
400
401 newxprt->sc_max_req_size = svcrdma_max_req_size;
402 newxprt->sc_max_requests = svcrdma_max_requests;
403 newxprt->sc_max_bc_requests = svcrdma_max_bc_requests;
404 newxprt->sc_recv_batch = RPCRDMA_MAX_RECV_BATCH;
405 newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests);
406
407 /* Qualify the transport's resource defaults with the
408 * capabilities of this particular device.
409 */
410
411 /* Transport header, head iovec, tail iovec */
412 newxprt->sc_max_send_sges = 3;
413 /* Add one SGE per page list entry */
414 newxprt->sc_max_send_sges += (svcrdma_max_req_size / PAGE_SIZE) + 1;
415 if (newxprt->sc_max_send_sges > dev->attrs.max_send_sge)
416 newxprt->sc_max_send_sges = dev->attrs.max_send_sge;
417 rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests +
418 newxprt->sc_recv_batch + 1 /* drain */;
419 if (rq_depth > dev->attrs.max_qp_wr) {
420 rq_depth = dev->attrs.max_qp_wr;
421 newxprt->sc_recv_batch = 1;
422 newxprt->sc_max_requests = rq_depth - 2;
423 newxprt->sc_max_bc_requests = 2;
424 }
425
426 /* Arbitrarily estimate the number of rw_ctxs needed for
427 * this transport. This is enough rw_ctxs to make forward
428 * progress even if the client is using one rkey per page
429 * in each Read chunk.
430 */
431 ctxts = 3 * RPCSVC_MAXPAGES;
432 newxprt->sc_sq_depth = rq_depth + ctxts;
433 if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr)
434 newxprt->sc_sq_depth = dev->attrs.max_qp_wr;
435 atomic_set(v: &newxprt->sc_sq_avail, i: newxprt->sc_sq_depth);
436
437 newxprt->sc_pd = ib_alloc_pd(dev, 0);
438 if (IS_ERR(ptr: newxprt->sc_pd)) {
439 trace_svcrdma_pd_err(rdma: newxprt, status: PTR_ERR(ptr: newxprt->sc_pd));
440 goto errout;
441 }
442 newxprt->sc_sq_cq = ib_alloc_cq_any(dev, private: newxprt, nr_cqe: newxprt->sc_sq_depth,
443 poll_ctx: IB_POLL_WORKQUEUE);
444 if (IS_ERR(ptr: newxprt->sc_sq_cq))
445 goto errout;
446 newxprt->sc_rq_cq =
447 ib_alloc_cq_any(dev, private: newxprt, nr_cqe: rq_depth, poll_ctx: IB_POLL_WORKQUEUE);
448 if (IS_ERR(ptr: newxprt->sc_rq_cq))
449 goto errout;
450
451 memset(&qp_attr, 0, sizeof qp_attr);
452 qp_attr.event_handler = qp_event_handler;
453 qp_attr.qp_context = &newxprt->sc_xprt;
454 qp_attr.port_num = newxprt->sc_port_num;
455 qp_attr.cap.max_rdma_ctxs = ctxts;
456 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts;
457 qp_attr.cap.max_recv_wr = rq_depth;
458 qp_attr.cap.max_send_sge = newxprt->sc_max_send_sges;
459 qp_attr.cap.max_recv_sge = 1;
460 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
461 qp_attr.qp_type = IB_QPT_RC;
462 qp_attr.send_cq = newxprt->sc_sq_cq;
463 qp_attr.recv_cq = newxprt->sc_rq_cq;
464 dprintk(" cap.max_send_wr = %d, cap.max_recv_wr = %d\n",
465 qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr);
466 dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n",
467 qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge);
468 dprintk(" send CQ depth = %u, recv CQ depth = %u\n",
469 newxprt->sc_sq_depth, rq_depth);
470 ret = rdma_create_qp(id: newxprt->sc_cm_id, pd: newxprt->sc_pd, qp_init_attr: &qp_attr);
471 if (ret) {
472 trace_svcrdma_qp_err(rdma: newxprt, status: ret);
473 goto errout;
474 }
475 newxprt->sc_max_send_sges = qp_attr.cap.max_send_sge;
476 newxprt->sc_qp = newxprt->sc_cm_id->qp;
477
478 if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
479 newxprt->sc_snd_w_inv = false;
480 if (!rdma_protocol_iwarp(device: dev, port_num: newxprt->sc_port_num) &&
481 !rdma_ib_or_roce(device: dev, port_num: newxprt->sc_port_num)) {
482 trace_svcrdma_fabric_err(rdma: newxprt, status: -EINVAL);
483 goto errout;
484 }
485
486 if (!svc_rdma_post_recvs(rdma: newxprt))
487 goto errout;
488
489 /* Construct RDMA-CM private message */
490 pmsg.cp_magic = rpcrdma_cmp_magic;
491 pmsg.cp_version = RPCRDMA_CMP_VERSION;
492 pmsg.cp_flags = 0;
493 pmsg.cp_send_size = pmsg.cp_recv_size =
494 rpcrdma_encode_buffer_size(size: newxprt->sc_max_req_size);
495
496 /* Accept Connection */
497 set_bit(RDMAXPRT_CONN_PENDING, addr: &newxprt->sc_flags);
498 memset(&conn_param, 0, sizeof conn_param);
499 conn_param.responder_resources = 0;
500 conn_param.initiator_depth = min_t(int, newxprt->sc_ord,
501 dev->attrs.max_qp_init_rd_atom);
502 if (!conn_param.initiator_depth) {
503 ret = -EINVAL;
504 trace_svcrdma_initdepth_err(rdma: newxprt, status: ret);
505 goto errout;
506 }
507 conn_param.private_data = &pmsg;
508 conn_param.private_data_len = sizeof(pmsg);
509 rdma_lock_handler(id: newxprt->sc_cm_id);
510 newxprt->sc_cm_id->event_handler = svc_rdma_cma_handler;
511 ret = rdma_accept(id: newxprt->sc_cm_id, conn_param: &conn_param);
512 rdma_unlock_handler(id: newxprt->sc_cm_id);
513 if (ret) {
514 trace_svcrdma_accept_err(rdma: newxprt, status: ret);
515 goto errout;
516 }
517
518#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
519 dprintk("svcrdma: new connection accepted on device %s:\n", dev->name);
520 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
521 dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap));
522 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
523 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap));
524 dprintk(" max_sge : %d\n", newxprt->sc_max_send_sges);
525 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth);
526 dprintk(" rdma_rw_ctxs : %d\n", ctxts);
527 dprintk(" max_requests : %d\n", newxprt->sc_max_requests);
528 dprintk(" ord : %d\n", conn_param.initiator_depth);
529#endif
530
531 return &newxprt->sc_xprt;
532
533 errout:
534 /* Take a reference in case the DTO handler runs */
535 svc_xprt_get(xprt: &newxprt->sc_xprt);
536 if (newxprt->sc_qp && !IS_ERR(ptr: newxprt->sc_qp))
537 ib_destroy_qp(qp: newxprt->sc_qp);
538 rdma_destroy_id(id: newxprt->sc_cm_id);
539 /* This call to put will destroy the transport */
540 svc_xprt_put(xprt: &newxprt->sc_xprt);
541 return NULL;
542}
543
544static void svc_rdma_detach(struct svc_xprt *xprt)
545{
546 struct svcxprt_rdma *rdma =
547 container_of(xprt, struct svcxprt_rdma, sc_xprt);
548
549 rdma_disconnect(id: rdma->sc_cm_id);
550}
551
552static void __svc_rdma_free(struct work_struct *work)
553{
554 struct svcxprt_rdma *rdma =
555 container_of(work, struct svcxprt_rdma, sc_work);
556
557 /* This blocks until the Completion Queues are empty */
558 if (rdma->sc_qp && !IS_ERR(ptr: rdma->sc_qp))
559 ib_drain_qp(qp: rdma->sc_qp);
560 flush_workqueue(svcrdma_wq);
561
562 svc_rdma_flush_recv_queues(rdma);
563
564 svc_rdma_destroy_rw_ctxts(rdma);
565 svc_rdma_send_ctxts_destroy(rdma);
566 svc_rdma_recv_ctxts_destroy(rdma);
567
568 /* Destroy the QP if present (not a listener) */
569 if (rdma->sc_qp && !IS_ERR(ptr: rdma->sc_qp))
570 ib_destroy_qp(qp: rdma->sc_qp);
571
572 if (rdma->sc_sq_cq && !IS_ERR(ptr: rdma->sc_sq_cq))
573 ib_free_cq(cq: rdma->sc_sq_cq);
574
575 if (rdma->sc_rq_cq && !IS_ERR(ptr: rdma->sc_rq_cq))
576 ib_free_cq(cq: rdma->sc_rq_cq);
577
578 if (rdma->sc_pd && !IS_ERR(ptr: rdma->sc_pd))
579 ib_dealloc_pd(pd: rdma->sc_pd);
580
581 /* Destroy the CM ID */
582 rdma_destroy_id(id: rdma->sc_cm_id);
583
584 kfree(objp: rdma);
585}
586
587static void svc_rdma_free(struct svc_xprt *xprt)
588{
589 struct svcxprt_rdma *rdma =
590 container_of(xprt, struct svcxprt_rdma, sc_xprt);
591
592 INIT_WORK(&rdma->sc_work, __svc_rdma_free);
593 schedule_work(work: &rdma->sc_work);
594}
595
596static int svc_rdma_has_wspace(struct svc_xprt *xprt)
597{
598 struct svcxprt_rdma *rdma =
599 container_of(xprt, struct svcxprt_rdma, sc_xprt);
600
601 /*
602 * If there are already waiters on the SQ,
603 * return false.
604 */
605 if (waitqueue_active(wq_head: &rdma->sc_send_wait))
606 return 0;
607
608 /* Otherwise return true. */
609 return 1;
610}
611
612static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
613{
614}
615

source code of linux/net/sunrpc/xprtrdma/svc_rdma_transport.c