qtconcurrentreducekernel.h 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. /****************************************************************************
  2. **
  3. ** Copyright (C) 2016 The Qt Company Ltd.
  4. ** Contact: https://www.qt.io/licensing/
  5. **
  6. ** This file is part of the QtCore module of the Qt Toolkit.
  7. **
  8. ** $QT_BEGIN_LICENSE:LGPL$
  9. ** Commercial License Usage
  10. ** Licensees holding valid commercial Qt licenses may use this file in
  11. ** accordance with the commercial license agreement provided with the
  12. ** Software or, alternatively, in accordance with the terms contained in
  13. ** a written agreement between you and The Qt Company. For licensing terms
  14. ** and conditions see https://www.qt.io/terms-conditions. For further
  15. ** information use the contact form at https://www.qt.io/contact-us.
  16. **
  17. ** GNU Lesser General Public License Usage
  18. ** Alternatively, this file may be used under the terms of the GNU Lesser
  19. ** General Public License version 3 as published by the Free Software
  20. ** Foundation and appearing in the file LICENSE.LGPL3 included in the
  21. ** packaging of this file. Please review the following information to
  22. ** ensure the GNU Lesser General Public License version 3 requirements
  23. ** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
  24. **
  25. ** GNU General Public License Usage
  26. ** Alternatively, this file may be used under the terms of the GNU
  27. ** General Public License version 2.0 or (at your option) the GNU General
  28. ** Public license version 3 or any later version approved by the KDE Free
  29. ** Qt Foundation. The licenses are as published by the Free Software
  30. ** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
  31. ** included in the packaging of this file. Please review the following
  32. ** information to ensure the GNU General Public License requirements will
  33. ** be met: https://www.gnu.org/licenses/gpl-2.0.html and
  34. ** https://www.gnu.org/licenses/gpl-3.0.html.
  35. **
  36. ** $QT_END_LICENSE$
  37. **
  38. ****************************************************************************/
  39. #ifndef QTCONCURRENT_REDUCEKERNEL_H
  40. #define QTCONCURRENT_REDUCEKERNEL_H
  41. #include <QtConcurrent/qtconcurrent_global.h>
  42. #ifndef QT_NO_CONCURRENT
  43. #include <QtCore/qatomic.h>
  44. #include <QtCore/qlist.h>
  45. #include <QtCore/qmap.h>
  46. #include <QtCore/qmutex.h>
  47. #include <QtCore/qthread.h>
  48. #include <QtCore/qthreadpool.h>
  49. #include <QtCore/qvector.h>
  50. QT_BEGIN_NAMESPACE
  51. namespace QtConcurrent {
  52. #ifndef Q_QDOC
  53. /*
  54. The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
  55. limit the reduce queue size for MapReduce. When the number of
  56. reduce blocks in the queue exceeds ReduceQueueStartLimit,
  57. MapReduce won't start any new threads, and when it exceeds
  58. ReduceQueueThrottleLimit running threads will be stopped.
  59. */
  60. enum {
  61. ReduceQueueStartLimit = 20,
  62. ReduceQueueThrottleLimit = 30
  63. };
  64. // IntermediateResults holds a block of intermediate results from a
  65. // map or filter functor. The begin/end offsets indicates the origin
  66. // and range of the block.
  67. template <typename T>
  68. class IntermediateResults
  69. {
  70. public:
  71. int begin, end;
  72. QVector<T> vector;
  73. };
  74. #endif // Q_QDOC
  75. enum ReduceOption {
  76. UnorderedReduce = 0x1,
  77. OrderedReduce = 0x2,
  78. SequentialReduce = 0x4
  79. // ParallelReduce = 0x8
  80. };
  81. Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
  82. Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
  83. #ifndef Q_QDOC
  84. // supports both ordered and out-of-order reduction
  85. template <typename ReduceFunctor, typename ReduceResultType, typename T>
  86. class ReduceKernel
  87. {
  88. typedef QMap<int, IntermediateResults<T> > ResultsMap;
  89. const ReduceOptions reduceOptions;
  90. QMutex mutex;
  91. int progress, resultsMapSize, threadCount;
  92. ResultsMap resultsMap;
  93. bool canReduce(int begin) const
  94. {
  95. return (((reduceOptions & UnorderedReduce)
  96. && progress == 0)
  97. || ((reduceOptions & OrderedReduce)
  98. && progress == begin));
  99. }
  100. void reduceResult(ReduceFunctor &reduce,
  101. ReduceResultType &r,
  102. const IntermediateResults<T> &result)
  103. {
  104. for (int i = 0; i < result.vector.size(); ++i) {
  105. reduce(r, result.vector.at(i));
  106. }
  107. }
  108. void reduceResults(ReduceFunctor &reduce,
  109. ReduceResultType &r,
  110. ResultsMap &map)
  111. {
  112. typename ResultsMap::iterator it = map.begin();
  113. while (it != map.end()) {
  114. reduceResult(reduce, r, it.value());
  115. ++it;
  116. }
  117. }
  118. public:
  119. ReduceKernel(ReduceOptions _reduceOptions)
  120. : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
  121. threadCount(QThreadPool::globalInstance()->maxThreadCount())
  122. { }
  123. void runReduce(ReduceFunctor &reduce,
  124. ReduceResultType &r,
  125. const IntermediateResults<T> &result)
  126. {
  127. QMutexLocker locker(&mutex);
  128. if (!canReduce(result.begin)) {
  129. ++resultsMapSize;
  130. resultsMap.insert(result.begin, result);
  131. return;
  132. }
  133. if (reduceOptions & UnorderedReduce) {
  134. // UnorderedReduce
  135. progress = -1;
  136. // reduce this result
  137. locker.unlock();
  138. reduceResult(reduce, r, result);
  139. locker.relock();
  140. // reduce all stored results as well
  141. while (!resultsMap.isEmpty()) {
  142. ResultsMap resultsMapCopy = resultsMap;
  143. resultsMap.clear();
  144. locker.unlock();
  145. reduceResults(reduce, r, resultsMapCopy);
  146. locker.relock();
  147. resultsMapSize -= resultsMapCopy.size();
  148. }
  149. progress = 0;
  150. } else {
  151. // reduce this result
  152. locker.unlock();
  153. reduceResult(reduce, r, result);
  154. locker.relock();
  155. // OrderedReduce
  156. progress += result.end - result.begin;
  157. // reduce as many other results as possible
  158. typename ResultsMap::iterator it = resultsMap.begin();
  159. while (it != resultsMap.end()) {
  160. if (it.value().begin != progress)
  161. break;
  162. locker.unlock();
  163. reduceResult(reduce, r, it.value());
  164. locker.relock();
  165. --resultsMapSize;
  166. progress += it.value().end - it.value().begin;
  167. it = resultsMap.erase(it);
  168. }
  169. }
  170. }
  171. // final reduction
  172. void finish(ReduceFunctor &reduce, ReduceResultType &r)
  173. {
  174. reduceResults(reduce, r, resultsMap);
  175. }
  176. inline bool shouldThrottle()
  177. {
  178. return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
  179. }
  180. inline bool shouldStartThread()
  181. {
  182. return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
  183. }
  184. };
  185. template <typename Sequence, typename Base, typename Functor1, typename Functor2>
  186. struct SequenceHolder2 : public Base
  187. {
  188. SequenceHolder2(const Sequence &_sequence,
  189. Functor1 functor1,
  190. Functor2 functor2,
  191. ReduceOptions reduceOptions)
  192. : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
  193. sequence(_sequence)
  194. { }
  195. Sequence sequence;
  196. void finish()
  197. {
  198. Base::finish();
  199. // Clear the sequence to make sure all temporaries are destroyed
  200. // before finished is signaled.
  201. sequence = Sequence();
  202. }
  203. };
  204. #endif //Q_QDOC
  205. } // namespace QtConcurrent
  206. QT_END_NAMESPACE
  207. #endif // QT_NO_CONCURRENT
  208. #endif