1/*
2 * Copyright Andrey Semashev 2007 - 2021.
3 * Distributed under the Boost Software License, Version 1.0.
4 * (See accompanying file LICENSE_1_0.txt or copy at
5 * http://www.boost.org/LICENSE_1_0.txt)
6 */
7/*!
8 * \file threadsafe_queue.cpp
9 * \author Andrey Semashev
10 * \date 05.11.2010
11 *
12 * \brief This header is the Boost.Log library implementation, see the library documentation
13 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
14 *
15 * The implementation is based on algorithms published in the "Simple, Fast,
16 * and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" article
17 * in PODC96 by Maged M. Michael and Michael L. Scott. Pseudocode is available here:
18 * http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
19 *
20 * The lock-free version of the mentioned algorithms contain a race condition and therefore
21 * were not included here.
22 */
23
24#include <boost/log/detail/config.hpp>
25#include <boost/log/detail/threadsafe_queue.hpp>
26
27#ifndef BOOST_LOG_NO_THREADS
28
29#include <new>
30#include <boost/assert.hpp>
31#include <boost/throw_exception.hpp>
32#include <boost/memory_order.hpp>
33#include <boost/atomic/atomic.hpp>
34#include <boost/align/aligned_alloc.hpp>
35#include <boost/type_traits/alignment_of.hpp>
36#include <boost/log/detail/adaptive_mutex.hpp>
37#include <boost/log/detail/locks.hpp>
38#include <boost/log/detail/header.hpp>
39
40namespace boost {
41
42BOOST_LOG_OPEN_NAMESPACE
43
44namespace aux {
45
46//! Generic queue implementation with two locks
47class threadsafe_queue_impl_generic :
48 public threadsafe_queue_impl
49{
50private:
51 //! Mutex type to be used
52 typedef adaptive_mutex mutex_type;
53
54 /*!
55 * A structure that contains a pointer to the node and the associated mutex.
56 * The alignment below allows to eliminate false sharing, it should not be less than CPU cache line size.
57 */
58 struct BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE) pointer
59 {
60 //! Pointer to the either end of the queue
61 node_base* node;
62 //! Lock for access synchronization
63 mutex_type mutex;
64 // 128 bytes padding is chosen to mitigate false sharing for NetBurst CPUs, which load two cache lines in one go.
65 unsigned char padding[128U - (sizeof(node_base*) + sizeof(mutex_type)) % 128U];
66 };
67
68private:
69 //! Pointer to the beginning of the queue
70 pointer m_Head;
71 //! Pointer to the end of the queue
72 pointer m_Tail;
73
74public:
75 explicit threadsafe_queue_impl_generic(node_base* first_node)
76 {
77 set_next(p: first_node, NULL);
78 m_Head.node = m_Tail.node = first_node;
79 }
80
81 static void* operator new (std::size_t size)
82 {
83 void* p = alignment::aligned_alloc(BOOST_LOG_CPU_CACHE_LINE_SIZE, size);
84 if (BOOST_UNLIKELY(!p))
85 BOOST_THROW_EXCEPTION(std::bad_alloc());
86 return p;
87 }
88
89 static void operator delete (void* p, std::size_t) BOOST_NOEXCEPT
90 {
91 alignment::aligned_free(ptr: p);
92 }
93
94 node_base* reset_last_node() BOOST_NOEXCEPT
95 {
96 BOOST_ASSERT(m_Head.node == m_Tail.node);
97 node_base* p = m_Head.node;
98 m_Head.node = m_Tail.node = NULL;
99 return p;
100 }
101
102 bool unsafe_empty() const BOOST_NOEXCEPT
103 {
104 return m_Head.node == m_Tail.node;
105 }
106
107 void push(node_base* p)
108 {
109 set_next(p, NULL);
110 exclusive_lock_guard< mutex_type > lock(m_Tail.mutex);
111 set_next(p: m_Tail.node, next: p);
112 m_Tail.node = p;
113 }
114
115 bool try_pop(node_base*& node_to_free, node_base*& node_with_value)
116 {
117 exclusive_lock_guard< mutex_type > lock(m_Head.mutex);
118 node_base* next = get_next(p: m_Head.node);
119 if (next)
120 {
121 // We have a node to pop
122 node_to_free = m_Head.node;
123 node_with_value = m_Head.node = next;
124 return true;
125 }
126 else
127 return false;
128 }
129
130private:
131 BOOST_FORCEINLINE static void set_next(node_base* p, node_base* next)
132 {
133 p->next.store(v: next, order: boost::memory_order_relaxed);
134 }
135 BOOST_FORCEINLINE static node_base* get_next(node_base* p)
136 {
137 return p->next.load(order: boost::memory_order_relaxed);
138 }
139
140 // Copying and assignment are closed
141 BOOST_DELETED_FUNCTION(threadsafe_queue_impl_generic(threadsafe_queue_impl_generic const&))
142 BOOST_DELETED_FUNCTION(threadsafe_queue_impl_generic& operator= (threadsafe_queue_impl_generic const&))
143};
144
145inline threadsafe_queue_impl::threadsafe_queue_impl()
146{
147}
148
149inline threadsafe_queue_impl::~threadsafe_queue_impl()
150{
151}
152
153BOOST_LOG_API threadsafe_queue_impl* threadsafe_queue_impl::create(node_base* first_node)
154{
155 return new threadsafe_queue_impl_generic(first_node);
156}
157
158BOOST_LOG_API void threadsafe_queue_impl::destroy(threadsafe_queue_impl* impl) BOOST_NOEXCEPT
159{
160 delete static_cast< threadsafe_queue_impl_generic* >(impl);
161}
162
163BOOST_LOG_API threadsafe_queue_impl::node_base* threadsafe_queue_impl::reset_last_node(threadsafe_queue_impl* impl) BOOST_NOEXCEPT
164{
165 return static_cast< threadsafe_queue_impl_generic* >(impl)->reset_last_node();
166}
167
168BOOST_LOG_API bool threadsafe_queue_impl::unsafe_empty(const threadsafe_queue_impl* impl) BOOST_NOEXCEPT
169{
170 return static_cast< const threadsafe_queue_impl_generic* >(impl)->unsafe_empty();
171}
172
173BOOST_LOG_API void threadsafe_queue_impl::push(threadsafe_queue_impl* impl, node_base* p)
174{
175 static_cast< threadsafe_queue_impl_generic* >(impl)->push(p);
176}
177
178BOOST_LOG_API bool threadsafe_queue_impl::try_pop(threadsafe_queue_impl* impl, node_base*& node_to_free, node_base*& node_with_value)
179{
180 return static_cast< threadsafe_queue_impl_generic* >(impl)->try_pop(node_to_free, node_with_value);
181}
182
183} // namespace aux
184
185BOOST_LOG_CLOSE_NAMESPACE // namespace log
186
187} // namespace boost
188
189#include <boost/log/detail/footer.hpp>
190
191#endif // BOOST_LOG_NO_THREADS
192

source code of boost/libs/log/src/threadsafe_queue.cpp