1// Copyright (C) 2011-2013 Tim Blechmann
2//
3// Distributed under the Boost Software License, Version 1.0. (See
4// accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#include <boost/lockfree/spsc_queue.hpp>
8#include <boost/thread.hpp>
9
10#define BOOST_TEST_MAIN
11#ifdef BOOST_LOCKFREE_INCLUDE_TESTS
12#include <boost/test/included/unit_test.hpp>
13#else
14#include <boost/test/unit_test.hpp>
15#endif
16
17#include <iostream>
18#include <memory>
19
20#include "test_helpers.hpp"
21#include "test_common.hpp"
22
23using namespace boost;
24using namespace boost::lockfree;
25using namespace std;
26
27#ifndef BOOST_LOCKFREE_STRESS_TEST
28static const boost::uint32_t nodes_per_thread = 100000;
29#else
30static const boost::uint32_t nodes_per_thread = 100000000;
31#endif
32
33struct spsc_queue_tester
34{
35 spsc_queue<int, capacity<128> > sf;
36
37 boost::lockfree::detail::atomic<long> spsc_queue_cnt, received_nodes;
38
39// In VxWorks one RTP just supports 65535 objects
40#ifndef __VXWORKS__
41 static_hashed_set<int, 1<<16 > working_set;
42#else
43 static_hashed_set<int, 1<<15 > working_set;
44#endif
45
46 spsc_queue_tester(void):
47 spsc_queue_cnt(0), received_nodes(0)
48 {}
49
50 void add(void)
51 {
52 for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) {
53 int id = generate_id<int>();
54 working_set.insert(id);
55
56 while (sf.push(t: id) == false)
57 {}
58
59 ++spsc_queue_cnt;
60 }
61 running = false;
62 }
63
64 bool get_element(void)
65 {
66 int data;
67 bool success = sf.pop(ret&: data);
68
69 if (success) {
70 ++received_nodes;
71 --spsc_queue_cnt;
72 bool erased = working_set.erase(id: data);
73 assert(erased);
74 return true;
75 } else
76 return false;
77 }
78
79 boost::lockfree::detail::atomic<bool> running;
80
81 void get(void)
82 {
83 for(;;) {
84 bool success = get_element();
85 if (!running && !success)
86 break;
87 }
88
89 while ( get_element() );
90 }
91
92 void run(void)
93 {
94 running = true;
95
96 BOOST_REQUIRE(sf.empty());
97
98 boost::thread reader(boost::bind(f: &spsc_queue_tester::get, a1: this));
99 boost::thread writer(boost::bind(f: &spsc_queue_tester::add, a1: this));
100 cout << "reader and writer threads created" << endl;
101
102 writer.join();
103 cout << "writer threads joined. waiting for readers to finish" << endl;
104
105 reader.join();
106
107 BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
108 BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
109 BOOST_REQUIRE(sf.empty());
110 BOOST_REQUIRE(working_set.count_nodes() == 0);
111 }
112};
113
114BOOST_AUTO_TEST_CASE( spsc_queue_test_caching )
115{
116 boost::shared_ptr<spsc_queue_tester> test1(new spsc_queue_tester);
117 test1->run();
118}
119
120struct spsc_queue_tester_buffering
121{
122 spsc_queue<int, capacity<128> > sf;
123
124 boost::lockfree::detail::atomic<long> spsc_queue_cnt;
125
126// In VxWorks one RTP just supports 65535 objects
127#ifndef __VXWORKS__
128 static_hashed_set<int, 1<<16 > working_set;
129#else
130 static_hashed_set<int, 1<<15 > working_set;
131#endif
132
133 boost::lockfree::detail::atomic<size_t> received_nodes;
134
135 spsc_queue_tester_buffering(void):
136 spsc_queue_cnt(0), received_nodes(0)
137 {}
138
139 static const size_t buf_size = 5;
140
141 void add(void)
142 {
143 boost::array<int, buf_size> input_buffer;
144 for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) {
145 for (size_t i = 0; i != buf_size; ++i) {
146 int id = generate_id<int>();
147 working_set.insert(id);
148 input_buffer[i] = id;
149 }
150
151 size_t pushed = 0;
152
153 do {
154 pushed += sf.push(t: input_buffer.c_array() + pushed,
155 size: input_buffer.size() - pushed);
156 } while (pushed != buf_size);
157
158 spsc_queue_cnt+=buf_size;
159 }
160 running = false;
161 }
162
163 bool get_elements(void)
164 {
165 boost::array<int, buf_size> output_buffer;
166
167 size_t popd = sf.pop(ret: output_buffer.c_array(), size: output_buffer.size());
168
169 if (popd) {
170 received_nodes += popd;
171 spsc_queue_cnt -= popd;
172
173 for (size_t i = 0; i != popd; ++i) {
174 bool erased = working_set.erase(id: output_buffer[i]);
175 assert(erased);
176 }
177
178 return true;
179 } else
180 return false;
181 }
182
183 boost::lockfree::detail::atomic<bool> running;
184
185 void get(void)
186 {
187 for(;;) {
188 bool success = get_elements();
189 if (!running && !success)
190 break;
191 }
192
193 while ( get_elements() );
194 }
195
196 void run(void)
197 {
198 running = true;
199
200 boost::thread reader(boost::bind(f: &spsc_queue_tester_buffering::get, a1: this));
201 boost::thread writer(boost::bind(f: &spsc_queue_tester_buffering::add, a1: this));
202 cout << "reader and writer threads created" << endl;
203
204 writer.join();
205 cout << "writer threads joined. waiting for readers to finish" << endl;
206
207 reader.join();
208
209 BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
210 BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
211 BOOST_REQUIRE(sf.empty());
212 BOOST_REQUIRE(working_set.count_nodes() == 0);
213 }
214};
215
216
217BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering )
218{
219 boost::shared_ptr<spsc_queue_tester_buffering> test1(new spsc_queue_tester_buffering);
220 test1->run();
221}
222
223

source code of boost/libs/lockfree/test/spsc_queue_stress_test.cpp