Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
concurrent_monitor.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB_concurrent_monitor_H
22 #define __TBB_concurrent_monitor_H
23 
24 #include "tbb/tbb_stddef.h"
25 #include "tbb/atomic.h"
26 #include "tbb/spin_mutex.h"
27 #include "tbb/tbb_exception.h"
28 #include "tbb/aligned_space.h"
29 
30 #include "semaphore.h"
31 
32 namespace tbb {
33 namespace internal {
34 
36 
38 public:
39  struct node_t {
42  explicit node_t() : next((node_t*)(uintptr_t)0xcdcdcdcd), prev((node_t*)(uintptr_t)0xcdcdcdcd) {}
43  };
44 
45  // ctor
47  // dtor
49 
50  inline size_t size() const {return count;}
51  inline bool empty() const {return size()==0;}
52  inline node_t* front() const {return head.next;}
53  inline node_t* last() const {return head.prev;}
54  inline node_t* begin() const {return front();}
55  inline const node_t* end() const {return &head;}
56 
58  inline void add( node_t* n ) {
60  n->prev = head.prev;
61  n->next = &head;
62  head.prev->next = n;
63  head.prev = n;
64  }
65 
67  inline void remove( node_t& n ) {
68  __TBB_ASSERT( count > 0, "attempt to remove an item from an empty list" );
70  n.prev->next = n.next;
71  n.next->prev = n.prev;
72  }
73 
76  if( const size_t l_count = __TBB_load_relaxed(count) ) {
77  __TBB_store_relaxed(lst.count, l_count);
78  lst.head.next = head.next;
79  lst.head.prev = head.prev;
80  head.next->prev = &lst.head;
81  head.prev->next = &lst.head;
82  clear();
83  }
84  }
85 
87 private:
90 };
91 
94 
96 
98 public:
101  friend class concurrent_monitor;
102  public:
103  thread_context() : skipped_wakeup(false), aborted(false), ready(false), context(0) {
104  epoch = 0;
105  in_waitset = false;
106  }
108  if (ready) {
109  if( skipped_wakeup ) semaphore().P();
111  }
112  }
113  binary_semaphore& semaphore() { return *sema.begin(); }
114  private:
116  // Inlining of the method is undesirable, due to extra instructions for
117  // exception support added at caller side.
118  __TBB_NOINLINE( void init() );
120  __TBB_atomic unsigned epoch;
123  bool aborted;
124  bool ready;
125  uintptr_t context;
126  };
127 
130 
133 
135  void prepare_wait( thread_context& thr, uintptr_t ctx = 0 );
136 
138 
139  inline bool commit_wait( thread_context& thr ) {
140  const bool do_it = thr.epoch == __TBB_load_relaxed(epoch);
141  // this check is just an optimization
142  if( do_it ) {
143  __TBB_ASSERT( thr.ready, "use of commit_wait() without prior prepare_wait()");
144  thr.semaphore().P();
145  __TBB_ASSERT( !thr.in_waitset, "still in the queue?" );
146  if( thr.aborted )
148  } else {
149  cancel_wait( thr );
150  }
151  return do_it;
152  }
154  void cancel_wait( thread_context& thr );
155 
157  template<typename WaitUntil, typename Context>
158  void wait( WaitUntil until, Context on );
159 
162 
164  void notify_one_relaxed();
165 
168 
170  void notify_all_relaxed();
171 
173  template<typename P> void notify( const P& predicate ) {atomic_fence(); notify_relaxed( predicate );}
174 
176  template<typename P> void notify_relaxed( const P& predicate );
177 
180 
182  void abort_all_relaxed();
183 
184 private:
187  __TBB_atomic unsigned epoch;
188  thread_context* to_thread_context( waitset_node_t* n ) { return static_cast<thread_context*>(n); }
189 };
190 
191 template<typename WaitUntil, typename Context>
192 void concurrent_monitor::wait( WaitUntil until, Context on )
193 {
194  bool slept = false;
195  thread_context thr_ctx;
196  prepare_wait( thr_ctx, on() );
197  while( !until() ) {
198  if( (slept = commit_wait( thr_ctx ) )==true )
199  if( until() ) break;
200  slept = false;
201  prepare_wait( thr_ctx, on() );
202  }
203  if( !slept )
204  cancel_wait( thr_ctx );
205 }
206 
207 template<typename P>
208 void concurrent_monitor::notify_relaxed( const P& predicate ) {
209  if( waitset_ec.empty() )
210  return;
211  waitset_t temp;
212  waitset_node_t* nxt;
213  const waitset_node_t* end = waitset_ec.end();
214  {
217  for( waitset_node_t* n=waitset_ec.last(); n!=end; n=nxt ) {
218  nxt = n->prev;
219  thread_context* thr = to_thread_context( n );
220  if( predicate( thr->context ) ) {
221  waitset_ec.remove( *n );
222  thr->in_waitset = false;
223  temp.add( n );
224  }
225  }
226  }
227 
228  end = temp.end();
229  for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
230  nxt = n->next;
231  to_thread_context(n)->semaphore().V();
232  }
233 #if TBB_USE_ASSERT
234  temp.clear();
235 #endif
236 }
237 
238 } // namespace internal
239 } // namespace tbb
240 
241 #endif /* __TBB_concurrent_monitor_H */
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
__TBB_NOINLINE(void init())
The method for lazy initialization of the thread_context's semaphore.
void notify_all_relaxed()
Notify all waiting threads of the event; Relaxed version.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
circular_doubly_linked_list_with_sentinel::node_t waitset_node_t
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void notify_all()
Notify all waiting threads of the event.
void notify_one()
Notify one thread about the event.
void add(node_t *n)
add to the back of the list
thread_context * to_thread_context(waitset_node_t *n)
void atomic_fence()
Sequentially consistent full memory fence.
Definition: tbb_machine.h:343
void prepare_wait(thread_context &thr, uintptr_t ctx=0)
prepare wait by inserting 'thr' into the wait queue
A lock that occupies a single byte.
Definition: spin_mutex.h:40
binary_semaphore for concurrent monitor
Definition: semaphore.h:226
Circular doubly-linked list with sentinel.
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void notify_one_relaxed()
Notify one thread about the event. Relaxed version.
void __TBB_store_relaxed(volatile T &location, V value)
Definition: tbb_machine.h:743
bool commit_wait(thread_context &thr)
Commit wait if event count has not changed; otherwise, cancel wait.
void notify_relaxed(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate; Relaxed version.
The graph class.
void wait(WaitUntil until, Context on)
Wait for a condition to be satisfied with waiting-on context.
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
void abort_all_relaxed()
Abort any sleeping threads at the time of the call; Relaxed version.
void abort_all()
Abort any sleeping threads at the time of the call.
void cancel_wait(thread_context &thr)
Cancel the wait. Removes the thread from the wait queue if not removed yet.
void notify(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate.
T __TBB_load_relaxed(const volatile T &location)
Definition: tbb_machine.h:739
#define __TBB_atomic
Definition: tbb_stddef.h:241
tbb::aligned_space< binary_semaphore > sema
circular_doubly_linked_list_with_sentinel waitset_t
Block of space aligned sufficiently to construct an array T with N elements.
Definition: aligned_space.h:33
void flush_to(circular_doubly_linked_list_with_sentinel &lst)
move all elements to 'lst' and initialize the 'this' list

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.