qtconcurrentiteratekernel.h 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. /****************************************************************************
  2. **
  3. ** Copyright (C) 2016 The Qt Company Ltd.
  4. ** Contact: https://www.qt.io/licensing/
  5. **
  6. ** This file is part of the QtCore module of the Qt Toolkit.
  7. **
  8. ** $QT_BEGIN_LICENSE:LGPL$
  9. ** Commercial License Usage
  10. ** Licensees holding valid commercial Qt licenses may use this file in
  11. ** accordance with the commercial license agreement provided with the
  12. ** Software or, alternatively, in accordance with the terms contained in
  13. ** a written agreement between you and The Qt Company. For licensing terms
  14. ** and conditions see https://www.qt.io/terms-conditions. For further
  15. ** information use the contact form at https://www.qt.io/contact-us.
  16. **
  17. ** GNU Lesser General Public License Usage
  18. ** Alternatively, this file may be used under the terms of the GNU Lesser
  19. ** General Public License version 3 as published by the Free Software
  20. ** Foundation and appearing in the file LICENSE.LGPL3 included in the
  21. ** packaging of this file. Please review the following information to
  22. ** ensure the GNU Lesser General Public License version 3 requirements
  23. ** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
  24. **
  25. ** GNU General Public License Usage
  26. ** Alternatively, this file may be used under the terms of the GNU
  27. ** General Public License version 2.0 or (at your option) the GNU General
  28. ** Public license version 3 or any later version approved by the KDE Free
  29. ** Qt Foundation. The licenses are as published by the Free Software
  30. ** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
  31. ** included in the packaging of this file. Please review the following
  32. ** information to ensure the GNU General Public License requirements will
  33. ** be met: https://www.gnu.org/licenses/gpl-2.0.html and
  34. ** https://www.gnu.org/licenses/gpl-3.0.html.
  35. **
  36. ** $QT_END_LICENSE$
  37. **
  38. ****************************************************************************/
  39. #ifndef QTCONCURRENT_ITERATEKERNEL_H
  40. #define QTCONCURRENT_ITERATEKERNEL_H
  41. #include <QtConcurrent/qtconcurrent_global.h>
  42. #ifndef QT_NO_CONCURRENT
  43. #include <QtCore/qatomic.h>
  44. #include <QtConcurrent/qtconcurrentmedian.h>
  45. #include <QtConcurrent/qtconcurrentthreadengine.h>
  46. #include <iterator>
  47. QT_BEGIN_NAMESPACE
  48. #ifndef Q_QDOC
  49. namespace QtConcurrent {
  50. /*
  51. The BlockSizeManager class manages how many iterations a thread should
  52. reserve and process at a time. This is done by measuring the time spent
  53. in the user code versus the control part code, and then increasing
  54. the block size if the ratio between them is to small. The block size
  55. management is done on the basis of the median of several timing measuremens,
  56. and it is done induvidualy for each thread.
  57. */
  58. class Q_CONCURRENT_EXPORT BlockSizeManager
  59. {
  60. public:
  61. BlockSizeManager(int iterationCount);
  62. void timeBeforeUser();
  63. void timeAfterUser();
  64. int blockSize();
  65. private:
  66. inline bool blockSizeMaxed()
  67. {
  68. return (m_blockSize >= maxBlockSize);
  69. }
  70. const int maxBlockSize;
  71. qint64 beforeUser;
  72. qint64 afterUser;
  73. Median<double> controlPartElapsed;
  74. Median<double> userPartElapsed;
  75. int m_blockSize;
  76. Q_DISABLE_COPY(BlockSizeManager)
  77. };
  78. // ### Qt6: Replace BlockSizeManager with V2 implementation
  79. class Q_CONCURRENT_EXPORT BlockSizeManagerV2
  80. {
  81. public:
  82. explicit BlockSizeManagerV2(int iterationCount);
  83. void timeBeforeUser();
  84. void timeAfterUser();
  85. int blockSize();
  86. private:
  87. inline bool blockSizeMaxed()
  88. {
  89. return (m_blockSize >= maxBlockSize);
  90. }
  91. const int maxBlockSize;
  92. qint64 beforeUser;
  93. qint64 afterUser;
  94. MedianDouble controlPartElapsed;
  95. MedianDouble userPartElapsed;
  96. int m_blockSize;
  97. Q_DISABLE_COPY(BlockSizeManagerV2)
  98. };
  99. template <typename T>
  100. class ResultReporter
  101. {
  102. public:
  103. ResultReporter(ThreadEngine<T> *_threadEngine)
  104. :threadEngine(_threadEngine)
  105. {
  106. }
  107. void reserveSpace(int resultCount)
  108. {
  109. currentResultCount = resultCount;
  110. vector.resize(qMax(resultCount, vector.count()));
  111. }
  112. void reportResults(int begin)
  113. {
  114. const int useVectorThreshold = 4; // Tunable parameter.
  115. if (currentResultCount > useVectorThreshold) {
  116. vector.resize(currentResultCount);
  117. threadEngine->reportResults(vector, begin);
  118. } else {
  119. for (int i = 0; i < currentResultCount; ++i)
  120. threadEngine->reportResult(&vector.at(i), begin + i);
  121. }
  122. }
  123. inline T * getPointer()
  124. {
  125. return vector.data();
  126. }
  127. int currentResultCount;
  128. ThreadEngine<T> *threadEngine;
  129. QVector<T> vector;
  130. };
  131. template <>
  132. class ResultReporter<void>
  133. {
  134. public:
  135. inline ResultReporter(ThreadEngine<void> *) { }
  136. inline void reserveSpace(int) { }
  137. inline void reportResults(int) { }
  138. inline void * getPointer() { return Q_NULLPTR; }
  139. };
  140. inline bool selectIteration(std::bidirectional_iterator_tag)
  141. {
  142. return false; // while
  143. }
  144. inline bool selectIteration(std::forward_iterator_tag)
  145. {
  146. return false; // while
  147. }
  148. inline bool selectIteration(std::random_access_iterator_tag)
  149. {
  150. return true; // for
  151. }
  152. template <typename Iterator, typename T>
  153. class IterateKernel : public ThreadEngine<T>
  154. {
  155. public:
  156. typedef T ResultType;
  157. IterateKernel(Iterator _begin, Iterator _end)
  158. : begin(_begin), end(_end), current(_begin), currentIndex(0),
  159. forIteration(selectIteration(typename std::iterator_traits<Iterator>::iterator_category())), progressReportingEnabled(true)
  160. {
  161. iterationCount = forIteration ? std::distance(_begin, _end) : 0;
  162. }
  163. virtual ~IterateKernel() { }
  164. virtual bool runIteration(Iterator it, int index , T *result)
  165. { Q_UNUSED(it); Q_UNUSED(index); Q_UNUSED(result); return false; }
  166. virtual bool runIterations(Iterator _begin, int beginIndex, int endIndex, T *results)
  167. { Q_UNUSED(_begin); Q_UNUSED(beginIndex); Q_UNUSED(endIndex); Q_UNUSED(results); return false; }
  168. void start()
  169. {
  170. progressReportingEnabled = this->isProgressReportingEnabled();
  171. if (progressReportingEnabled && iterationCount > 0)
  172. this->setProgressRange(0, iterationCount);
  173. }
  174. bool shouldStartThread()
  175. {
  176. if (forIteration)
  177. return (currentIndex.load() < iterationCount) && !this->shouldThrottleThread();
  178. else // whileIteration
  179. return (iteratorThreads.load() == 0);
  180. }
  181. ThreadFunctionResult threadFunction()
  182. {
  183. if (forIteration)
  184. return this->forThreadFunction();
  185. else // whileIteration
  186. return this->whileThreadFunction();
  187. }
  188. ThreadFunctionResult forThreadFunction()
  189. {
  190. BlockSizeManagerV2 blockSizeManager(iterationCount);
  191. ResultReporter<T> resultReporter(this);
  192. for(;;) {
  193. if (this->isCanceled())
  194. break;
  195. const int currentBlockSize = blockSizeManager.blockSize();
  196. if (currentIndex.load() >= iterationCount)
  197. break;
  198. // Atomically reserve a block of iterationCount for this thread.
  199. const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
  200. const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
  201. if (beginIndex >= endIndex) {
  202. // No more work
  203. break;
  204. }
  205. this->waitForResume(); // (only waits if the qfuture is paused.)
  206. if (shouldStartThread())
  207. this->startThread();
  208. const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
  209. resultReporter.reserveSpace(finalBlockSize);
  210. // Call user code with the current iteration range.
  211. blockSizeManager.timeBeforeUser();
  212. const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
  213. blockSizeManager.timeAfterUser();
  214. if (resultsAvailable)
  215. resultReporter.reportResults(beginIndex);
  216. // Report progress if progress reporting enabled.
  217. if (progressReportingEnabled) {
  218. completed.fetchAndAddAcquire(finalBlockSize);
  219. this->setProgressValue(this->completed.load());
  220. }
  221. if (this->shouldThrottleThread())
  222. return ThrottleThread;
  223. }
  224. return ThreadFinished;
  225. }
  226. ThreadFunctionResult whileThreadFunction()
  227. {
  228. if (iteratorThreads.testAndSetAcquire(0, 1) == false)
  229. return ThreadFinished;
  230. ResultReporter<T> resultReporter(this);
  231. resultReporter.reserveSpace(1);
  232. while (current != end) {
  233. // The following two lines breaks support for input iterators according to
  234. // the sgi docs: dereferencing prev after calling ++current is not allowed
  235. // on input iterators. (prev is dereferenced inside user.runIteration())
  236. Iterator prev = current;
  237. ++current;
  238. int index = currentIndex.fetchAndAddRelaxed(1);
  239. iteratorThreads.testAndSetRelease(1, 0);
  240. this->waitForResume(); // (only waits if the qfuture is paused.)
  241. if (shouldStartThread())
  242. this->startThread();
  243. const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
  244. if (resultAavailable)
  245. resultReporter.reportResults(index);
  246. if (this->shouldThrottleThread())
  247. return ThrottleThread;
  248. if (iteratorThreads.testAndSetAcquire(0, 1) == false)
  249. return ThreadFinished;
  250. }
  251. return ThreadFinished;
  252. }
  253. public:
  254. const Iterator begin;
  255. const Iterator end;
  256. Iterator current;
  257. QAtomicInt currentIndex;
  258. bool forIteration;
  259. QAtomicInt iteratorThreads;
  260. int iterationCount;
  261. bool progressReportingEnabled;
  262. QAtomicInt completed;
  263. };
  264. } // namespace QtConcurrent
  265. #endif //Q_QDOC
  266. QT_END_NAMESPACE
  267. #endif // QT_NO_CONCURRENT
  268. #endif