Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
aggregator.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB__aggregator_H
22 #define __TBB__aggregator_H
23 
24 #if !TBB_PREVIEW_AGGREGATOR
25 #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
26 #endif
27 
28 #include "atomic.h"
29 #include "tbb_profiling.h"
30 
31 namespace tbb {
32 namespace interface6 {
33 
34 using namespace tbb::internal;
35 
37  template<typename handler_type> friend class aggregator_ext;
38  uintptr_t status;
40 public:
41  enum aggregator_operation_status { agg_waiting=0, agg_finished };
42  aggregator_operation() : status(agg_waiting), my_next(NULL) {}
44  void start() { call_itt_notify(acquired, &status); }
46 
47  void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
50 };
51 
52 namespace internal {
53 
55  friend class basic_handler;
56  virtual void apply_body() = 0;
57 public:
59  virtual ~basic_operation_base() {}
60 };
61 
62 template<typename Body>
64  const Body& my_body;
65  void apply_body() __TBB_override { my_body(); }
66 public:
67  basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
68 };
69 
71 public:
73  void operator()(aggregator_operation* op_list) const {
74  while (op_list) {
75  // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
76  // The executing thread "acquires" the tag (see start()) and then performs
77  // the associated operation w/o triggering a race condition diagnostics.
78  // A thread that created the operation is waiting for its status (see execute_impl()),
79  // so when this thread is done with the operation, it will "release" the tag
80  // and update the status (see finish()) to give control back to the waiting thread.
81  basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
82  // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
83  op_list = op_list->next();
84  request.start();
85  request.apply_body();
86  request.finish();
87  }
88  }
89 };
90 
91 } // namespace internal
92 
94 
96 template <typename handler_type>
98 public:
99  aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
100 
102 
103  void process(aggregator_operation *op) { execute_impl(*op); }
104 
105 protected:
110 
111  // ITT note: &(op.status) tag is used to cover accesses to this operation. This
112  // thread has created the operation, and now releases it so that the handler
113  // thread may handle the associated operation w/o triggering a race condition;
114  // thus this tag will be acquired just before the operation is handled in the
115  // handle_operations functor.
117  // insert the operation into the list
118  do {
119  // ITT may flag the following line as a race; it is a false positive:
120  // This is an atomic read; we don't provide itt_hide_load_word for atomics
121  op.my_next = res = mailbox; // NOT A RACE
122  } while (mailbox.compare_and_swap(&op, res) != res);
123  if (!res) { // first in the list; handle the operations
124  // ITT note: &mailbox tag covers access to the handler_busy flag, which this
125  // waiting handler thread will try to set before entering handle_operations.
126  call_itt_notify(acquired, &mailbox);
127  start_handle_operations();
128  __TBB_ASSERT(op.status, NULL);
129  }
130  else { // not first; wait for op to be ready
134  }
135  }
136 
137 
138 private:
141 
143 
144  uintptr_t handler_busy;
145 
146  handler_type handle_operations;
147 
150  aggregator_operation *pending_operations;
151 
152  // ITT note: &handler_busy tag covers access to mailbox as it is passed
153  // between active and waiting handlers. Below, the waiting handler waits until
154  // the active handler releases, and the waiting handler acquires &handler_busy as
155  // it becomes the active_handler. The release point is at the end of this
156  // function, when all operations in mailbox have been handled by the
157  // owner of this aggregator.
158  call_itt_notify(prepare, &handler_busy);
159  // get handler_busy: only one thread can possibly spin here at a time
160  spin_wait_until_eq(handler_busy, uintptr_t(0));
161  call_itt_notify(acquired, &handler_busy);
162  // acquire fence not necessary here due to causality rule and surrounding atomics
163  __TBB_store_with_release(handler_busy, uintptr_t(1));
164 
165  // ITT note: &mailbox tag covers access to the handler_busy flag itself.
166  // Capturing the state of the mailbox signifies that handler_busy has been
167  // set and a new active handler will now process that list's operations.
168  call_itt_notify(releasing, &mailbox);
169  // grab pending_operations
170  pending_operations = mailbox.fetch_and_store(NULL);
171 
172  // handle all the operations
173  handle_operations(pending_operations);
174 
175  // release the handler
176  itt_store_word_with_release(handler_busy, uintptr_t(0));
177  }
178 };
179 
181 class aggregator : private aggregator_ext<internal::basic_handler> {
182 public:
183  aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
185 
187  template<typename Body>
188  void execute(const Body& b) {
189  internal::basic_operation<Body> op(b);
190  this->execute_impl(op);
191  }
192 };
193 
194 } // namespace interface6
195 
196 using interface6::aggregator;
197 using interface6::aggregator_ext;
198 using interface6::aggregator_operation;
199 
200 } // namespace tbb
201 
202 #endif // __TBB__aggregator_H
T itt_hide_load_word(const T &src)
#define __TBB_override
Definition: tbb_stddef.h:244
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function h
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
atomic< aggregator_operation * > mailbox
An atomically updated list (aka mailbox) of aggregator_operations.
Definition: aggregator.h:140
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
Basic aggregator interface.
Definition: aggregator.h:181
void execute(const Body &b)
BASIC INTERFACE: Enter a function for exclusive execution by the aggregator.
Definition: aggregator.h:188
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void execute_impl(aggregator_operation &op)
Definition: aggregator.h:108
T itt_load_word_with_acquire(const tbb::atomic< T > &src)
void operator()(aggregator_operation *op_list) const
Definition: aggregator.h:73
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
void process(aggregator_operation *op)
EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
Definition: aggregator.h:103
The graph class.
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:395
void finish()
Call finish when done handling this operation.
Definition: aggregator.h:47
void start_handle_operations()
Trigger the handling of operations when the handler is free.
Definition: aggregator.h:149
void call_itt_notify(notify_type, void *)
aggregator_operation * my_next
Definition: aggregator.h:39
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:403
Primary template for atomic.
Definition: atomic.h:407
void itt_hide_store_word(T &dst, T src)
uintptr_t handler_busy
Controls thread access to handle_operations.
Definition: aggregator.h:144
aggregator_ext(const handler_type &h)
Definition: aggregator.h:99
Aggregator base class and expert interface.
Definition: aggregator.h:97
aggregator_operation * next()
Definition: aggregator.h:48
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:55
void set_next(aggregator_operation *n)
Definition: aggregator.h:49
void start()
Call start before handling this operation.
Definition: aggregator.h:44
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:717
value_type fetch_and_store(value_type value)
Definition: atomic.h:278

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.