1/****************************************************************************
2**
3** Copyright (C) 2016 The Qt Company Ltd.
4** Contact: https://www.qt.io/licensing/
5**
6** This file is part of the QtConcurrent module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and The Qt Company. For licensing terms
14** and conditions see https://www.qt.io/terms-conditions. For further
15** information use the contact form at https://www.qt.io/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 3 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL3 included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 3 requirements
23** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24**
25** GNU General Public License Usage
26** Alternatively, this file may be used under the terms of the GNU
27** General Public License version 2.0 or (at your option) the GNU General
28** Public license version 3 or any later version approved by the KDE Free
29** Qt Foundation. The licenses are as published by the Free Software
30** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31** included in the packaging of this file. Please review the following
32** information to ensure the GNU General Public License requirements will
33** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34** https://www.gnu.org/licenses/gpl-3.0.html.
35**
36** $QT_END_LICENSE$
37**
38****************************************************************************/
39
40#ifndef QTCONCURRENT_ITERATEKERNEL_H
41#define QTCONCURRENT_ITERATEKERNEL_H
42
43#include <QtConcurrent/qtconcurrent_global.h>
44
45#if !defined(QT_NO_CONCURRENT) || defined(Q_CLANG_QDOC)
46
47#include <QtCore/qatomic.h>
48#include <QtConcurrent/qtconcurrentmedian.h>
49#include <QtConcurrent/qtconcurrentthreadengine.h>
50
51#include <iterator>
52
53QT_BEGIN_NAMESPACE
54
55
56
57namespace QtConcurrent {
58
59/*
60 The BlockSizeManager class manages how many iterations a thread should
61 reserve and process at a time. This is done by measuring the time spent
62 in the user code versus the control part code, and then increasing
63 the block size if the ratio between them is to small. The block size
64 management is done on the basis of the median of several timing measurements,
65 and it is done individually for each thread.
66*/
67class Q_CONCURRENT_EXPORT BlockSizeManager
68{
69public:
70 explicit BlockSizeManager(QThreadPool *pool, int iterationCount);
71
72 void timeBeforeUser();
73 void timeAfterUser();
74 int blockSize();
75
76private:
77 inline bool blockSizeMaxed()
78 {
79 return (m_blockSize >= maxBlockSize);
80 }
81
82 const int maxBlockSize;
83 qint64 beforeUser;
84 qint64 afterUser;
85 Median controlPartElapsed;
86 Median userPartElapsed;
87 int m_blockSize;
88
89 Q_DISABLE_COPY(BlockSizeManager)
90};
91
92template <typename T>
93class ResultReporter
94{
95public:
96 ResultReporter(ThreadEngine<T> *_threadEngine, T &_defaultValue)
97 : threadEngine(_threadEngine), defaultValue(_defaultValue)
98 {
99 }
100
101 void reserveSpace(int resultCount)
102 {
103 currentResultCount = resultCount;
104 resizeList(qMax(resultCount, vector.count()));
105 }
106
107 void reportResults(int begin)
108 {
109 const int useVectorThreshold = 4; // Tunable parameter.
110 if (currentResultCount > useVectorThreshold) {
111 resizeList(currentResultCount);
112 threadEngine->reportResults(vector, begin);
113 } else {
114 for (int i = 0; i < currentResultCount; ++i)
115 threadEngine->reportResult(&vector.at(i), begin + i);
116 }
117 }
118
119 inline T * getPointer()
120 {
121 return vector.data();
122 }
123
124 int currentResultCount;
125 ThreadEngine<T> *threadEngine;
126 QList<T> vector;
127
128private:
129 void resizeList(qsizetype size)
130 {
131 if constexpr (std::is_default_constructible_v<T>)
132 vector.resize(size);
133 else
134 vector.resize(size, defaultValue);
135 }
136
137 T &defaultValue;
138};
139
140template <>
141class ResultReporter<void>
142{
143public:
144 inline ResultReporter(ThreadEngine<void> *) { }
145 inline void reserveSpace(int) { }
146 inline void reportResults(int) { }
147 inline void * getPointer() { return nullptr; }
148};
149
150template<typename T>
151struct DefaultValueContainer
152{
153 template<typename U = T>
154 DefaultValueContainer(U &&_value) : value(std::forward<U>(_value))
155 {
156 }
157
158 T value;
159};
160
161template<>
162struct DefaultValueContainer<void>
163{
164};
165
166inline bool selectIteration(std::bidirectional_iterator_tag)
167{
168 return false; // while
169}
170
171inline bool selectIteration(std::forward_iterator_tag)
172{
173 return false; // while
174}
175
176inline bool selectIteration(std::random_access_iterator_tag)
177{
178 return true; // for
179}
180
181template <typename Iterator, typename T>
182class IterateKernel : public ThreadEngine<T>
183{
184 using IteratorCategory = typename std::iterator_traits<Iterator>::iterator_category;
185
186public:
187 typedef T ResultType;
188
189 template<typename U = T, std::enable_if_t<std::is_same_v<U, void>, bool> = true>
190 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
191 : ThreadEngine<U>(pool),
192 begin(_begin),
193 end(_end),
194 current(_begin),
195 iterationCount(selectIteration(IteratorCategory()) ? std::distance(_begin, _end) : 0),
196 forIteration(selectIteration(IteratorCategory())),
197 progressReportingEnabled(true)
198 {
199 }
200
201 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
202 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
203 : ThreadEngine<U>(pool),
204 begin(_begin),
205 end(_end),
206 current(_begin),
207 iterationCount(selectIteration(IteratorCategory()) ? std::distance(_begin, _end) : 0),
208 forIteration(selectIteration(IteratorCategory())),
209 progressReportingEnabled(true),
210 defaultValue(U())
211 {
212 }
213
214 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
215 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end, U &&_defaultValue)
216 : ThreadEngine<U>(pool),
217 begin(_begin),
218 end(_end),
219 current(_begin),
220 iterationCount(selectIteration(IteratorCategory()) ? std::distance(_begin, _end) : 0),
221 forIteration(selectIteration(IteratorCategory())),
222 progressReportingEnabled(true),
223 defaultValue(std::forward<U>(_defaultValue))
224 {
225 }
226
227 virtual ~IterateKernel() { }
228
229 virtual bool runIteration(Iterator, int , T *) { return false; }
230 virtual bool runIterations(Iterator, int, int, T *) { return false; }
231
232 void start() override
233 {
234 progressReportingEnabled = this->isProgressReportingEnabled();
235 if (progressReportingEnabled && iterationCount > 0)
236 this->setProgressRange(0, iterationCount);
237 }
238
239 bool shouldStartThread() override
240 {
241 if (forIteration)
242 return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
243 else // whileIteration
244 return (iteratorThreads.loadRelaxed() == 0);
245 }
246
247 ThreadFunctionResult threadFunction() override
248 {
249 if (forIteration)
250 return this->forThreadFunction();
251 else // whileIteration
252 return this->whileThreadFunction();
253 }
254
255 ThreadFunctionResult forThreadFunction()
256 {
257 BlockSizeManager blockSizeManager(ThreadEngineBase::threadPool, iterationCount);
258 ResultReporter<T> resultReporter = createResultsReporter();
259
260 for(;;) {
261 if (this->isCanceled())
262 break;
263
264 const int currentBlockSize = blockSizeManager.blockSize();
265
266 if (currentIndex.loadRelaxed() >= iterationCount)
267 break;
268
269 // Atomically reserve a block of iterationCount for this thread.
270 const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
271 const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
272
273 if (beginIndex >= endIndex) {
274 // No more work
275 break;
276 }
277
278 this->waitForResume(); // (only waits if the qfuture is paused.)
279
280 if (shouldStartThread())
281 this->startThread();
282
283 const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
284 resultReporter.reserveSpace(finalBlockSize);
285
286 // Call user code with the current iteration range.
287 blockSizeManager.timeBeforeUser();
288 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
289 blockSizeManager.timeAfterUser();
290
291 if (resultsAvailable)
292 resultReporter.reportResults(beginIndex);
293
294 // Report progress if progress reporting enabled.
295 if (progressReportingEnabled) {
296 completed.fetchAndAddAcquire(finalBlockSize);
297 this->setProgressValue(this->completed.loadRelaxed());
298 }
299
300 if (this->shouldThrottleThread())
301 return ThrottleThread;
302 }
303 return ThreadFinished;
304 }
305
306 ThreadFunctionResult whileThreadFunction()
307 {
308 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
309 return ThreadFinished;
310
311 ResultReporter<T> resultReporter = createResultsReporter();
312 resultReporter.reserveSpace(1);
313
314 while (current != end) {
315 // The following two lines breaks support for input iterators according to
316 // the sgi docs: dereferencing prev after calling ++current is not allowed
317 // on input iterators. (prev is dereferenced inside user.runIteration())
318 Iterator prev = current;
319 ++current;
320 int index = currentIndex.fetchAndAddRelaxed(1);
321 iteratorThreads.testAndSetRelease(1, 0);
322
323 this->waitForResume(); // (only waits if the qfuture is paused.)
324
325 if (shouldStartThread())
326 this->startThread();
327
328 const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
329 if (resultAavailable)
330 resultReporter.reportResults(index);
331
332 if (this->shouldThrottleThread())
333 return ThrottleThread;
334
335 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
336 return ThreadFinished;
337 }
338
339 return ThreadFinished;
340 }
341
342private:
343 ResultReporter<T> createResultsReporter()
344 {
345 if constexpr (!std::is_same_v<T, void>)
346 return ResultReporter<T>(this, defaultValue.value);
347 else
348 return ResultReporter<T>(this);
349 }
350
351public:
352 const Iterator begin;
353 const Iterator end;
354 Iterator current;
355 QAtomicInt currentIndex;
356 QAtomicInt iteratorThreads;
357 QAtomicInt completed;
358 const int iterationCount;
359 const bool forIteration;
360 bool progressReportingEnabled;
361 DefaultValueContainer<ResultType> defaultValue;
362};
363
364} // namespace QtConcurrent
365
366
367QT_END_NAMESPACE
368
369#endif // QT_NO_CONCURRENT
370
371#endif
372