1 | // SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) |
2 | /* |
3 | * Ring buffer operations. |
4 | * |
5 | * Copyright (C) 2020 Facebook, Inc. |
6 | */ |
7 | #ifndef _GNU_SOURCE |
8 | #define _GNU_SOURCE |
9 | #endif |
10 | #include <stdlib.h> |
11 | #include <stdio.h> |
12 | #include <errno.h> |
13 | #include <unistd.h> |
14 | #include <linux/err.h> |
15 | #include <linux/bpf.h> |
16 | #include <asm/barrier.h> |
17 | #include <sys/mman.h> |
18 | #include <sys/epoll.h> |
19 | #include <time.h> |
20 | |
21 | #include "libbpf.h" |
22 | #include "libbpf_internal.h" |
23 | #include "bpf.h" |
24 | |
25 | struct ring { |
26 | ring_buffer_sample_fn sample_cb; |
27 | void *ctx; |
28 | void *data; |
29 | unsigned long *consumer_pos; |
30 | unsigned long *producer_pos; |
31 | unsigned long mask; |
32 | int map_fd; |
33 | }; |
34 | |
35 | struct ring_buffer { |
36 | struct epoll_event *events; |
37 | struct ring **rings; |
38 | size_t page_size; |
39 | int epoll_fd; |
40 | int ring_cnt; |
41 | }; |
42 | |
43 | struct user_ring_buffer { |
44 | struct epoll_event event; |
45 | unsigned long *consumer_pos; |
46 | unsigned long *producer_pos; |
47 | void *data; |
48 | unsigned long mask; |
49 | size_t page_size; |
50 | int map_fd; |
51 | int epoll_fd; |
52 | }; |
53 | |
54 | /* 8-byte ring buffer header structure */ |
55 | struct ringbuf_hdr { |
56 | __u32 len; |
57 | __u32 pad; |
58 | }; |
59 | |
60 | static void ringbuf_free_ring(struct ring_buffer *rb, struct ring *r) |
61 | { |
62 | if (r->consumer_pos) { |
63 | munmap(r->consumer_pos, rb->page_size); |
64 | r->consumer_pos = NULL; |
65 | } |
66 | if (r->producer_pos) { |
67 | munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1)); |
68 | r->producer_pos = NULL; |
69 | } |
70 | |
71 | free(r); |
72 | } |
73 | |
74 | /* Add extra RINGBUF maps to this ring buffer manager */ |
75 | int ring_buffer__add(struct ring_buffer *rb, int map_fd, |
76 | ring_buffer_sample_fn sample_cb, void *ctx) |
77 | { |
78 | struct bpf_map_info info; |
79 | __u32 len = sizeof(info); |
80 | struct epoll_event *e; |
81 | struct ring *r; |
82 | __u64 mmap_sz; |
83 | void *tmp; |
84 | int err; |
85 | |
86 | memset(&info, 0, sizeof(info)); |
87 | |
88 | err = bpf_map_get_info_by_fd(map_fd, info: &info, info_len: &len); |
89 | if (err) { |
90 | err = -errno; |
91 | pr_warn("ringbuf: failed to get map info for fd=%d: %d\n" , |
92 | map_fd, err); |
93 | return libbpf_err(ret: err); |
94 | } |
95 | |
96 | if (info.type != BPF_MAP_TYPE_RINGBUF) { |
97 | pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n" , |
98 | map_fd); |
99 | return libbpf_err(ret: -EINVAL); |
100 | } |
101 | |
102 | tmp = libbpf_reallocarray(ptr: rb->rings, nmemb: rb->ring_cnt + 1, size: sizeof(*rb->rings)); |
103 | if (!tmp) |
104 | return libbpf_err(ret: -ENOMEM); |
105 | rb->rings = tmp; |
106 | |
107 | tmp = libbpf_reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events)); |
108 | if (!tmp) |
109 | return libbpf_err(ret: -ENOMEM); |
110 | rb->events = tmp; |
111 | |
112 | r = calloc(1, sizeof(*r)); |
113 | if (!r) |
114 | return libbpf_err(ret: -ENOMEM); |
115 | rb->rings[rb->ring_cnt] = r; |
116 | |
117 | r->map_fd = map_fd; |
118 | r->sample_cb = sample_cb; |
119 | r->ctx = ctx; |
120 | r->mask = info.max_entries - 1; |
121 | |
122 | /* Map writable consumer page */ |
123 | tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, 0); |
124 | if (tmp == MAP_FAILED) { |
125 | err = -errno; |
126 | pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n" , |
127 | map_fd, err); |
128 | goto err_out; |
129 | } |
130 | r->consumer_pos = tmp; |
131 | |
132 | /* Map read-only producer page and data pages. We map twice as big |
133 | * data size to allow simple reading of samples that wrap around the |
134 | * end of a ring buffer. See kernel implementation for details. |
135 | */ |
136 | mmap_sz = rb->page_size + 2 * (__u64)info.max_entries; |
137 | if (mmap_sz != (__u64)(size_t)mmap_sz) { |
138 | err = -E2BIG; |
139 | pr_warn("ringbuf: ring buffer size (%u) is too big\n" , info.max_entries); |
140 | goto err_out; |
141 | } |
142 | tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ, MAP_SHARED, map_fd, rb->page_size); |
143 | if (tmp == MAP_FAILED) { |
144 | err = -errno; |
145 | pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n" , |
146 | map_fd, err); |
147 | goto err_out; |
148 | } |
149 | r->producer_pos = tmp; |
150 | r->data = tmp + rb->page_size; |
151 | |
152 | e = &rb->events[rb->ring_cnt]; |
153 | memset(e, 0, sizeof(*e)); |
154 | |
155 | e->events = EPOLLIN; |
156 | e->data.fd = rb->ring_cnt; |
157 | if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) { |
158 | err = -errno; |
159 | pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n" , |
160 | map_fd, err); |
161 | goto err_out; |
162 | } |
163 | |
164 | rb->ring_cnt++; |
165 | return 0; |
166 | |
167 | err_out: |
168 | ringbuf_free_ring(rb, r); |
169 | return libbpf_err(ret: err); |
170 | } |
171 | |
172 | void ring_buffer__free(struct ring_buffer *rb) |
173 | { |
174 | int i; |
175 | |
176 | if (!rb) |
177 | return; |
178 | |
179 | for (i = 0; i < rb->ring_cnt; ++i) |
180 | ringbuf_free_ring(rb, r: rb->rings[i]); |
181 | if (rb->epoll_fd >= 0) |
182 | close(rb->epoll_fd); |
183 | |
184 | free(rb->events); |
185 | free(rb->rings); |
186 | free(rb); |
187 | } |
188 | |
189 | struct ring_buffer * |
190 | ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx, |
191 | const struct ring_buffer_opts *opts) |
192 | { |
193 | struct ring_buffer *rb; |
194 | int err; |
195 | |
196 | if (!OPTS_VALID(opts, ring_buffer_opts)) |
197 | return errno = EINVAL, NULL; |
198 | |
199 | rb = calloc(1, sizeof(*rb)); |
200 | if (!rb) |
201 | return errno = ENOMEM, NULL; |
202 | |
203 | rb->page_size = getpagesize(); |
204 | |
205 | rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); |
206 | if (rb->epoll_fd < 0) { |
207 | err = -errno; |
208 | pr_warn("ringbuf: failed to create epoll instance: %d\n" , err); |
209 | goto err_out; |
210 | } |
211 | |
212 | err = ring_buffer__add(rb, map_fd, sample_cb, ctx); |
213 | if (err) |
214 | goto err_out; |
215 | |
216 | return rb; |
217 | |
218 | err_out: |
219 | ring_buffer__free(rb); |
220 | return errno = -err, NULL; |
221 | } |
222 | |
223 | static inline int roundup_len(__u32 len) |
224 | { |
225 | /* clear out top 2 bits (discard and busy, if set) */ |
226 | len <<= 2; |
227 | len >>= 2; |
228 | /* add length prefix */ |
229 | len += BPF_RINGBUF_HDR_SZ; |
230 | /* round up to 8 byte alignment */ |
231 | return (len + 7) / 8 * 8; |
232 | } |
233 | |
234 | static int64_t ringbuf_process_ring(struct ring *r) |
235 | { |
236 | int *len_ptr, len, err; |
237 | /* 64-bit to avoid overflow in case of extreme application behavior */ |
238 | int64_t cnt = 0; |
239 | unsigned long cons_pos, prod_pos; |
240 | bool got_new_data; |
241 | void *sample; |
242 | |
243 | cons_pos = smp_load_acquire(r->consumer_pos); |
244 | do { |
245 | got_new_data = false; |
246 | prod_pos = smp_load_acquire(r->producer_pos); |
247 | while (cons_pos < prod_pos) { |
248 | len_ptr = r->data + (cons_pos & r->mask); |
249 | len = smp_load_acquire(len_ptr); |
250 | |
251 | /* sample not committed yet, bail out for now */ |
252 | if (len & BPF_RINGBUF_BUSY_BIT) |
253 | goto done; |
254 | |
255 | got_new_data = true; |
256 | cons_pos += roundup_len(len); |
257 | |
258 | if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) { |
259 | sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ; |
260 | err = r->sample_cb(r->ctx, sample, len); |
261 | if (err < 0) { |
262 | /* update consumer pos and bail out */ |
263 | smp_store_release(r->consumer_pos, |
264 | cons_pos); |
265 | return err; |
266 | } |
267 | cnt++; |
268 | } |
269 | |
270 | smp_store_release(r->consumer_pos, cons_pos); |
271 | } |
272 | } while (got_new_data); |
273 | done: |
274 | return cnt; |
275 | } |
276 | |
277 | /* Consume available ring buffer(s) data without event polling. |
278 | * Returns number of records consumed across all registered ring buffers (or |
279 | * INT_MAX, whichever is less), or negative number if any of the callbacks |
280 | * return error. |
281 | */ |
282 | int ring_buffer__consume(struct ring_buffer *rb) |
283 | { |
284 | int64_t err, res = 0; |
285 | int i; |
286 | |
287 | for (i = 0; i < rb->ring_cnt; i++) { |
288 | struct ring *ring = rb->rings[i]; |
289 | |
290 | err = ringbuf_process_ring(r: ring); |
291 | if (err < 0) |
292 | return libbpf_err(ret: err); |
293 | res += err; |
294 | } |
295 | if (res > INT_MAX) |
296 | return INT_MAX; |
297 | return res; |
298 | } |
299 | |
300 | /* Poll for available data and consume records, if any are available. |
301 | * Returns number of records consumed (or INT_MAX, whichever is less), or |
302 | * negative number, if any of the registered callbacks returned error. |
303 | */ |
304 | int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms) |
305 | { |
306 | int i, cnt; |
307 | int64_t err, res = 0; |
308 | |
309 | cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms); |
310 | if (cnt < 0) |
311 | return libbpf_err(ret: -errno); |
312 | |
313 | for (i = 0; i < cnt; i++) { |
314 | __u32 ring_id = rb->events[i].data.fd; |
315 | struct ring *ring = rb->rings[ring_id]; |
316 | |
317 | err = ringbuf_process_ring(r: ring); |
318 | if (err < 0) |
319 | return libbpf_err(ret: err); |
320 | res += err; |
321 | } |
322 | if (res > INT_MAX) |
323 | return INT_MAX; |
324 | return res; |
325 | } |
326 | |
327 | /* Get an fd that can be used to sleep until data is available in the ring(s) */ |
328 | int ring_buffer__epoll_fd(const struct ring_buffer *rb) |
329 | { |
330 | return rb->epoll_fd; |
331 | } |
332 | |
333 | struct ring *ring_buffer__ring(struct ring_buffer *rb, unsigned int idx) |
334 | { |
335 | if (idx >= rb->ring_cnt) |
336 | return errno = ERANGE, NULL; |
337 | |
338 | return rb->rings[idx]; |
339 | } |
340 | |
341 | unsigned long ring__consumer_pos(const struct ring *r) |
342 | { |
343 | /* Synchronizes with smp_store_release() in ringbuf_process_ring(). */ |
344 | return smp_load_acquire(r->consumer_pos); |
345 | } |
346 | |
347 | unsigned long ring__producer_pos(const struct ring *r) |
348 | { |
349 | /* Synchronizes with smp_store_release() in __bpf_ringbuf_reserve() in |
350 | * the kernel. |
351 | */ |
352 | return smp_load_acquire(r->producer_pos); |
353 | } |
354 | |
355 | size_t ring__avail_data_size(const struct ring *r) |
356 | { |
357 | unsigned long cons_pos, prod_pos; |
358 | |
359 | cons_pos = ring__consumer_pos(r); |
360 | prod_pos = ring__producer_pos(r); |
361 | return prod_pos - cons_pos; |
362 | } |
363 | |
364 | size_t ring__size(const struct ring *r) |
365 | { |
366 | return r->mask + 1; |
367 | } |
368 | |
369 | int ring__map_fd(const struct ring *r) |
370 | { |
371 | return r->map_fd; |
372 | } |
373 | |
374 | int ring__consume(struct ring *r) |
375 | { |
376 | int64_t res; |
377 | |
378 | res = ringbuf_process_ring(r); |
379 | if (res < 0) |
380 | return libbpf_err(ret: res); |
381 | |
382 | return res > INT_MAX ? INT_MAX : res; |
383 | } |
384 | |
385 | static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb) |
386 | { |
387 | if (rb->consumer_pos) { |
388 | munmap(rb->consumer_pos, rb->page_size); |
389 | rb->consumer_pos = NULL; |
390 | } |
391 | if (rb->producer_pos) { |
392 | munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1)); |
393 | rb->producer_pos = NULL; |
394 | } |
395 | } |
396 | |
397 | void user_ring_buffer__free(struct user_ring_buffer *rb) |
398 | { |
399 | if (!rb) |
400 | return; |
401 | |
402 | user_ringbuf_unmap_ring(rb); |
403 | |
404 | if (rb->epoll_fd >= 0) |
405 | close(rb->epoll_fd); |
406 | |
407 | free(rb); |
408 | } |
409 | |
410 | static int user_ringbuf_map(struct user_ring_buffer *rb, int map_fd) |
411 | { |
412 | struct bpf_map_info info; |
413 | __u32 len = sizeof(info); |
414 | __u64 mmap_sz; |
415 | void *tmp; |
416 | struct epoll_event *rb_epoll; |
417 | int err; |
418 | |
419 | memset(&info, 0, sizeof(info)); |
420 | |
421 | err = bpf_map_get_info_by_fd(map_fd, info: &info, info_len: &len); |
422 | if (err) { |
423 | err = -errno; |
424 | pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n" , map_fd, err); |
425 | return err; |
426 | } |
427 | |
428 | if (info.type != BPF_MAP_TYPE_USER_RINGBUF) { |
429 | pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n" , map_fd); |
430 | return -EINVAL; |
431 | } |
432 | |
433 | rb->map_fd = map_fd; |
434 | rb->mask = info.max_entries - 1; |
435 | |
436 | /* Map read-only consumer page */ |
437 | tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0); |
438 | if (tmp == MAP_FAILED) { |
439 | err = -errno; |
440 | pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n" , |
441 | map_fd, err); |
442 | return err; |
443 | } |
444 | rb->consumer_pos = tmp; |
445 | |
446 | /* Map read-write the producer page and data pages. We map the data |
447 | * region as twice the total size of the ring buffer to allow the |
448 | * simple reading and writing of samples that wrap around the end of |
449 | * the buffer. See the kernel implementation for details. |
450 | */ |
451 | mmap_sz = rb->page_size + 2 * (__u64)info.max_entries; |
452 | if (mmap_sz != (__u64)(size_t)mmap_sz) { |
453 | pr_warn("user ringbuf: ring buf size (%u) is too big\n" , info.max_entries); |
454 | return -E2BIG; |
455 | } |
456 | tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ | PROT_WRITE, MAP_SHARED, |
457 | map_fd, rb->page_size); |
458 | if (tmp == MAP_FAILED) { |
459 | err = -errno; |
460 | pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n" , |
461 | map_fd, err); |
462 | return err; |
463 | } |
464 | |
465 | rb->producer_pos = tmp; |
466 | rb->data = tmp + rb->page_size; |
467 | |
468 | rb_epoll = &rb->event; |
469 | rb_epoll->events = EPOLLOUT; |
470 | if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) { |
471 | err = -errno; |
472 | pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n" , map_fd, err); |
473 | return err; |
474 | } |
475 | |
476 | return 0; |
477 | } |
478 | |
479 | struct user_ring_buffer * |
480 | user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts) |
481 | { |
482 | struct user_ring_buffer *rb; |
483 | int err; |
484 | |
485 | if (!OPTS_VALID(opts, user_ring_buffer_opts)) |
486 | return errno = EINVAL, NULL; |
487 | |
488 | rb = calloc(1, sizeof(*rb)); |
489 | if (!rb) |
490 | return errno = ENOMEM, NULL; |
491 | |
492 | rb->page_size = getpagesize(); |
493 | |
494 | rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); |
495 | if (rb->epoll_fd < 0) { |
496 | err = -errno; |
497 | pr_warn("user ringbuf: failed to create epoll instance: %d\n" , err); |
498 | goto err_out; |
499 | } |
500 | |
501 | err = user_ringbuf_map(rb, map_fd); |
502 | if (err) |
503 | goto err_out; |
504 | |
505 | return rb; |
506 | |
507 | err_out: |
508 | user_ring_buffer__free(rb); |
509 | return errno = -err, NULL; |
510 | } |
511 | |
512 | static void user_ringbuf_commit(struct user_ring_buffer *rb, void *sample, bool discard) |
513 | { |
514 | __u32 new_len; |
515 | struct ringbuf_hdr *hdr; |
516 | uintptr_t hdr_offset; |
517 | |
518 | hdr_offset = rb->mask + 1 + (sample - rb->data) - BPF_RINGBUF_HDR_SZ; |
519 | hdr = rb->data + (hdr_offset & rb->mask); |
520 | |
521 | new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT; |
522 | if (discard) |
523 | new_len |= BPF_RINGBUF_DISCARD_BIT; |
524 | |
525 | /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in |
526 | * the kernel. |
527 | */ |
528 | __atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL); |
529 | } |
530 | |
531 | void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample) |
532 | { |
533 | user_ringbuf_commit(rb, sample, discard: true); |
534 | } |
535 | |
536 | void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample) |
537 | { |
538 | user_ringbuf_commit(rb, sample, discard: false); |
539 | } |
540 | |
541 | void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) |
542 | { |
543 | __u32 avail_size, total_size, max_size; |
544 | /* 64-bit to avoid overflow in case of extreme application behavior */ |
545 | __u64 cons_pos, prod_pos; |
546 | struct ringbuf_hdr *hdr; |
547 | |
548 | /* The top two bits are used as special flags */ |
549 | if (size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)) |
550 | return errno = E2BIG, NULL; |
551 | |
552 | /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in |
553 | * the kernel. |
554 | */ |
555 | cons_pos = smp_load_acquire(rb->consumer_pos); |
556 | /* Synchronizes with smp_store_release() in user_ringbuf_commit() */ |
557 | prod_pos = smp_load_acquire(rb->producer_pos); |
558 | |
559 | max_size = rb->mask + 1; |
560 | avail_size = max_size - (prod_pos - cons_pos); |
561 | /* Round up total size to a multiple of 8. */ |
562 | total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8; |
563 | |
564 | if (total_size > max_size) |
565 | return errno = E2BIG, NULL; |
566 | |
567 | if (avail_size < total_size) |
568 | return errno = ENOSPC, NULL; |
569 | |
570 | hdr = rb->data + (prod_pos & rb->mask); |
571 | hdr->len = size | BPF_RINGBUF_BUSY_BIT; |
572 | hdr->pad = 0; |
573 | |
574 | /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in |
575 | * the kernel. |
576 | */ |
577 | smp_store_release(rb->producer_pos, prod_pos + total_size); |
578 | |
579 | return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); |
580 | } |
581 | |
582 | static __u64 ns_elapsed_timespec(const struct timespec *start, const struct timespec *end) |
583 | { |
584 | __u64 start_ns, end_ns, ns_per_s = 1000000000; |
585 | |
586 | start_ns = (__u64)start->tv_sec * ns_per_s + start->tv_nsec; |
587 | end_ns = (__u64)end->tv_sec * ns_per_s + end->tv_nsec; |
588 | |
589 | return end_ns - start_ns; |
590 | } |
591 | |
592 | void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) |
593 | { |
594 | void *sample; |
595 | int err, ms_remaining = timeout_ms; |
596 | struct timespec start; |
597 | |
598 | if (timeout_ms < 0 && timeout_ms != -1) |
599 | return errno = EINVAL, NULL; |
600 | |
601 | if (timeout_ms != -1) { |
602 | err = clock_gettime(CLOCK_MONOTONIC, &start); |
603 | if (err) |
604 | return NULL; |
605 | } |
606 | |
607 | do { |
608 | int cnt, ms_elapsed; |
609 | struct timespec curr; |
610 | __u64 ns_per_ms = 1000000; |
611 | |
612 | sample = user_ring_buffer__reserve(rb, size); |
613 | if (sample) |
614 | return sample; |
615 | else if (errno != ENOSPC) |
616 | return NULL; |
617 | |
618 | /* The kernel guarantees at least one event notification |
619 | * delivery whenever at least one sample is drained from the |
620 | * ring buffer in an invocation to bpf_ringbuf_drain(). Other |
621 | * additional events may be delivered at any time, but only one |
622 | * event is guaranteed per bpf_ringbuf_drain() invocation, |
623 | * provided that a sample is drained, and the BPF program did |
624 | * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). If |
625 | * BPF_RB_FORCE_WAKEUP is passed to bpf_ringbuf_drain(), a |
626 | * wakeup event will be delivered even if no samples are |
627 | * drained. |
628 | */ |
629 | cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); |
630 | if (cnt < 0) |
631 | return NULL; |
632 | |
633 | if (timeout_ms == -1) |
634 | continue; |
635 | |
636 | err = clock_gettime(CLOCK_MONOTONIC, &curr); |
637 | if (err) |
638 | return NULL; |
639 | |
640 | ms_elapsed = ns_elapsed_timespec(start: &start, end: &curr) / ns_per_ms; |
641 | ms_remaining = timeout_ms - ms_elapsed; |
642 | } while (ms_remaining > 0); |
643 | |
644 | /* Try one more time to reserve a sample after the specified timeout has elapsed. */ |
645 | return user_ring_buffer__reserve(rb, size); |
646 | } |
647 | |