1 | /* |
2 | * Definitions for the 'struct ptr_ring' datastructure. |
3 | * |
4 | * Author: |
5 | * Michael S. Tsirkin <mst@redhat.com> |
6 | * |
7 | * Copyright (C) 2016 Red Hat, Inc. |
8 | * |
9 | * This program is free software; you can redistribute it and/or modify it |
10 | * under the terms of the GNU General Public License as published by the |
11 | * Free Software Foundation; either version 2 of the License, or (at your |
12 | * option) any later version. |
13 | * |
14 | * This is a limited-size FIFO maintaining pointers in FIFO order, with |
15 | * one CPU producing entries and another consuming entries from a FIFO. |
16 | * |
17 | * This implementation tries to minimize cache-contention when there is a |
18 | * single producer and a single consumer CPU. |
19 | */ |
20 | |
21 | #ifndef _LINUX_PTR_RING_H |
22 | #define _LINUX_PTR_RING_H 1 |
23 | |
24 | #ifdef __KERNEL__ |
25 | #include <linux/spinlock.h> |
26 | #include <linux/cache.h> |
27 | #include <linux/types.h> |
28 | #include <linux/compiler.h> |
29 | #include <linux/slab.h> |
30 | #include <asm/errno.h> |
31 | #endif |
32 | |
33 | struct ptr_ring { |
34 | int producer ____cacheline_aligned_in_smp; |
35 | spinlock_t producer_lock; |
36 | int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ |
37 | int consumer_tail; /* next entry to invalidate */ |
38 | spinlock_t consumer_lock; |
39 | /* Shared consumer/producer data */ |
40 | /* Read-only by both the producer and the consumer */ |
41 | int size ____cacheline_aligned_in_smp; /* max entries in queue */ |
42 | int batch; /* number of entries to consume in a batch */ |
43 | void **queue; |
44 | }; |
45 | |
46 | /* Note: callers invoking this in a loop must use a compiler barrier, |
47 | * for example cpu_relax(). |
48 | * |
49 | * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: |
50 | * see e.g. ptr_ring_full. |
51 | */ |
52 | static inline bool __ptr_ring_full(struct ptr_ring *r) |
53 | { |
54 | return r->queue[r->producer]; |
55 | } |
56 | |
57 | static inline bool ptr_ring_full(struct ptr_ring *r) |
58 | { |
59 | bool ret; |
60 | |
61 | spin_lock(&r->producer_lock); |
62 | ret = __ptr_ring_full(r); |
63 | spin_unlock(&r->producer_lock); |
64 | |
65 | return ret; |
66 | } |
67 | |
68 | static inline bool ptr_ring_full_irq(struct ptr_ring *r) |
69 | { |
70 | bool ret; |
71 | |
72 | spin_lock_irq(&r->producer_lock); |
73 | ret = __ptr_ring_full(r); |
74 | spin_unlock_irq(&r->producer_lock); |
75 | |
76 | return ret; |
77 | } |
78 | |
79 | static inline bool ptr_ring_full_any(struct ptr_ring *r) |
80 | { |
81 | unsigned long flags; |
82 | bool ret; |
83 | |
84 | spin_lock_irqsave(&r->producer_lock, flags); |
85 | ret = __ptr_ring_full(r); |
86 | spin_unlock_irqrestore(&r->producer_lock, flags); |
87 | |
88 | return ret; |
89 | } |
90 | |
91 | static inline bool ptr_ring_full_bh(struct ptr_ring *r) |
92 | { |
93 | bool ret; |
94 | |
95 | spin_lock_bh(&r->producer_lock); |
96 | ret = __ptr_ring_full(r); |
97 | spin_unlock_bh(&r->producer_lock); |
98 | |
99 | return ret; |
100 | } |
101 | |
102 | /* Note: callers invoking this in a loop must use a compiler barrier, |
103 | * for example cpu_relax(). Callers must hold producer_lock. |
104 | * Callers are responsible for making sure pointer that is being queued |
105 | * points to a valid data. |
106 | */ |
107 | static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) |
108 | { |
109 | if (unlikely(!r->size) || r->queue[r->producer]) |
110 | return -ENOSPC; |
111 | |
112 | /* Make sure the pointer we are storing points to a valid data. */ |
113 | /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ |
114 | smp_wmb(); |
115 | |
116 | WRITE_ONCE(r->queue[r->producer++], ptr); |
117 | if (unlikely(r->producer >= r->size)) |
118 | r->producer = 0; |
119 | return 0; |
120 | } |
121 | |
122 | /* |
123 | * Note: resize (below) nests producer lock within consumer lock, so if you |
124 | * consume in interrupt or BH context, you must disable interrupts/BH when |
125 | * calling this. |
126 | */ |
127 | static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) |
128 | { |
129 | int ret; |
130 | |
131 | spin_lock(&r->producer_lock); |
132 | ret = __ptr_ring_produce(r, ptr); |
133 | spin_unlock(&r->producer_lock); |
134 | |
135 | return ret; |
136 | } |
137 | |
138 | static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) |
139 | { |
140 | int ret; |
141 | |
142 | spin_lock_irq(&r->producer_lock); |
143 | ret = __ptr_ring_produce(r, ptr); |
144 | spin_unlock_irq(&r->producer_lock); |
145 | |
146 | return ret; |
147 | } |
148 | |
149 | static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) |
150 | { |
151 | unsigned long flags; |
152 | int ret; |
153 | |
154 | spin_lock_irqsave(&r->producer_lock, flags); |
155 | ret = __ptr_ring_produce(r, ptr); |
156 | spin_unlock_irqrestore(&r->producer_lock, flags); |
157 | |
158 | return ret; |
159 | } |
160 | |
161 | static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) |
162 | { |
163 | int ret; |
164 | |
165 | spin_lock_bh(&r->producer_lock); |
166 | ret = __ptr_ring_produce(r, ptr); |
167 | spin_unlock_bh(&r->producer_lock); |
168 | |
169 | return ret; |
170 | } |
171 | |
172 | static inline void *__ptr_ring_peek(struct ptr_ring *r) |
173 | { |
174 | if (likely(r->size)) |
175 | return READ_ONCE(r->queue[r->consumer_head]); |
176 | return NULL; |
177 | } |
178 | |
179 | /* |
180 | * Test ring empty status without taking any locks. |
181 | * |
182 | * NB: This is only safe to call if ring is never resized. |
183 | * |
184 | * However, if some other CPU consumes ring entries at the same time, the value |
185 | * returned is not guaranteed to be correct. |
186 | * |
187 | * In this case - to avoid incorrectly detecting the ring |
188 | * as empty - the CPU consuming the ring entries is responsible |
189 | * for either consuming all ring entries until the ring is empty, |
190 | * or synchronizing with some other CPU and causing it to |
191 | * re-test __ptr_ring_empty and/or consume the ring enteries |
192 | * after the synchronization point. |
193 | * |
194 | * Note: callers invoking this in a loop must use a compiler barrier, |
195 | * for example cpu_relax(). |
196 | */ |
197 | static inline bool __ptr_ring_empty(struct ptr_ring *r) |
198 | { |
199 | if (likely(r->size)) |
200 | return !r->queue[READ_ONCE(r->consumer_head)]; |
201 | return true; |
202 | } |
203 | |
204 | static inline bool ptr_ring_empty(struct ptr_ring *r) |
205 | { |
206 | bool ret; |
207 | |
208 | spin_lock(&r->consumer_lock); |
209 | ret = __ptr_ring_empty(r); |
210 | spin_unlock(&r->consumer_lock); |
211 | |
212 | return ret; |
213 | } |
214 | |
215 | static inline bool ptr_ring_empty_irq(struct ptr_ring *r) |
216 | { |
217 | bool ret; |
218 | |
219 | spin_lock_irq(&r->consumer_lock); |
220 | ret = __ptr_ring_empty(r); |
221 | spin_unlock_irq(&r->consumer_lock); |
222 | |
223 | return ret; |
224 | } |
225 | |
226 | static inline bool ptr_ring_empty_any(struct ptr_ring *r) |
227 | { |
228 | unsigned long flags; |
229 | bool ret; |
230 | |
231 | spin_lock_irqsave(&r->consumer_lock, flags); |
232 | ret = __ptr_ring_empty(r); |
233 | spin_unlock_irqrestore(&r->consumer_lock, flags); |
234 | |
235 | return ret; |
236 | } |
237 | |
238 | static inline bool ptr_ring_empty_bh(struct ptr_ring *r) |
239 | { |
240 | bool ret; |
241 | |
242 | spin_lock_bh(&r->consumer_lock); |
243 | ret = __ptr_ring_empty(r); |
244 | spin_unlock_bh(&r->consumer_lock); |
245 | |
246 | return ret; |
247 | } |
248 | |
249 | /* Must only be called after __ptr_ring_peek returned !NULL */ |
250 | static inline void __ptr_ring_discard_one(struct ptr_ring *r) |
251 | { |
252 | /* Fundamentally, what we want to do is update consumer |
253 | * index and zero out the entry so producer can reuse it. |
254 | * Doing it naively at each consume would be as simple as: |
255 | * consumer = r->consumer; |
256 | * r->queue[consumer++] = NULL; |
257 | * if (unlikely(consumer >= r->size)) |
258 | * consumer = 0; |
259 | * r->consumer = consumer; |
260 | * but that is suboptimal when the ring is full as producer is writing |
261 | * out new entries in the same cache line. Defer these updates until a |
262 | * batch of entries has been consumed. |
263 | */ |
264 | /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty |
265 | * to work correctly. |
266 | */ |
267 | int consumer_head = r->consumer_head; |
268 | int head = consumer_head++; |
269 | |
270 | /* Once we have processed enough entries invalidate them in |
271 | * the ring all at once so producer can reuse their space in the ring. |
272 | * We also do this when we reach end of the ring - not mandatory |
273 | * but helps keep the implementation simple. |
274 | */ |
275 | if (unlikely(consumer_head - r->consumer_tail >= r->batch || |
276 | consumer_head >= r->size)) { |
277 | /* Zero out entries in the reverse order: this way we touch the |
278 | * cache line that producer might currently be reading the last; |
279 | * producer won't make progress and touch other cache lines |
280 | * besides the first one until we write out all entries. |
281 | */ |
282 | while (likely(head >= r->consumer_tail)) |
283 | r->queue[head--] = NULL; |
284 | r->consumer_tail = consumer_head; |
285 | } |
286 | if (unlikely(consumer_head >= r->size)) { |
287 | consumer_head = 0; |
288 | r->consumer_tail = 0; |
289 | } |
290 | /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ |
291 | WRITE_ONCE(r->consumer_head, consumer_head); |
292 | } |
293 | |
294 | static inline void *__ptr_ring_consume(struct ptr_ring *r) |
295 | { |
296 | void *ptr; |
297 | |
298 | /* The READ_ONCE in __ptr_ring_peek guarantees that anyone |
299 | * accessing data through the pointer is up to date. Pairs |
300 | * with smp_wmb in __ptr_ring_produce. |
301 | */ |
302 | ptr = __ptr_ring_peek(r); |
303 | if (ptr) |
304 | __ptr_ring_discard_one(r); |
305 | |
306 | return ptr; |
307 | } |
308 | |
309 | static inline int __ptr_ring_consume_batched(struct ptr_ring *r, |
310 | void **array, int n) |
311 | { |
312 | void *ptr; |
313 | int i; |
314 | |
315 | for (i = 0; i < n; i++) { |
316 | ptr = __ptr_ring_consume(r); |
317 | if (!ptr) |
318 | break; |
319 | array[i] = ptr; |
320 | } |
321 | |
322 | return i; |
323 | } |
324 | |
325 | /* |
326 | * Note: resize (below) nests producer lock within consumer lock, so if you |
327 | * call this in interrupt or BH context, you must disable interrupts/BH when |
328 | * producing. |
329 | */ |
330 | static inline void *ptr_ring_consume(struct ptr_ring *r) |
331 | { |
332 | void *ptr; |
333 | |
334 | spin_lock(&r->consumer_lock); |
335 | ptr = __ptr_ring_consume(r); |
336 | spin_unlock(&r->consumer_lock); |
337 | |
338 | return ptr; |
339 | } |
340 | |
341 | static inline void *ptr_ring_consume_irq(struct ptr_ring *r) |
342 | { |
343 | void *ptr; |
344 | |
345 | spin_lock_irq(&r->consumer_lock); |
346 | ptr = __ptr_ring_consume(r); |
347 | spin_unlock_irq(&r->consumer_lock); |
348 | |
349 | return ptr; |
350 | } |
351 | |
352 | static inline void *ptr_ring_consume_any(struct ptr_ring *r) |
353 | { |
354 | unsigned long flags; |
355 | void *ptr; |
356 | |
357 | spin_lock_irqsave(&r->consumer_lock, flags); |
358 | ptr = __ptr_ring_consume(r); |
359 | spin_unlock_irqrestore(&r->consumer_lock, flags); |
360 | |
361 | return ptr; |
362 | } |
363 | |
364 | static inline void *ptr_ring_consume_bh(struct ptr_ring *r) |
365 | { |
366 | void *ptr; |
367 | |
368 | spin_lock_bh(&r->consumer_lock); |
369 | ptr = __ptr_ring_consume(r); |
370 | spin_unlock_bh(&r->consumer_lock); |
371 | |
372 | return ptr; |
373 | } |
374 | |
375 | static inline int ptr_ring_consume_batched(struct ptr_ring *r, |
376 | void **array, int n) |
377 | { |
378 | int ret; |
379 | |
380 | spin_lock(&r->consumer_lock); |
381 | ret = __ptr_ring_consume_batched(r, array, n); |
382 | spin_unlock(&r->consumer_lock); |
383 | |
384 | return ret; |
385 | } |
386 | |
387 | static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, |
388 | void **array, int n) |
389 | { |
390 | int ret; |
391 | |
392 | spin_lock_irq(&r->consumer_lock); |
393 | ret = __ptr_ring_consume_batched(r, array, n); |
394 | spin_unlock_irq(&r->consumer_lock); |
395 | |
396 | return ret; |
397 | } |
398 | |
399 | static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, |
400 | void **array, int n) |
401 | { |
402 | unsigned long flags; |
403 | int ret; |
404 | |
405 | spin_lock_irqsave(&r->consumer_lock, flags); |
406 | ret = __ptr_ring_consume_batched(r, array, n); |
407 | spin_unlock_irqrestore(&r->consumer_lock, flags); |
408 | |
409 | return ret; |
410 | } |
411 | |
412 | static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, |
413 | void **array, int n) |
414 | { |
415 | int ret; |
416 | |
417 | spin_lock_bh(&r->consumer_lock); |
418 | ret = __ptr_ring_consume_batched(r, array, n); |
419 | spin_unlock_bh(&r->consumer_lock); |
420 | |
421 | return ret; |
422 | } |
423 | |
424 | /* Cast to structure type and call a function without discarding from FIFO. |
425 | * Function must return a value. |
426 | * Callers must take consumer_lock. |
427 | */ |
428 | #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) |
429 | |
430 | #define PTR_RING_PEEK_CALL(r, f) ({ \ |
431 | typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ |
432 | \ |
433 | spin_lock(&(r)->consumer_lock); \ |
434 | __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ |
435 | spin_unlock(&(r)->consumer_lock); \ |
436 | __PTR_RING_PEEK_CALL_v; \ |
437 | }) |
438 | |
439 | #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ |
440 | typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ |
441 | \ |
442 | spin_lock_irq(&(r)->consumer_lock); \ |
443 | __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ |
444 | spin_unlock_irq(&(r)->consumer_lock); \ |
445 | __PTR_RING_PEEK_CALL_v; \ |
446 | }) |
447 | |
448 | #define PTR_RING_PEEK_CALL_BH(r, f) ({ \ |
449 | typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ |
450 | \ |
451 | spin_lock_bh(&(r)->consumer_lock); \ |
452 | __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ |
453 | spin_unlock_bh(&(r)->consumer_lock); \ |
454 | __PTR_RING_PEEK_CALL_v; \ |
455 | }) |
456 | |
457 | #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ |
458 | typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ |
459 | unsigned long __PTR_RING_PEEK_CALL_f;\ |
460 | \ |
461 | spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ |
462 | __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ |
463 | spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ |
464 | __PTR_RING_PEEK_CALL_v; \ |
465 | }) |
466 | |
467 | /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See |
468 | * documentation for vmalloc for which of them are legal. |
469 | */ |
470 | static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) |
471 | { |
472 | if (size > KMALLOC_MAX_SIZE / sizeof(void *)) |
473 | return NULL; |
474 | return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO); |
475 | } |
476 | |
477 | static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) |
478 | { |
479 | r->size = size; |
480 | r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); |
481 | /* We need to set batch at least to 1 to make logic |
482 | * in __ptr_ring_discard_one work correctly. |
483 | * Batching too much (because ring is small) would cause a lot of |
484 | * burstiness. Needs tuning, for now disable batching. |
485 | */ |
486 | if (r->batch > r->size / 2 || !r->batch) |
487 | r->batch = 1; |
488 | } |
489 | |
490 | static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) |
491 | { |
492 | r->queue = __ptr_ring_init_queue_alloc(size, gfp); |
493 | if (!r->queue) |
494 | return -ENOMEM; |
495 | |
496 | __ptr_ring_set_size(r, size); |
497 | r->producer = r->consumer_head = r->consumer_tail = 0; |
498 | spin_lock_init(&r->producer_lock); |
499 | spin_lock_init(&r->consumer_lock); |
500 | |
501 | return 0; |
502 | } |
503 | |
504 | /* |
505 | * Return entries into ring. Destroy entries that don't fit. |
506 | * |
507 | * Note: this is expected to be a rare slow path operation. |
508 | * |
509 | * Note: producer lock is nested within consumer lock, so if you |
510 | * resize you must make sure all uses nest correctly. |
511 | * In particular if you consume ring in interrupt or BH context, you must |
512 | * disable interrupts/BH when doing so. |
513 | */ |
514 | static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, |
515 | void (*destroy)(void *)) |
516 | { |
517 | unsigned long flags; |
518 | int head; |
519 | |
520 | spin_lock_irqsave(&r->consumer_lock, flags); |
521 | spin_lock(&r->producer_lock); |
522 | |
523 | if (!r->size) |
524 | goto done; |
525 | |
526 | /* |
527 | * Clean out buffered entries (for simplicity). This way following code |
528 | * can test entries for NULL and if not assume they are valid. |
529 | */ |
530 | head = r->consumer_head - 1; |
531 | while (likely(head >= r->consumer_tail)) |
532 | r->queue[head--] = NULL; |
533 | r->consumer_tail = r->consumer_head; |
534 | |
535 | /* |
536 | * Go over entries in batch, start moving head back and copy entries. |
537 | * Stop when we run into previously unconsumed entries. |
538 | */ |
539 | while (n) { |
540 | head = r->consumer_head - 1; |
541 | if (head < 0) |
542 | head = r->size - 1; |
543 | if (r->queue[head]) { |
544 | /* This batch entry will have to be destroyed. */ |
545 | goto done; |
546 | } |
547 | r->queue[head] = batch[--n]; |
548 | r->consumer_tail = head; |
549 | /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ |
550 | WRITE_ONCE(r->consumer_head, head); |
551 | } |
552 | |
553 | done: |
554 | /* Destroy all entries left in the batch. */ |
555 | while (n) |
556 | destroy(batch[--n]); |
557 | spin_unlock(&r->producer_lock); |
558 | spin_unlock_irqrestore(&r->consumer_lock, flags); |
559 | } |
560 | |
561 | static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, |
562 | int size, gfp_t gfp, |
563 | void (*destroy)(void *)) |
564 | { |
565 | int producer = 0; |
566 | void **old; |
567 | void *ptr; |
568 | |
569 | while ((ptr = __ptr_ring_consume(r))) |
570 | if (producer < size) |
571 | queue[producer++] = ptr; |
572 | else if (destroy) |
573 | destroy(ptr); |
574 | |
575 | if (producer >= size) |
576 | producer = 0; |
577 | __ptr_ring_set_size(r, size); |
578 | r->producer = producer; |
579 | r->consumer_head = 0; |
580 | r->consumer_tail = 0; |
581 | old = r->queue; |
582 | r->queue = queue; |
583 | |
584 | return old; |
585 | } |
586 | |
587 | /* |
588 | * Note: producer lock is nested within consumer lock, so if you |
589 | * resize you must make sure all uses nest correctly. |
590 | * In particular if you consume ring in interrupt or BH context, you must |
591 | * disable interrupts/BH when doing so. |
592 | */ |
593 | static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, |
594 | void (*destroy)(void *)) |
595 | { |
596 | unsigned long flags; |
597 | void **queue = __ptr_ring_init_queue_alloc(size, gfp); |
598 | void **old; |
599 | |
600 | if (!queue) |
601 | return -ENOMEM; |
602 | |
603 | spin_lock_irqsave(&(r)->consumer_lock, flags); |
604 | spin_lock(&(r)->producer_lock); |
605 | |
606 | old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); |
607 | |
608 | spin_unlock(&(r)->producer_lock); |
609 | spin_unlock_irqrestore(&(r)->consumer_lock, flags); |
610 | |
611 | kvfree(old); |
612 | |
613 | return 0; |
614 | } |
615 | |
616 | /* |
617 | * Note: producer lock is nested within consumer lock, so if you |
618 | * resize you must make sure all uses nest correctly. |
619 | * In particular if you consume ring in interrupt or BH context, you must |
620 | * disable interrupts/BH when doing so. |
621 | */ |
622 | static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, |
623 | unsigned int nrings, |
624 | int size, |
625 | gfp_t gfp, void (*destroy)(void *)) |
626 | { |
627 | unsigned long flags; |
628 | void ***queues; |
629 | int i; |
630 | |
631 | queues = kmalloc_array(nrings, sizeof(*queues), gfp); |
632 | if (!queues) |
633 | goto noqueues; |
634 | |
635 | for (i = 0; i < nrings; ++i) { |
636 | queues[i] = __ptr_ring_init_queue_alloc(size, gfp); |
637 | if (!queues[i]) |
638 | goto nomem; |
639 | } |
640 | |
641 | for (i = 0; i < nrings; ++i) { |
642 | spin_lock_irqsave(&(rings[i])->consumer_lock, flags); |
643 | spin_lock(&(rings[i])->producer_lock); |
644 | queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], |
645 | size, gfp, destroy); |
646 | spin_unlock(&(rings[i])->producer_lock); |
647 | spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); |
648 | } |
649 | |
650 | for (i = 0; i < nrings; ++i) |
651 | kvfree(queues[i]); |
652 | |
653 | kfree(queues); |
654 | |
655 | return 0; |
656 | |
657 | nomem: |
658 | while (--i >= 0) |
659 | kvfree(queues[i]); |
660 | |
661 | kfree(queues); |
662 | |
663 | noqueues: |
664 | return -ENOMEM; |
665 | } |
666 | |
667 | static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) |
668 | { |
669 | void *ptr; |
670 | |
671 | if (destroy) |
672 | while ((ptr = ptr_ring_consume(r))) |
673 | destroy(ptr); |
674 | kvfree(r->queue); |
675 | } |
676 | |
677 | #endif /* _LINUX_PTR_RING_H */ |
678 | |