1//
2// detail/impl/epoll_reactor.ipp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4//
5// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#ifndef BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
12#define BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
13
14#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15# pragma once
16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18#include <boost/asio/detail/config.hpp>
19
20#if defined(BOOST_ASIO_HAS_EPOLL)
21
22#include <cstddef>
23#include <sys/epoll.h>
24#include <boost/asio/detail/epoll_reactor.hpp>
25#include <boost/asio/detail/throw_error.hpp>
26#include <boost/asio/error.hpp>
27
28#if defined(BOOST_ASIO_HAS_TIMERFD)
29# include <sys/timerfd.h>
30#endif // defined(BOOST_ASIO_HAS_TIMERFD)
31
32#include <boost/asio/detail/push_options.hpp>
33
34namespace boost {
35namespace asio {
36namespace detail {
37
38epoll_reactor::epoll_reactor(boost::asio::io_service& io_service)
39 : boost::asio::detail::service_base<epoll_reactor>(io_service),
40 io_service_(use_service<io_service_impl>(ios&: io_service)),
41 mutex_(),
42 interrupter_(),
43 epoll_fd_(do_epoll_create()),
44 timer_fd_(do_timerfd_create()),
45 shutdown_(false)
46{
47 // Add the interrupter's descriptor to epoll.
48 epoll_event ev = { .events: 0, .data: { .ptr: 0 } };
49 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
50 ev.data.ptr = &interrupter_;
51 epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_ADD, fd: interrupter_.read_descriptor(), event: &ev);
52 interrupter_.interrupt();
53
54 // Add the timer descriptor to epoll.
55 if (timer_fd_ != -1)
56 {
57 ev.events = EPOLLIN | EPOLLERR;
58 ev.data.ptr = &timer_fd_;
59 epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_ADD, fd: timer_fd_, event: &ev);
60 }
61}
62
63epoll_reactor::~epoll_reactor()
64{
65 if (epoll_fd_ != -1)
66 close(fd: epoll_fd_);
67 if (timer_fd_ != -1)
68 close(fd: timer_fd_);
69}
70
71void epoll_reactor::shutdown_service()
72{
73 mutex::scoped_lock lock(mutex_);
74 shutdown_ = true;
75 lock.unlock();
76
77 op_queue<operation> ops;
78
79 while (descriptor_state* state = registered_descriptors_.first())
80 {
81 for (int i = 0; i < max_ops; ++i)
82 ops.push(q&: state->op_queue_[i]);
83 state->shutdown_ = true;
84 registered_descriptors_.free(o: state);
85 }
86
87 timer_queues_.get_all_timers(ops);
88
89 io_service_.abandon_operations(ops);
90}
91
92void epoll_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)
93{
94 if (fork_ev == boost::asio::io_service::fork_child)
95 {
96 if (epoll_fd_ != -1)
97 ::close(fd: epoll_fd_);
98 epoll_fd_ = -1;
99 epoll_fd_ = do_epoll_create();
100
101 if (timer_fd_ != -1)
102 ::close(fd: timer_fd_);
103 timer_fd_ = -1;
104 timer_fd_ = do_timerfd_create();
105
106 interrupter_.recreate();
107
108 // Add the interrupter's descriptor to epoll.
109 epoll_event ev = { .events: 0, .data: { .ptr: 0 } };
110 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
111 ev.data.ptr = &interrupter_;
112 epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_ADD, fd: interrupter_.read_descriptor(), event: &ev);
113 interrupter_.interrupt();
114
115 // Add the timer descriptor to epoll.
116 if (timer_fd_ != -1)
117 {
118 ev.events = EPOLLIN | EPOLLERR;
119 ev.data.ptr = &timer_fd_;
120 epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_ADD, fd: timer_fd_, event: &ev);
121 }
122
123 update_timeout();
124
125 // Re-register all descriptors with epoll.
126 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
127 for (descriptor_state* state = registered_descriptors_.first();
128 state != 0; state = state->next_)
129 {
130 ev.events = state->registered_events_;
131 ev.data.ptr = state;
132 int result = epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_ADD, fd: state->descriptor_, event: &ev);
133 if (result != 0)
134 {
135 boost::system::error_code ec(errno,
136 boost::asio::error::get_system_category());
137 boost::asio::detail::throw_error(err: ec, location: "epoll re-registration");
138 }
139 }
140 }
141}
142
143void epoll_reactor::init_task()
144{
145 io_service_.init_task();
146}
147
148int epoll_reactor::register_descriptor(socket_type descriptor,
149 epoll_reactor::per_descriptor_data& descriptor_data)
150{
151 descriptor_data = allocate_descriptor_state();
152
153 {
154 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
155
156 descriptor_data->reactor_ = this;
157 descriptor_data->descriptor_ = descriptor;
158 descriptor_data->shutdown_ = false;
159 }
160
161 epoll_event ev = { .events: 0, .data: { .ptr: 0 } };
162 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
163 descriptor_data->registered_events_ = ev.events;
164 ev.data.ptr = descriptor_data;
165 int result = epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_ADD, fd: descriptor, event: &ev);
166 if (result != 0)
167 return errno;
168
169 return 0;
170}
171
172int epoll_reactor::register_internal_descriptor(
173 int op_type, socket_type descriptor,
174 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
175{
176 descriptor_data = allocate_descriptor_state();
177
178 {
179 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
180
181 descriptor_data->reactor_ = this;
182 descriptor_data->descriptor_ = descriptor;
183 descriptor_data->shutdown_ = false;
184 descriptor_data->op_queue_[op_type].push(h: op);
185 }
186
187 epoll_event ev = { .events: 0, .data: { .ptr: 0 } };
188 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
189 descriptor_data->registered_events_ = ev.events;
190 ev.data.ptr = descriptor_data;
191 int result = epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_ADD, fd: descriptor, event: &ev);
192 if (result != 0)
193 return errno;
194
195 return 0;
196}
197
198void epoll_reactor::move_descriptor(socket_type,
199 epoll_reactor::per_descriptor_data& target_descriptor_data,
200 epoll_reactor::per_descriptor_data& source_descriptor_data)
201{
202 target_descriptor_data = source_descriptor_data;
203 source_descriptor_data = 0;
204}
205
206void epoll_reactor::start_op(int op_type, socket_type descriptor,
207 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
208 bool is_continuation, bool allow_speculative)
209{
210 if (!descriptor_data)
211 {
212 op->ec_ = boost::asio::error::bad_descriptor;
213 post_immediate_completion(op, is_continuation);
214 return;
215 }
216
217 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
218
219 if (descriptor_data->shutdown_)
220 {
221 post_immediate_completion(op, is_continuation);
222 return;
223 }
224
225 if (descriptor_data->op_queue_[op_type].empty())
226 {
227 if (allow_speculative
228 && (op_type != read_op
229 || descriptor_data->op_queue_[except_op].empty()))
230 {
231 if (op->perform())
232 {
233 descriptor_lock.unlock();
234 io_service_.post_immediate_completion(op, is_continuation);
235 return;
236 }
237
238 if (op_type == write_op)
239 {
240 if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
241 {
242 epoll_event ev = { .events: 0, .data: { .ptr: 0 } };
243 ev.events = descriptor_data->registered_events_ | EPOLLOUT;
244 ev.data.ptr = descriptor_data;
245 if (epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_MOD, fd: descriptor, event: &ev) == 0)
246 {
247 descriptor_data->registered_events_ |= ev.events;
248 }
249 else
250 {
251 op->ec_ = boost::system::error_code(errno,
252 boost::asio::error::get_system_category());
253 io_service_.post_immediate_completion(op, is_continuation);
254 return;
255 }
256 }
257 }
258 }
259 else
260 {
261 if (op_type == write_op)
262 {
263 descriptor_data->registered_events_ |= EPOLLOUT;
264 }
265
266 epoll_event ev = { .events: 0, .data: { .ptr: 0 } };
267 ev.events = descriptor_data->registered_events_;
268 ev.data.ptr = descriptor_data;
269 epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_MOD, fd: descriptor, event: &ev);
270 }
271 }
272
273 descriptor_data->op_queue_[op_type].push(h: op);
274 io_service_.work_started();
275}
276
277void epoll_reactor::cancel_ops(socket_type,
278 epoll_reactor::per_descriptor_data& descriptor_data)
279{
280 if (!descriptor_data)
281 return;
282
283 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
284
285 op_queue<operation> ops;
286 for (int i = 0; i < max_ops; ++i)
287 {
288 while (reactor_op* op = descriptor_data->op_queue_[i].front())
289 {
290 op->ec_ = boost::asio::error::operation_aborted;
291 descriptor_data->op_queue_[i].pop();
292 ops.push(h: op);
293 }
294 }
295
296 descriptor_lock.unlock();
297
298 io_service_.post_deferred_completions(ops);
299}
300
301void epoll_reactor::deregister_descriptor(socket_type descriptor,
302 epoll_reactor::per_descriptor_data& descriptor_data, bool closing)
303{
304 if (!descriptor_data)
305 return;
306
307 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
308
309 if (!descriptor_data->shutdown_)
310 {
311 if (closing)
312 {
313 // The descriptor will be automatically removed from the epoll set when
314 // it is closed.
315 }
316 else
317 {
318 epoll_event ev = { .events: 0, .data: { .ptr: 0 } };
319 epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_DEL, fd: descriptor, event: &ev);
320 }
321
322 op_queue<operation> ops;
323 for (int i = 0; i < max_ops; ++i)
324 {
325 while (reactor_op* op = descriptor_data->op_queue_[i].front())
326 {
327 op->ec_ = boost::asio::error::operation_aborted;
328 descriptor_data->op_queue_[i].pop();
329 ops.push(h: op);
330 }
331 }
332
333 descriptor_data->descriptor_ = -1;
334 descriptor_data->shutdown_ = true;
335
336 descriptor_lock.unlock();
337
338 free_descriptor_state(s: descriptor_data);
339 descriptor_data = 0;
340
341 io_service_.post_deferred_completions(ops);
342 }
343}
344
345void epoll_reactor::deregister_internal_descriptor(socket_type descriptor,
346 epoll_reactor::per_descriptor_data& descriptor_data)
347{
348 if (!descriptor_data)
349 return;
350
351 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
352
353 if (!descriptor_data->shutdown_)
354 {
355 epoll_event ev = { .events: 0, .data: { .ptr: 0 } };
356 epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_DEL, fd: descriptor, event: &ev);
357
358 op_queue<operation> ops;
359 for (int i = 0; i < max_ops; ++i)
360 ops.push(q&: descriptor_data->op_queue_[i]);
361
362 descriptor_data->descriptor_ = -1;
363 descriptor_data->shutdown_ = true;
364
365 descriptor_lock.unlock();
366
367 free_descriptor_state(s: descriptor_data);
368 descriptor_data = 0;
369 }
370}
371
372void epoll_reactor::run(bool block, op_queue<operation>& ops)
373{
374 // This code relies on the fact that the task_io_service queues the reactor
375 // task behind all descriptor operations generated by this function. This
376 // means, that by the time we reach this point, any previously returned
377 // descriptor operations have already been dequeued. Therefore it is now safe
378 // for us to reuse and return them for the task_io_service to queue again.
379
380 // Calculate a timeout only if timerfd is not used.
381 int timeout;
382 if (timer_fd_ != -1)
383 timeout = block ? -1 : 0;
384 else
385 {
386 mutex::scoped_lock lock(mutex_);
387 timeout = block ? get_timeout() : 0;
388 }
389
390 // Block on the epoll descriptor.
391 epoll_event events[128];
392 int num_events = epoll_wait(epfd: epoll_fd_, events: events, maxevents: 128, timeout: timeout);
393
394#if defined(BOOST_ASIO_HAS_TIMERFD)
395 bool check_timers = (timer_fd_ == -1);
396#else // defined(BOOST_ASIO_HAS_TIMERFD)
397 bool check_timers = true;
398#endif // defined(BOOST_ASIO_HAS_TIMERFD)
399
400 // Dispatch the waiting events.
401 for (int i = 0; i < num_events; ++i)
402 {
403 void* ptr = events[i].data.ptr;
404 if (ptr == &interrupter_)
405 {
406 // No need to reset the interrupter since we're leaving the descriptor
407 // in a ready-to-read state and relying on edge-triggered notifications
408 // to make it so that we only get woken up when the descriptor's epoll
409 // registration is updated.
410
411#if defined(BOOST_ASIO_HAS_TIMERFD)
412 if (timer_fd_ == -1)
413 check_timers = true;
414#else // defined(BOOST_ASIO_HAS_TIMERFD)
415 check_timers = true;
416#endif // defined(BOOST_ASIO_HAS_TIMERFD)
417 }
418#if defined(BOOST_ASIO_HAS_TIMERFD)
419 else if (ptr == &timer_fd_)
420 {
421 check_timers = true;
422 }
423#endif // defined(BOOST_ASIO_HAS_TIMERFD)
424 else
425 {
426 // The descriptor operation doesn't count as work in and of itself, so we
427 // don't call work_started() here. This still allows the io_service to
428 // stop if the only remaining operations are descriptor operations.
429 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
430 descriptor_data->set_ready_events(events[i].events);
431 ops.push(h: descriptor_data);
432 }
433 }
434
435 if (check_timers)
436 {
437 mutex::scoped_lock common_lock(mutex_);
438 timer_queues_.get_ready_timers(ops);
439
440#if defined(BOOST_ASIO_HAS_TIMERFD)
441 if (timer_fd_ != -1)
442 {
443 itimerspec new_timeout;
444 itimerspec old_timeout;
445 int flags = get_timeout(ts&: new_timeout);
446 timerfd_settime(ufd: timer_fd_, flags: flags, utmr: &new_timeout, otmr: &old_timeout);
447 }
448#endif // defined(BOOST_ASIO_HAS_TIMERFD)
449 }
450}
451
452void epoll_reactor::interrupt()
453{
454 epoll_event ev = { .events: 0, .data: { .ptr: 0 } };
455 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
456 ev.data.ptr = &interrupter_;
457 epoll_ctl(epfd: epoll_fd_, EPOLL_CTL_MOD, fd: interrupter_.read_descriptor(), event: &ev);
458}
459
460int epoll_reactor::do_epoll_create()
461{
462#if defined(EPOLL_CLOEXEC)
463 int fd = epoll_create1(EPOLL_CLOEXEC);
464#else // defined(EPOLL_CLOEXEC)
465 int fd = -1;
466 errno = EINVAL;
467#endif // defined(EPOLL_CLOEXEC)
468
469 if (fd == -1 && (errno == EINVAL || errno == ENOSYS))
470 {
471 fd = epoll_create(size: epoll_size);
472 if (fd != -1)
473 ::fcntl(fd: fd, F_SETFD, FD_CLOEXEC);
474 }
475
476 if (fd == -1)
477 {
478 boost::system::error_code ec(errno,
479 boost::asio::error::get_system_category());
480 boost::asio::detail::throw_error(err: ec, location: "epoll");
481 }
482
483 return fd;
484}
485
486int epoll_reactor::do_timerfd_create()
487{
488#if defined(BOOST_ASIO_HAS_TIMERFD)
489# if defined(TFD_CLOEXEC)
490 int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
491# else // defined(TFD_CLOEXEC)
492 int fd = -1;
493 errno = EINVAL;
494# endif // defined(TFD_CLOEXEC)
495
496 if (fd == -1 && errno == EINVAL)
497 {
498 fd = timerfd_create(CLOCK_MONOTONIC, flags: 0);
499 if (fd != -1)
500 ::fcntl(fd: fd, F_SETFD, FD_CLOEXEC);
501 }
502
503 return fd;
504#else // defined(BOOST_ASIO_HAS_TIMERFD)
505 return -1;
506#endif // defined(BOOST_ASIO_HAS_TIMERFD)
507}
508
509epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
510{
511 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
512 return registered_descriptors_.alloc();
513}
514
515void epoll_reactor::free_descriptor_state(epoll_reactor::descriptor_state* s)
516{
517 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
518 registered_descriptors_.free(o: s);
519}
520
521void epoll_reactor::do_add_timer_queue(timer_queue_base& queue)
522{
523 mutex::scoped_lock lock(mutex_);
524 timer_queues_.insert(q: &queue);
525}
526
527void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue)
528{
529 mutex::scoped_lock lock(mutex_);
530 timer_queues_.erase(q: &queue);
531}
532
533void epoll_reactor::update_timeout()
534{
535#if defined(BOOST_ASIO_HAS_TIMERFD)
536 if (timer_fd_ != -1)
537 {
538 itimerspec new_timeout;
539 itimerspec old_timeout;
540 int flags = get_timeout(ts&: new_timeout);
541 timerfd_settime(ufd: timer_fd_, flags: flags, utmr: &new_timeout, otmr: &old_timeout);
542 return;
543 }
544#endif // defined(BOOST_ASIO_HAS_TIMERFD)
545 interrupt();
546}
547
548int epoll_reactor::get_timeout()
549{
550 // By default we will wait no longer than 5 minutes. This will ensure that
551 // any changes to the system clock are detected after no longer than this.
552 return timer_queues_.wait_duration_msec(max_duration: 5 * 60 * 1000);
553}
554
555#if defined(BOOST_ASIO_HAS_TIMERFD)
556int epoll_reactor::get_timeout(itimerspec& ts)
557{
558 ts.it_interval.tv_sec = 0;
559 ts.it_interval.tv_nsec = 0;
560
561 long usec = timer_queues_.wait_duration_usec(max_duration: 5 * 60 * 1000 * 1000);
562 ts.it_value.tv_sec = usec / 1000000;
563 ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
564
565 return usec ? 0 : TFD_TIMER_ABSTIME;
566}
567#endif // defined(BOOST_ASIO_HAS_TIMERFD)
568
569struct epoll_reactor::perform_io_cleanup_on_block_exit
570{
571 explicit perform_io_cleanup_on_block_exit(epoll_reactor* r)
572 : reactor_(r), first_op_(0)
573 {
574 }
575
576 ~perform_io_cleanup_on_block_exit()
577 {
578 if (first_op_)
579 {
580 // Post the remaining completed operations for invocation.
581 if (!ops_.empty())
582 reactor_->io_service_.post_deferred_completions(ops&: ops_);
583
584 // A user-initiated operation has completed, but there's no need to
585 // explicitly call work_finished() here. Instead, we'll take advantage of
586 // the fact that the task_io_service will call work_finished() once we
587 // return.
588 }
589 else
590 {
591 // No user-initiated operations have completed, so we need to compensate
592 // for the work_finished() call that the task_io_service will make once
593 // this operation returns.
594 reactor_->io_service_.work_started();
595 }
596 }
597
598 epoll_reactor* reactor_;
599 op_queue<operation> ops_;
600 operation* first_op_;
601};
602
603epoll_reactor::descriptor_state::descriptor_state()
604 : operation(&epoll_reactor::descriptor_state::do_complete)
605{
606}
607
608operation* epoll_reactor::descriptor_state::perform_io(uint32_t events)
609{
610 mutex_.lock();
611 perform_io_cleanup_on_block_exit io_cleanup(reactor_);
612 mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock);
613
614 // Exception operations must be processed first to ensure that any
615 // out-of-band data is read before normal data.
616 static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI };
617 for (int j = max_ops - 1; j >= 0; --j)
618 {
619 if (events & (flag[j] | EPOLLERR | EPOLLHUP))
620 {
621 while (reactor_op* op = op_queue_[j].front())
622 {
623 if (op->perform())
624 {
625 op_queue_[j].pop();
626 io_cleanup.ops_.push(h: op);
627 }
628 else
629 break;
630 }
631 }
632 }
633
634 // The first operation will be returned for completion now. The others will
635 // be posted for later by the io_cleanup object's destructor.
636 io_cleanup.first_op_ = io_cleanup.ops_.front();
637 io_cleanup.ops_.pop();
638 return io_cleanup.first_op_;
639}
640
641void epoll_reactor::descriptor_state::do_complete(
642 io_service_impl* owner, operation* base,
643 const boost::system::error_code& ec, std::size_t bytes_transferred)
644{
645 if (owner)
646 {
647 descriptor_state* descriptor_data = static_cast<descriptor_state*>(base);
648 uint32_t events = static_cast<uint32_t>(bytes_transferred);
649 if (operation* op = descriptor_data->perform_io(events))
650 {
651 op->complete(owner&: *owner, ec, bytes_transferred: 0);
652 }
653 }
654}
655
656} // namespace detail
657} // namespace asio
658} // namespace boost
659
660#include <boost/asio/detail/pop_options.hpp>
661
662#endif // defined(BOOST_ASIO_HAS_EPOLL)
663
664#endif // BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
665

source code of boost/boost/asio/detail/impl/epoll_reactor.ipp