1// Copyright (C) 2011 Tim Blechmann
2//
3// Distributed under the Boost Software License, Version 1.0. (See
4// accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#include <cassert>
8#include <iostream>
9#include "test_helpers.hpp"
10
11#include <boost/array.hpp>
12#include <boost/thread.hpp>
13
14namespace impl {
15
16using boost::array;
17using namespace boost;
18using namespace std;
19
20template <bool Bounded = false>
21struct queue_stress_tester
22{
23 static const unsigned int buckets = 1<<13;
24#ifndef BOOST_LOCKFREE_STRESS_TEST
25 static const long node_count = 5000;
26#else
27 static const long node_count = 500000;
28#endif
29 const int reader_threads;
30 const int writer_threads;
31
32 boost::lockfree::detail::atomic<int> writers_finished;
33
34 static_hashed_set<long, buckets> data;
35 static_hashed_set<long, buckets> dequeued;
36 array<std::set<long>, buckets> returned;
37
38 boost::lockfree::detail::atomic<int> push_count, pop_count;
39
40 queue_stress_tester(int reader, int writer):
41 reader_threads(reader), writer_threads(writer), push_count(0), pop_count(0)
42 {}
43
44 template <typename queue>
45 void add_items(queue & stk)
46 {
47 for (long i = 0; i != node_count; ++i) {
48 long id = generate_id<long>();
49
50 bool inserted = data.insert(id);
51 assert(inserted);
52
53 if (Bounded)
54 while(stk.bounded_push(id) == false) {
55#ifdef __VXWORKS__
56 thread::yield();
57#endif
58 }
59 else
60 while(stk.push(id) == false) {
61#ifdef __VXWORKS__
62 thread::yield();
63#endif
64 }
65 ++push_count;
66 }
67 writers_finished += 1;
68 }
69
70 boost::lockfree::detail::atomic<bool> running;
71
72 template <typename queue>
73 bool consume_element(queue & q)
74 {
75 long id;
76 bool ret = q.pop(id);
77
78 if (!ret)
79 return false;
80
81 bool erased = data.erase(id);
82 bool inserted = dequeued.insert(id);
83 assert(erased);
84 assert(inserted);
85 ++pop_count;
86 return true;
87 }
88
89 template <typename queue>
90 void get_items(queue & q)
91 {
92 for (;;) {
93 bool received_element = consume_element(q);
94 if (received_element)
95 continue;
96
97 if ( writers_finished.load() == writer_threads )
98 break;
99
100#ifdef __VXWORKS__
101 thread::yield();
102#endif
103 }
104
105 while (consume_element(q));
106 }
107
108 template <typename queue>
109 void run(queue & stk)
110 {
111 BOOST_WARN(stk.is_lock_free());
112 writers_finished.store(i: 0);
113
114 thread_group writer;
115 thread_group reader;
116
117 BOOST_REQUIRE(stk.empty());
118
119 for (int i = 0; i != reader_threads; ++i)
120 reader.create_thread(boost::bind(&queue_stress_tester::template get_items<queue>, this, boost::ref(stk)));
121
122 for (int i = 0; i != writer_threads; ++i)
123 writer.create_thread(boost::bind(&queue_stress_tester::template add_items<queue>, this, boost::ref(stk)));
124
125 std::cout << "threads created" << std::endl;
126
127 writer.join_all();
128
129 std::cout << "writer threads joined, waiting for readers" << std::endl;
130
131 reader.join_all();
132
133 std::cout << "reader threads joined" << std::endl;
134
135 BOOST_REQUIRE_EQUAL(data.count_nodes(), (size_t)0);
136 BOOST_REQUIRE(stk.empty());
137
138 BOOST_REQUIRE_EQUAL(push_count, pop_count);
139 BOOST_REQUIRE_EQUAL(push_count, writer_threads * node_count);
140 }
141};
142
143}
144
145using impl::queue_stress_tester;
146

source code of boost/libs/lockfree/test/test_common.hpp