concurrent_priority_queue.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. /*
  2. Copyright 2005-2013 Intel Corporation. All Rights Reserved.
  3. This file is part of Threading Building Blocks.
  4. Threading Building Blocks is free software; you can redistribute it
  5. and/or modify it under the terms of the GNU General Public License
  6. version 2 as published by the Free Software Foundation.
  7. Threading Building Blocks is distributed in the hope that it will be
  8. useful, but WITHOUT ANY WARRANTY; without even the implied warranty
  9. of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with Threading Building Blocks; if not, write to the Free Software
  13. Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  14. As a special exception, you may use this file as part of a free software
  15. library without restriction. Specifically, if other files instantiate
  16. templates or use macros or inline functions from this file, or you compile
  17. this file and link it with other files to produce an executable, this
  18. file does not by itself cause the resulting executable to be covered by
  19. the GNU General Public License. This exception does not however
  20. invalidate any other reasons why the executable file might be covered by
  21. the GNU General Public License.
  22. */
  23. #ifndef __TBB_concurrent_priority_queue_H
  24. #define __TBB_concurrent_priority_queue_H
  25. #include "atomic.h"
  26. #include "cache_aligned_allocator.h"
  27. #include "tbb_exception.h"
  28. #include "tbb_stddef.h"
  29. #include "tbb_profiling.h"
  30. #include "internal/_aggregator_impl.h"
  31. #include <vector>
  32. #include <iterator>
  33. #include <functional>
  34. namespace tbb {
  35. namespace interface5 {
  36. using namespace tbb::internal;
  37. //! Concurrent priority queue
  38. template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
  39. class concurrent_priority_queue {
  40. public:
  41. //! Element type in the queue.
  42. typedef T value_type;
  43. //! Reference type
  44. typedef T& reference;
  45. //! Const reference type
  46. typedef const T& const_reference;
  47. //! Integral type for representing size of the queue.
  48. typedef size_t size_type;
  49. //! Difference type for iterator
  50. typedef ptrdiff_t difference_type;
  51. //! Allocator type
  52. typedef A allocator_type;
  53. //! Constructs a new concurrent_priority_queue with default capacity
  54. explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
  55. {
  56. my_aggregator.initialize_handler(my_functor_t(this));
  57. }
  58. //! Constructs a new concurrent_priority_queue with init_sz capacity
  59. explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
  60. mark(0), my_size(0), data(a)
  61. {
  62. data.reserve(init_capacity);
  63. my_aggregator.initialize_handler(my_functor_t(this));
  64. }
  65. //! [begin,end) constructor
  66. template<typename InputIterator>
  67. concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
  68. data(begin, end, a)
  69. {
  70. mark = 0;
  71. my_aggregator.initialize_handler(my_functor_t(this));
  72. heapify();
  73. my_size = data.size();
  74. }
  75. //! Copy constructor
  76. /** This operation is unsafe if there are pending concurrent operations on the src queue. */
  77. explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
  78. my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
  79. {
  80. my_aggregator.initialize_handler(my_functor_t(this));
  81. heapify();
  82. }
  83. //! Copy constructor with specific allocator
  84. /** This operation is unsafe if there are pending concurrent operations on the src queue. */
  85. concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
  86. my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
  87. {
  88. my_aggregator.initialize_handler(my_functor_t(this));
  89. heapify();
  90. }
  91. //! Assignment operator
  92. /** This operation is unsafe if there are pending concurrent operations on the src queue. */
  93. concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
  94. if (this != &src) {
  95. std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
  96. mark = src.mark;
  97. my_size = src.my_size;
  98. }
  99. return *this;
  100. }
  101. //! Returns true if empty, false otherwise
  102. /** Returned value may not reflect results of pending operations.
  103. This operation reads shared data and will trigger a race condition. */
  104. bool empty() const { return size()==0; }
  105. //! Returns the current number of elements contained in the queue
  106. /** Returned value may not reflect results of pending operations.
  107. This operation reads shared data and will trigger a race condition. */
  108. size_type size() const { return __TBB_load_with_acquire(my_size); }
  109. //! Pushes elem onto the queue, increasing capacity of queue if necessary
  110. /** This operation can be safely used concurrently with other push, try_pop or reserve operations. */
  111. void push(const_reference elem) {
  112. cpq_operation op_data(elem, PUSH_OP);
  113. my_aggregator.execute(&op_data);
  114. if (op_data.status == FAILED) // exception thrown
  115. throw_exception(eid_bad_alloc);
  116. }
  117. //! Gets a reference to and removes highest priority element
  118. /** If a highest priority element was found, sets elem and returns true,
  119. otherwise returns false.
  120. This operation can be safely used concurrently with other push, try_pop or reserve operations. */
  121. bool try_pop(reference elem) {
  122. cpq_operation op_data(POP_OP);
  123. op_data.elem = &elem;
  124. my_aggregator.execute(&op_data);
  125. return op_data.status==SUCCEEDED;
  126. }
  127. //! Clear the queue; not thread-safe
  128. /** This operation is unsafe if there are pending concurrent operations on the queue.
  129. Resets size, effectively emptying queue; does not free space.
  130. May not clear elements added in pending operations. */
  131. void clear() {
  132. data.clear();
  133. mark = 0;
  134. my_size = 0;
  135. }
  136. //! Swap this queue with another; not thread-safe
  137. /** This operation is unsafe if there are pending concurrent operations on the queue. */
  138. void swap(concurrent_priority_queue& q) {
  139. data.swap(q.data);
  140. std::swap(mark, q.mark);
  141. std::swap(my_size, q.my_size);
  142. }
  143. //! Return allocator object
  144. allocator_type get_allocator() const { return data.get_allocator(); }
  145. private:
  146. enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
  147. enum operation_status { WAIT=0, SUCCEEDED, FAILED };
  148. class cpq_operation : public aggregated_operation<cpq_operation> {
  149. public:
  150. operation_type type;
  151. union {
  152. value_type *elem;
  153. size_type sz;
  154. };
  155. cpq_operation(const_reference e, operation_type t) :
  156. type(t), elem(const_cast<value_type*>(&e)) {}
  157. cpq_operation(operation_type t) : type(t) {}
  158. };
  159. class my_functor_t {
  160. concurrent_priority_queue<T, Compare, A> *cpq;
  161. public:
  162. my_functor_t() {}
  163. my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
  164. void operator()(cpq_operation* op_list) {
  165. cpq->handle_operations(op_list);
  166. }
  167. };
  168. aggregator< my_functor_t, cpq_operation> my_aggregator;
  169. //! Padding added to avoid false sharing
  170. char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
  171. //! The point at which unsorted elements begin
  172. size_type mark;
  173. __TBB_atomic size_type my_size;
  174. Compare compare;
  175. //! Padding added to avoid false sharing
  176. char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
  177. //! Storage for the heap of elements in queue, plus unheapified elements
  178. /** data has the following structure:
  179. binary unheapified
  180. heap elements
  181. ____|_______|____
  182. | | |
  183. v v v
  184. [_|...|_|_|...|_| |...| ]
  185. 0 ^ ^ ^
  186. | | |__capacity
  187. | |__my_size
  188. |__mark
  189. Thus, data stores the binary heap starting at position 0 through
  190. mark-1 (it may be empty). Then there are 0 or more elements
  191. that have not yet been inserted into the heap, in positions
  192. mark through my_size-1. */
  193. std::vector<value_type, allocator_type> data;
  194. void handle_operations(cpq_operation *op_list) {
  195. cpq_operation *tmp, *pop_list=NULL;
  196. __TBB_ASSERT(mark == data.size(), NULL);
  197. // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
  198. while (op_list) {
  199. // ITT note: &(op_list->status) tag is used to cover accesses to op_list
  200. // node. This thread is going to handle the operation, and so will acquire it
  201. // and perform the associated operation w/o triggering a race condition; the
  202. // thread that created the operation is waiting on the status field, so when
  203. // this thread is done with the operation, it will perform a
  204. // store_with_release to give control back to the waiting thread in
  205. // aggregator::insert_operation.
  206. call_itt_notify(acquired, &(op_list->status));
  207. __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
  208. tmp = op_list;
  209. op_list = itt_hide_load_word(op_list->next);
  210. if (tmp->type == PUSH_OP) {
  211. __TBB_TRY {
  212. data.push_back(*(tmp->elem));
  213. __TBB_store_with_release(my_size, my_size+1);
  214. itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
  215. } __TBB_CATCH(...) {
  216. itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
  217. }
  218. }
  219. else { // tmp->type == POP_OP
  220. __TBB_ASSERT(tmp->type == POP_OP, NULL);
  221. if (mark < data.size() &&
  222. compare(data[0], data[data.size()-1])) {
  223. // there are newly pushed elems and the last one
  224. // is higher than top
  225. *(tmp->elem) = data[data.size()-1]; // copy the data
  226. __TBB_store_with_release(my_size, my_size-1);
  227. itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
  228. data.pop_back();
  229. __TBB_ASSERT(mark<=data.size(), NULL);
  230. }
  231. else { // no convenient item to pop; postpone
  232. itt_hide_store_word(tmp->next, pop_list);
  233. pop_list = tmp;
  234. }
  235. }
  236. }
  237. // second pass processes pop operations
  238. while (pop_list) {
  239. tmp = pop_list;
  240. pop_list = itt_hide_load_word(pop_list->next);
  241. __TBB_ASSERT(tmp->type == POP_OP, NULL);
  242. if (data.empty()) {
  243. itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
  244. }
  245. else {
  246. __TBB_ASSERT(mark<=data.size(), NULL);
  247. if (mark < data.size() &&
  248. compare(data[0], data[data.size()-1])) {
  249. // there are newly pushed elems and the last one is
  250. // higher than top
  251. *(tmp->elem) = data[data.size()-1]; // copy the data
  252. __TBB_store_with_release(my_size, my_size-1);
  253. itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
  254. data.pop_back();
  255. }
  256. else { // extract top and push last element down heap
  257. *(tmp->elem) = data[0]; // copy the data
  258. __TBB_store_with_release(my_size, my_size-1);
  259. itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
  260. reheap();
  261. }
  262. }
  263. }
  264. // heapify any leftover pushed elements before doing the next
  265. // batch of operations
  266. if (mark<data.size()) heapify();
  267. __TBB_ASSERT(mark == data.size(), NULL);
  268. }
  269. //! Merge unsorted elements into heap
  270. void heapify() {
  271. if (!mark && data.size()>0) mark = 1;
  272. for (; mark<data.size(); ++mark) {
  273. // for each unheapified element under size
  274. size_type cur_pos = mark;
  275. value_type to_place = data[mark];
  276. do { // push to_place up the heap
  277. size_type parent = (cur_pos-1)>>1;
  278. if (!compare(data[parent], to_place)) break;
  279. data[cur_pos] = data[parent];
  280. cur_pos = parent;
  281. } while( cur_pos );
  282. data[cur_pos] = to_place;
  283. }
  284. }
  285. //! Re-heapify after an extraction
  286. /** Re-heapify by pushing last element down the heap from the root. */
  287. void reheap() {
  288. size_type cur_pos=0, child=1;
  289. while (child < mark) {
  290. size_type target = child;
  291. if (child+1 < mark && compare(data[child], data[child+1]))
  292. ++target;
  293. // target now has the higher priority child
  294. if (compare(data[target], data[data.size()-1])) break;
  295. data[cur_pos] = data[target];
  296. cur_pos = target;
  297. child = (cur_pos<<1)+1;
  298. }
  299. data[cur_pos] = data[data.size()-1];
  300. data.pop_back();
  301. if (mark > data.size()) mark = data.size();
  302. }
  303. };
  304. } // namespace interface5
  305. using interface5::concurrent_priority_queue;
  306. } // namespace tbb
  307. #endif /* __TBB_concurrent_priority_queue_H */