123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- /*
- Copyright 2005-2013 Intel Corporation. All Rights Reserved.
- This file is part of Threading Building Blocks.
- Threading Building Blocks is free software; you can redistribute it
- and/or modify it under the terms of the GNU General Public License
- version 2 as published by the Free Software Foundation.
- Threading Building Blocks is distributed in the hope that it will be
- useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with Threading Building Blocks; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- As a special exception, you may use this file as part of a free software
- library without restriction. Specifically, if other files instantiate
- templates or use macros or inline functions from this file, or you compile
- this file and link it with other files to produce an executable, this
- file does not by itself cause the resulting executable to be covered by
- the GNU General Public License. This exception does not however
- invalidate any other reasons why the executable file might be covered by
- the GNU General Public License.
- */
- #ifndef __TBB__aggregator_H
- #define __TBB__aggregator_H
- #if !TBB_PREVIEW_AGGREGATOR
- #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
- #endif
- #include "atomic.h"
- #include "tbb_profiling.h"
- namespace tbb {
- namespace interface6 {
- using namespace tbb::internal;
- class aggregator_operation {
- template<typename handler_type> friend class aggregator_ext;
- uintptr_t status;
- aggregator_operation* my_next;
- public:
- enum aggregator_operation_status { agg_waiting=0, agg_finished };
- aggregator_operation() : status(agg_waiting), my_next(NULL) {}
- /// Call start before handling this operation
- void start() { call_itt_notify(acquired, &status); }
- /// Call finish when done handling this operation
- /** The operation will be released to its originating thread, and possibly deleted. */
- void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
- aggregator_operation* next() { return itt_hide_load_word(my_next);}
- void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); }
- };
- namespace internal {
- class basic_operation_base : public aggregator_operation {
- friend class basic_handler;
- virtual void apply_body() = 0;
- public:
- basic_operation_base() : aggregator_operation() {}
- virtual ~basic_operation_base() {}
- };
- template<typename Body>
- class basic_operation : public basic_operation_base, no_assign {
- const Body& my_body;
- /*override*/ void apply_body() { my_body(); }
- public:
- basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
- };
- class basic_handler {
- public:
- basic_handler() {}
- void operator()(aggregator_operation* op_list) const {
- while (op_list) {
- // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
- // The executing thread "acquires" the tag (see start()) and then performs
- // the associated operation w/o triggering a race condition diagnostics.
- // A thread that created the operation is waiting for its status (see execute_impl()),
- // so when this thread is done with the operation, it will "release" the tag
- // and update the status (see finish()) to give control back to the waiting thread.
- basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
- // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
- op_list = op_list->next();
- request.start();
- request.apply_body();
- request.finish();
- }
- }
- };
- } // namespace internal
- //! Aggregator base class and expert interface
- /** An aggregator for collecting operations coming from multiple sources and executing
- them serially on a single thread. */
- template <typename handler_type>
- class aggregator_ext : tbb::internal::no_copy {
- public:
- aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
- //! EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
- /** Details of user-made operations must be handled by user-provided handler */
- void process(aggregator_operation *op) { execute_impl(*op); }
- protected:
- /** Place operation in mailbox, then either handle mailbox or wait for the operation
- to be completed by a different thread. */
- void execute_impl(aggregator_operation& op) {
- aggregator_operation* res;
- // ITT note: &(op.status) tag is used to cover accesses to this operation. This
- // thread has created the operation, and now releases it so that the handler
- // thread may handle the associated operation w/o triggering a race condition;
- // thus this tag will be acquired just before the operation is handled in the
- // handle_operations functor.
- call_itt_notify(releasing, &(op.status));
- // insert the operation in the queue
- do {
- // ITT may flag the following line as a race; it is a false positive:
- // This is an atomic read; we don't provide itt_hide_load_word for atomics
- op.my_next = res = mailbox; // NOT A RACE
- } while (mailbox.compare_and_swap(&op, res) != res);
- if (!res) { // first in the list; handle the operations
- // ITT note: &mailbox tag covers access to the handler_busy flag, which this
- // waiting handler thread will try to set before entering handle_operations.
- call_itt_notify(acquired, &mailbox);
- start_handle_operations();
- __TBB_ASSERT(op.status, NULL);
- }
- else { // not first; wait for op to be ready
- call_itt_notify(prepare, &(op.status));
- spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting));
- itt_load_word_with_acquire(op.status);
- }
- }
- private:
- //! An atomically updated list (aka mailbox) of aggregator_operations
- atomic<aggregator_operation *> mailbox;
- //! Controls thread access to handle_operations
- /** Behaves as boolean flag where 0=false, 1=true */
- uintptr_t handler_busy;
- handler_type handle_operations;
- //! Trigger the handling of operations when the handler is free
- void start_handle_operations() {
- aggregator_operation *pending_operations;
- // ITT note: &handler_busy tag covers access to mailbox as it is passed
- // between active and waiting handlers. Below, the waiting handler waits until
- // the active handler releases, and the waiting handler acquires &handler_busy as
- // it becomes the active_handler. The release point is at the end of this
- // function, when all operations in mailbox have been handled by the
- // owner of this aggregator.
- call_itt_notify(prepare, &handler_busy);
- // get handler_busy: only one thread can possibly spin here at a time
- spin_wait_until_eq(handler_busy, uintptr_t(0));
- call_itt_notify(acquired, &handler_busy);
- // acquire fence not necessary here due to causality rule and surrounding atomics
- __TBB_store_with_release(handler_busy, uintptr_t(1));
- // ITT note: &mailbox tag covers access to the handler_busy flag itself.
- // Capturing the state of the mailbox signifies that handler_busy has been
- // set and a new active handler will now process that list's operations.
- call_itt_notify(releasing, &mailbox);
- // grab pending_operations
- pending_operations = mailbox.fetch_and_store(NULL);
- // handle all the operations
- handle_operations(pending_operations);
- // release the handler
- itt_store_word_with_release(handler_busy, uintptr_t(0));
- }
- };
- //! Basic aggregator interface
- class aggregator : private aggregator_ext<internal::basic_handler> {
- public:
- aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
- //! BASIC INTERFACE: Enter a function for exclusvie execution by the aggregator.
- /** The calling thread stores the function object in a basic_operation and
- places the operation in the aggregator's mailbox */
- template<typename Body>
- void execute(const Body& b) {
- internal::basic_operation<Body> op(b);
- this->execute_impl(op);
- }
- };
- } // namespace interface6
- using interface6::aggregator;
- using interface6::aggregator_ext;
- using interface6::aggregator_operation;
- } // namespace tbb
- #endif // __TBB__aggregator_H
|