123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #ifndef __TBB_parallel_while
- #define __TBB_parallel_while
- #include "task.h"
- #include <new>
- namespace tbb {
- template<typename Body>
- class parallel_while;
- namespace internal {
- template<typename Stream, typename Body> class while_task;
-
-
- template<typename Body>
- class while_iteration_task: public task {
- const Body& my_body;
- typename Body::argument_type my_value;
- task* execute() {
- my_body(my_value);
- return NULL;
- }
- while_iteration_task( const typename Body::argument_type& value, const Body& body ) :
- my_body(body), my_value(value)
- {}
- template<typename Body_> friend class while_group_task;
- friend class tbb::parallel_while<Body>;
- };
-
-
- template<typename Body>
- class while_group_task: public task {
- static const size_t max_arg_size = 4;
- const Body& my_body;
- size_t size;
- typename Body::argument_type my_arg[max_arg_size];
- while_group_task( const Body& body ) : my_body(body), size(0) {}
- task* execute() {
- typedef while_iteration_task<Body> iteration_type;
- __TBB_ASSERT( size>0, NULL );
- task_list list;
- task* t;
- size_t k=0;
- for(;;) {
- t = new( allocate_child() ) iteration_type(my_arg[k],my_body);
- if( ++k==size ) break;
- list.push_back(*t);
- }
- set_ref_count(int(k+1));
- spawn(list);
- spawn_and_wait_for_all(*t);
- return NULL;
- }
- template<typename Stream, typename Body_> friend class while_task;
- };
-
-
-
- template<typename Stream, typename Body>
- class while_task: public task {
- Stream& my_stream;
- const Body& my_body;
- empty_task& my_barrier;
- task* execute() {
- typedef while_group_task<Body> block_type;
- block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body);
- size_t k=0;
- while( my_stream.pop_if_present(t.my_arg[k]) ) {
- if( ++k==block_type::max_arg_size ) {
-
- recycle_to_reexecute();
- break;
- }
- }
- if( k==0 ) {
- destroy(t);
- return NULL;
- } else {
- t.size = k;
- return &t;
- }
- }
- while_task( Stream& stream, const Body& body, empty_task& barrier ) :
- my_stream(stream),
- my_body(body),
- my_barrier(barrier)
- {}
- friend class tbb::parallel_while<Body>;
- };
- }
- template<typename Body>
- class parallel_while: internal::no_copy {
- public:
-
- parallel_while() : my_body(NULL), my_barrier(NULL) {}
-
- ~parallel_while() {
- if( my_barrier ) {
- my_barrier->destroy(*my_barrier);
- my_barrier = NULL;
- }
- }
-
- typedef typename Body::argument_type value_type;
-
-
- template<typename Stream>
- void run( Stream& stream, const Body& body );
-
-
- void add( const value_type& item );
- private:
- const Body* my_body;
- empty_task* my_barrier;
- };
- template<typename Body>
- template<typename Stream>
- void parallel_while<Body>::run( Stream& stream, const Body& body ) {
- using namespace internal;
- empty_task& barrier = *new( task::allocate_root() ) empty_task();
- my_body = &body;
- my_barrier = &barrier;
- my_barrier->set_ref_count(2);
- while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier );
- my_barrier->spawn_and_wait_for_all(w);
- my_barrier->destroy(*my_barrier);
- my_barrier = NULL;
- my_body = NULL;
- }
- template<typename Body>
- void parallel_while<Body>::add( const value_type& item ) {
- __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running");
- typedef internal::while_iteration_task<Body> iteration_type;
- iteration_type& i = *new( task::allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body);
- task::self().spawn( i );
- }
- }
- #endif
|