1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3
4(c) 2007 Network Appliance, Inc. All Rights Reserved.
5(c) 2009 NetApp. All Rights Reserved.
6
7
8******************************************************************************/
9
10#include <linux/tcp.h>
11#include <linux/slab.h>
12#include <linux/sunrpc/xprt.h>
13#include <linux/export.h>
14#include <linux/sunrpc/bc_xprt.h>
15
16#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17#define RPCDBG_FACILITY RPCDBG_TRANS
18#endif
19
20#define BC_MAX_SLOTS 64U
21
22unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
23{
24 return BC_MAX_SLOTS;
25}
26
27/*
28 * Helper routines that track the number of preallocation elements
29 * on the transport.
30 */
31static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
32{
33 return xprt->bc_alloc_count < xprt->bc_alloc_max;
34}
35
36/*
37 * Free the preallocated rpc_rqst structure and the memory
38 * buffers hanging off of it.
39 */
40static void xprt_free_allocation(struct rpc_rqst *req)
41{
42 struct xdr_buf *xbufp;
43
44 dprintk("RPC: free allocations for req= %p\n", req);
45 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
46 xbufp = &req->rq_rcv_buf;
47 free_page((unsigned long)xbufp->head[0].iov_base);
48 xbufp = &req->rq_snd_buf;
49 free_page((unsigned long)xbufp->head[0].iov_base);
50 kfree(objp: req);
51}
52
53static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
54{
55 buf->head[0].iov_len = PAGE_SIZE;
56 buf->tail[0].iov_len = 0;
57 buf->pages = NULL;
58 buf->page_len = 0;
59 buf->flags = 0;
60 buf->len = 0;
61 buf->buflen = PAGE_SIZE;
62}
63
64static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
65{
66 struct page *page;
67 /* Preallocate one XDR receive buffer */
68 page = alloc_page(gfp_flags);
69 if (page == NULL)
70 return -ENOMEM;
71 xdr_buf_init(buf, page_address(page), PAGE_SIZE);
72 return 0;
73}
74
75static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
76{
77 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
78 struct rpc_rqst *req;
79
80 /* Pre-allocate one backchannel rpc_rqst */
81 req = kzalloc(size: sizeof(*req), flags: gfp_flags);
82 if (req == NULL)
83 return NULL;
84
85 req->rq_xprt = xprt;
86
87 /* Preallocate one XDR receive buffer */
88 if (xprt_alloc_xdr_buf(buf: &req->rq_rcv_buf, gfp_flags) < 0) {
89 printk(KERN_ERR "Failed to create bc receive xbuf\n");
90 goto out_free;
91 }
92 req->rq_rcv_buf.len = PAGE_SIZE;
93
94 /* Preallocate one XDR send buffer */
95 if (xprt_alloc_xdr_buf(buf: &req->rq_snd_buf, gfp_flags) < 0) {
96 printk(KERN_ERR "Failed to create bc snd xbuf\n");
97 goto out_free;
98 }
99 return req;
100out_free:
101 xprt_free_allocation(req);
102 return NULL;
103}
104
105/*
106 * Preallocate up to min_reqs structures and related buffers for use
107 * by the backchannel. This function can be called multiple times
108 * when creating new sessions that use the same rpc_xprt. The
109 * preallocated buffers are added to the pool of resources used by
110 * the rpc_xprt. Any one of these resources may be used by an
111 * incoming callback request. It's up to the higher levels in the
112 * stack to enforce that the maximum number of session slots is not
113 * being exceeded.
114 *
115 * Some callback arguments can be large. For example, a pNFS server
116 * using multiple deviceids. The list can be unbound, but the client
117 * has the ability to tell the server the maximum size of the callback
118 * requests. Each deviceID is 16 bytes, so allocate one page
119 * for the arguments to have enough room to receive a number of these
120 * deviceIDs. The NFS client indicates to the pNFS server that its
121 * callback requests can be up to 4096 bytes in size.
122 */
123int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
124{
125 if (!xprt->ops->bc_setup)
126 return 0;
127 return xprt->ops->bc_setup(xprt, min_reqs);
128}
129EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
130
131int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
132{
133 struct rpc_rqst *req;
134 struct list_head tmp_list;
135 int i;
136
137 dprintk("RPC: setup backchannel transport\n");
138
139 if (min_reqs > BC_MAX_SLOTS)
140 min_reqs = BC_MAX_SLOTS;
141
142 /*
143 * We use a temporary list to keep track of the preallocated
144 * buffers. Once we're done building the list we splice it
145 * into the backchannel preallocation list off of the rpc_xprt
146 * struct. This helps minimize the amount of time the list
147 * lock is held on the rpc_xprt struct. It also makes cleanup
148 * easier in case of memory allocation errors.
149 */
150 INIT_LIST_HEAD(list: &tmp_list);
151 for (i = 0; i < min_reqs; i++) {
152 /* Pre-allocate one backchannel rpc_rqst */
153 req = xprt_alloc_bc_req(xprt);
154 if (req == NULL) {
155 printk(KERN_ERR "Failed to create bc rpc_rqst\n");
156 goto out_free;
157 }
158
159 /* Add the allocated buffer to the tmp list */
160 dprintk("RPC: adding req= %p\n", req);
161 list_add(new: &req->rq_bc_pa_list, head: &tmp_list);
162 }
163
164 /*
165 * Add the temporary list to the backchannel preallocation list
166 */
167 spin_lock(lock: &xprt->bc_pa_lock);
168 list_splice(list: &tmp_list, head: &xprt->bc_pa_list);
169 xprt->bc_alloc_count += min_reqs;
170 xprt->bc_alloc_max += min_reqs;
171 atomic_add(i: min_reqs, v: &xprt->bc_slot_count);
172 spin_unlock(lock: &xprt->bc_pa_lock);
173
174 dprintk("RPC: setup backchannel transport done\n");
175 return 0;
176
177out_free:
178 /*
179 * Memory allocation failed, free the temporary list
180 */
181 while (!list_empty(head: &tmp_list)) {
182 req = list_first_entry(&tmp_list,
183 struct rpc_rqst,
184 rq_bc_pa_list);
185 list_del(entry: &req->rq_bc_pa_list);
186 xprt_free_allocation(req);
187 }
188
189 dprintk("RPC: setup backchannel transport failed\n");
190 return -ENOMEM;
191}
192
193/**
194 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
195 * @xprt: the transport holding the preallocated strucures
196 * @max_reqs: the maximum number of preallocated structures to destroy
197 *
198 * Since these structures may have been allocated by multiple calls
199 * to xprt_setup_backchannel, we only destroy up to the maximum number
200 * of reqs specified by the caller.
201 */
202void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
203{
204 if (xprt->ops->bc_destroy)
205 xprt->ops->bc_destroy(xprt, max_reqs);
206}
207EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
208
209void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
210{
211 struct rpc_rqst *req = NULL, *tmp = NULL;
212
213 dprintk("RPC: destroy backchannel transport\n");
214
215 if (max_reqs == 0)
216 goto out;
217
218 spin_lock_bh(lock: &xprt->bc_pa_lock);
219 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
220 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
221 dprintk("RPC: req=%p\n", req);
222 list_del(entry: &req->rq_bc_pa_list);
223 xprt_free_allocation(req);
224 xprt->bc_alloc_count--;
225 atomic_dec(v: &xprt->bc_slot_count);
226 if (--max_reqs == 0)
227 break;
228 }
229 spin_unlock_bh(lock: &xprt->bc_pa_lock);
230
231out:
232 dprintk("RPC: backchannel list empty= %s\n",
233 list_empty(&xprt->bc_pa_list) ? "true" : "false");
234}
235
236static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
237 struct rpc_rqst *new)
238{
239 struct rpc_rqst *req = NULL;
240
241 dprintk("RPC: allocate a backchannel request\n");
242 if (list_empty(head: &xprt->bc_pa_list)) {
243 if (!new)
244 goto not_found;
245 if (atomic_read(v: &xprt->bc_slot_count) >= BC_MAX_SLOTS)
246 goto not_found;
247 list_add_tail(new: &new->rq_bc_pa_list, head: &xprt->bc_pa_list);
248 xprt->bc_alloc_count++;
249 atomic_inc(v: &xprt->bc_slot_count);
250 }
251 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
252 rq_bc_pa_list);
253 req->rq_reply_bytes_recvd = 0;
254 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
255 sizeof(req->rq_private_buf));
256 req->rq_xid = xid;
257 req->rq_connect_cookie = xprt->connect_cookie;
258 dprintk("RPC: backchannel req=%p\n", req);
259not_found:
260 return req;
261}
262
263/*
264 * Return the preallocated rpc_rqst structure and XDR buffers
265 * associated with this rpc_task.
266 */
267void xprt_free_bc_request(struct rpc_rqst *req)
268{
269 struct rpc_xprt *xprt = req->rq_xprt;
270
271 xprt->ops->bc_free_rqst(req);
272}
273
274void xprt_free_bc_rqst(struct rpc_rqst *req)
275{
276 struct rpc_xprt *xprt = req->rq_xprt;
277
278 dprintk("RPC: free backchannel req=%p\n", req);
279
280 req->rq_connect_cookie = xprt->connect_cookie - 1;
281 smp_mb__before_atomic();
282 clear_bit(RPC_BC_PA_IN_USE, addr: &req->rq_bc_pa_state);
283 smp_mb__after_atomic();
284
285 /*
286 * Return it to the list of preallocations so that it
287 * may be reused by a new callback request.
288 */
289 spin_lock_bh(lock: &xprt->bc_pa_lock);
290 if (xprt_need_to_requeue(xprt)) {
291 xprt_bc_reinit_xdr_buf(buf: &req->rq_snd_buf);
292 xprt_bc_reinit_xdr_buf(buf: &req->rq_rcv_buf);
293 req->rq_rcv_buf.len = PAGE_SIZE;
294 list_add_tail(new: &req->rq_bc_pa_list, head: &xprt->bc_pa_list);
295 xprt->bc_alloc_count++;
296 atomic_inc(v: &xprt->bc_slot_count);
297 req = NULL;
298 }
299 spin_unlock_bh(lock: &xprt->bc_pa_lock);
300 if (req != NULL) {
301 /*
302 * The last remaining session was destroyed while this
303 * entry was in use. Free the entry and don't attempt
304 * to add back to the list because there is no need to
305 * have anymore preallocated entries.
306 */
307 dprintk("RPC: Last session removed req=%p\n", req);
308 xprt_free_allocation(req);
309 }
310 xprt_put(xprt);
311}
312
313/*
314 * One or more rpc_rqst structure have been preallocated during the
315 * backchannel setup. Buffer space for the send and private XDR buffers
316 * has been preallocated as well. Use xprt_alloc_bc_request to allocate
317 * to this request. Use xprt_free_bc_request to return it.
318 *
319 * We know that we're called in soft interrupt context, grab the spin_lock
320 * since there is no need to grab the bottom half spin_lock.
321 *
322 * Return an available rpc_rqst, otherwise NULL if non are available.
323 */
324struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
325{
326 struct rpc_rqst *req, *new = NULL;
327
328 do {
329 spin_lock(lock: &xprt->bc_pa_lock);
330 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
331 if (req->rq_connect_cookie != xprt->connect_cookie)
332 continue;
333 if (req->rq_xid == xid)
334 goto found;
335 }
336 req = xprt_get_bc_request(xprt, xid, new);
337found:
338 spin_unlock(lock: &xprt->bc_pa_lock);
339 if (new) {
340 if (req != new)
341 xprt_free_allocation(req: new);
342 break;
343 } else if (req)
344 break;
345 new = xprt_alloc_bc_req(xprt);
346 } while (new);
347 return req;
348}
349
350/*
351 * Add callback request to callback list. Wake a thread
352 * on the first pool (usually the only pool) to handle it.
353 */
354void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
355{
356 struct rpc_xprt *xprt = req->rq_xprt;
357 struct svc_serv *bc_serv = xprt->bc_serv;
358
359 spin_lock(lock: &xprt->bc_pa_lock);
360 list_del(entry: &req->rq_bc_pa_list);
361 xprt->bc_alloc_count--;
362 spin_unlock(lock: &xprt->bc_pa_lock);
363
364 req->rq_private_buf.len = copied;
365 set_bit(RPC_BC_PA_IN_USE, addr: &req->rq_bc_pa_state);
366
367 dprintk("RPC: add callback request to list\n");
368 xprt_get(xprt);
369 lwq_enqueue(n: &req->rq_bc_list, q: &bc_serv->sv_cb_list);
370 svc_pool_wake_idle_thread(pool: &bc_serv->sv_pools[0]);
371}
372

source code of linux/net/sunrpc/backchannel_rqst.c