1 | // (C) Copyright 2014 Vicente Botet |
2 | // |
3 | // Distributed under the Boost Software License, Version 1.0. (See accompanying |
4 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
5 | |
6 | // adapted from the example given by Howard Hinnant in |
7 | |
8 | #include <boost/config.hpp> |
9 | |
10 | #define BOOST_THREAD_VERSION 4 |
11 | //#define BOOST_THREAD_QUEUE_DEPRECATE_OLD |
12 | #if ! defined BOOST_NO_CXX11_DECLTYPE |
13 | #define BOOST_RESULT_OF_USE_DECLTYPE |
14 | #endif |
15 | #include <iostream> |
16 | #include <boost/thread/scoped_thread.hpp> |
17 | #ifdef XXXX |
18 | #include <boost/thread/externally_locked_stream.hpp> |
19 | typedef boost::externally_locked_stream<std::ostream> the_ostream; |
20 | #else |
21 | typedef std::ostream the_ostream; |
22 | typedef std::istream the_istream; |
23 | #endif |
24 | #include <boost/thread/concurrent_queues/sync_queue.hpp> |
25 | #include <boost/thread/concurrent_queues/queue_adaptor.hpp> |
26 | #include <boost/thread/concurrent_queues/queue_views.hpp> |
27 | #include <boost/static_assert.hpp> |
28 | #include <boost/type_traits.hpp> |
29 | |
30 | void producer(the_ostream &/*mos*/, boost::queue_back<int> sbq) |
31 | { |
32 | using namespace boost; |
33 | try { |
34 | for(int i=0; ;++i) |
35 | { |
36 | sbq.push(x: i); |
37 | //sbq << i; |
38 | //mos << "push(" << i << ") " << sbq.size() <<"\n"; |
39 | this_thread::sleep_for(d: chrono::milliseconds(200)); |
40 | } |
41 | } |
42 | catch(sync_queue_is_closed&) |
43 | { |
44 | //mos << "closed !!!\n"; |
45 | } |
46 | catch(...) |
47 | { |
48 | //mos << "exception !!!\n"; |
49 | } |
50 | } |
51 | |
52 | void consumer( |
53 | the_ostream &/*mos*/, |
54 | boost::queue_front<int> sbq) |
55 | { |
56 | using namespace boost; |
57 | try { |
58 | for(int i=0; ;++i) |
59 | { |
60 | int r; |
61 | sbq.pull(x&: r); |
62 | //sbq >> r; |
63 | //mos << i << " pull(" << r << ") " << sbq.size() <<"\n"; |
64 | |
65 | this_thread::sleep_for(d: chrono::milliseconds(250)); |
66 | } |
67 | } |
68 | catch(sync_queue_is_closed&) |
69 | { |
70 | //mos << "closed !!!\n"; |
71 | } |
72 | catch(...) |
73 | { |
74 | //mos << "exception !!!\n"; |
75 | } |
76 | } |
77 | void consumer2(the_ostream &/*mos*/, boost::queue_front<int> sbq) |
78 | { |
79 | using namespace boost; |
80 | try { |
81 | for(int i=0; ;++i) |
82 | { |
83 | int r; |
84 | queue_op_status st = sbq.try_pull(x&: r); |
85 | if (queue_op_status::closed == st) break; |
86 | if (queue_op_status::success == st) { |
87 | //mos << i << " try_pull(" << r << ")\n"; |
88 | } |
89 | this_thread::sleep_for(d: chrono::milliseconds(250)); |
90 | } |
91 | } |
92 | catch(...) |
93 | { |
94 | //mos << "exception !!!\n"; |
95 | } |
96 | } |
97 | void consumer3(the_ostream &/*mos*/, boost::queue_front<int> sbq) |
98 | { |
99 | using namespace boost; |
100 | try { |
101 | for(int i=0; ;++i) |
102 | { |
103 | int r; |
104 | queue_op_status res = sbq.wait_pull(x&: r); |
105 | if (res==queue_op_status::closed) break; |
106 | //mos << i << " wait_pull(" << r << ")\n"; |
107 | this_thread::sleep_for(d: chrono::milliseconds(250)); |
108 | } |
109 | } |
110 | catch(...) |
111 | { |
112 | //mos << "exception !!!\n"; |
113 | } |
114 | } |
115 | |
116 | int main() |
117 | { |
118 | using namespace boost; |
119 | |
120 | #ifdef XXXX |
121 | recursive_mutex terminal_mutex; |
122 | |
123 | externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex); |
124 | externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex); |
125 | externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex); |
126 | #else |
127 | the_ostream &mcerr = std::cout; |
128 | the_ostream &mcout = std::cerr; |
129 | //the_istream &mcin = std::cin; |
130 | #endif |
131 | |
132 | queue_adaptor<sync_queue<int> > sbq; |
133 | |
134 | { |
135 | mcout << "begin of main" << std::endl; |
136 | scoped_thread<> t11(boost::thread(producer, boost::ref(t&: mcerr), concurrent::queue_back<int>(sbq))); |
137 | scoped_thread<> t12(boost::thread(producer, boost::ref(t&: mcerr), concurrent::queue_back<int>(sbq))); |
138 | scoped_thread<> t2(boost::thread(consumer, boost::ref(t&: mcout), concurrent::queue_front<int>(sbq))); |
139 | |
140 | this_thread::sleep_for(d: chrono::seconds(1)); |
141 | |
142 | mcout << "closed()" << std::endl; |
143 | sbq.close(); |
144 | mcout << "closed()" << std::endl; |
145 | |
146 | } // all threads joined here. |
147 | mcout << "end of main" << std::endl; |
148 | return 0; |
149 | } |
150 | |
151 | |