1 | /**************************************************************************** |
2 | ** |
3 | ** Copyright (C) 2014 Digia Plc and/or its subsidiary(-ies). |
4 | ** Contact: http://www.qt-project.org/legal |
5 | ** |
6 | ** This file is part of the QtCore module of the Qt Toolkit. |
7 | ** |
8 | ** $QT_BEGIN_LICENSE:LGPL$ |
9 | ** Commercial License Usage |
10 | ** Licensees holding valid commercial Qt licenses may use this file in |
11 | ** accordance with the commercial license agreement provided with the |
12 | ** Software or, alternatively, in accordance with the terms contained in |
13 | ** a written agreement between you and Digia. For licensing terms and |
14 | ** conditions see http://qt.digia.com/licensing. For further information |
15 | ** use the contact form at http://qt.digia.com/contact-us. |
16 | ** |
17 | ** GNU Lesser General Public License Usage |
18 | ** Alternatively, this file may be used under the terms of the GNU Lesser |
19 | ** General Public License version 2.1 as published by the Free Software |
20 | ** Foundation and appearing in the file LICENSE.LGPL included in the |
21 | ** packaging of this file. Please review the following information to |
22 | ** ensure the GNU Lesser General Public License version 2.1 requirements |
23 | ** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html. |
24 | ** |
25 | ** In addition, as a special exception, Digia gives you certain additional |
26 | ** rights. These rights are described in the Digia Qt LGPL Exception |
27 | ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package. |
28 | ** |
29 | ** GNU General Public License Usage |
30 | ** Alternatively, this file may be used under the terms of the GNU |
31 | ** General Public License version 3.0 as published by the Free Software |
32 | ** Foundation and appearing in the file LICENSE.GPL included in the |
33 | ** packaging of this file. Please review the following information to |
34 | ** ensure the GNU General Public License version 3.0 requirements will be |
35 | ** met: http://www.gnu.org/copyleft/gpl.html. |
36 | ** |
37 | ** |
38 | ** $QT_END_LICENSE$ |
39 | ** |
40 | ****************************************************************************/ |
41 | |
42 | #ifndef QTCONCURRENT_ITERATEKERNEL_H |
43 | #define QTCONCURRENT_ITERATEKERNEL_H |
44 | |
45 | #include <QtCore/qglobal.h> |
46 | |
47 | #ifndef QT_NO_CONCURRENT |
48 | |
49 | #include <QtCore/qatomic.h> |
50 | #include <QtCore/qtconcurrentmedian.h> |
51 | #include <QtCore/qtconcurrentthreadengine.h> |
52 | |
53 | #ifndef QT_NO_STL |
54 | # include <iterator> |
55 | #endif |
56 | |
57 | QT_BEGIN_HEADER |
58 | QT_BEGIN_NAMESPACE |
59 | |
60 | QT_MODULE(Core) |
61 | |
62 | #ifndef qdoc |
63 | |
64 | namespace QtConcurrent { |
65 | |
66 | #ifndef QT_NO_STL |
67 | using std::advance; |
68 | #else |
69 | template <typename It, typename T> |
70 | void advance(It &it, T value) |
71 | { |
72 | it+=value; |
73 | } |
74 | #endif |
75 | |
76 | /* |
77 | The BlockSizeManager class manages how many iterations a thread should |
78 | reserve and process at a time. This is done by measuring the time spent |
79 | in the user code versus the control part code, and then increasing |
80 | the block size if the ratio between them is to small. The block size |
81 | management is done on the basis of the median of several timing measuremens, |
82 | and it is done induvidualy for each thread. |
83 | */ |
84 | class Q_CORE_EXPORT BlockSizeManager |
85 | { |
86 | public: |
87 | BlockSizeManager(int iterationCount); |
88 | void timeBeforeUser(); |
89 | void timeAfterUser(); |
90 | int blockSize(); |
91 | private: |
92 | inline bool blockSizeMaxed() |
93 | { |
94 | return (m_blockSize >= maxBlockSize); |
95 | } |
96 | |
97 | const int maxBlockSize; |
98 | qint64 beforeUser; |
99 | qint64 afterUser; |
100 | Median<double> controlPartElapsed; |
101 | Median<double> userPartElapsed; |
102 | int m_blockSize; |
103 | }; |
104 | |
105 | template <typename T> |
106 | class ResultReporter |
107 | { |
108 | public: |
109 | ResultReporter(ThreadEngine<T> *_threadEngine) |
110 | :threadEngine(_threadEngine) |
111 | { |
112 | |
113 | } |
114 | |
115 | void reserveSpace(int resultCount) |
116 | { |
117 | currentResultCount = resultCount; |
118 | vector.resize(qMax(resultCount, vector.count())); |
119 | } |
120 | |
121 | void reportResults(int begin) |
122 | { |
123 | const int useVectorThreshold = 4; // Tunable parameter. |
124 | if (currentResultCount > useVectorThreshold) { |
125 | vector.resize(currentResultCount); |
126 | threadEngine->reportResults(vector, begin); |
127 | } else { |
128 | for (int i = 0; i < currentResultCount; ++i) |
129 | threadEngine->reportResult(&vector.at(i), begin + i); |
130 | } |
131 | } |
132 | |
133 | inline T * getPointer() |
134 | { |
135 | return vector.data(); |
136 | } |
137 | |
138 | int currentResultCount; |
139 | ThreadEngine<T> *threadEngine; |
140 | QVector<T> vector; |
141 | }; |
142 | |
143 | template <> |
144 | class ResultReporter<void> |
145 | { |
146 | public: |
147 | inline ResultReporter(ThreadEngine<void> *) { } |
148 | inline void reserveSpace(int) { }; |
149 | inline void reportResults(int) { }; |
150 | inline void * getPointer() { return 0; } |
151 | }; |
152 | |
153 | #ifndef QT_NO_STL |
154 | inline bool selectIteration(std::bidirectional_iterator_tag) |
155 | { |
156 | return false; // while |
157 | } |
158 | |
159 | inline bool selectIteration(std::forward_iterator_tag) |
160 | { |
161 | return false; // while |
162 | } |
163 | |
164 | inline bool selectIteration(std::random_access_iterator_tag) |
165 | { |
166 | return true; // for |
167 | } |
168 | #else |
169 | // no stl support, always use while iteration |
170 | template <typename T> |
171 | inline bool selectIteration(T) |
172 | { |
173 | return false; // while |
174 | } |
175 | #endif |
176 | |
177 | template <typename Iterator, typename T> |
178 | class IterateKernel : public ThreadEngine<T> |
179 | { |
180 | public: |
181 | typedef T ResultType; |
182 | |
183 | IterateKernel(Iterator _begin, Iterator _end) |
184 | #if defined (QT_NO_STL) |
185 | : begin(_begin), end(_end), current(_begin), currentIndex(0), |
186 | forIteration(false), progressReportingEnabled(true) |
187 | #else |
188 | : begin(_begin), end(_end), current(_begin), currentIndex(0), |
189 | forIteration(selectIteration(typename std::iterator_traits<Iterator>::iterator_category())), progressReportingEnabled(true) |
190 | #endif |
191 | { |
192 | #if defined (QT_NO_STL) |
193 | iterationCount = 0; |
194 | #else |
195 | iterationCount = forIteration ? std::distance(_begin, _end) : 0; |
196 | |
197 | #endif |
198 | } |
199 | |
200 | virtual ~IterateKernel() { } |
201 | |
202 | virtual bool runIteration(Iterator it, int index , T *result) |
203 | { Q_UNUSED(it); Q_UNUSED(index); Q_UNUSED(result); return false; } |
204 | virtual bool runIterations(Iterator _begin, int beginIndex, int endIndex, T *results) |
205 | { Q_UNUSED(_begin); Q_UNUSED(beginIndex); Q_UNUSED(endIndex); Q_UNUSED(results); return false; } |
206 | |
207 | void start() |
208 | { |
209 | progressReportingEnabled = this->isProgressReportingEnabled(); |
210 | if (progressReportingEnabled && iterationCount > 0) |
211 | this->setProgressRange(0, iterationCount); |
212 | } |
213 | |
214 | bool shouldStartThread() |
215 | { |
216 | if (forIteration) |
217 | return (currentIndex < iterationCount) && !this->shouldThrottleThread(); |
218 | else // whileIteration |
219 | return (iteratorThreads == 0); |
220 | } |
221 | |
222 | ThreadFunctionResult threadFunction() |
223 | { |
224 | if (forIteration) |
225 | return this->forThreadFunction(); |
226 | else // whileIteration |
227 | return this->whileThreadFunction(); |
228 | } |
229 | |
230 | ThreadFunctionResult forThreadFunction() |
231 | { |
232 | BlockSizeManager blockSizeManager(iterationCount); |
233 | ResultReporter<T> resultReporter(this); |
234 | |
235 | for(;;) { |
236 | if (this->isCanceled()) |
237 | break; |
238 | |
239 | const int currentBlockSize = blockSizeManager.blockSize(); |
240 | |
241 | if (currentIndex >= iterationCount) |
242 | break; |
243 | |
244 | // Atomically reserve a block of iterationCount for this thread. |
245 | const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize); |
246 | const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount); |
247 | |
248 | if (beginIndex >= endIndex) { |
249 | // No more work |
250 | break; |
251 | } |
252 | |
253 | this->waitForResume(); // (only waits if the qfuture is paused.) |
254 | |
255 | if (shouldStartThread()) |
256 | this->startThread(); |
257 | |
258 | const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range |
259 | resultReporter.reserveSpace(finalBlockSize); |
260 | |
261 | // Call user code with the current iteration range. |
262 | blockSizeManager.timeBeforeUser(); |
263 | const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer()); |
264 | blockSizeManager.timeAfterUser(); |
265 | |
266 | if (resultsAvailable) |
267 | resultReporter.reportResults(beginIndex); |
268 | |
269 | // Report progress if progress reporting enabled. |
270 | if (progressReportingEnabled) { |
271 | completed.fetchAndAddAcquire(finalBlockSize); |
272 | this->setProgressValue(this->completed); |
273 | } |
274 | |
275 | if (this->shouldThrottleThread()) |
276 | return ThrottleThread; |
277 | } |
278 | return ThreadFinished; |
279 | } |
280 | |
281 | ThreadFunctionResult whileThreadFunction() |
282 | { |
283 | if (iteratorThreads.testAndSetAcquire(0, 1) == false) |
284 | return ThreadFinished; |
285 | |
286 | ResultReporter<T> resultReporter(this); |
287 | resultReporter.reserveSpace(1); |
288 | |
289 | while (current != end) { |
290 | // The following two lines breaks support for input iterators according to |
291 | // the sgi docs: dereferencing prev after calling ++current is not allowed |
292 | // on input iterators. (prev is dereferenced inside user.runIteration()) |
293 | Iterator prev = current; |
294 | ++current; |
295 | int index = currentIndex.fetchAndAddRelaxed(1); |
296 | iteratorThreads.testAndSetRelease(1, 0); |
297 | |
298 | this->waitForResume(); // (only waits if the qfuture is paused.) |
299 | |
300 | if (shouldStartThread()) |
301 | this->startThread(); |
302 | |
303 | const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer()); |
304 | if (resultAavailable) |
305 | resultReporter.reportResults(index); |
306 | |
307 | if (this->shouldThrottleThread()) |
308 | return ThrottleThread; |
309 | |
310 | if (iteratorThreads.testAndSetAcquire(0, 1) == false) |
311 | return ThreadFinished; |
312 | } |
313 | |
314 | return ThreadFinished; |
315 | } |
316 | |
317 | |
318 | public: |
319 | const Iterator begin; |
320 | const Iterator end; |
321 | Iterator current; |
322 | QAtomicInt currentIndex; |
323 | bool forIteration; |
324 | QAtomicInt iteratorThreads; |
325 | int iterationCount; |
326 | |
327 | bool progressReportingEnabled; |
328 | QAtomicInt completed; |
329 | }; |
330 | |
331 | } // namespace QtConcurrent |
332 | |
333 | #endif //qdoc |
334 | |
335 | QT_END_NAMESPACE |
336 | QT_END_HEADER |
337 | |
338 | #endif // QT_NO_CONCURRENT |
339 | |
340 | #endif |
341 | |