1/****************************************************************************
2**
3** Copyright (C) 2014 Digia Plc and/or its subsidiary(-ies).
4** Contact: http://www.qt-project.org/legal
5**
6** This file is part of the QtCore module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and Digia. For licensing terms and
14** conditions see http://qt.digia.com/licensing. For further information
15** use the contact form at http://qt.digia.com/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 2.1 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 2.1 requirements
23** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
24**
25** In addition, as a special exception, Digia gives you certain additional
26** rights. These rights are described in the Digia Qt LGPL Exception
27** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
28**
29** GNU General Public License Usage
30** Alternatively, this file may be used under the terms of the GNU
31** General Public License version 3.0 as published by the Free Software
32** Foundation and appearing in the file LICENSE.GPL included in the
33** packaging of this file. Please review the following information to
34** ensure the GNU General Public License version 3.0 requirements will be
35** met: http://www.gnu.org/copyleft/gpl.html.
36**
37**
38** $QT_END_LICENSE$
39**
40****************************************************************************/
41
42#ifndef QTCONCURRENT_REDUCEKERNEL_H
43#define QTCONCURRENT_REDUCEKERNEL_H
44
45#include <QtCore/qglobal.h>
46
47#ifndef QT_NO_CONCURRENT
48
49#include <QtCore/qatomic.h>
50#include <QtCore/qlist.h>
51#include <QtCore/qmap.h>
52#include <QtCore/qmutex.h>
53#include <QtCore/qthread.h>
54#include <QtCore/qthreadpool.h>
55#include <QtCore/qvector.h>
56
57QT_BEGIN_HEADER
58QT_BEGIN_NAMESPACE
59
60QT_MODULE(Core)
61
62namespace QtConcurrent {
63
64#ifndef qdoc
65
66/*
67 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
68 limit the reduce queue size for MapReduce. When the number of
69 reduce blocks in the queue exceeds ReduceQueueStartLimit,
70 MapReduce won't start any new threads, and when it exceeds
71 ReduceQueueThrottleLimit running threads will be stopped.
72*/
73enum {
74 ReduceQueueStartLimit = 20,
75 ReduceQueueThrottleLimit = 30
76};
77
78// IntermediateResults holds a block of intermediate results from a
79// map or filter functor. The begin/end offsets indicates the origin
80// and range of the block.
81template <typename T>
82class IntermediateResults
83{
84public:
85 int begin, end;
86 QVector<T> vector;
87};
88
89#endif // qdoc
90
91enum ReduceOption {
92 UnorderedReduce = 0x1,
93 OrderedReduce = 0x2,
94 SequentialReduce = 0x4
95 // ParallelReduce = 0x8
96};
97Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
98Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
99
100#ifndef qdoc
101
102// supports both ordered and out-of-order reduction
103template <typename ReduceFunctor, typename ReduceResultType, typename T>
104class ReduceKernel
105{
106 typedef QMap<int, IntermediateResults<T> > ResultsMap;
107
108 const ReduceOptions reduceOptions;
109
110 QMutex mutex;
111 int progress, resultsMapSize, threadCount;
112 ResultsMap resultsMap;
113
114 bool canReduce(int begin) const
115 {
116 return (((reduceOptions & UnorderedReduce)
117 && progress == 0)
118 || ((reduceOptions & OrderedReduce)
119 && progress == begin));
120 }
121
122 void reduceResult(ReduceFunctor &reduce,
123 ReduceResultType &r,
124 const IntermediateResults<T> &result)
125 {
126 for (int i = 0; i < result.vector.size(); ++i) {
127 reduce(r, result.vector.at(i));
128 }
129 }
130
131 void reduceResults(ReduceFunctor &reduce,
132 ReduceResultType &r,
133 ResultsMap &map)
134 {
135 typename ResultsMap::iterator it = map.begin();
136 while (it != map.end()) {
137 reduceResult(reduce, r, it.value());
138 ++it;
139 }
140 }
141
142public:
143 ReduceKernel(ReduceOptions _reduceOptions)
144 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
145 threadCount(QThreadPool::globalInstance()->maxThreadCount())
146 { }
147
148 void runReduce(ReduceFunctor &reduce,
149 ReduceResultType &r,
150 const IntermediateResults<T> &result)
151 {
152 QMutexLocker locker(&mutex);
153 if (!canReduce(result.begin)) {
154 ++resultsMapSize;
155 resultsMap.insert(result.begin, result);
156 return;
157 }
158
159 if (reduceOptions & UnorderedReduce) {
160 // UnorderedReduce
161 progress = -1;
162
163 // reduce this result
164 locker.unlock();
165 reduceResult(reduce, r, result);
166 locker.relock();
167
168 // reduce all stored results as well
169 while (!resultsMap.isEmpty()) {
170 ResultsMap resultsMapCopy = resultsMap;
171 resultsMap.clear();
172
173 locker.unlock();
174 reduceResults(reduce, r, resultsMapCopy);
175 locker.relock();
176
177 resultsMapSize -= resultsMapCopy.size();
178 }
179
180 progress = 0;
181 } else {
182 // reduce this result
183 locker.unlock();
184 reduceResult(reduce, r, result);
185 locker.relock();
186
187 // OrderedReduce
188 progress += result.end - result.begin;
189
190 // reduce as many other results as possible
191 typename ResultsMap::iterator it = resultsMap.begin();
192 while (it != resultsMap.end()) {
193 if (it.value().begin != progress)
194 break;
195
196 locker.unlock();
197 reduceResult(reduce, r, it.value());
198 locker.relock();
199
200 --resultsMapSize;
201 progress += it.value().end - it.value().begin;
202 it = resultsMap.erase(it);
203 }
204 }
205 }
206
207 // final reduction
208 void finish(ReduceFunctor &reduce, ReduceResultType &r)
209 {
210 reduceResults(reduce, r, resultsMap);
211 }
212
213 inline bool shouldThrottle()
214 {
215 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
216 }
217
218 inline bool shouldStartThread()
219 {
220 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
221 }
222};
223
224template <typename Sequence, typename Base, typename Functor1, typename Functor2>
225struct SequenceHolder2 : public Base
226{
227 SequenceHolder2(const Sequence &_sequence,
228 Functor1 functor1,
229 Functor2 functor2,
230 ReduceOptions reduceOptions)
231 : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
232 sequence(_sequence)
233 { }
234
235 Sequence sequence;
236
237 void finish()
238 {
239 Base::finish();
240 // Clear the sequence to make sure all temporaries are destroyed
241 // before finished is signaled.
242 sequence = Sequence();
243 }
244};
245
246#endif //qdoc
247
248} // namespace QtConcurrent
249
250QT_END_NAMESPACE
251QT_END_HEADER
252
253#endif // QT_NO_CONCURRENT
254
255#endif
256