1/****************************************************************************
2**
3** Copyright (C) 2016 The Qt Company Ltd.
4** Contact: https://www.qt.io/licensing/
5**
6** This file is part of the QtConcurrent module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and The Qt Company. For licensing terms
14** and conditions see https://www.qt.io/terms-conditions. For further
15** information use the contact form at https://www.qt.io/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 3 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL3 included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 3 requirements
23** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24**
25** GNU General Public License Usage
26** Alternatively, this file may be used under the terms of the GNU
27** General Public License version 2.0 or (at your option) the GNU General
28** Public license version 3 or any later version approved by the KDE Free
29** Qt Foundation. The licenses are as published by the Free Software
30** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31** included in the packaging of this file. Please review the following
32** information to ensure the GNU General Public License requirements will
33** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34** https://www.gnu.org/licenses/gpl-3.0.html.
35**
36** $QT_END_LICENSE$
37**
38****************************************************************************/
39
40#ifndef QTCONCURRENT_REDUCEKERNEL_H
41#define QTCONCURRENT_REDUCEKERNEL_H
42
43#include <QtConcurrent/qtconcurrent_global.h>
44
45#if !defined(QT_NO_CONCURRENT) || defined(Q_CLANG_QDOC)
46
47#include <QtCore/qatomic.h>
48#include <QtCore/qlist.h>
49#include <QtCore/qmap.h>
50#include <QtCore/qmutex.h>
51#include <QtCore/qthread.h>
52#include <QtCore/qthreadpool.h>
53#include <QtCore/qvector.h>
54
55QT_BEGIN_NAMESPACE
56
57
58namespace QtConcurrent {
59
60/*
61 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
62 limit the reduce queue size for MapReduce. When the number of
63 reduce blocks in the queue exceeds ReduceQueueStartLimit,
64 MapReduce won't start any new threads, and when it exceeds
65 ReduceQueueThrottleLimit running threads will be stopped.
66*/
67#ifdef Q_CLANG_QDOC
68enum ReduceQueueLimits {
69 ReduceQueueStartLimit = 20,
70 ReduceQueueThrottleLimit = 30
71};
72#else
73enum {
74 ReduceQueueStartLimit = 20,
75 ReduceQueueThrottleLimit = 30
76};
77#endif
78
79// IntermediateResults holds a block of intermediate results from a
80// map or filter functor. The begin/end offsets indicates the origin
81// and range of the block.
82template <typename T>
83class IntermediateResults
84{
85public:
86 int begin, end;
87 QVector<T> vector;
88};
89
90enum ReduceOption {
91 UnorderedReduce = 0x1,
92 OrderedReduce = 0x2,
93 SequentialReduce = 0x4
94 // ParallelReduce = 0x8
95};
96Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
97#ifndef Q_CLANG_QDOC
98Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
99#endif
100// supports both ordered and out-of-order reduction
101template <typename ReduceFunctor, typename ReduceResultType, typename T>
102class ReduceKernel
103{
104 typedef QMap<int, IntermediateResults<T> > ResultsMap;
105
106 const ReduceOptions reduceOptions;
107
108 QMutex mutex;
109 int progress, resultsMapSize, threadCount;
110 ResultsMap resultsMap;
111
112 bool canReduce(int begin) const
113 {
114 return (((reduceOptions & UnorderedReduce)
115 && progress == 0)
116 || ((reduceOptions & OrderedReduce)
117 && progress == begin));
118 }
119
120 void reduceResult(ReduceFunctor &reduce,
121 ReduceResultType &r,
122 const IntermediateResults<T> &result)
123 {
124 for (int i = 0; i < result.vector.size(); ++i) {
125 reduce(r, result.vector.at(i));
126 }
127 }
128
129 void reduceResults(ReduceFunctor &reduce,
130 ReduceResultType &r,
131 ResultsMap &map)
132 {
133 typename ResultsMap::iterator it = map.begin();
134 while (it != map.end()) {
135 reduceResult(reduce, r, it.value());
136 ++it;
137 }
138 }
139
140public:
141 ReduceKernel(ReduceOptions _reduceOptions)
142 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
143 threadCount(QThreadPool::globalInstance()->maxThreadCount())
144 { }
145
146 void runReduce(ReduceFunctor &reduce,
147 ReduceResultType &r,
148 const IntermediateResults<T> &result)
149 {
150 QMutexLocker locker(&mutex);
151 if (!canReduce(result.begin)) {
152 ++resultsMapSize;
153 resultsMap.insert(result.begin, result);
154 return;
155 }
156
157 if (reduceOptions & UnorderedReduce) {
158 // UnorderedReduce
159 progress = -1;
160
161 // reduce this result
162 locker.unlock();
163 reduceResult(reduce, r, result);
164 locker.relock();
165
166 // reduce all stored results as well
167 while (!resultsMap.isEmpty()) {
168 ResultsMap resultsMapCopy = resultsMap;
169 resultsMap.clear();
170
171 locker.unlock();
172 reduceResults(reduce, r, resultsMapCopy);
173 locker.relock();
174
175 resultsMapSize -= resultsMapCopy.size();
176 }
177
178 progress = 0;
179 } else {
180 // reduce this result
181 locker.unlock();
182 reduceResult(reduce, r, result);
183 locker.relock();
184
185 // OrderedReduce
186 progress += result.end - result.begin;
187
188 // reduce as many other results as possible
189 typename ResultsMap::iterator it = resultsMap.begin();
190 while (it != resultsMap.end()) {
191 if (it.value().begin != progress)
192 break;
193
194 locker.unlock();
195 reduceResult(reduce, r, it.value());
196 locker.relock();
197
198 --resultsMapSize;
199 progress += it.value().end - it.value().begin;
200 it = resultsMap.erase(it);
201 }
202 }
203 }
204
205 // final reduction
206 void finish(ReduceFunctor &reduce, ReduceResultType &r)
207 {
208 reduceResults(reduce, r, resultsMap);
209 }
210
211 inline bool shouldThrottle()
212 {
213 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
214 }
215
216 inline bool shouldStartThread()
217 {
218 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
219 }
220};
221
222template <typename Sequence, typename Base, typename Functor1, typename Functor2>
223struct SequenceHolder2 : public Base
224{
225 SequenceHolder2(const Sequence &_sequence,
226 Functor1 functor1,
227 Functor2 functor2,
228 ReduceOptions reduceOptions)
229 : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
230 sequence(_sequence)
231 { }
232
233 Sequence sequence;
234
235 void finish() override
236 {
237 Base::finish();
238 // Clear the sequence to make sure all temporaries are destroyed
239 // before finished is signaled.
240 sequence = Sequence();
241 }
242};
243
244} // namespace QtConcurrent
245
246QT_END_NAMESPACE
247
248#endif // QT_NO_CONCURRENT
249
250#endif
251