1/****************************************************************************
2**
3** Copyright (C) 2014 Digia Plc and/or its subsidiary(-ies).
4** Contact: http://www.qt-project.org/legal
5**
6** This file is part of the QtCore module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and Digia. For licensing terms and
14** conditions see http://qt.digia.com/licensing. For further information
15** use the contact form at http://qt.digia.com/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 2.1 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 2.1 requirements
23** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
24**
25** In addition, as a special exception, Digia gives you certain additional
26** rights. These rights are described in the Digia Qt LGPL Exception
27** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
28**
29** GNU General Public License Usage
30** Alternatively, this file may be used under the terms of the GNU
31** General Public License version 3.0 as published by the Free Software
32** Foundation and appearing in the file LICENSE.GPL included in the
33** packaging of this file. Please review the following information to
34** ensure the GNU General Public License version 3.0 requirements will be
35** met: http://www.gnu.org/copyleft/gpl.html.
36**
37**
38** $QT_END_LICENSE$
39**
40****************************************************************************/
41
42#ifndef QTCONCURRENT_ITERATEKERNEL_H
43#define QTCONCURRENT_ITERATEKERNEL_H
44
45#include <QtCore/qglobal.h>
46
47#ifndef QT_NO_CONCURRENT
48
49#include <QtCore/qatomic.h>
50#include <QtCore/qtconcurrentmedian.h>
51#include <QtCore/qtconcurrentthreadengine.h>
52
53#ifndef QT_NO_STL
54# include <iterator>
55#endif
56
57QT_BEGIN_HEADER
58QT_BEGIN_NAMESPACE
59
60QT_MODULE(Core)
61
62#ifndef qdoc
63
64namespace QtConcurrent {
65
66#ifndef QT_NO_STL
67 using std::advance;
68#else
69 template <typename It, typename T>
70 void advance(It &it, T value)
71 {
72 it+=value;
73 }
74#endif
75
76/*
77 The BlockSizeManager class manages how many iterations a thread should
78 reserve and process at a time. This is done by measuring the time spent
79 in the user code versus the control part code, and then increasing
80 the block size if the ratio between them is to small. The block size
81 management is done on the basis of the median of several timing measuremens,
82 and it is done induvidualy for each thread.
83*/
84class Q_CORE_EXPORT BlockSizeManager
85{
86public:
87 BlockSizeManager(int iterationCount);
88 void timeBeforeUser();
89 void timeAfterUser();
90 int blockSize();
91private:
92 inline bool blockSizeMaxed()
93 {
94 return (m_blockSize >= maxBlockSize);
95 }
96
97 const int maxBlockSize;
98 qint64 beforeUser;
99 qint64 afterUser;
100 Median<double> controlPartElapsed;
101 Median<double> userPartElapsed;
102 int m_blockSize;
103};
104
105template <typename T>
106class ResultReporter
107{
108public:
109 ResultReporter(ThreadEngine<T> *_threadEngine)
110 :threadEngine(_threadEngine)
111 {
112
113 }
114
115 void reserveSpace(int resultCount)
116 {
117 currentResultCount = resultCount;
118 vector.resize(qMax(resultCount, vector.count()));
119 }
120
121 void reportResults(int begin)
122 {
123 const int useVectorThreshold = 4; // Tunable parameter.
124 if (currentResultCount > useVectorThreshold) {
125 vector.resize(currentResultCount);
126 threadEngine->reportResults(vector, begin);
127 } else {
128 for (int i = 0; i < currentResultCount; ++i)
129 threadEngine->reportResult(&vector.at(i), begin + i);
130 }
131 }
132
133 inline T * getPointer()
134 {
135 return vector.data();
136 }
137
138 int currentResultCount;
139 ThreadEngine<T> *threadEngine;
140 QVector<T> vector;
141};
142
143template <>
144class ResultReporter<void>
145{
146public:
147 inline ResultReporter(ThreadEngine<void> *) { }
148 inline void reserveSpace(int) { };
149 inline void reportResults(int) { };
150 inline void * getPointer() { return 0; }
151};
152
153#ifndef QT_NO_STL
154inline bool selectIteration(std::bidirectional_iterator_tag)
155{
156 return false; // while
157}
158
159inline bool selectIteration(std::forward_iterator_tag)
160{
161 return false; // while
162}
163
164inline bool selectIteration(std::random_access_iterator_tag)
165{
166 return true; // for
167}
168#else
169// no stl support, always use while iteration
170template <typename T>
171inline bool selectIteration(T)
172{
173 return false; // while
174}
175#endif
176
177template <typename Iterator, typename T>
178class IterateKernel : public ThreadEngine<T>
179{
180public:
181 typedef T ResultType;
182
183 IterateKernel(Iterator _begin, Iterator _end)
184#if defined (QT_NO_STL)
185 : begin(_begin), end(_end), current(_begin), currentIndex(0),
186 forIteration(false), progressReportingEnabled(true)
187#else
188 : begin(_begin), end(_end), current(_begin), currentIndex(0),
189 forIteration(selectIteration(typename std::iterator_traits<Iterator>::iterator_category())), progressReportingEnabled(true)
190#endif
191 {
192#if defined (QT_NO_STL)
193 iterationCount = 0;
194#else
195 iterationCount = forIteration ? std::distance(_begin, _end) : 0;
196
197#endif
198 }
199
200 virtual ~IterateKernel() { }
201
202 virtual bool runIteration(Iterator it, int index , T *result)
203 { Q_UNUSED(it); Q_UNUSED(index); Q_UNUSED(result); return false; }
204 virtual bool runIterations(Iterator _begin, int beginIndex, int endIndex, T *results)
205 { Q_UNUSED(_begin); Q_UNUSED(beginIndex); Q_UNUSED(endIndex); Q_UNUSED(results); return false; }
206
207 void start()
208 {
209 progressReportingEnabled = this->isProgressReportingEnabled();
210 if (progressReportingEnabled && iterationCount > 0)
211 this->setProgressRange(0, iterationCount);
212 }
213
214 bool shouldStartThread()
215 {
216 if (forIteration)
217 return (currentIndex < iterationCount) && !this->shouldThrottleThread();
218 else // whileIteration
219 return (iteratorThreads == 0);
220 }
221
222 ThreadFunctionResult threadFunction()
223 {
224 if (forIteration)
225 return this->forThreadFunction();
226 else // whileIteration
227 return this->whileThreadFunction();
228 }
229
230 ThreadFunctionResult forThreadFunction()
231 {
232 BlockSizeManager blockSizeManager(iterationCount);
233 ResultReporter<T> resultReporter(this);
234
235 for(;;) {
236 if (this->isCanceled())
237 break;
238
239 const int currentBlockSize = blockSizeManager.blockSize();
240
241 if (currentIndex >= iterationCount)
242 break;
243
244 // Atomically reserve a block of iterationCount for this thread.
245 const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
246 const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
247
248 if (beginIndex >= endIndex) {
249 // No more work
250 break;
251 }
252
253 this->waitForResume(); // (only waits if the qfuture is paused.)
254
255 if (shouldStartThread())
256 this->startThread();
257
258 const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
259 resultReporter.reserveSpace(finalBlockSize);
260
261 // Call user code with the current iteration range.
262 blockSizeManager.timeBeforeUser();
263 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
264 blockSizeManager.timeAfterUser();
265
266 if (resultsAvailable)
267 resultReporter.reportResults(beginIndex);
268
269 // Report progress if progress reporting enabled.
270 if (progressReportingEnabled) {
271 completed.fetchAndAddAcquire(finalBlockSize);
272 this->setProgressValue(this->completed);
273 }
274
275 if (this->shouldThrottleThread())
276 return ThrottleThread;
277 }
278 return ThreadFinished;
279 }
280
281 ThreadFunctionResult whileThreadFunction()
282 {
283 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
284 return ThreadFinished;
285
286 ResultReporter<T> resultReporter(this);
287 resultReporter.reserveSpace(1);
288
289 while (current != end) {
290 // The following two lines breaks support for input iterators according to
291 // the sgi docs: dereferencing prev after calling ++current is not allowed
292 // on input iterators. (prev is dereferenced inside user.runIteration())
293 Iterator prev = current;
294 ++current;
295 int index = currentIndex.fetchAndAddRelaxed(1);
296 iteratorThreads.testAndSetRelease(1, 0);
297
298 this->waitForResume(); // (only waits if the qfuture is paused.)
299
300 if (shouldStartThread())
301 this->startThread();
302
303 const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
304 if (resultAavailable)
305 resultReporter.reportResults(index);
306
307 if (this->shouldThrottleThread())
308 return ThrottleThread;
309
310 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
311 return ThreadFinished;
312 }
313
314 return ThreadFinished;
315 }
316
317
318public:
319 const Iterator begin;
320 const Iterator end;
321 Iterator current;
322 QAtomicInt currentIndex;
323 bool forIteration;
324 QAtomicInt iteratorThreads;
325 int iterationCount;
326
327 bool progressReportingEnabled;
328 QAtomicInt completed;
329};
330
331} // namespace QtConcurrent
332
333#endif //qdoc
334
335QT_END_NAMESPACE
336QT_END_HEADER
337
338#endif // QT_NO_CONCURRENT
339
340#endif
341