Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
flow_graph.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #define __TBB_flow_graph_H_include_area
22 
23 #include "tbb_stddef.h"
24 #include "atomic.h"
25 #include "spin_mutex.h"
26 #include "null_mutex.h"
27 #include "spin_rw_mutex.h"
28 #include "null_rw_mutex.h"
29 #include "task.h"
31 #include "tbb_exception.h"
34 #include "tbb_profiling.h"
35 #include "task_arena.h"
36 
37 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
38  #if __INTEL_COMPILER
39  // Disabled warning "routine is both inline and noinline"
40  #pragma warning (push)
41  #pragma warning( disable: 2196 )
42  #endif
43  #define __TBB_NOINLINE_SYM __attribute__((noinline))
44 #else
45  #define __TBB_NOINLINE_SYM
46 #endif
47 
48 #if __TBB_PREVIEW_ASYNC_MSG
49 #include <vector> // std::vector in internal::async_storage
50 #include <memory> // std::shared_ptr in async_msg
51 #endif
52 
53 #if __TBB_PREVIEW_STREAMING_NODE
54 // For streaming_node
55 #include <array> // std::array
56 #include <unordered_map> // std::unordered_map
57 #include <type_traits> // std::decay, std::true_type, std::false_type
58 #endif // __TBB_PREVIEW_STREAMING_NODE
59 
60 #if TBB_DEPRECATED_FLOW_ENQUEUE
61 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
62 #else
63 #define FLOW_SPAWN(a) tbb::task::spawn((a))
64 #endif
65 
66 // use the VC10 or gcc version of tuple if it is available.
67 #if __TBB_CPP11_TUPLE_PRESENT
68  #include <tuple>
69 namespace tbb {
70  namespace flow {
71  using std::tuple;
72  using std::tuple_size;
73  using std::tuple_element;
74  using std::get;
75  }
76 }
77 #else
78  #include "compat/tuple"
79 #endif
80 
81 #include<list>
82 #include<queue>
83 
94 namespace tbb {
95 namespace flow {
96 
98 enum concurrency { unlimited = 0, serial = 1 };
99 
100 namespace interface11 {
103 struct null_type {};
104 
106 class continue_msg {};
107 
109 template< typename T > class sender;
110 template< typename T > class receiver;
113 template< typename T, typename U > class limiter_node; // needed for resetting decrementer
115 template< typename R, typename B > class run_and_put_task;
117 namespace internal {
118 
119 template<typename T, typename M> class successor_cache;
120 template<typename T, typename M> class broadcast_cache;
121 template<typename T, typename M> class round_robin_cache;
122 template<typename T, typename M> class predecessor_cache;
123 template<typename T, typename M> class reservable_predecessor_cache;
125 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
126 namespace order {
127 struct following;
128 struct preceding;
129 }
130 template<typename Order, typename... Args> struct node_set;
131 #endif
133 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
134 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
135 // C == receiver< ... > or sender< ... >, depending.
136 template<typename C>
137 class edge_container {
139 public:
140  typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
141 
142  void add_edge(C &s) {
143  built_edges.push_back(&s);
144  }
146  void delete_edge(C &s) {
147  for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
148  if (*i == &s) {
149  (void)built_edges.erase(i);
150  return; // only remove one predecessor per request
151  }
152  }
153  }
155  void copy_edges(edge_list_type &v) {
156  v = built_edges;
157  }
158 
159  size_t edge_count() {
160  return (size_t)(built_edges.size());
161  }
163  void clear() {
164  built_edges.clear();
165  }
166 
167  // methods remove the statement from all predecessors/successors liste in the edge
168  // container.
169  template< typename S > void sender_extract(S &s);
170  template< typename R > void receiver_extract(R &r);
172 private:
173  edge_list_type built_edges;
174 }; // class edge_container
175 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
177 } // namespace internal
179 } // namespace interfaceX
180 } // namespace flow
181 } // namespace tbb
186 namespace tbb {
187 namespace flow {
188 namespace interface11 {
190 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
191 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
192  // if no RHS task, don't change left.
193  if (right == NULL) return left;
194  // right != NULL
195  if (left == NULL) return right;
196  if (left == SUCCESSFULLY_ENQUEUED) return right;
197  // left contains a task
198  if (right != SUCCESSFULLY_ENQUEUED) {
199  // both are valid tasks
201  return right;
202  }
203  return left;
204 }
206 #if __TBB_PREVIEW_ASYNC_MSG
208 template < typename T > class async_msg;
210 namespace internal {
212 template < typename T > class async_storage;
214 template< typename T, typename = void >
217  typedef T filtered_type;
218 
219  static const bool is_async_type = false;
221  static const void* to_void_ptr(const T& t) {
222  return static_cast<const void*>(&t);
223  }
224 
225  static void* to_void_ptr(T& t) {
226  return static_cast<void*>(&t);
227  }
229  static const T& from_void_ptr(const void* p) {
230  return *static_cast<const T*>(p);
231  }
233  static T& from_void_ptr(void* p) {
234  return *static_cast<T*>(p);
235  }
236 
237  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
238  if (is_async) {
239  // This (T) is NOT async and incoming 'A<X> t' IS async
240  // Get data from async_msg
242  task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
243  // finalize() must be called after subscribe() because set() can be called in finalize()
244  // and 'this_recv' client must be subscribed by this moment
245  msg.finalize();
246  return new_task;
247  }
248  else {
249  // Incoming 't' is NOT async
250  return this_recv->try_put_task(from_void_ptr(p));
251  }
252  }
253 };
255 template< typename T >
256 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
257  typedef T async_type;
258  typedef typename T::async_msg_data_type filtered_type;
260  static const bool is_async_type = true;
262  // Receiver-classes use const interfaces
263  static const void* to_void_ptr(const T& t) {
264  return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
265  }
266 
267  static void* to_void_ptr(T& t) {
268  return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
269  }
270 
271  // Sender-classes use non-const interfaces
272  static const T& from_void_ptr(const void* p) {
273  return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
274  }
275 
276  static T& from_void_ptr(void* p) {
277  return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
278  }
280  // Used in receiver<T> class
281  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
282  if (is_async) {
283  // Both are async
284  return this_recv->try_put_task(from_void_ptr(p));
285  }
286  else {
287  // This (T) is async and incoming 'X t' is NOT async
288  // Create async_msg for X
290  const T msg(t);
291  return this_recv->try_put_task(msg);
292  }
293  }
294 };
297 
299  template< typename, typename > friend class internal::predecessor_cache;
300  template< typename, typename > friend class internal::reservable_predecessor_cache;
301 public:
304 
305  virtual ~untyped_sender() {}
307  // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
308 
309  // TODO: Prevent untyped successor registration
310 
312  virtual bool register_successor( successor_type &r ) = 0;
315  virtual bool remove_successor( successor_type &r ) = 0;
318  virtual bool try_release( ) { return false; }
321  virtual bool try_consume( ) { return false; }
323 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
324  typedef internal::edge_container<successor_type> built_successors_type;
326  typedef built_successors_type::edge_list_type successor_list_type;
327  virtual built_successors_type &built_successors() = 0;
328  virtual void internal_add_built_successor( successor_type & ) = 0;
329  virtual void internal_delete_built_successor( successor_type & ) = 0;
330  virtual void copy_successors( successor_list_type &) = 0;
331  virtual size_t successor_count() = 0;
332 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
333 protected:
335  template< typename X >
336  bool try_get( X &t ) {
338  }
341  template< typename X >
342  bool try_reserve( X &t ) {
344  }
346  virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
347  virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
348 };
351  template< typename, typename > friend class run_and_put_task;
353  template< typename, typename > friend class internal::broadcast_cache;
354  template< typename, typename > friend class internal::round_robin_cache;
355  template< typename, typename > friend class internal::successor_cache;
357 #if __TBB_PREVIEW_OPENCL_NODE
358  template< typename, typename > friend class proxy_dependency_receiver;
359 #endif /* __TBB_PREVIEW_OPENCL_NODE */
360 public:
365  virtual ~untyped_receiver() {}
368  template<typename X>
369  bool try_put(const X& t) {
370  task *res = try_put_task(t);
371  if (!res) return false;
373  return true;
374  }
376  // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
377 
378  // TODO: Prevent untyped predecessor registration
381  virtual bool register_predecessor( predecessor_type & ) { return false; }
384  virtual bool remove_predecessor( predecessor_type & ) { return false; }
386 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
387  typedef internal::edge_container<predecessor_type> built_predecessors_type;
388  typedef built_predecessors_type::edge_list_type predecessor_list_type;
389  virtual built_predecessors_type &built_predecessors() = 0;
390  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
391  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
392  virtual void copy_predecessors( predecessor_list_type & ) = 0;
393  virtual size_t predecessor_count() = 0;
394 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
395 protected:
396  template<typename X>
397  task *try_put_task(const X& t) {
399  }
401  virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
403  virtual graph& graph_reference() const = 0;
405  // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
410  virtual bool is_continue_receiver() { return false; }
411 };
413 } // namespace internal
416 template< typename T >
418 public:
425  virtual bool try_get( T & ) { return false; }
428  virtual bool try_reserve( T & ) { return false; }
430 protected:
431  virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
432  // Both async OR both are NOT async
435  }
436  // Else: this (T) is async OR incoming 't' is async
437  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
438  return false;
439  }
441  virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
442  // Both async OR both are NOT async
445  }
446  // Else: this (T) is async OR incoming 't' is async
447  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
448  return false;
449  }
450 }; // class sender<T>
453 template< typename T >
455  template< typename > friend class internal::async_storage;
456  template< typename, typename > friend struct internal::async_helpers;
457 public:
466  }
470  }
472 protected:
473  virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
475  }
478  virtual task *try_put_task(const T& t) = 0;
480 }; // class receiver<T>
482 #else // __TBB_PREVIEW_ASYNC_MSG
483 
485 template< typename T >
486 class sender {
487 public:
489  __TBB_DEPRECATED typedef T output_type;
494  virtual ~sender() {}
496  // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
497 
499  __TBB_DEPRECATED virtual bool register_successor( successor_type &r ) = 0;
500 
505  virtual bool try_get( T & ) { return false; }
506 
508  virtual bool try_reserve( T & ) { return false; }
511  virtual bool try_release( ) { return false; }
514  virtual bool try_consume( ) { return false; }
516 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
517  __TBB_DEPRECATED typedef typename internal::edge_container<successor_type> built_successors_type;
519  __TBB_DEPRECATED typedef typename built_successors_type::edge_list_type successor_list_type;
520  __TBB_DEPRECATED virtual built_successors_type &built_successors() = 0;
521  __TBB_DEPRECATED virtual void internal_add_built_successor( successor_type & ) = 0;
522  __TBB_DEPRECATED virtual void internal_delete_built_successor( successor_type & ) = 0;
523  __TBB_DEPRECATED virtual void copy_successors( successor_list_type &) = 0;
524  __TBB_DEPRECATED virtual size_t successor_count() = 0;
525 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
526 }; // class sender<T>
527 
529 template< typename T >
530 class receiver {
531 public:
534 
539  virtual ~receiver() {}
542  bool try_put( const T& t ) {
543  task *res = try_put_task(t);
544  if (!res) return false;
546  return true;
547  }
550 protected:
551  template< typename R, typename B > friend class run_and_put_task;
552  template< typename X, typename Y > friend class internal::broadcast_cache;
553  template< typename X, typename Y > friend class internal::round_robin_cache;
554  virtual task *try_put_task(const T& t) = 0;
555  virtual graph& graph_reference() const = 0;
556 public:
557  // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
558 
560  __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
563  __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
564 
565 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
566  __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
567  __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
568  __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors() = 0;
569  __TBB_DEPRECATED virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
570  __TBB_DEPRECATED virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
571  __TBB_DEPRECATED virtual void copy_predecessors( predecessor_list_type & ) = 0;
572  __TBB_DEPRECATED virtual size_t predecessor_count() = 0;
573 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
574 
575 protected:
579  template<typename TT, typename M> friend class internal::successor_cache;
580  virtual bool is_continue_receiver() { return false; }
582 #if __TBB_PREVIEW_OPENCL_NODE
583  template< typename, typename > friend class proxy_dependency_receiver;
584 #endif /* __TBB_PREVIEW_OPENCL_NODE */
585 }; // class receiver<T>
587 #endif // __TBB_PREVIEW_ASYNC_MSG
588 
591 class continue_receiver : public receiver< continue_msg > {
592 public:
599 
602  __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
603  my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
604  my_current_count = 0;
605  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
606  }
607 
611  my_current_count = 0;
612  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
613  }
614 
619  return true;
620  }
621 
629  return true;
630  }
632 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
633  __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
634  __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
635  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
637  __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
639  my_built_predecessors.add_edge( s );
640  }
642  __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
644  my_built_predecessors.delete_edge(s);
645  }
647  __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
649  my_built_predecessors.copy_edges(v);
650  }
652  __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
654  return my_built_predecessors.edge_count();
655  }
656 
657 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
659 protected:
660  template< typename R, typename B > friend class run_and_put_task;
661  template<typename X, typename Y> friend class internal::broadcast_cache;
662  template<typename X, typename Y> friend class internal::round_robin_cache;
663  // execute body is supposed to be too small to create a task for.
665  {
669  else
671  }
672  task * res = execute();
673  return res? res : SUCCESSFULLY_ENQUEUED;
674  }
675 
676 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
677  // continue_receiver must contain its own built_predecessors because it does
678  // not have a node_cache.
679  built_predecessors_type my_built_predecessors;
680 #endif
686  // the friend declaration in the base class did not eliminate the "protected class"
687  // error in gcc 4.1.2
688  template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
692  if (f & rf_clear_edges) {
693 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
694  my_built_predecessors.clear();
695 #endif
697  }
698  }
703  virtual task * execute() = 0;
704  template<typename TT, typename M> friend class internal::successor_cache;
705  bool is_continue_receiver() __TBB_override { return true; }
707 }; // class continue_receiver
709 } // interfaceX
711 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
712  template <typename K, typename T>
713  K key_from_message( const T &t ) {
714  return t.key();
715  }
716 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
717 
721 } // flow
722 } // tbb
726 
727 namespace tbb {
728 namespace flow {
729 namespace interface11 {
734 #if __TBB_PREVIEW_ASYNC_MSG
736 #endif
738 
739 template <typename C, typename N>
740 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
741 {
742  if (begin) current_node = my_graph->my_nodes;
743  //else it is an end iterator by default
744 }
745 
746 template <typename C, typename N>
748  __TBB_ASSERT(current_node, "graph_iterator at end");
749  return *operator->();
750 }
751 
752 template <typename C, typename N>
754  return current_node;
755 }
757 template <typename C, typename N>
759  if (current_node) current_node = current_node->next;
760 }
762 } // namespace interfaceX
763 
764 namespace interface10 {
766 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
768  own_context = true;
769  cancelled = false;
770  caught_exception = false;
771  my_context = new task_group_context(tbb::internal::FLOW_TASKS);
775  my_is_active = true;
776 }
778 inline graph::graph(task_group_context& use_this_context) :
779  my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
781  own_context = false;
782  cancelled = false;
787  my_is_active = true;
788 }
789 
790 inline graph::~graph() {
791  wait_for_all();
793  tbb::task::destroy(*my_root_task);
794  if (own_context) delete my_context;
795  delete my_task_arena;
796 }
798 inline void graph::reserve_wait() {
799  if (my_root_task) {
802  }
803 }
804 
805 inline void graph::release_wait() {
806  if (my_root_task) {
809  }
810 }
811 
813  n->next = NULL;
814  {
816  n->prev = my_nodes_last;
818  my_nodes_last = n;
819  if (!my_nodes) my_nodes = n;
820  }
821 }
822 
824  {
826  __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
827  if (n->prev) n->prev->next = n->next;
828  if (n->next) n->next->prev = n->prev;
829  if (my_nodes_last == n) my_nodes_last = n->prev;
830  if (my_nodes == n) my_nodes = n->next;
831  }
832  n->prev = n->next = NULL;
833 }
834 
836  // reset context
838 
839  if(my_context) my_context->reset();
840  cancelled = false;
841  caught_exception = false;
842  // reset all the nodes comprising the graph
843  for(iterator ii = begin(); ii != end(); ++ii) {
844  tbb::flow::interface11::graph_node *my_p = &(*ii);
845  my_p->reset_node(f);
846  }
847  // Reattach the arena. Might be useful to run the graph in a particular task_arena
848  // while not limiting graph lifetime to a single task_arena::execute() call.
849  prepare_task_arena( /*reinit=*/true );
851  // now spawn the tasks necessary to start the graph
852  for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
854  }
855  my_reset_task_list.clear();
856 }
858 inline graph::iterator graph::begin() { return iterator(this, true); }
860 inline graph::iterator graph::end() { return iterator(this, false); }
861 
862 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
863 
864 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
865 
866 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
868 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
869 
870 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
871 inline void graph::set_name(const char *name) {
873 }
874 #endif
876 } // namespace interface10
878 namespace interface11 {
880 inline graph_node::graph_node(graph& g) : my_graph(g) {
881  my_graph.register_node(this);
882 }
883 
885  my_graph.remove_node(this);
886 }
889 
890 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
891 using internal::node_set;
892 #endif
893 
895 template < typename Output >
896 class source_node : public graph_node, public sender< Output > {
897 public:
899  typedef Output output_type;
900 
903 
904  //Source node has no input type
906 
907 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
908  typedef typename sender<output_type>::built_successors_type built_successors_type;
909  typedef typename sender<output_type>::successor_list_type successor_list_type;
910 #endif
913  template< typename Body >
914  __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
915  : graph_node(g), my_active(is_active), init_my_active(is_active),
917  my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
919  {
921  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
922  static_cast<sender<output_type> *>(this), this->my_body );
923  }
924 
925 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
926  template <typename Body, typename... Successors>
927  source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
928  : source_node(successors.graph_reference(), body, is_active) {
929  make_edges(*this, successors);
930  }
931 #endif
935  graph_node(src.my_graph), sender<Output>(),
937  init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
938  my_reserved(false), my_has_cached_item(false)
939  {
941  tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
942  static_cast<sender<output_type> *>(this), this->my_body );
943  }
944 
946  ~source_node() { delete my_body; delete my_init_body; }
948 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
949  void set_name( const char *name ) __TBB_override {
951  }
952 #endif
958  if ( my_active )
959  spawn_put();
960  return true;
961  }
962 
967  return true;
968  }
969 
970 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
972  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
974  void internal_add_built_successor( successor_type &r) __TBB_override {
976  my_successors.internal_add_built_successor(r);
977  }
979  void internal_delete_built_successor( successor_type &r) __TBB_override {
981  my_successors.internal_delete_built_successor(r);
982  }
983 
984  size_t successor_count() __TBB_override {
986  return my_successors.successor_count();
987  }
988 
989  void copy_successors(successor_list_type &v) __TBB_override {
991  my_successors.copy_successors(v);
992  }
993 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
994 
998  if ( my_reserved )
999  return false;
1000 
1001  if ( my_has_cached_item ) {
1002  v = my_cached_item;
1003  my_has_cached_item = false;
1004  return true;
1005  }
1006  // we've been asked to provide an item, but we have none. enqueue a task to
1007  // provide one.
1008  spawn_put();
1009  return false;
1010  }
1011 
1015  if ( my_reserved ) {
1016  return false;
1017  }
1018 
1020  v = my_cached_item;
1021  my_reserved = true;
1022  return true;
1023  } else {
1024  return false;
1025  }
1026  }
1032  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1033  my_reserved = false;
1035  spawn_put();
1036  return true;
1037  }
1038 
1042  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1043  my_reserved = false;
1044  my_has_cached_item = false;
1045  if ( !my_successors.empty() ) {
1047  }
1048  return true;
1049  }
1052  void activate() {
1054  my_active = true;
1057  }
1058 
1059  template<typename Body>
1062  return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1063  }
1065 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1066  void extract( ) __TBB_override {
1067  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1069  my_reserved = false;
1071  }
1072 #endif
1074 protected:
1079  my_reserved =false;
1080  if(my_has_cached_item) {
1082  }
1084  if(f & rf_reset_bodies) {
1086  delete my_body;
1087  my_body = tmp;
1088  }
1091  }
1092 
1093 private:
1103 
1104  // used by apply_body_bypass, can invoke body of node.
1107  if ( my_reserved ) {
1108  return false;
1109  }
1110  if ( !my_has_cached_item ) {
1112  bool r = (*my_body)(my_cached_item);
1114  if (r) {
1115  my_has_cached_item = true;
1116  }
1117  }
1118  if ( my_has_cached_item ) {
1119  v = my_cached_item;
1120  my_reserved = true;
1121  return true;
1122  } else {
1123  return false;
1124  }
1125  }
1126 
1127  // when resetting, and if the source_node was created with my_active == true, then
1128  // when we reset the node we must store a task to run the node, and spawn it only
1129  // after the reset is complete and is_active() is again true. This is why we don't
1130  // test for is_active() here.
1132  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1134  }
1135 
1137  void spawn_put( ) {
1138  if(internal::is_graph_active(this->my_graph)) {
1140  }
1141  }
1142 
1146  output_type v;
1147  if ( !try_reserve_apply_body(v) )
1148  return NULL;
1149 
1150  task *last_task = my_successors.try_put_task(v);
1151  if ( last_task )
1152  try_consume();
1153  else
1154  try_release();
1155  return last_task;
1156  }
1157 }; // class source_node
1158 
1160 template < typename Input, typename Output = continue_msg, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1161 class function_node : public graph_node, public internal::function_input<Input,Output,Policy,Allocator>, public internal::function_output<Output> {
1162 public:
1163  typedef Input input_type;
1164  typedef Output output_type;
1170 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1171  typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1172  typedef typename fOutput_type::successor_list_type successor_list_type;
1173 #endif
1175 
1177  // input_queue_type is allocated here, but destroyed in the function_input_base.
1178  // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1179  // be done in one place. This would be an interface-breaking change.
1180  template< typename Body >
1184 #else
1186 #endif
1188  fOutput_type(g) {
1189  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1190  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1191  }
1192 
1193 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1194  template <typename Body>
1195  function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1196  : function_node(g, concurrency, body, Policy(), priority) {}
1197 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1198 
1199 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1200  template <typename Body, typename... Args>
1201  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1204  make_edges_in_order(nodes, *this);
1205  }
1206 
1207 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1208  template <typename Body, typename... Args>
1209  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1210  : function_node(nodes, concurrency, body, Policy(), priority) {}
1211 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1212 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1216  graph_node(src.my_graph),
1218  fOutput_type(src.my_graph) {
1219  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1220  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1221  }
1222 
1223 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1224  void set_name( const char *name ) __TBB_override {
1226  }
1227 #endif
1228 
1229 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1230  void extract( ) __TBB_override {
1231  my_predecessors.built_predecessors().receiver_extract(*this);
1232  successors().built_successors().sender_extract(*this);
1233  }
1234 #endif
1235 
1236 protected:
1237  template< typename R, typename B > friend class run_and_put_task;
1238  template<typename X, typename Y> friend class internal::broadcast_cache;
1239  template<typename X, typename Y> friend class internal::round_robin_cache;
1241 
1243 
1246  // TODO: use clear() instead.
1247  if(f & rf_clear_edges) {
1248  successors().clear();
1250  }
1251  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1252  __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1253  }
1255 }; // class function_node
1256 
1258 // Output is a tuple of output types.
1259 template < typename Input, typename Output, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1261  public graph_node,
1263  <
1264  Input,
1265  typename internal::wrap_tuple_elements<
1266  tbb::flow::tuple_size<Output>::value, // #elements in tuple
1267  internal::multifunction_output, // wrap this around each element
1268  Output // the tuple providing the types
1269  >::type,
1270  Policy,
1271  Allocator
1272  > {
1273 protected:
1275 public:
1276  typedef Input input_type;
1281 private:
1284 public:
1285  template<typename Body>
1287  graph &g, size_t concurrency,
1290 #else
1292 #endif
1294  tbb::internal::fgt_multioutput_node_with_body<N>(
1295  CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1296  &this->my_graph, static_cast<receiver<input_type> *>(this),
1297  this->output_ports(), this->my_body
1298  );
1299  }
1300 
1301 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1302  template <typename Body>
1304  : multifunction_node(g, concurrency, body, Policy(), priority) {}
1305 #endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1306 
1307 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1308  template <typename Body, typename... Args>
1309  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1312  make_edges_in_order(nodes, *this);
1313  }
1314 
1315 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1316  template <typename Body, typename... Args>
1317  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1318  : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1319 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1320 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1321 
1323  graph_node(other.my_graph), base_type(other) {
1324  tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1325  &this->my_graph, static_cast<receiver<input_type> *>(this),
1326  this->output_ports(), this->my_body );
1327  }
1328 
1329 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1330  void set_name( const char *name ) __TBB_override {
1332  }
1333 #endif
1334 
1335 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1336  void extract( ) __TBB_override {
1337  my_predecessors.built_predecessors().receiver_extract(*this);
1338  base_type::extract();
1339  }
1340 #endif
1341  // all the guts are in multifunction_input...
1342 protected:
1344 }; // multifunction_node
1345 
1347 // successors. The node has unlimited concurrency, so it does not reject inputs.
1348 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1349 class split_node : public graph_node, public receiver<TupleType> {
1352 public:
1353  typedef TupleType input_type;
1354  typedef Allocator allocator_type;
1355 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1357  typedef typename base_type::predecessor_list_type predecessor_list_type;
1358  typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1359  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1360 #endif
1361 
1362  typedef typename internal::wrap_tuple_elements<
1363  N, // #elements in tuple
1364  internal::multifunction_output, // wrap this around each element
1365  TupleType // the tuple providing the types
1367 
1369  : graph_node(g),
1371  {
1372  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1373  static_cast<receiver<input_type> *>(this), this->output_ports());
1374  }
1375 
1376 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1377  template <typename... Args>
1378  __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1379  make_edges_in_order(nodes, *this);
1380  }
1381 #endif
1382 
1384  : graph_node(other.my_graph), base_type(other),
1386  {
1387  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1388  static_cast<receiver<input_type> *>(this), this->output_ports());
1389  }
1390 
1391 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1392  void set_name( const char *name ) __TBB_override {
1394  }
1395 #endif
1396 
1398 
1399 protected:
1400  task *try_put_task(const TupleType& t) __TBB_override {
1401  // Sending split messages in parallel is not justified, as overheads would prevail.
1402  // Also, we do not have successors here. So we just tell the task returned here is successful.
1404  }
1406  if (f & rf_clear_edges)
1408 
1410  }
1413  return my_graph;
1414  }
1415 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1416 private:
1417  void extract() __TBB_override {}
1418 
1420  void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1421 
1423  void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1424 
1425  size_t predecessor_count() __TBB_override { return 0; }
1426 
1427  void copy_predecessors(predecessor_list_type&) __TBB_override {}
1429  built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1430 
1432  built_predecessors_type my_predessors;
1433 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1434 
1435 private:
1437 };
1438 
1440 template <typename Output, typename Policy = internal::Policy<void> >
1441 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1442  public internal::function_output<Output> {
1443 public:
1445  typedef Output output_type;
1450 
1452  template <typename Body >
1454  graph &g,
1456  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1457 #else
1459 #endif
1460  ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1461  fOutput_type(g) {
1462  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1463 
1464  static_cast<receiver<input_type> *>(this),
1465  static_cast<sender<output_type> *>(this), this->my_body );
1466  }
1467 
1468 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1469  template <typename Body>
1470  continue_node( graph& g, Body body, node_priority_t priority )
1471  : continue_node(g, body, Policy(), priority) {}
1472 #endif
1473 
1474 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1475  template <typename Body, typename... Args>
1476  continue_node( const node_set<Args...>& nodes, Body body,
1479  make_edges_in_order(nodes, *this);
1480  }
1481 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1482  template <typename Body, typename... Args>
1483  continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1484  : continue_node(nodes, body, Policy(), priority) {}
1485 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1486 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1487 
1489  template <typename Body >
1491  graph &g, int number_of_predecessors,
1494 #else
1496 #endif
1497  ) : graph_node(g)
1498  , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1499  fOutput_type(g) {
1500  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1501  static_cast<receiver<input_type> *>(this),
1502  static_cast<sender<output_type> *>(this), this->my_body );
1503  }
1504 
1505 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1506  template <typename Body>
1507  continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1508  : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1509 #endif
1511 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1512  template <typename Body, typename... Args>
1513  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1515  : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1516  make_edges_in_order(nodes, *this);
1517  }
1518 
1519 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1520  template <typename Body, typename... Args>
1521  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1522  Body body, node_priority_t priority )
1523  : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1524 #endif
1525 #endif
1526 
1531  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1532  static_cast<receiver<input_type> *>(this),
1533  static_cast<sender<output_type> *>(this), this->my_body );
1534  }
1535 
1536 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1537  void set_name( const char *name ) __TBB_override {
1539  }
1540 #endif
1541 
1542 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1543  void extract() __TBB_override {
1544  input_impl_type::my_built_predecessors.receiver_extract(*this);
1545  successors().built_successors().sender_extract(*this);
1546  }
1547 #endif
1549 protected:
1550  template< typename R, typename B > friend class run_and_put_task;
1551  template<typename X, typename Y> friend class internal::broadcast_cache;
1552  template<typename X, typename Y> friend class internal::round_robin_cache;
1555 
1558  if(f & rf_clear_edges)successors().clear();
1559  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1560  }
1561 }; // continue_node
1564 template <typename T>
1565 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1566 public:
1567  typedef T input_type;
1568  typedef T output_type;
1571 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1572  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1573  typedef typename sender<output_type>::successor_list_type successor_list_type;
1574 #endif
1575 private:
1577 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1578  internal::edge_container<predecessor_type> my_built_predecessors;
1579  spin_mutex pred_mutex; // serialize accesses on edge_container
1580 #endif
1581 public:
1585  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1586  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1587  }
1588 
1589 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1590  template <typename... Args>
1591  broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1592  make_edges_in_order(nodes, *this);
1593  }
1594 #endif
1595 
1596  // Copy constructor
1599  {
1600  my_successors.set_owner( this );
1601  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1602  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1603  }
1604 
1605 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1606  void set_name( const char *name ) __TBB_override {
1608  }
1609 #endif
1614  return true;
1615  }
1616 
1620  return true;
1621  }
1623 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1624  typedef typename sender<T>::built_successors_type built_successors_type;
1626  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1627 
1628  void internal_add_built_successor(successor_type &r) __TBB_override {
1629  my_successors.internal_add_built_successor(r);
1630  }
1631 
1632  void internal_delete_built_successor(successor_type &r) __TBB_override {
1633  my_successors.internal_delete_built_successor(r);
1634  }
1635 
1636  size_t successor_count() __TBB_override {
1637  return my_successors.successor_count();
1638  }
1639 
1640  void copy_successors(successor_list_type &v) __TBB_override {
1641  my_successors.copy_successors(v);
1642  }
1643 
1644  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1645 
1646  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1648  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1649  spin_mutex::scoped_lock l(pred_mutex);
1650  my_built_predecessors.add_edge(p);
1651  }
1652 
1653  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1655  my_built_predecessors.delete_edge(p);
1656  }
1658  size_t predecessor_count() __TBB_override {
1660  return my_built_predecessors.edge_count();
1661  }
1663  void copy_predecessors(predecessor_list_type &v) __TBB_override {
1665  my_built_predecessors.copy_edges(v);
1666  }
1668  void extract() __TBB_override {
1669  my_built_predecessors.receiver_extract(*this);
1670  my_successors.built_successors().sender_extract(*this);
1671  }
1672 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1673 
1674 protected:
1675  template< typename R, typename B > friend class run_and_put_task;
1676  template<typename X, typename Y> friend class internal::broadcast_cache;
1677  template<typename X, typename Y> friend class internal::round_robin_cache;
1680  task *new_task = my_successors.try_put_task(t);
1681  if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1682  return new_task;
1683  }
1684 
1686  return my_graph;
1687  }
1688 
1690 
1693  my_successors.clear();
1694 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1695  my_built_predecessors.clear();
1696 #endif
1697  }
1698  __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1699  }
1700 }; // broadcast_node
1703 template <typename T, typename A=cache_aligned_allocator<T> >
1704 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1705 public:
1706  typedef T input_type;
1707  typedef T output_type;
1711 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1712  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1713  typedef typename sender<output_type>::successor_list_type successor_list_type;
1714 #endif
1715 protected:
1716  typedef size_t size_type;
1718 
1719 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1720  internal::edge_container<predecessor_type> my_built_predecessors;
1721 #endif
1722 
1724 
1726 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1727  , add_blt_succ, del_blt_succ,
1728  add_blt_pred, del_blt_pred,
1729  blt_succ_cnt, blt_pred_cnt,
1730  blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1731 #endif
1732  };
1733 
1734  // implements the aggregator_operation concept
1735  class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1736  public:
1737  char type;
1738 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1739  task * ltask;
1740  union {
1741  input_type *elem;
1744  size_t cnt_val;
1745  successor_list_type *svec;
1746  predecessor_list_type *pvec;
1747  };
1748 #else
1749  T *elem;
1752 #endif
1753  buffer_operation(const T& e, op_type t) : type(char(t))
1755 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1756  , ltask(NULL), elem(const_cast<T*>(&e))
1757 #else
1758  , elem(const_cast<T*>(&e)) , ltask(NULL)
1759 #endif
1760  {}
1761  buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1762  };
1765  typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
1766  friend class internal::aggregating_functor<class_type, buffer_operation>;
1767  internal::aggregator< handler_type, buffer_operation> my_aggregator;
1769  virtual void handle_operations(buffer_operation *op_list) {
1770  handle_operations_impl(op_list, this);
1771  }
1773  template<typename derived_type>
1774  void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1775  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1776 
1777  buffer_operation *tmp = NULL;
1778  bool try_forwarding = false;
1779  while (op_list) {
1780  tmp = op_list;
1781  op_list = op_list->next;
1782  switch (tmp->type) {
1783  case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1784  case rem_succ: internal_rem_succ(tmp); break;
1785  case req_item: internal_pop(tmp); break;
1786  case res_item: internal_reserve(tmp); break;
1787  case rel_res: internal_release(tmp); try_forwarding = true; break;
1788  case con_res: internal_consume(tmp); try_forwarding = true; break;
1789  case put_item: try_forwarding = internal_push(tmp); break;
1790  case try_fwd_task: internal_forward_task(tmp); break;
1791 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1792  // edge recording
1793  case add_blt_succ: internal_add_built_succ(tmp); break;
1794  case del_blt_succ: internal_del_built_succ(tmp); break;
1795  case add_blt_pred: internal_add_built_pred(tmp); break;
1796  case del_blt_pred: internal_del_built_pred(tmp); break;
1797  case blt_succ_cnt: internal_succ_cnt(tmp); break;
1798  case blt_pred_cnt: internal_pred_cnt(tmp); break;
1799  case blt_succ_cpy: internal_copy_succs(tmp); break;
1800  case blt_pred_cpy: internal_copy_preds(tmp); break;
1801 #endif
1802  }
1803  }
1805  derived->order();
1807  if (try_forwarding && !forwarder_busy) {
1809  forwarder_busy = true;
1810  task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
1813  // tmp should point to the last item handled by the aggregator. This is the operation
1814  // the handling thread enqueued. So modifying that record will be okay.
1815  // workaround for icc bug
1816  tbb::task *z = tmp->ltask;
1817  graph &g = this->my_graph;
1818  tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
1819  }
1820  }
1821  } // handle_operations
1824  return op_data.ltask;
1825  }
1827  inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1828  task *ft = grab_forwarding_task(op_data);
1829  if(ft) {
1831  return true;
1832  }
1833  return false;
1834  }
1835 
1837  virtual task *forward_task() {
1838  buffer_operation op_data(try_fwd_task);
1839  task *last_task = NULL;
1840  do {
1841  op_data.status = internal::WAIT;
1842  op_data.ltask = NULL;
1843  my_aggregator.execute(&op_data);
1844 
1845  // workaround for icc bug
1846  tbb::task *xtask = op_data.ltask;
1847  graph& g = this->my_graph;
1848  last_task = combine_tasks(g, last_task, xtask);
1849  } while (op_data.status ==internal::SUCCEEDED);
1850  return last_task;
1851  }
1852 
1855  my_successors.register_successor(*(op->r));
1857  }
1861  my_successors.remove_successor(*(op->r));
1863  }
1865 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1866  typedef typename sender<T>::built_successors_type built_successors_type;
1868  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1870  virtual void internal_add_built_succ(buffer_operation *op) {
1871  my_successors.internal_add_built_successor(*(op->r));
1873  }
1875  virtual void internal_del_built_succ(buffer_operation *op) {
1876  my_successors.internal_delete_built_successor(*(op->r));
1878  }
1880  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1882  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1884  virtual void internal_add_built_pred(buffer_operation *op) {
1885  my_built_predecessors.add_edge(*(op->p));
1887  }
1889  virtual void internal_del_built_pred(buffer_operation *op) {
1890  my_built_predecessors.delete_edge(*(op->p));
1892  }
1893 
1894  virtual void internal_succ_cnt(buffer_operation *op) {
1895  op->cnt_val = my_successors.successor_count();
1897  }
1898 
1899  virtual void internal_pred_cnt(buffer_operation *op) {
1900  op->cnt_val = my_built_predecessors.edge_count();
1902  }
1903 
1904  virtual void internal_copy_succs(buffer_operation *op) {
1905  my_successors.copy_successors(*(op->svec));
1907  }
1909  virtual void internal_copy_preds(buffer_operation *op) {
1910  my_built_predecessors.copy_edges(*(op->pvec));
1912  }
1913 
1914 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1915 
1916 private:
1917  void order() {}
1918 
1919  bool is_item_valid() {
1920  return this->my_item_valid(this->my_tail - 1);
1921  }
1923  void try_put_and_add_task(task*& last_task) {
1924  task *new_task = my_successors.try_put_task(this->back());
1925  if (new_task) {
1926  // workaround for icc bug
1927  graph& g = this->my_graph;
1928  last_task = combine_tasks(g, last_task, new_task);
1929  this->destroy_back();
1930  }
1931  }
1933 protected:
1935  virtual void internal_forward_task(buffer_operation *op) {
1937  }
1939  template<typename derived_type>
1940  void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1941  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1943  if (this->my_reserved || !derived->is_item_valid()) {
1945  this->forwarder_busy = false;
1946  return;
1947  }
1948  // Try forwarding, giving each successor a chance
1949  task * last_task = NULL;
1950  size_type counter = my_successors.size();
1951  for (; counter > 0 && derived->is_item_valid(); --counter)
1952  derived->try_put_and_add_task(last_task);
1954  op->ltask = last_task; // return task
1955  if (last_task && !counter) {
1957  }
1958  else {
1960  forwarder_busy = false;
1961  }
1962  }
1963 
1964  virtual bool internal_push(buffer_operation *op) {
1965  this->push_back(*(op->elem));
1967  return true;
1968  }
1969 
1970  virtual void internal_pop(buffer_operation *op) {
1971  if(this->pop_back(*(op->elem))) {
1973  }
1974  else {
1976  }
1977  }
1978 
1980  if(this->reserve_front(*(op->elem))) {
1982  }
1983  else {
1985  }
1986  }
1987 
1989  this->consume_front();
1991  }
1992 
1994  this->release_front();
1996  }
1997 
1998 public:
2001  forwarder_busy(false) {
2002  my_successors.set_owner(this);
2003  my_aggregator.initialize_handler(handler_type(this));
2004  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2005  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2006  }
2007 
2008 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2009  template <typename... Args>
2010  buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2011  make_edges_in_order(nodes, *this);
2012  }
2013 #endif
2014 
2017  internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
2018  forwarder_busy = false;
2019  my_successors.set_owner(this);
2020  my_aggregator.initialize_handler(handler_type(this));
2021  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2022  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2023  }
2024 
2025 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2026  void set_name( const char *name ) __TBB_override {
2028  }
2029 #endif
2030 
2031  //
2032  // message sender implementation
2033  //
2034 
2036 
2038  buffer_operation op_data(reg_succ);
2039  op_data.r = &r;
2040  my_aggregator.execute(&op_data);
2041  (void)enqueue_forwarding_task(op_data);
2042  return true;
2043  }
2044 
2045 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2046  void internal_add_built_successor( successor_type &r) __TBB_override {
2047  buffer_operation op_data(add_blt_succ);
2048  op_data.r = &r;
2049  my_aggregator.execute(&op_data);
2050  }
2051 
2052  void internal_delete_built_successor( successor_type &r) __TBB_override {
2053  buffer_operation op_data(del_blt_succ);
2054  op_data.r = &r;
2055  my_aggregator.execute(&op_data);
2056  }
2057 
2058  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2059  buffer_operation op_data(add_blt_pred);
2060  op_data.p = &p;
2061  my_aggregator.execute(&op_data);
2062  }
2063 
2064  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2065  buffer_operation op_data(del_blt_pred);
2066  op_data.p = &p;
2067  my_aggregator.execute(&op_data);
2068  }
2069 
2070  size_t predecessor_count() __TBB_override {
2071  buffer_operation op_data(blt_pred_cnt);
2072  my_aggregator.execute(&op_data);
2073  return op_data.cnt_val;
2074  }
2075 
2076  size_t successor_count() __TBB_override {
2077  buffer_operation op_data(blt_succ_cnt);
2078  my_aggregator.execute(&op_data);
2079  return op_data.cnt_val;
2080  }
2081 
2082  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2083  buffer_operation op_data(blt_pred_cpy);
2084  op_data.pvec = &v;
2085  my_aggregator.execute(&op_data);
2086  }
2087 
2088  void copy_successors( successor_list_type &v ) __TBB_override {
2089  buffer_operation op_data(blt_succ_cpy);
2090  op_data.svec = &v;
2091  my_aggregator.execute(&op_data);
2092  }
2093 
2094 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2095 
2097 
2100  r.remove_predecessor(*this);
2101  buffer_operation op_data(rem_succ);
2102  op_data.r = &r;
2103  my_aggregator.execute(&op_data);
2104  // even though this operation does not cause a forward, if we are the handler, and
2105  // a forward is scheduled, we may be the first to reach this point after the aggregator,
2106  // and so should check for the task.
2107  (void)enqueue_forwarding_task(op_data);
2108  return true;
2109  }
2110 
2112 
2114  bool try_get( T &v ) __TBB_override {
2115  buffer_operation op_data(req_item);
2116  op_data.elem = &v;
2117  my_aggregator.execute(&op_data);
2118  (void)enqueue_forwarding_task(op_data);
2119  return (op_data.status==internal::SUCCEEDED);
2120  }
2121 
2123 
2126  buffer_operation op_data(res_item);
2127  op_data.elem = &v;
2128  my_aggregator.execute(&op_data);
2129  (void)enqueue_forwarding_task(op_data);
2130  return (op_data.status==internal::SUCCEEDED);
2131  }
2132 
2134 
2136  buffer_operation op_data(rel_res);
2137  my_aggregator.execute(&op_data);
2138  (void)enqueue_forwarding_task(op_data);
2139  return true;
2140  }
2141 
2143 
2145  buffer_operation op_data(con_res);
2146  my_aggregator.execute(&op_data);
2147  (void)enqueue_forwarding_task(op_data);
2148  return true;
2149  }
2150 
2151 protected:
2152 
2153  template< typename R, typename B > friend class run_and_put_task;
2154  template<typename X, typename Y> friend class internal::broadcast_cache;
2155  template<typename X, typename Y> friend class internal::round_robin_cache;
2158  buffer_operation op_data(t, put_item);
2159  my_aggregator.execute(&op_data);
2160  task *ft = grab_forwarding_task(op_data);
2161  // sequencer_nodes can return failure (if an item has been previously inserted)
2162  // We have to spawn the returned task if our own operation fails.
2163 
2164  if(ft && op_data.status ==internal::FAILED) {
2165  // we haven't succeeded queueing the item, but for some reason the
2166  // call returned a task (if another request resulted in a successful
2167  // forward this could happen.) Queue the task and reset the pointer.
2169  }
2170  else if(!ft && op_data.status ==internal::SUCCEEDED) {
2171  ft = SUCCESSFULLY_ENQUEUED;
2172  }
2173  return ft;
2174  }
2175 
2177  return my_graph;
2178  }
2179 
2181 
2182 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2183 public:
2184  void extract() __TBB_override {
2185  my_built_predecessors.receiver_extract(*this);
2186  my_successors.built_successors().sender_extract(*this);
2187  }
2188 #endif
2189 
2190 protected:
2193  // TODO: just clear structures
2194  if (f&rf_clear_edges) {
2195  my_successors.clear();
2196 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2197  my_built_predecessors.clear();
2198 #endif
2199  }
2200  forwarder_busy = false;
2201  }
2202 }; // buffer_node
2203 
2205 template <typename T, typename A=cache_aligned_allocator<T> >
2206 class queue_node : public buffer_node<T, A> {
2207 protected:
2212 
2213 private:
2214  template<typename, typename> friend class buffer_node;
2215 
2216  bool is_item_valid() {
2217  return this->my_item_valid(this->my_head);
2218  }
2219 
2220  void try_put_and_add_task(task*& last_task) {
2221  task *new_task = this->my_successors.try_put_task(this->front());
2222  if (new_task) {
2223  // workaround for icc bug
2224  graph& graph_ref = this->graph_reference();
2225  last_task = combine_tasks(graph_ref, last_task, new_task);
2226  this->destroy_front();
2227  }
2228  }
2229 
2230 protected:
2231  void internal_forward_task(queue_operation *op) __TBB_override {
2232  this->internal_forward_task_impl(op, this);
2233  }
2234 
2236  if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2238  }
2239  else {
2240  this->pop_front(*(op->elem));
2242  }
2243  }
2245  if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2247  }
2248  else {
2249  this->reserve_front(*(op->elem));
2251  }
2252  }
2254  this->consume_front();
2256  }
2257 
2258 public:
2259  typedef T input_type;
2260  typedef T output_type;
2263 
2266  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2267  static_cast<receiver<input_type> *>(this),
2268  static_cast<sender<output_type> *>(this) );
2269  }
2270 
2271 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2272  template <typename... Args>
2273  queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2274  make_edges_in_order(nodes, *this);
2275  }
2276 #endif
2277 
2280  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2281  static_cast<receiver<input_type> *>(this),
2282  static_cast<sender<output_type> *>(this) );
2283  }
2284 
2285 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2286  void set_name( const char *name ) __TBB_override {
2288  }
2289 #endif
2290 
2291 protected:
2294  }
2295 }; // queue_node
2296 
2298 template< typename T, typename A=cache_aligned_allocator<T> >
2299 class sequencer_node : public queue_node<T, A> {
2301  // my_sequencer should be a benign function and must be callable
2302  // from a parallel context. Does this mean it needn't be reset?
2303 public:
2304  typedef T input_type;
2305  typedef T output_type;
2308 
2310  template< typename Sequencer >
2311  __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2312  my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2313  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2314  static_cast<receiver<input_type> *>(this),
2315  static_cast<sender<output_type> *>(this) );
2316  }
2317 
2318 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2319  template <typename Sequencer, typename... Args>
2320  sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2321  : sequencer_node(nodes.graph_reference(), s) {
2322  make_edges_in_order(nodes, *this);
2323  }
2324 #endif
2325 
2328  my_sequencer( src.my_sequencer->clone() ) {
2329  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2330  static_cast<receiver<input_type> *>(this),
2331  static_cast<sender<output_type> *>(this) );
2332  }
2333 
2336 
2337 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2338  void set_name( const char *name ) __TBB_override {
2340  }
2341 #endif
2342 
2343 protected:
2346 
2347 private:
2349  size_type tag = (*my_sequencer)(*(op->elem));
2350 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2351  if (tag < this->my_head) {
2352  // have already emitted a message with this tag
2354  return false;
2355  }
2356 #endif
2357  // cannot modify this->my_tail now; the buffer would be inconsistent.
2358  size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2359 
2360  if (this->size(new_tail) > this->capacity()) {
2361  this->grow_my_array(this->size(new_tail));
2362  }
2363  this->my_tail = new_tail;
2364 
2365  const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2366  __TBB_store_with_release(op->status, res);
2367  return res ==internal::SUCCEEDED;
2368  }
2369 }; // sequencer_node
2370 
2372 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2373 class priority_queue_node : public buffer_node<T, A> {
2374 public:
2375  typedef T input_type;
2376  typedef T output_type;
2381 
2383  __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2384  : buffer_node<T, A>(g), compare(comp), mark(0) {
2385  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2386  static_cast<receiver<input_type> *>(this),
2387  static_cast<sender<output_type> *>(this) );
2388  }
2389 
2390 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2391  template <typename... Args>
2392  priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2393  : priority_queue_node(nodes.graph_reference(), comp) {
2394  make_edges_in_order(nodes, *this);
2395  }
2396 #endif
2397 
2400  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2401  static_cast<receiver<input_type> *>(this),
2402  static_cast<sender<output_type> *>(this) );
2403  }
2404 
2405 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2406  void set_name( const char *name ) __TBB_override {
2408  }
2409 #endif
2410 
2411 protected:
2412 
2414  mark = 0;
2416  }
2417 
2421 
2424  this->internal_forward_task_impl(op, this);
2425  }
2426 
2428  this->handle_operations_impl(op_list, this);
2429  }
2430 
2432  prio_push(*(op->elem));
2434  return true;
2435  }
2436 
2438  // if empty or already reserved, don't pop
2439  if ( this->my_reserved == true || this->my_tail == 0 ) {
2441  return;
2442  }
2443 
2444  *(op->elem) = prio();
2446  prio_pop();
2447 
2448  }
2449 
2450  // pops the highest-priority item, saves copy
2452  if (this->my_reserved == true || this->my_tail == 0) {
2454  return;
2455  }
2456  this->my_reserved = true;
2457  *(op->elem) = prio();
2458  reserved_item = *(op->elem);
2460  prio_pop();
2461  }
2462 
2465  this->my_reserved = false;
2467  }
2468 
2469  void internal_release(prio_operation *op) __TBB_override {
2472  this->my_reserved = false;
2474  }
2475 
2476 private:
2477  template<typename, typename> friend class buffer_node;
2478 
2479  void order() {
2480  if (mark < this->my_tail) heapify();
2481  __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2482  }
2483 
2484  bool is_item_valid() {
2485  return this->my_tail > 0;
2486  }
2487 
2488  void try_put_and_add_task(task*& last_task) {
2489  task * new_task = this->my_successors.try_put_task(this->prio());
2490  if (new_task) {
2491  // workaround for icc bug
2492  graph& graph_ref = this->graph_reference();
2493  last_task = combine_tasks(graph_ref, last_task, new_task);
2494  prio_pop();
2495  }
2496  }
2497 
2498 private:
2499  Compare compare;
2501 
2503 
2504  // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2505  bool prio_use_tail() {
2506  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2507  return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2508  }
2509 
2510  // prio_push: checks that the item will fit, expand array if necessary, put at end
2511  void prio_push(const T &src) {
2512  if ( this->my_tail >= this->my_array_size )
2513  this->grow_my_array( this->my_tail + 1 );
2514  (void) this->place_item(this->my_tail, src);
2515  ++(this->my_tail);
2516  __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2517  }
2518 
2519  // prio_pop: deletes highest priority item from the array, and if it is item
2520  // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2521  // and mark. Assumes the array has already been tested for emptiness; no failure.
2522  void prio_pop() {
2523  if (prio_use_tail()) {
2524  // there are newly pushed elements; last one higher than top
2525  // copy the data
2526  this->destroy_item(this->my_tail-1);
2527  --(this->my_tail);
2528  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2529  return;
2530  }
2531  this->destroy_item(0);
2532  if(this->my_tail > 1) {
2533  // push the last element down heap
2534  __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2535  this->move_item(0,this->my_tail - 1);
2536  }
2537  --(this->my_tail);
2538  if(mark > this->my_tail) --mark;
2539  if (this->my_tail > 1) // don't reheap for heap of size 1
2540  reheap();
2541  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2542  }
2543 
2544  const T& prio() {
2545  return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2546  }
2547 
2548  // turn array into heap
2549  void heapify() {
2550  if(this->my_tail == 0) {
2551  mark = 0;
2552  return;
2553  }
2554  if (!mark) mark = 1;
2555  for (; mark<this->my_tail; ++mark) { // for each unheaped element
2556  size_type cur_pos = mark;
2557  input_type to_place;
2558  this->fetch_item(mark,to_place);
2559  do { // push to_place up the heap
2560  size_type parent = (cur_pos-1)>>1;
2561  if (!compare(this->get_my_item(parent), to_place))
2562  break;
2563  this->move_item(cur_pos, parent);
2564  cur_pos = parent;
2565  } while( cur_pos );
2566  (void) this->place_item(cur_pos, to_place);
2567  }
2568  }
2569 
2570  // otherwise heapified array with new root element; rearrange to heap
2571  void reheap() {
2572  size_type cur_pos=0, child=1;
2573  while (child < mark) {
2574  size_type target = child;
2575  if (child+1<mark &&
2576  compare(this->get_my_item(child),
2577  this->get_my_item(child+1)))
2578  ++target;
2579  // target now has the higher priority child
2580  if (compare(this->get_my_item(target),
2581  this->get_my_item(cur_pos)))
2582  break;
2583  // swap
2584  this->swap_items(cur_pos, target);
2585  cur_pos = target;
2586  child = (cur_pos<<1)+1;
2587  }
2588  }
2589 }; // priority_queue_node
2590 
2591 } // interfaceX
2592 
2593 namespace interface11 {
2594 
2596 
2599 template< typename T, typename DecrementType=continue_msg >
2600 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2601 public:
2602  typedef T input_type;
2603  typedef T output_type;
2606 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2607  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2608  typedef typename sender<output_type>::built_successors_type built_successors_type;
2609  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2610  typedef typename sender<output_type>::successor_list_type successor_list_type;
2611 #endif
2612  //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2613 
2614 private:
2616  size_t my_count; //number of successful puts
2617  size_t my_tries; //number of active put attempts
2621  __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
2622 
2624 
2625  // Let decrementer call decrement_counter()
2626  friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
2627 
2628  bool check_conditions() { // always called under lock
2629  return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2630  }
2631 
2632  // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2634  input_type v;
2635  task *rval = NULL;
2636  bool reserved = false;
2637  {
2639  if ( check_conditions() )
2640  ++my_tries;
2641  else
2642  return NULL;
2643  }
2644 
2645  //SUCCESS
2646  // if we can reserve and can put, we consume the reservation
2647  // we increment the count and decrement the tries
2648  if ( (my_predecessors.try_reserve(v)) == true ){
2649  reserved=true;
2650  if ( (rval = my_successors.try_put_task(v)) != NULL ){
2651  {
2653  ++my_count;
2654  --my_tries;
2655  my_predecessors.try_consume();
2656  if ( check_conditions() ) {
2657  if ( internal::is_graph_active(this->my_graph) ) {
2658  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2661  }
2662  }
2663  }
2664  return rval;
2665  }
2666  }
2667  //FAILURE
2668  //if we can't reserve, we decrement the tries
2669  //if we can reserve but can't put, we decrement the tries and release the reservation
2670  {
2672  --my_tries;
2673  if (reserved) my_predecessors.try_release();
2674  if ( check_conditions() ) {
2675  if ( internal::is_graph_active(this->my_graph) ) {
2676  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2678  __TBB_ASSERT(!rval, "Have two tasks to handle");
2679  return rtask;
2680  }
2681  }
2682  return rval;
2683  }
2684  }
2685 
2686  void forward() {
2687  __TBB_ASSERT(false, "Should never be called");
2688  return;
2689  }
2690 
2691  task* decrement_counter( long long delta ) {
2692  {
2694  if( delta > 0 && size_t(delta) > my_count )
2695  my_count = 0;
2696  else if( delta < 0 && size_t(delta) > my_threshold - my_count )
2698  else
2699  my_count -= size_t(delta); // absolute value of delta is sufficiently small
2700  }
2701  return forward_task();
2702  }
2703 
2704  void initialize() {
2705  my_predecessors.set_owner(this);
2706  my_successors.set_owner(this);
2707  decrement.set_owner(this);
2709  CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2710  static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2711  static_cast<sender<output_type> *>(this)
2712  );
2713  }
2714 public:
2717 
2718 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
2720  "Deprecated interface of the limiter node can be used only in conjunction "
2721  "with continue_msg as the type of DecrementType template parameter." );
2722 #endif // Check for incompatible interface
2723 
2726  __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
2727  : graph_node(g), my_threshold(threshold), my_count(0),
2729  my_tries(0), decrement(),
2730  init_decrement_predecessors(num_decrement_predecessors),
2731  decrement(num_decrement_predecessors)) {
2732  initialize();
2733  }
2734 
2735 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2736  template <typename... Args>
2737  limiter_node(const node_set<Args...>& nodes, size_t threshold)
2738  : limiter_node(nodes.graph_reference(), threshold) {
2739  make_edges_in_order(nodes, *this);
2740  }
2741 #endif
2742 
2744  limiter_node( const limiter_node& src ) :
2745  graph_node(src.my_graph), receiver<T>(), sender<T>(),
2748  my_tries(0), decrement(),
2749  init_decrement_predecessors(src.init_decrement_predecessors),
2750  decrement(src.init_decrement_predecessors)) {
2751  initialize();
2752  }
2753 
2754 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2755  void set_name( const char *name ) __TBB_override {
2757  }
2758 #endif
2759 
2763  bool was_empty = my_successors.empty();
2765  //spawn a forward task if this is the only successor
2766  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2767  if ( internal::is_graph_active(this->my_graph) ) {
2768  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2771  }
2772  }
2773  return true;
2774  }
2775 
2777 
2779  r.remove_predecessor(*this);
2781  return true;
2782  }
2783 
2784 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2785  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2786  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
2787 
2788  void internal_add_built_successor(successor_type &src) __TBB_override {
2789  my_successors.internal_add_built_successor(src);
2790  }
2791 
2792  void internal_delete_built_successor(successor_type &src) __TBB_override {
2793  my_successors.internal_delete_built_successor(src);
2794  }
2795 
2796  size_t successor_count() __TBB_override { return my_successors.successor_count(); }
2797 
2798  void copy_successors(successor_list_type &v) __TBB_override {
2799  my_successors.copy_successors(v);
2800  }
2801 
2802  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
2803  my_predecessors.internal_add_built_predecessor(src);
2804  }
2805 
2806  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
2807  my_predecessors.internal_delete_built_predecessor(src);
2808  }
2809 
2810  size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
2811 
2812  void copy_predecessors(predecessor_list_type &v) __TBB_override {
2813  my_predecessors.copy_predecessors(v);
2814  }
2815 
2816  void extract() __TBB_override {
2817  my_count = 0;
2818  my_successors.built_successors().sender_extract(*this);
2819  my_predecessors.built_predecessors().receiver_extract(*this);
2820  decrement.built_predecessors().receiver_extract(decrement);
2821  }
2822 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2823 
2827  my_predecessors.add( src );
2829  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2832  }
2833  return true;
2834  }
2835 
2838  my_predecessors.remove( src );
2839  return true;
2840  }
2841 
2842 protected:
2843 
2844  template< typename R, typename B > friend class run_and_put_task;
2845  template<typename X, typename Y> friend class internal::broadcast_cache;
2846  template<typename X, typename Y> friend class internal::round_robin_cache;
2849  {
2851  if ( my_count + my_tries >= my_threshold )
2852  return NULL;
2853  else
2854  ++my_tries;
2855  }
2856 
2857  task * rtask = my_successors.try_put_task(t);
2858 
2859  if ( !rtask ) { // try_put_task failed.
2861  --my_tries;
2863  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2865  }
2866  }
2867  else {
2869  ++my_count;
2870  --my_tries;
2871  }
2872  return rtask;
2873  }
2874 
2876 
2878  __TBB_ASSERT(false,NULL); // should never be called
2879  }
2880 
2882  my_count = 0;
2883  if(f & rf_clear_edges) {
2884  my_predecessors.clear();
2885  my_successors.clear();
2886  }
2887  else
2888  {
2889  my_predecessors.reset( );
2890  }
2891  decrement.reset_receiver(f);
2892  }
2893 }; // limiter_node
2894 
2896 
2900 using internal::input_port;
2901 using internal::tag_value;
2902 
2903 template<typename OutputTuple, typename JP=queueing> class join_node;
2904 
2905 template<typename OutputTuple>
2906 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2907 private:
2910 public:
2911  typedef OutputTuple output_type;
2914  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2915  this->input_ports(), static_cast< sender< output_type > *>(this) );
2916  }
2917 
2918 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2919  template <typename... Args>
2920  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2921  make_edges_in_order(nodes, *this);
2922  }
2923 #endif
2924 
2926  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2927  this->input_ports(), static_cast< sender< output_type > *>(this) );
2928  }
2929 
2930 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2931  void set_name( const char *name ) __TBB_override {
2933  }
2934 #endif
2935 
2936 };
2937 
2938 template<typename OutputTuple>
2939 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2940 private:
2943 public:
2944  typedef OutputTuple output_type;
2947  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2948  this->input_ports(), static_cast< sender< output_type > *>(this) );
2949  }
2950 
2951 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2952  template <typename... Args>
2953  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2954  make_edges_in_order(nodes, *this);
2955  }
2956 #endif
2957 
2959  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2960  this->input_ports(), static_cast< sender< output_type > *>(this) );
2961  }
2962 
2963 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2964  void set_name( const char *name ) __TBB_override {
2966  }
2967 #endif
2968 
2969 };
2970 
2971 // template for key_matching join_node
2972 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2973 template<typename OutputTuple, typename K, typename KHash>
2974 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2975  key_matching_port, OutputTuple, key_matching<K,KHash> > {
2976 private:
2979 public:
2980  typedef OutputTuple output_type;
2982 
2983 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2985 
2986 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2987  template <typename... Args>
2988  join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
2989  : join_node(nodes.graph_reference()) {
2990  make_edges_in_order(nodes, *this);
2991  }
2992 #endif
2993 
2994 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2995 
2996  template<typename __TBB_B0, typename __TBB_B1>
2997  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2998  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2999  this->input_ports(), static_cast< sender< output_type > *>(this) );
3000  }
3001  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3002  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3003  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3004  this->input_ports(), static_cast< sender< output_type > *>(this) );
3005  }
3006  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3007  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3008  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3009  this->input_ports(), static_cast< sender< output_type > *>(this) );
3010  }
3011  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3012  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3013  unfolded_type(g, b0, b1, b2, b3, b4) {
3014  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3015  this->input_ports(), static_cast< sender< output_type > *>(this) );
3016  }
3017 #if __TBB_VARIADIC_MAX >= 6
3018  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3019  typename __TBB_B5>
3020  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3021  unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3022  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3023  this->input_ports(), static_cast< sender< output_type > *>(this) );
3024  }
3025 #endif
3026 #if __TBB_VARIADIC_MAX >= 7
3027  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3028  typename __TBB_B5, typename __TBB_B6>
3029  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3030  unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3031  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3032  this->input_ports(), static_cast< sender< output_type > *>(this) );
3033  }
3034 #endif
3035 #if __TBB_VARIADIC_MAX >= 8
3036  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3037  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3038  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3039  __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3040  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3041  this->input_ports(), static_cast< sender< output_type > *>(this) );
3042  }
3043 #endif
3044 #if __TBB_VARIADIC_MAX >= 9
3045  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3046  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3047  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3048  __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3049  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3050  this->input_ports(), static_cast< sender< output_type > *>(this) );
3051  }
3052 #endif
3053 #if __TBB_VARIADIC_MAX >= 10
3054  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3055  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3056  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3057  __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3058  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3059  this->input_ports(), static_cast< sender< output_type > *>(this) );
3060  }
3061 #endif
3062 
3063 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3064  template <typename... Args, typename... Bodies>
3065  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3066  : join_node(nodes.graph_reference(), bodies...) {
3067  make_edges_in_order(nodes, *this);
3068  }
3069 #endif
3070 
3072  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3073  this->input_ports(), static_cast< sender< output_type > *>(this) );
3074  }
3075 
3076 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3077  void set_name( const char *name ) __TBB_override {
3079  }
3080 #endif
3081 
3082 };
3083 
3084 // indexer node
3086 
3087 // TODO: Implement interface with variadic template or tuple
3088 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3089  typename T4=null_type, typename T5=null_type, typename T6=null_type,
3090  typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3091 
3092 //indexer node specializations
3093 template<typename T0>
3094 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3095 private:
3096  static const int N = 1;
3097 public:
3098  typedef tuple<T0> InputTuple;
3102  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3103  this->input_ports(), static_cast< sender< output_type > *>(this) );
3104  }
3105 
3106 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3107  template <typename... Args>
3108  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3109  make_edges_in_order(nodes, *this);
3110  }
3111 #endif
3112 
3113  // Copy constructor
3115  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3116  this->input_ports(), static_cast< sender< output_type > *>(this) );
3117  }
3118 
3119 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3120  void set_name( const char *name ) __TBB_override {
3122  }
3123 #endif
3124 };
3125 
3126 template<typename T0, typename T1>
3127 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3128 private:
3129  static const int N = 2;
3130 public:
3131  typedef tuple<T0, T1> InputTuple;
3135  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3136  this->input_ports(), static_cast< sender< output_type > *>(this) );
3137  }
3138 
3139 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3140  template <typename... Args>
3141  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3142  make_edges_in_order(nodes, *this);
3143  }
3144 #endif
3145 
3146  // Copy constructor
3148  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3149  this->input_ports(), static_cast< sender< output_type > *>(this) );
3150  }
3151 
3152 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3153  void set_name( const char *name ) __TBB_override {
3155  }
3156 #endif
3157 };
3158 
3159 template<typename T0, typename T1, typename T2>
3160 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3161 private:
3162  static const int N = 3;
3163 public:
3164  typedef tuple<T0, T1, T2> InputTuple;
3168  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3169  this->input_ports(), static_cast< sender< output_type > *>(this) );
3170  }
3171 
3172 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3173  template <typename... Args>
3174  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3175  make_edges_in_order(nodes, *this);
3176  }
3177 #endif
3178 
3179  // Copy constructor
3181  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3182  this->input_ports(), static_cast< sender< output_type > *>(this) );
3183  }
3184 
3185 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3186  void set_name( const char *name ) __TBB_override {
3188  }
3189 #endif
3190 };
3191 
3192 template<typename T0, typename T1, typename T2, typename T3>
3193 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3194 private:
3195  static const int N = 4;
3196 public:
3197  typedef tuple<T0, T1, T2, T3> InputTuple;
3201  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3202  this->input_ports(), static_cast< sender< output_type > *>(this) );
3203  }
3204 
3205 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3206  template <typename... Args>
3207  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3208  make_edges_in_order(nodes, *this);
3209  }
3210 #endif
3211 
3212  // Copy constructor
3214  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3215  this->input_ports(), static_cast< sender< output_type > *>(this) );
3216  }
3217 
3218 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3219  void set_name( const char *name ) __TBB_override {
3221  }
3222 #endif
3223 };
3224 
3225 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3226 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3227 private:
3228  static const int N = 5;
3229 public:
3230  typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3234  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3235  this->input_ports(), static_cast< sender< output_type > *>(this) );
3236  }
3237 
3238 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3239  template <typename... Args>
3240  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3241  make_edges_in_order(nodes, *this);
3242  }
3243 #endif
3244 
3245  // Copy constructor
3247  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3248  this->input_ports(), static_cast< sender< output_type > *>(this) );
3249  }
3250 
3251 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3252  void set_name( const char *name ) __TBB_override {
3254  }
3255 #endif
3256 };
3257 
3258 #if __TBB_VARIADIC_MAX >= 6
3259 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3260 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3261 private:
3262  static const int N = 6;
3263 public:
3264  typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3268  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3269  this->input_ports(), static_cast< sender< output_type > *>(this) );
3270  }
3271 
3272 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3273  template <typename... Args>
3274  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3275  make_edges_in_order(nodes, *this);
3276  }
3277 #endif
3278 
3279  // Copy constructor
3281  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3282  this->input_ports(), static_cast< sender< output_type > *>(this) );
3283  }
3284 
3285 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3286  void set_name( const char *name ) __TBB_override {
3288  }
3289 #endif
3290 };
3291 #endif //variadic max 6
3292 
3293 #if __TBB_VARIADIC_MAX >= 7
3294 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3295  typename T6>
3296 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3297 private:
3298  static const int N = 7;
3299 public:
3300  typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3304  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3305  this->input_ports(), static_cast< sender< output_type > *>(this) );
3306  }
3307 
3308 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3309  template <typename... Args>
3310  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3311  make_edges_in_order(nodes, *this);
3312  }
3313 #endif
3314 
3315  // Copy constructor
3317  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3318  this->input_ports(), static_cast< sender< output_type > *>(this) );
3319  }
3320 
3321 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3322  void set_name( const char *name ) __TBB_override {
3324  }
3325 #endif
3326 };
3327 #endif //variadic max 7
3328 
3329 #if __TBB_VARIADIC_MAX >= 8
3330 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3331  typename T6, typename T7>
3332 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3333 private:
3334  static const int N = 8;
3335 public:
3336  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3340  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3341  this->input_ports(), static_cast< sender< output_type > *>(this) );
3342  }
3343 
3344 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3345  template <typename... Args>
3346  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3347  make_edges_in_order(nodes, *this);
3348  }
3349 #endif
3350 
3351  // Copy constructor
3352  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3353  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3354  this->input_ports(), static_cast< sender< output_type > *>(this) );
3355  }
3356 
3357 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3358  void set_name( const char *name ) __TBB_override {
3360  }
3361 #endif
3362 };
3363 #endif //variadic max 8
3364 
3365 #if __TBB_VARIADIC_MAX >= 9
3366 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3367  typename T6, typename T7, typename T8>
3368 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3369 private:
3370  static const int N = 9;
3371 public:
3372  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3376  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3377  this->input_ports(), static_cast< sender< output_type > *>(this) );
3378  }
3379 
3380 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3381  template <typename... Args>
3382  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3383  make_edges_in_order(nodes, *this);
3384  }
3385 #endif
3386 
3387  // Copy constructor
3389  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3390  this->input_ports(), static_cast< sender< output_type > *>(this) );
3391  }
3392 
3393 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3394  void set_name( const char *name ) __TBB_override {
3396  }
3397 #endif
3398 };
3399 #endif //variadic max 9
3400 
3401 #if __TBB_VARIADIC_MAX >= 10
3402 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3403  typename T6, typename T7, typename T8, typename T9>
3404 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3405 private:
3406  static const int N = 10;
3407 public:
3408  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3412  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3413  this->input_ports(), static_cast< sender< output_type > *>(this) );
3414  }
3415 
3416 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3417  template <typename... Args>
3418  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3419  make_edges_in_order(nodes, *this);
3420  }
3421 #endif
3422 
3423  // Copy constructor
3425  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3426  this->input_ports(), static_cast< sender< output_type > *>(this) );
3427  }
3428 
3429 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3430  void set_name( const char *name ) __TBB_override {
3432  }
3433 #endif
3434 };
3435 #endif //variadic max 10
3436 
3437 #if __TBB_PREVIEW_ASYNC_MSG
3439 #else
3440 template< typename T >
3441 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3442 #endif
3443 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3444  s.internal_add_built_predecessor(p);
3445  p.internal_add_built_successor(s);
3446 #endif
3447  p.register_successor( s );
3449 }
3450 
3452 template< typename T >
3453 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3454  internal_make_edge( p, s );
3455 }
3456 
3457 #if __TBB_PREVIEW_ASYNC_MSG
3458 template< typename TS, typename TR,
3461 inline void make_edge( TS &p, TR &s ) {
3462  internal_make_edge( p, s );
3463 }
3464 
3465 template< typename T >
3467  internal_make_edge( p, s );
3468 }
3469 
3470 template< typename T >
3472  internal_make_edge( p, s );
3473 }
3474 
3475 #endif // __TBB_PREVIEW_ASYNC_MSG
3476 
3477 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3478 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3479 template< typename T, typename V,
3480  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3481 inline void make_edge( T& output, V& input) {
3482  make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3483 }
3484 
3485 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3486 template< typename T, typename R,
3487  typename = typename T::output_ports_type >
3488 inline void make_edge( T& output, receiver<R>& input) {
3489  make_edge(get<0>(output.output_ports()), input);
3490 }
3491 
3492 //Makes an edge from a sender to port 0 of a multi-input successor.
3493 template< typename S, typename V,
3494  typename = typename V::input_ports_type >
3495 inline void make_edge( sender<S>& output, V& input) {
3496  make_edge(output, get<0>(input.input_ports()));
3497 }
3498 #endif
3499 
3500 #if __TBB_PREVIEW_ASYNC_MSG
3502 #else
3503 template< typename T >
3504 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3505 #endif
3506  p.remove_successor( s );
3507 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3508  // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3509  p.internal_delete_built_successor(s);
3510  s.internal_delete_built_predecessor(p);
3511 #endif
3513 }
3514 
3516 template< typename T >
3517 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3518  internal_remove_edge( p, s );
3519 }
3520 
3521 #if __TBB_PREVIEW_ASYNC_MSG
3522 template< typename TS, typename TR,
3525 inline void remove_edge( TS &p, TR &s ) {
3526  internal_remove_edge( p, s );
3527 }
3528 
3529 template< typename T >
3531  internal_remove_edge( p, s );
3532 }
3533 
3534 template< typename T >
3536  internal_remove_edge( p, s );
3537 }
3538 #endif // __TBB_PREVIEW_ASYNC_MSG
3539 
3540 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3541 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3542 template< typename T, typename V,
3543  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3544 inline void remove_edge( T& output, V& input) {
3545  remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3546 }
3547 
3548 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3549 template< typename T, typename R,
3550  typename = typename T::output_ports_type >
3551 inline void remove_edge( T& output, receiver<R>& input) {
3552  remove_edge(get<0>(output.output_ports()), input);
3553 }
3554 //Removes an edge between a sender and port 0 of a multi-input successor.
3555 template< typename S, typename V,
3556  typename = typename V::input_ports_type >
3557 inline void remove_edge( sender<S>& output, V& input) {
3558  remove_edge(output, get<0>(input.input_ports()));
3559 }
3560 #endif
3561 
3562 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3563 template<typename C >
3564 template< typename S >
3565 void internal::edge_container<C>::sender_extract( S &s ) {
3566  edge_list_type e = built_edges;
3567  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3568  remove_edge(s, **i);
3569  }
3570 }
3571 
3572 template<typename C >
3573 template< typename R >
3574 void internal::edge_container<C>::receiver_extract( R &r ) {
3575  edge_list_type e = built_edges;
3576  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3577  remove_edge(**i, r);
3578  }
3579 }
3580 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3581 
3583 template< typename Body, typename Node >
3584 Body copy_body( Node &n ) {
3585  return n.template copy_function_object<Body>();
3586 }
3587 
3588 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3589 
3590 //composite_node
3591 template< typename InputTuple, typename OutputTuple > class composite_node;
3592 
3593 template< typename... InputTypes, typename... OutputTypes>
3594 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3595 
3596 public:
3597  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3598  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3599 
3600 private:
3601  std::unique_ptr<input_ports_type> my_input_ports;
3602  std::unique_ptr<output_ports_type> my_output_ports;
3603 
3604  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3605  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3606 
3607 protected:
3609 
3610 public:
3611 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3612  composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3613  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3615  }
3616 #else
3618  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3619  }
3620 #endif
3621 
3622  template<typename T1, typename T2>
3623  void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
3624  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3625  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3626  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
3627  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
3628 
3631  }
3632 
3633  template< typename... NodeTypes >
3634  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3635 
3636  template< typename... NodeTypes >
3637  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3638 
3639 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3640  void set_name( const char *name ) __TBB_override {
3642  }
3643 #endif
3644 
3646  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3647  return *my_input_ports;
3648  }
3649 
3651  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3652  return *my_output_ports;
3653  }
3654 
3655 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3656  void extract() __TBB_override {
3657  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3658  }
3659 #endif
3660 }; // class composite_node
3661 
3662 //composite_node with only input ports
3663 template< typename... InputTypes>
3664 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
3665 public:
3666  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3667 
3668 private:
3669  std::unique_ptr<input_ports_type> my_input_ports;
3670  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3671 
3672 protected:
3674 
3675 public:
3676 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3677  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3678  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3680  }
3681 #else
3683  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3684  }
3685 #endif
3686 
3687  template<typename T>
3688  void set_external_ports(T&& input_ports_tuple) {
3689  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3690 
3691  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
3692 
3693  tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
3694  }
3695 
3696  template< typename... NodeTypes >
3697  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3698 
3699  template< typename... NodeTypes >
3700  void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3701 
3702 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3703  void set_name( const char *name ) __TBB_override {
3705  }
3706 #endif
3707 
3709  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3710  return *my_input_ports;
3711  }
3712 
3713 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3714  void extract() __TBB_override {
3715  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3716  }
3717 #endif
3718 
3719 }; // class composite_node
3720 
3721 //composite_nodes with only output_ports
3722 template<typename... OutputTypes>
3723 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
3724 public:
3725  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3726 
3727 private:
3728  std::unique_ptr<output_ports_type> my_output_ports;
3729  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3730 
3731 protected:
3733 
3734 public:
3735 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3736  __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3737  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3739  }
3740 #else
3742  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3743  }
3744 #endif
3745 
3746  template<typename T>
3747  void set_external_ports(T&& output_ports_tuple) {
3748  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3749 
3750  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
3751 
3752  tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
3753  }
3754 
3755  template<typename... NodeTypes >
3756  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3757 
3758  template<typename... NodeTypes >
3759  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3760 
3761 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3762  void set_name( const char *name ) __TBB_override {
3764  }
3765 #endif
3766 
3768  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3769  return *my_output_ports;
3770  }
3771 
3772 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3773  void extract() __TBB_override {
3774  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3775  }
3776 #endif
3777 
3778 }; // class composite_node
3779 
3780 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
3781 
3782 namespace internal {
3783 
3784 template<typename Gateway>
3786 public:
3787  typedef Gateway gateway_type;
3788 
3789  async_body_base(gateway_type *gateway): my_gateway(gateway) { }
3790  void set_gateway(gateway_type *gateway) {
3791  my_gateway = gateway;
3792  }
3793 
3794 protected:
3796 };
3797 
3798 template<typename Input, typename Ports, typename Gateway, typename Body>
3799 class async_body: public async_body_base<Gateway> {
3800 public:
3802  typedef Gateway gateway_type;
3803 
3804  async_body(const Body &body, gateway_type *gateway)
3805  : base_type(gateway), my_body(body) { }
3806 
3807  void operator()( const Input &v, Ports & ) {
3808  my_body(v, *this->my_gateway);
3809  }
3810 
3811  Body get_body() { return my_body; }
3812 
3813 private:
3814  Body my_body;
3815 };
3816 
3817 } // namespace internal
3818 
3819 } // namespace interfaceX
3820 namespace interface11 {
3821 
3823 template < typename Input, typename Output,
3824  typename Policy = queueing_lightweight,
3825  typename Allocator=cache_aligned_allocator<Input> >
3826 class async_node : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output > {
3829 
3830 public:
3831  typedef Input input_type;
3832  typedef Output output_type;
3839 
3840 private:
3844  // TODO: pass value by copy since we do not want to block asynchronous thread.
3845  const Output *value;
3846  bool result;
3847  try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
3848  void operator()() {
3849  result = port->try_put(*value);
3850  }
3851  };
3852 
3853  class receiver_gateway_impl: public receiver_gateway<Output> {
3854  public:
3855  receiver_gateway_impl(async_node* node): my_node(node) {}
3857  tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3858  my_node->my_graph.reserve_wait();
3859  }
3860 
3862  my_node->my_graph.release_wait();
3863  tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3864  }
3865 
3867  bool try_put(const Output &i) __TBB_override {
3868  return my_node->try_put_impl(i);
3869  }
3870 
3871  private:
3873  } my_gateway;
3874 
3875  //The substitute of 'this' for member construction, to prevent compiler warnings
3876  async_node* self() { return this; }
3877 
3879  bool try_put_impl(const Output &i) {
3880  internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
3881  internal::broadcast_cache<output_type>& port_successors = port_0.successors();
3883  task_list tasks;
3884  bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
3885  __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
3886  "Return status is inconsistent with the method operation." );
3887 
3888  while( !tasks.empty() ) {
3889  internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
3890  }
3891  tbb::internal::fgt_async_try_put_end(this, &port_0);
3892  return is_at_least_one_put_successful;
3893  }
3894 
3895 public:
3896  template<typename Body>
3898  graph &g, size_t concurrency,
3900  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
3901 #else
3903 #endif
3904  ) : base_type(
3905  g, concurrency,
3906  internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3907  (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
3908  tbb::internal::fgt_multioutput_node_with_body<1>(
3909  CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
3910  &this->my_graph, static_cast<receiver<input_type> *>(this),
3911  this->output_ports(), this->my_body
3912  );
3913  }
3914 
3915 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
3916  template <typename Body, typename... Args>
3917  __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
3918  : async_node(g, concurrency, body, Policy(), priority) {}
3919 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3920 
3921 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3922  template <typename Body, typename... Args>
3924  const node_set<Args...>& nodes, size_t concurrency, Body body,
3926  ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
3927  make_edges_in_order(nodes, *this);
3928  }
3929 
3930 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3931  template <typename Body, typename... Args>
3932  __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
3933  : async_node(nodes, concurrency, body, Policy(), priority) {}
3934 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3935 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3936 
3937  __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
3938  static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3939  static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3940 
3941  tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
3942  &this->my_graph, static_cast<receiver<input_type> *>(this),
3943  this->output_ports(), this->my_body );
3944  }
3945 
3947  return my_gateway;
3948  }
3949 
3950 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3951  void set_name( const char *name ) __TBB_override {
3953  }
3954 #endif
3955 
3956  // Define sender< Output >
3957 
3960  return internal::output_port<0>(*this).register_successor(r);
3961  }
3962 
3965  return internal::output_port<0>(*this).remove_successor(r);
3966  }
3967 
3968  template<typename Body>
3972  mfn_body_type &body_ref = *this->my_body;
3973  async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3974  return ab.get_body();
3975  }
3976 
3977 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3978  typedef typename internal::edge_container<successor_type> built_successors_type;
3980  typedef typename built_successors_type::edge_list_type successor_list_type;
3981  built_successors_type &built_successors() __TBB_override {
3982  return internal::output_port<0>(*this).built_successors();
3983  }
3984 
3985  void internal_add_built_successor( successor_type &r ) __TBB_override {
3986  internal::output_port<0>(*this).internal_add_built_successor(r);
3987  }
3988 
3989  void internal_delete_built_successor( successor_type &r ) __TBB_override {
3990  internal::output_port<0>(*this).internal_delete_built_successor(r);
3991  }
3992 
3993  void copy_successors( successor_list_type &l ) __TBB_override {
3994  internal::output_port<0>(*this).copy_successors(l);
3995  }
3996 
3997  size_t successor_count() __TBB_override {
3998  return internal::output_port<0>(*this).successor_count();
3999  }
4000 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4001 
4002 protected:
4003 
4005  base_type::reset_node(f);
4006  }
4007 };
4008 
4009 #if __TBB_PREVIEW_STREAMING_NODE
4011 #endif // __TBB_PREVIEW_STREAMING_NODE
4012 
4014 
4015 template< typename T >
4016 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4017 public:
4018  typedef T input_type;
4019  typedef T output_type;
4022 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4023  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4024  typedef typename sender<output_type>::built_successors_type built_successors_type;
4025  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4026  typedef typename sender<output_type>::successor_list_type successor_list_type;
4027 #endif
4028 
4029  __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4030  my_successors.set_owner( this );
4031  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4032  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4033  }
4034 
4035 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4036  template <typename... Args>
4037  overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4038  make_edges_in_order(nodes, *this);
4039  }
4040 #endif
4041 
4044  graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4045  {
4046  my_successors.set_owner( this );
4047  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4048  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4049  }
4050 
4052 
4053 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4054  void set_name( const char *name ) __TBB_override {
4056  }
4057 #endif
4058 
4060  spin_mutex::scoped_lock l( my_mutex );
4061  if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4062  // We have a valid value that must be forwarded immediately.
4063  bool ret = s.try_put( my_buffer );
4064  if ( ret ) {
4065  // We add the successor that accepted our put
4066  my_successors.register_successor( s );
4067  } else {
4068  // In case of reservation a race between the moment of reservation and register_successor can appear,
4069  // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4070  // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4071  // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4072  task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4073  register_predecessor_task( *this, s );
4074  internal::spawn_in_graph_arena( my_graph, *rtask );
4075  }
4076  } else {
4077  // No valid value yet, just add as successor
4078  my_successors.register_successor( s );
4079  }
4080  return true;
4081  }
4082 
4084  spin_mutex::scoped_lock l( my_mutex );
4085  my_successors.remove_successor(s);
4086  return true;
4087  }
4088 
4089 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4090  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4091  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
4092 
4093  void internal_add_built_successor( successor_type &s) __TBB_override {
4094  spin_mutex::scoped_lock l( my_mutex );
4095  my_successors.internal_add_built_successor(s);
4096  }
4097 
4098  void internal_delete_built_successor( successor_type &s) __TBB_override {
4099  spin_mutex::scoped_lock l( my_mutex );
4100  my_successors.internal_delete_built_successor(s);
4101  }
4102 
4103  size_t successor_count() __TBB_override {
4104  spin_mutex::scoped_lock l( my_mutex );
4105  return my_successors.successor_count();
4106  }
4107 
4108  void copy_successors(successor_list_type &v) __TBB_override {
4109  spin_mutex::scoped_lock l( my_mutex );
4110  my_successors.copy_successors(v);
4111  }
4112 
4113  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4114  spin_mutex::scoped_lock l( my_mutex );
4115  my_built_predecessors.add_edge(p);
4116  }
4117 
4118  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4119  spin_mutex::scoped_lock l( my_mutex );
4120  my_built_predecessors.delete_edge(p);
4121  }
4122 
4123  size_t predecessor_count() __TBB_override {
4124  spin_mutex::scoped_lock l( my_mutex );
4125  return my_built_predecessors.edge_count();
4126  }
4127 
4128  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4129  spin_mutex::scoped_lock l( my_mutex );
4130  my_built_predecessors.copy_edges(v);
4131  }
4132 
4133  void extract() __TBB_override {
4134  my_buffer_is_valid = false;
4135  built_successors().sender_extract(*this);
4136  built_predecessors().receiver_extract(*this);
4137  }
4138 
4139 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4140 
4142  spin_mutex::scoped_lock l( my_mutex );
4143  if ( my_buffer_is_valid ) {
4144  v = my_buffer;
4145  return true;
4146  }
4147  return false;
4148  }
4149 
4152  return try_get(v);
4153  }
4154 
4156  bool try_release() __TBB_override { return true; }
4157 
4159  bool try_consume() __TBB_override { return true; }
4160 
4161  bool is_valid() {
4162  spin_mutex::scoped_lock l( my_mutex );
4163  return my_buffer_is_valid;
4164  }
4165 
4166  void clear() {
4167  spin_mutex::scoped_lock l( my_mutex );
4168  my_buffer_is_valid = false;
4169  }
4170 
4171 protected:
4172 
4173  template< typename R, typename B > friend class run_and_put_task;
4174  template<typename X, typename Y> friend class internal::broadcast_cache;
4175  template<typename X, typename Y> friend class internal::round_robin_cache;
4178  return try_put_task_impl(v);
4179  }
4180 
4182  my_buffer = v;
4183  my_buffer_is_valid = true;
4184  task * rtask = my_successors.try_put_task(v);
4185  if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4186  return rtask;
4187  }
4188 
4190  return my_graph;
4191  }
4192 
4195 
4197  o(owner), s(succ) {};
4198 
4200  if (!s.register_predecessor(o)) {
4201  o.register_successor(s);
4202  }
4203  return NULL;
4204  }
4205 
4208  };
4209 
4212 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4213  internal::edge_container<predecessor_type> my_built_predecessors;
4214 #endif
4218 
4220  my_buffer_is_valid = false;
4221  if (f&rf_clear_edges) {
4222  my_successors.clear();
4223  }
4224  }
4225 }; // overwrite_node
4226 
4227 template< typename T >
4228 class write_once_node : public overwrite_node<T> {
4229 public:
4230  typedef T input_type;
4231  typedef T output_type;
4235 
4238  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4239  static_cast<receiver<input_type> *>(this),
4240  static_cast<sender<output_type> *>(this) );
4241  }
4242 
4243 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4244  template <typename... Args>
4245  write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4246  make_edges_in_order(nodes, *this);
4247  }
4248 #endif
4249 
4252  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4253  static_cast<receiver<input_type> *>(this),
4254  static_cast<sender<output_type> *>(this) );
4255  }
4256 
4257 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4258  void set_name( const char *name ) __TBB_override {
4260  }
4261 #endif
4262 
4263 protected:
4264  template< typename R, typename B > friend class run_and_put_task;
4265  template<typename X, typename Y> friend class internal::broadcast_cache;
4266  template<typename X, typename Y> friend class internal::round_robin_cache;
4268  spin_mutex::scoped_lock l( this->my_mutex );
4269  return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4270  }
4271 };
4272 
4273 } // interfaceX
4274 
4279 
4280  using interface11::graph;
4281  using interface11::graph_node;
4282  using interface11::continue_msg;
4283 
4284  using interface11::source_node;
4285  using interface11::function_node;
4286  using interface11::multifunction_node;
4287  using interface11::split_node;
4289  using interface11::indexer_node;
4290  using interface11::internal::tagged_msg;
4293  using interface11::continue_node;
4294  using interface11::overwrite_node;
4295  using interface11::write_once_node;
4296  using interface11::broadcast_node;
4297  using interface11::buffer_node;
4298  using interface11::queue_node;
4299  using interface11::sequencer_node;
4300  using interface11::priority_queue_node;
4301  using interface11::limiter_node;
4302  using namespace interface11::internal::graph_policy_namespace;
4303  using interface11::join_node;
4305  using interface11::copy_body;
4306  using interface11::make_edge;
4309 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
4310  using interface11::composite_node;
4311 #endif
4312  using interface11::async_node;
4313 #if __TBB_PREVIEW_ASYNC_MSG
4314  using interface11::async_msg;
4315 #endif
4316 #if __TBB_PREVIEW_STREAMING_NODE
4317  using interface11::port_ref;
4318  using interface11::streaming_node;
4319 #endif // __TBB_PREVIEW_STREAMING_NODE
4320 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4322  using internal::no_priority;
4323 #endif
4324 
4325 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4326  using interface11::internal::follows;
4327  using interface11::internal::precedes;
4328  using interface11::internal::make_node_set;
4329  using interface11::internal::make_edges;
4330 #endif
4331 
4332 } // flow
4333 } // tbb
4334 
4335 // Include deduction guides for node classes
4337 
4338 #undef __TBB_PFG_RESET_ARG
4339 #undef __TBB_COMMA
4340 
4342 #undef __TBB_flow_graph_H_include_area
4343 
4344 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4345  #undef __TBB_NOINLINE_SYM
4346 #endif
4347 
4348 #endif // __TBB_flow_graph_H
__TBB_NOINLINE_SYM function_node(graph &g, size_t concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority=tbb::flow::internal::no_priority))
Constructor.
Definition: flow_graph.h:1181
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:1685
Implements async node.
Definition: flow_graph.h:3826
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
Definition: flow_graph.h:1940
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:2744
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:705
task * try_put_task(const input_type &) __TBB_override
Definition: flow_graph.h:664
interface11::internal::Policy< queueing, lightweight > queueing_lightweight
Definition: flow_graph.h:88
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1098
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3166
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
Definition: flow_graph.h:1774
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:1993
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:899
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:321
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2604
__TBB_NOINLINE_SYM overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
Definition: flow_graph.h:4043
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:3834
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:805
task that does nothing. Useful for synchronization.
Definition: task.h:1031
internal::port_ref_impl< N1, N2 > port_ref()
Definition: flow_graph.h:42
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1137
iterator end()
end iterator
Definition: flow_graph.h:860
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2451
__TBB_NOINLINE_SYM sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2311
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:3099
Implements methods for an executable node that takes continue_msg as input.
Definition: flow_graph.h:753
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3501
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:3265
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1013
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1447
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3879
static task * try_put_task_wrapper_impl(receiver< T > *const this_recv, const void *p, bool is_async)
Definition: flow_graph.h:237
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
Definition: task.h:777
Used to form groups of tasks.
Definition: task.h:347
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4176
__TBB_DEPRECATED continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:609
__TBB_NOINLINE_SYM broadcast_node(graph &g)
Definition: flow_graph.h:1583
Implements methods for a function node that takes a type Input as input.
Definition: flow_graph.h:638
#define __TBB_DEPRECATED_LIMITER_ARG2(arg1, arg2)
output_ports_type & output_ports()
Definition: flow_graph.h:1397
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:342
size_type size(size_t new_tail=0)
Definition: flow_graph.h:153
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:369
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:4217
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:2716
#define __TBB_NOINLINE_SYM
Definition: flow_graph.h:45
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2418
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:381
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:384
A task that calls a node's apply_body_bypass function with no input.
Definition: flow_graph.h:321
void internal_forward_task(queue_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2231
buffer_node< T, A >::item_type item_type
Definition: flow_graph.h:2419
Base class for tasks generated by graph nodes.
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
Definition: flow_graph.h:1860
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2881
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3374
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3373
async_body_base< Gateway > base_type
Definition: flow_graph.h:3801
__TBB_DEPRECATED typedef T input_type
The input type of this receiver.
Definition: flow_graph.h:459
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2253
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:2942
#define CODEPTR()
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:3007
__TBB_NOINLINE_SYM sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2327
virtual bool try_reserve_wrapper(void *p, bool is_async)=0
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:191
static void fgt_graph(void *)
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:3804
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4020
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:221
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1280
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:2125
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2463
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4083
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:2925
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:2114
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4219
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1040
__TBB_NOINLINE_SYM async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:3897
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:798
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
A list of children.
Definition: task.h:1063
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:766
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2176
tbb::flow::interface11::graph_node * my_nodes_last
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > base_type
Definition: flow_graph.h:1282
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:2618
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3408
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:3165
bool is_graph_active(tbb::flow::interface10::graph &g)
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2180
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1279
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1244
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1823
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
static const node_priority_t no_priority
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:2144
internal::function_input< input_type, output_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1165
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1569
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4234
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:2848
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2431
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1411
const V & cast_to(T const &t)
Definition: flow_graph.h:715
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3280
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4233
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:1979
__TBB_NOINLINE_SYM priority_queue_node(graph &g, const Compare &comp=Compare())
Constructor.
Definition: flow_graph.h:2383
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
Definition: flow_graph.h:1510
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1278
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1169
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:4196
tbb::flow::interface11::graph_node * my_nodes
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3199
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1366
__TBB_NOINLINE_SYM source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
Definition: flow_graph.h:914
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
void increment_ref_count()
Atomically increment reference count.
Definition: task.h:760
The graph class.
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2307
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1618
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:3002
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:3132
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2300
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2427
buffer_node< T, A >::buffer_operation sequencer_operation
Definition: flow_graph.h:2345
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:3029
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1400
Implements methods for both executable and function nodes that puts Output to its successors.
Definition: flow_graph.h:854
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3388
#define __TBB_CPP11_PRESENT
Definition: tbb_config.h:149
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:336
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:4211
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:464
static void fgt_make_edge(void *, void *)
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2191
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2210
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
receiver< input_type > receiver_type
Definition: flow_graph.h:3833
__TBB_DEPRECATED bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:616
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:3842
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:2037
__TBB_DEPRECATED bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:626
buffer_node< T, A >::buffer_operation prio_operation
Definition: flow_graph.h:2420
__TBB_DEPRECATED continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:601
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2488
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2605
virtual task * execute()=0
Does whatever should happen when the threshold is reached.
tbb::flow::interface11::graph_iterator< const graph, const tbb::flow::interface11::graph_node > const_iterator
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:318
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:2761
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3213
static void fgt_composite(void *, void *, void *)
static void fgt_remove_edge(void *, void *)
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3316
virtual void finalize() const
Definition: flow_graph.h:147
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:2135
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:964
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1691
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:441
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:3020
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3246
__TBB_NOINLINE_SYM split_node(const split_node &other)
Definition: flow_graph.h:1383
static tbb::task *const SUCCESSFULLY_ENQUEUED
internal::aggregator< handler_type, buffer_operation > my_aggregator
Definition: flow_graph.h:1767
static void fgt_node_desc(const NodeType *, const char *)
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2379
base_type::size_type size_type
Definition: flow_graph.h:2209
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1349
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial.
Definition: flow_graph.h:98
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:4156
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:4181
__TBB_NOINLINE_SYM continue_node(const continue_node &src)
Copy constructor.
Definition: flow_graph.h:1528
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3410
The base of all graph nodes.
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:3056
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2877
static const T & from_void_ptr(const void *p)
Definition: flow_graph.h:229
static void fgt_reserve_wait(void *)
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:797
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:3828
__TBB_NOINLINE_SYM write_once_node(graph &g)
Constructor.
Definition: flow_graph.h:4237
static void fgt_async_commit(void *, void *)
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:3231
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2423
static void fgt_begin_body(void *)
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3964
__TBB_NOINLINE_SYM overwrite_node(graph &g)
Definition: flow_graph.h:4029
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:955
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:1923
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:4159
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3867
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:422
static void fgt_async_try_put_end(void *, void *)
static void fgt_multioutput_node_desc(const NodeType *, const char *)
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2261
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1405
internal::round_robin_cache< T, null_rw_mutex > my_successors
Definition: flow_graph.h:1717
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:428
__TBB_NOINLINE_SYM multifunction_node(const multifunction_node &other)
Definition: flow_graph.h:1322
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2344
bool try_put(const typename internal::async_helpers< T >::async_type &t)
Definition: flow_graph.h:468
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:3836
iterator begin()
start iterator
Definition: flow_graph.h:858
virtual void internal_reg_succ(buffer_operation *op)
Register successor.
Definition: flow_graph.h:1854
bool remove_successor(successor_type &r) __TBB_override
Removes a successor.
Definition: flow_graph.h:2099
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:4141
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:2620
function_body that takes an Input and a set of output ports
Definition: flow_graph.h:193
An cache of predecessors that supports requests and reservations.
Definition: flow_graph.h:123
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1052
void register_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:812
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
__TBB_NOINLINE_SYM broadcast_node(const broadcast_node &src)
Definition: flow_graph.h:1597
Forwards messages in FIFO order.
Definition: flow_graph.h:2206
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3302
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3411
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4004
virtual task * try_put_task_wrapper(const void *p, bool is_async)=0
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
Definition: flow_graph.h:1837
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2220
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:3301
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3584
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
const_iterator cend() const
end const iterator
Definition: flow_graph.h:868
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:4199
__TBB_NOINLINE_SYM queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2279
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
const item_type & get_my_item(size_t i) const
Definition: flow_graph.h:74
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1689
bool empty() const
True if list is empty; false otherwise.
Definition: task.h:1077
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1077
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:3856
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4021
void set_ref_count(int count)
Set reference count.
Definition: task.h:750
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3100
void reset(tbb::flow::interface11::reset_flags f=tbb::flow::interface11::rf_reset_protocol)
Definition: flow_graph.h:835
void const char const char int ITT_FORMAT __itt_group_sync s
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:473
Forward declaration section.
Definition: flow_graph.h:109
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3133
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2469
__TBB_NOINLINE_SYM buffer_node(graph &g)
Constructor.
Definition: flow_graph.h:2000
__TBB_NOINLINE_SYM function_node(const function_node &src)
Copy constructor.
Definition: flow_graph.h:1215
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:896
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:425
__TBB_NOINLINE_SYM split_node(graph &g)
Definition: flow_graph.h:1368
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:2837
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:3198
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1709
void remove_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:823
void prepare_task_arena(bool reinit=false)
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:461
The two-phase join port.
static task * emit_this(graph &g, const T &t, P &p)
Definition: flow_graph.h:733
tbb::flow::interface11::graph_iterator< graph, tbb::flow::interface11::graph_node > iterator
__TBB_NOINLINE_SYM buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:2016
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3337
void grow_my_array(size_t minimum_size)
Grows the internal array.
Definition: flow_graph.h:160
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1105
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:3047
virtual bool try_get_wrapper(void *p, bool is_async)=0
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1030
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3101
Base class for user-defined tasks.
Definition: task.h:604
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1343
pointer operator->() const
Dereference.
Definition: flow_graph.h:753
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1097
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2262
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:303
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:3012
A cache of successors that are broadcast to.
Definition: flow_graph.h:120
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
Forwards messages of type T to all successors.
Definition: flow_graph.h:1565
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1167
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
Definition: flow_graph.h:1612
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1570
receiver< TupleType > base_type
Definition: flow_graph.h:1351
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:113
__TBB_DEPRECATED typedef T output_type
The output type of this sender.
Definition: flow_graph.h:420
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2292
__TBB_NOINLINE_SYM queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2265
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1260
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2413
tbb::task * root_task()
Returns the root task of the graph.
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3134
bool place_item(size_t here, const item_type &me)
Definition: flow_graph.h:108
__TBB_NOINLINE_SYM continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1453
internal::broadcast_cache< input_type > my_successors
Definition: flow_graph.h:1576
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1556
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2380
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:847
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:652
An empty class used for messages that mean "I'm done".
Definition: flow_graph.h:106
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2244
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3835
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:3838
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1448
static void fgt_async_try_put_begin(void *, void *)
task & pop_front()
Pop the front task from the list.
Definition: task.h:1098
__TBB_DEPRECATED typedef continue_msg input_type
The input type.
Definition: flow_graph.h:595
void deactivate_graph(tbb::flow::interface10::graph &g)
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:996
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2875
__TBB_NOINLINE_SYM multifunction_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:1286
__TBB_NOINLINE_SYM write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:4251
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:2978
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3147
#define __TBB_override
Definition: tbb_stddef.h:240
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:4151
unsigned int node_priority_t
static void fgt_graph_desc(void *, const char *)
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3114
void activate_graph(tbb::flow::interface10::graph &g)
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3338
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1)
Definition: flow_graph.h:2997
bool enqueue_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1827
item_buffer with reservable front-end. NOTE: if reserving, do not
Definition: flow_graph.h:249
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:3827
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:431
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1554
task * decrement_counter(long long delta)
Definition: flow_graph.h:2691
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1242
tbb::task_group_context * my_context
__TBB_NOINLINE_SYM continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1490
Forwards messages in priority order.
Definition: flow_graph.h:2373
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1449
void move_item(size_t to, size_t from)
Definition: flow_graph.h:99
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2235
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:3807
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
Definition: flow_graph.h:1935
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3424
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:1988
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
static void fgt_release_wait(void *)
K key_from_message(const T &t)
Definition: flow_graph.h:713
A cache of predecessors that only supports try_get.
Definition: flow_graph.h:122
A lock that occupies a single byte.
Definition: spin_mutex.h:39
limiter_node(graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
Constructor.
Definition: flow_graph.h:2725
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:866
virtual void handle_operations(buffer_operation *op_list)
Definition: flow_graph.h:1769
An abstract cache of successors.
Definition: flow_graph.h:119
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2348
Enables one or the other code branches.
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:2825
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1441
static void fgt_node(void *, string_index, void *, void *)
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1099
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4059
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:110
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3517
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:3959
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1161
static void fgt_multiinput_multioutput_node(void *, string_index, void *, void *)
virtual bool internal_push(buffer_operation *op)
Definition: flow_graph.h:1964
static void fgt_async_reserve(void *, void *)
~graph()
Destroys the graph.
Definition: flow_graph.h:790
Detects whether two given types are the same.
Implements methods for a function node that takes a type Input as input and sends.
Definition: flow_graph.h:421
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:1765
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2437
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:3847
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1145
Base class for receivers of completion messages.
Definition: flow_graph.h:591
tbb::internal::uint64_t tag_value
Definition: flow_graph.h:29
Input and scheduling for a function node that takes a type Input as input.
Definition: flow_graph.h:61
#define __TBB_DEPRECATED_LIMITER_EXPR(expr)
__TBB_NOINLINE_SYM async_node(const async_node &other)
Definition: flow_graph.h:3937
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void fetch_item(size_t i, item_type &o)
Definition: flow_graph.h:90
friend class scoped_lock
Definition: spin_mutex.h:179
bool gather_successful_try_puts(const X &t, task_list &tasks)
Definition: flow_graph.h:511
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3180
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
Definition: flow_graph.h:1508
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1166
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
Definition: flow_graph.h:1996
static void fgt_node_with_body(void *, string_index, void *, void *, void *)
__TBB_NOINLINE_SYM priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2399
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:3837
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: flow_graph.h:203
reference operator*() const
Dereference.
Definition: flow_graph.h:747
Forwards messages in sequence order.
Definition: flow_graph.h:2299
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:690
virtual void reset_receiver(reset_flags f=rf_reset_protocol)=0
put receiver back in initial state
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:3038
void const char const char int ITT_FORMAT __itt_group_sync p
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:2157
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1446
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:3861
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4267
A cache of successors that are put in a round-robin fashion.
Definition: flow_graph.h:121
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3266
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1708
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:4189
Forwards messages in arbitrary order.
Definition: flow_graph.h:1704
Breaks an infinite loop between the node reservation and register_successor call.
Definition: flow_graph.h:4194
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:1679
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3409
static void fgt_end_body(void *)
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:1412
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:2909
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:362
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2306
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3438
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:2778
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1168
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3232
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:902
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
Definition: flow_graph.h:719
void add_nodes_impl(CompositeType *, bool)
Definition: flow_graph.h:958
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3453
__TBB_DEPRECATED typedef receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:598
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
A task that calls a node's forward_task function.
Definition: flow_graph.h:271
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:1970
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:2958

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.