Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
custom_scheduler.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef _TBB_custom_scheduler_H
18 #define _TBB_custom_scheduler_H
19 
20 #include "scheduler.h"
21 #include "observer_proxy.h"
22 #include "itt_notify.h"
23 
24 namespace tbb {
25 namespace internal {
26 
27 //------------------------------------------------------------------------
29 //------------------------------------------------------------------------
30 
32  static const bool itt_possible = true;
33  static const bool has_slow_atomic = false;
34 };
35 
37  static const bool itt_possible = false;
38 #if __TBB_x86_32||__TBB_x86_64
39  static const bool has_slow_atomic = true;
40 #else
41  static const bool has_slow_atomic = false;
42 #endif /* __TBB_x86_32||__TBB_x86_64 */
43 };
44 
45 //------------------------------------------------------------------------
46 // custom_scheduler
47 //------------------------------------------------------------------------
48 
50 
51 template<typename SchedulerTraits>
54 
55  custom_scheduler( market& m, bool genuine ) : generic_scheduler(m, genuine) {}
56 
58 
61 
63 
66  static_cast<custom_scheduler*>(governor::local_scheduler())->scheduler_type::local_wait_for_all( parent, child );
67  }
68 
70 
73  task_prefix& p = s.prefix();
74  __TBB_ASSERT(p.ref_count > 0, NULL);
75  if( SchedulerTraits::itt_possible )
76  ITT_NOTIFY(sync_releasing, &p.ref_count);
77  if( SchedulerTraits::has_slow_atomic && p.ref_count==1 )
78  p.ref_count=0;
79  else {
80  reference_count old_ref_count = __TBB_FetchAndDecrementWrelease(&p.ref_count);
81 #if __TBB_PREVIEW_RESUMABLE_TASKS
82  if (old_ref_count == internal::abandon_flag + 2) {
83  // Remove the abandon flag.
84  p.ref_count = 1;
85  // The wait has been completed. Spawn a resume task.
86  tbb::task::resume(p.abandoned_scheduler);
87  return;
88  }
89 #endif
90  if (old_ref_count > 1) {
91  // more references exist
92  // '__TBB_cl_evict(&p)' degraded performance of parallel_preorder example
93  return;
94  }
95  }
96 
97  // Ordering on p.ref_count (superfluous if SchedulerTraits::has_slow_atomic)
99  __TBB_ASSERT(p.ref_count==0, "completion of task caused predecessor's reference count to underflow");
100  if( SchedulerTraits::itt_possible )
101  ITT_NOTIFY(sync_acquired, &p.ref_count);
102 #if TBB_USE_ASSERT
103  p.extra_state &= ~es_ref_count_active;
104 #endif /* TBB_USE_ASSERT */
105 #if __TBB_TASK_ISOLATION
106  if ( isolation != no_isolation ) {
107  // The parent is allowed not to have isolation (even if a child has isolation) because it has never spawned.
108  __TBB_ASSERT(p.isolation == no_isolation || p.isolation == isolation, NULL);
109  p.isolation = isolation;
110  }
111 #endif /* __TBB_TASK_ISOLATION */
112 
113 #if __TBB_RECYCLE_TO_ENQUEUE
114  if (p.state==task::to_enqueue) {
115  // related to __TBB_TASK_ARENA TODO: try keep priority of the task
116  // e.g. rework task_prefix to remember priority of received task and use here
118  } else
119 #endif /*__TBB_RECYCLE_TO_ENQUEUE*/
120  if( bypass_slot==NULL )
121  bypass_slot = &s;
122 #if __TBB_PREVIEW_CRITICAL_TASKS
123  else if( internal::is_critical( s ) ) {
124  local_spawn( bypass_slot, bypass_slot->prefix().next );
125  bypass_slot = &s;
126  }
127 #endif /* __TBB_PREVIEW_CRITICAL_TASKS */
128  else
129  local_spawn( &s, s.prefix().next );
130  }
131 
134  __TBB_ISOLATION_ARG(task* t, isolation_tag isolation) );
135 
136 public:
137  static generic_scheduler* allocate_scheduler( market& m, bool genuine ) {
138  void* p = NFS_Allocate(1, sizeof(scheduler_type), NULL);
139  std::memset(p, 0, sizeof(scheduler_type));
140  scheduler_type* s = new( p ) scheduler_type( m, genuine );
141  s->assert_task_pool_valid();
142  ITT_SYNC_CREATE(s, SyncType_Scheduler, SyncObj_TaskPoolSpinning);
143  return s;
144  }
145 
147 
149 
150 }; // class custom_scheduler<>
151 
152 //------------------------------------------------------------------------
153 // custom_scheduler methods
154 //------------------------------------------------------------------------
155 template<typename SchedulerTraits>
157  task* t = NULL;
158  bool outermost_worker_level = worker_outermost_level();
159  bool outermost_dispatch_level = outermost_worker_level || master_outermost_level();
160  bool can_steal_here = can_steal();
161  bool outermost_current_worker_level = outermost_worker_level;
162 #if __TBB_PREVIEW_RESUMABLE_TASKS
163  outermost_current_worker_level &= my_properties.genuine;
164 #endif
165  my_inbox.set_is_idle( true );
166 #if __TBB_HOARD_NONLOCAL_TASKS
167  __TBB_ASSERT(!my_nonlocal_free_list, NULL);
168 #endif
169 #if __TBB_TASK_PRIORITY
170  if ( outermost_dispatch_level ) {
171  if ( intptr_t skipped_priority = my_arena->my_skipped_fifo_priority ) {
172  // This thread can dequeue FIFO tasks, and some priority levels of
173  // FIFO tasks have been bypassed (to prevent deadlock caused by
174  // dynamic priority changes in nested task group hierarchy).
175  if ( my_arena->my_skipped_fifo_priority.compare_and_swap(0, skipped_priority) == skipped_priority
176  && skipped_priority > my_arena->my_top_priority )
177  {
178  my_market->update_arena_priority( *my_arena, skipped_priority );
179  }
180  }
181  }
182 #endif /* !__TBB_TASK_PRIORITY */
183  // TODO: Try to find a place to reset my_limit (under market's lock)
184  // The number of slots potentially used in the arena. Updated once in a while, as my_limit changes rarely.
185  size_t n = my_arena->my_limit-1;
186  int yield_count = 0;
187  // The state "failure_count==-1" is used only when itt_possible is true,
188  // and denotes that a sync_prepare has not yet been issued.
189  for( int failure_count = -static_cast<int>(SchedulerTraits::itt_possible);; ++failure_count) {
190  __TBB_ASSERT( my_arena->my_limit > 0, NULL );
191  __TBB_ASSERT( my_arena_index <= n, NULL );
192  if( completion_ref_count == 1 ) {
193  if( SchedulerTraits::itt_possible ) {
194  if( failure_count!=-1 ) {
195  ITT_NOTIFY(sync_prepare, &completion_ref_count);
196  // Notify Intel(R) Thread Profiler that thread has stopped spinning.
197  ITT_NOTIFY(sync_acquired, this);
198  }
199  ITT_NOTIFY(sync_acquired, &completion_ref_count);
200  }
201  __TBB_ASSERT( !t, NULL );
202  // A worker thread in its outermost dispatch loop (i.e. its execution stack is empty) should
203  // exit it either when there is no more work in the current arena, or when revoked by the market.
204  __TBB_ASSERT( !outermost_worker_level, NULL );
205  __TBB_control_consistency_helper(); // on ref_count
206  break; // exit stealing loop and return;
207  }
208  // Check if the resource manager requires our arena to relinquish some threads
209  if ( outermost_current_worker_level ) {
210  if ( ( my_arena->my_num_workers_allotted < my_arena->num_workers_active() ) ) {
211  if ( SchedulerTraits::itt_possible && failure_count != -1 )
212  ITT_NOTIFY(sync_cancel, this);
213  return NULL;
214  }
215  }
216 #if __TBB_PREVIEW_RESUMABLE_TASKS
217  else if ( *my_arena_slot->my_scheduler_is_recalled ) {
218  // Original scheduler was requested, return from stealing loop and recall.
219  if ( my_inbox.is_idle_state(true) )
220  my_inbox.set_is_idle(false);
221  return NULL;
222  }
223 #endif
224 #if __TBB_TASK_PRIORITY
225  const int p = int(my_arena->my_top_priority);
226 #else /* !__TBB_TASK_PRIORITY */
227  static const int p = 0;
228 #endif
229  // Check if there are tasks mailed to this thread via task-to-thread affinity mechanism.
230  __TBB_ASSERT(my_affinity_id, NULL);
231  if ( n && !my_inbox.empty() ) {
232  t = get_mailbox_task( __TBB_ISOLATION_EXPR( isolation ) );
233 #if __TBB_TASK_ISOLATION
234  // There is a race with a thread adding a new task (possibly with suitable isolation)
235  // to our mailbox, so the below conditions might result in a false positive.
236  // Then set_is_idle(false) allows that task to be stolen; it's OK.
237  if ( isolation != no_isolation && !t && !my_inbox.empty()
238  && my_inbox.is_idle_state( true ) ) {
239  // We have proxy tasks in our mailbox but the isolation blocks their execution.
240  // So publish the proxy tasks in mailbox to be available for stealing from owner's task pool.
241  my_inbox.set_is_idle( false );
242  }
243 #endif /* __TBB_TASK_ISOLATION */
244  }
245  if ( t ) {
246  GATHER_STATISTIC( ++my_counters.mails_received );
247  }
248  // Check if there are tasks in starvation-resistant stream.
249  // Only allowed at the outermost dispatch level without isolation.
250  else if (__TBB_ISOLATION_EXPR(isolation == no_isolation &&) outermost_dispatch_level &&
251  !my_arena->my_task_stream.empty(p) && (
252 #if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD
253  t = my_arena->my_task_stream.pop( p, subsequent_lane_selector(my_arena_slot->hint_for_pop) )
254 #else
255  t = my_arena->my_task_stream.pop( p, my_arena_slot->hint_for_pop )
256 #endif
257  ) ) {
258  ITT_NOTIFY(sync_acquired, &my_arena->my_task_stream);
259  // just proceed with the obtained task
260  }
261 #if __TBB_TASK_PRIORITY
262  // Check if any earlier offloaded non-top priority tasks become returned to the top level
263  else if ( my_offloaded_tasks && (t = reload_tasks( __TBB_ISOLATION_EXPR( isolation ) )) ) {
264  __TBB_ASSERT( !is_proxy(*t), "The proxy task cannot be offloaded" );
265  // just proceed with the obtained task
266  }
267 #endif /* __TBB_TASK_PRIORITY */
268  else if ( can_steal_here && n && (t = steal_task( __TBB_ISOLATION_EXPR(isolation) )) ) {
269  // just proceed with the obtained task
270  }
271 #if __TBB_PREVIEW_CRITICAL_TASKS
272  else if( (t = get_critical_task( __TBB_ISOLATION_EXPR(isolation) )) ) {
273  __TBB_ASSERT( internal::is_critical(*t), "Received task must be critical one" );
274  ITT_NOTIFY(sync_acquired, &my_arena->my_critical_task_stream);
275  // just proceed with the obtained task
276  }
277 #endif // __TBB_PREVIEW_CRITICAL_TASKS
278  else
279  goto fail;
280  // A task was successfully obtained somewhere
281  __TBB_ASSERT(t,NULL);
282 #if __TBB_ARENA_OBSERVER
283  my_arena->my_observers.notify_entry_observers( my_last_local_observer, is_worker() );
284 #endif
285 #if __TBB_SCHEDULER_OBSERVER
286  the_global_observer_list.notify_entry_observers( my_last_global_observer, is_worker() );
287 #endif /* __TBB_SCHEDULER_OBSERVER */
288  if ( SchedulerTraits::itt_possible && failure_count != -1 ) {
289  // FIXME - might be victim, or might be selected from a mailbox
290  // Notify Intel(R) Thread Profiler that thread has stopped spinning.
291  ITT_NOTIFY(sync_acquired, this);
292  }
293  break; // exit stealing loop and return
294 fail:
295  GATHER_STATISTIC( ++my_counters.steals_failed );
296  if( SchedulerTraits::itt_possible && failure_count==-1 ) {
297  // The first attempt to steal work failed, so notify Intel(R) Thread Profiler that
298  // the thread has started spinning. Ideally, we would do this notification
299  // *before* the first failed attempt to steal, but at that point we do not
300  // know that the steal will fail.
301  ITT_NOTIFY(sync_prepare, this);
302  failure_count = 0;
303  }
304  // Pause, even if we are going to yield, because the yield might return immediately.
305  prolonged_pause();
306  const int failure_threshold = 2*int(n+1);
307  if( failure_count>=failure_threshold ) {
308 #if __TBB_YIELD2P
309  failure_count = 0;
310 #else
311  failure_count = failure_threshold;
312 #endif
313  __TBB_Yield();
314 #if __TBB_TASK_PRIORITY
315  // Check if there are tasks abandoned by other workers
316  if ( my_arena->my_orphaned_tasks ) {
317  // Epoch must be advanced before seizing the list pointer
318  ++my_arena->my_abandonment_epoch;
319  task* orphans = (task*)__TBB_FetchAndStoreW( &my_arena->my_orphaned_tasks, 0 );
320  if ( orphans ) {
321  task** link = NULL;
322  // Get local counter out of the way (we've just brought in external tasks)
323  my_local_reload_epoch--;
324  t = reload_tasks( orphans, link, __TBB_ISOLATION_ARG( effective_reference_priority(), isolation ) );
325  if ( orphans ) {
326  *link = my_offloaded_tasks;
327  if ( !my_offloaded_tasks )
328  my_offloaded_task_list_tail_link = link;
329  my_offloaded_tasks = orphans;
330  }
331  __TBB_ASSERT( !my_offloaded_tasks == !my_offloaded_task_list_tail_link, NULL );
332  if ( t ) {
333  if( SchedulerTraits::itt_possible )
334  ITT_NOTIFY(sync_cancel, this);
335  __TBB_ASSERT( !is_proxy(*t), "The proxy task cannot be offloaded" );
336  break; // exit stealing loop and return
337  }
338  }
339  }
340 #endif /* __TBB_TASK_PRIORITY */
341  const int yield_threshold = 100;
342  if( yield_count++ >= yield_threshold ) {
343  // When a worker thread has nothing to do, return it to RML.
344  // For purposes of affinity support, the thread is considered idle while in RML.
345 #if __TBB_TASK_PRIORITY
346  if( outermost_current_worker_level || my_arena->my_top_priority > my_arena->my_bottom_priority ) {
347  if ( my_arena->is_out_of_work() && outermost_current_worker_level ) {
348 #else /* !__TBB_TASK_PRIORITY */
349  if ( outermost_current_worker_level && my_arena->is_out_of_work() ) {
350 #endif /* !__TBB_TASK_PRIORITY */
351  if( SchedulerTraits::itt_possible )
352  ITT_NOTIFY(sync_cancel, this);
353  return NULL;
354  }
355 #if __TBB_TASK_PRIORITY
356  }
357  if ( my_offloaded_tasks ) {
358  // Safeguard against any sloppiness in managing reload epoch
359  // counter (e.g. on the hot path because of performance reasons).
360  my_local_reload_epoch--;
361  // Break the deadlock caused by a higher priority dispatch loop
362  // stealing and offloading a lower priority task. Priority check
363  // at the stealing moment cannot completely preclude such cases
364  // because priorities can changes dynamically.
365  if ( !outermost_worker_level && *my_ref_top_priority > my_arena->my_top_priority ) {
366  GATHER_STATISTIC( ++my_counters.prio_ref_fixups );
367  my_ref_top_priority = &my_arena->my_top_priority;
368  // it's expected that only outermost workers can use global reload epoch
369  __TBB_ASSERT(my_ref_reload_epoch == &my_arena->my_reload_epoch, NULL);
370  }
371  }
372 #endif /* __TBB_TASK_PRIORITY */
373  } // end of arena snapshot branch
374  // If several attempts did not find work, re-read the arena limit.
375  n = my_arena->my_limit-1;
376  } // end of yielding branch
377  } // end of nonlocal task retrieval loop
378  if ( my_inbox.is_idle_state( true ) )
379  my_inbox.set_is_idle( false );
380  return t;
381 }
382 
383 template<typename SchedulerTraits>
386  __TBB_ISOLATION_ARG(task* t, isolation_tag isolation) )
387 {
388  while ( t ) {
389  __TBB_ASSERT( my_inbox.is_idle_state(false), NULL );
390  __TBB_ASSERT(!is_proxy(*t),"unexpected proxy");
391  __TBB_ASSERT( t->prefix().owner, NULL );
392 #if __TBB_TASK_ISOLATION
393  __TBB_ASSERT( isolation == no_isolation || isolation == t->prefix().isolation,
394  "A task from another isolated region is going to be executed" );
395 #endif /* __TBB_TASK_ISOLATION */
397 #if __TBB_TASK_GROUP_CONTEXT && TBB_USE_ASSERT
398  assert_context_valid(t->prefix().context);
400 #endif
401  // TODO: make the assert stronger by prohibiting allocated state.
402  __TBB_ASSERT( 1L<<t->state() & (1L<<task::allocated|1L<<task::ready|1L<<task::reexecute), NULL );
403  assert_task_pool_valid();
404 #if __TBB_PREVIEW_CRITICAL_TASKS
405  // TODO: check performance and optimize if needed for added conditions on the
406  // hot-path.
407  if( !internal::is_critical(*t) && !t->is_enqueued_task() ) {
408  if( task* critical_task = get_critical_task( __TBB_ISOLATION_EXPR(isolation) ) ) {
409  __TBB_ASSERT( internal::is_critical(*critical_task),
410  "Received task must be critical one" );
411  ITT_NOTIFY(sync_acquired, &my_arena->my_critical_task_stream);
413  my_innermost_running_task = t; // required during spawn to propagate isolation
414  local_spawn(t, t->prefix().next);
415  t = critical_task;
416  } else {
417 #endif /* __TBB_PREVIEW_CRITICAL_TASKS */
418 #if __TBB_TASK_PRIORITY
419  intptr_t p = priority(*t);
420  if ( p != *my_ref_top_priority
421  && !t->is_enqueued_task() ) {
422  assert_priority_valid(p);
423  if ( p != my_arena->my_top_priority ) {
424  my_market->update_arena_priority( *my_arena, p );
425  }
426  if ( p < effective_reference_priority() ) {
427  if ( !my_offloaded_tasks ) {
428  my_offloaded_task_list_tail_link = &t->prefix().next_offloaded;
429  // Erase possible reference to the owner scheduler
430  // (next_offloaded is a union member)
431  *my_offloaded_task_list_tail_link = NULL;
432  }
433  offload_task( *t, p );
434  t = NULL;
435  if ( is_task_pool_published() ) {
436  t = winnow_task_pool( __TBB_ISOLATION_EXPR( isolation ) );
437  if ( t )
438  continue;
439  } else {
440  // Mark arena as full to unlock arena priority level adjustment
441  // by arena::is_out_of_work(), and ensure worker's presence.
442  my_arena->advertise_new_work<arena::wakeup>();
443  }
444  break; /* exit bypass loop */
445  }
446  }
447 #endif /* __TBB_TASK_PRIORITY */
448 #if __TBB_PREVIEW_CRITICAL_TASKS
449  }
450  } // if is not critical
451 #endif
452  task* t_next = NULL;
453  my_innermost_running_task = t;
454  t->prefix().owner = this;
456 #if __TBB_TASK_GROUP_CONTEXT
457  context_guard.set_ctx( t->prefix().context );
459 #endif
460  {
461  GATHER_STATISTIC( ++my_counters.tasks_executed );
462  GATHER_STATISTIC( my_counters.avg_arena_concurrency += my_arena->num_workers_active() );
463  GATHER_STATISTIC( my_counters.avg_assigned_workers += my_arena->my_num_workers_allotted );
464 #if __TBB_TASK_PRIORITY
465  GATHER_STATISTIC( my_counters.avg_arena_prio += p );
466  GATHER_STATISTIC( my_counters.avg_market_prio += my_market->my_global_top_priority );
467 #endif /* __TBB_TASK_PRIORITY */
468  ITT_STACK(SchedulerTraits::itt_possible, callee_enter, t->prefix().context->itt_caller);
469  t_next = t->execute();
470  ITT_STACK(SchedulerTraits::itt_possible, callee_leave, t->prefix().context->itt_caller);
471  if (t_next) {
472  assert_task_valid(t_next);
473  __TBB_ASSERT( t_next->state()==task::allocated,
474  "if task::execute() returns task, it must be marked as allocated" );
475  reset_extra_state(t_next);
476  __TBB_ISOLATION_EXPR( t_next->prefix().isolation = t->prefix().isolation );
477 #if TBB_USE_ASSERT
478  affinity_id next_affinity=t_next->prefix().affinity;
479  if (next_affinity != 0 && next_affinity != my_affinity_id)
480  GATHER_STATISTIC( ++my_counters.affinity_ignored );
481 #endif
482  } // if there is bypassed task
483  }
484  assert_task_pool_valid();
485  switch( t->state() ) {
486  case task::executing: {
487  task* s = t->parent();
488  __TBB_ASSERT( my_innermost_running_task==t, NULL );
489  __TBB_ASSERT( t->prefix().ref_count==0, "Task still has children after it has been executed" );
490  t->~task();
491  if( s )
492  tally_completion_of_predecessor( *s, __TBB_ISOLATION_ARG( t_next, t->prefix().isolation ) );
493  free_task<no_hint>( *t );
494  poison_pointer( my_innermost_running_task );
495  assert_task_pool_valid();
496  break;
497  }
498 
499  case task::recycle: // set by recycle_as_safe_continuation()
501 #if __TBB_RECYCLE_TO_ENQUEUE
503  case task::to_enqueue: // set by recycle_to_enqueue()
504 #endif
505  __TBB_ASSERT( t_next != t, "a task returned from method execute() can not be recycled in another way" );
507  // for safe continuation, need atomically decrement ref_count;
508  tally_completion_of_predecessor(*t, __TBB_ISOLATION_ARG( t_next, t->prefix().isolation ) );
509  assert_task_pool_valid();
510  break;
511 
512  case task::reexecute: // set by recycle_to_reexecute()
513  __TBB_ASSERT( t_next, "reexecution requires that method execute() return another task" );
514  __TBB_ASSERT( t_next != t, "a task returned from method execute() can not be recycled in another way" );
517  local_spawn( t, t->prefix().next );
518  assert_task_pool_valid();
519  break;
520  case task::allocated:
522  break;
523 #if __TBB_PREVIEW_RESUMABLE_TASKS
524  case task::to_resume:
525  __TBB_ASSERT(my_innermost_running_task == t, NULL);
526  __TBB_ASSERT(t->prefix().ref_count == 0, "Task still has children after it has been executed");
527  t->~task();
528  free_task<no_hint>(*t);
529  __TBB_ASSERT(!my_properties.genuine && my_properties.outermost,
530  "Only a coroutine on outermost level can be left.");
531  // Leave the outermost coroutine
532  return false;
533 #endif
534 #if TBB_USE_ASSERT
535  case task::ready:
536  __TBB_ASSERT( false, "task is in READY state upon return from method execute()" );
537  break;
538 #endif
539  default:
540  __TBB_ASSERT( false, "illegal state" );
541  break;
542  }
543  GATHER_STATISTIC( t_next ? ++my_counters.spawns_bypassed : 0 );
544  t = t_next;
545  } // end of scheduler bypass loop
546  return true;
547 }
548 
549 // TODO: Rename args 'parent' into 'controlling_task' and 'child' into 't' or consider introducing
550 // a wait object (a la task_handle) to replace the 'parent' logic.
551 template<typename SchedulerTraits>
553  __TBB_ASSERT( governor::is_set(this), NULL );
554  __TBB_ASSERT( parent.ref_count() >= (child && child->parent() == &parent ? 2 : 1), "ref_count is too small" );
555  __TBB_ASSERT( my_innermost_running_task, NULL );
556 #if __TBB_TASK_GROUP_CONTEXT
557  __TBB_ASSERT( parent.prefix().context, "parent task does not have context" );
558 #endif /* __TBB_TASK_GROUP_CONTEXT */
559  assert_task_pool_valid();
560  // Using parent's refcount in sync_prepare (in the stealing loop below) is
561  // a workaround for TP. We need to name it here to display correctly in Ampl.
562  if( SchedulerTraits::itt_possible )
563  ITT_SYNC_CREATE(&parent.prefix().ref_count, SyncType_Scheduler, SyncObj_TaskStealingLoop);
564 
565  // TODO: consider extending the "context" guard to a "dispatch loop" guard to additionally
566  // guard old_innermost_running_task and old_properties states.
567  context_guard_helper</*report_tasks=*/SchedulerTraits::itt_possible> context_guard;
568  task* old_innermost_running_task = my_innermost_running_task;
569  scheduler_properties old_properties = my_properties;
570 
571  task* t = child;
572  bool cleanup = !is_worker() && &parent==my_dummy_task;
573  // Remove outermost property to indicate nested level.
574  __TBB_ASSERT(my_properties.outermost || my_innermost_running_task!=my_dummy_task, "The outermost property should be set out of a dispatch loop");
575  my_properties.outermost &= my_innermost_running_task==my_dummy_task;
576 #if __TBB_PREVIEW_CRITICAL_TASKS
577  my_properties.has_taken_critical_task |= is_critical(*my_innermost_running_task);
578 #endif
579 #if __TBB_TASK_PRIORITY
580  __TBB_ASSERT( (uintptr_t)*my_ref_top_priority < (uintptr_t)num_priority_levels, NULL );
581  volatile intptr_t *old_ref_top_priority = my_ref_top_priority;
582  // When entering nested parallelism level market level counter
583  // must be replaced with the one local to this arena.
584  volatile uintptr_t *old_ref_reload_epoch = my_ref_reload_epoch;
585  if ( !outermost_level() ) {
586  // We are in a nested dispatch loop.
587  // Market or arena priority must not prevent child tasks from being
588  // executed so that dynamic priority changes did not cause deadlock.
589  my_ref_top_priority = &parent.prefix().context->my_priority;
590  my_ref_reload_epoch = &my_arena->my_reload_epoch;
591  if (my_ref_reload_epoch != old_ref_reload_epoch)
592  my_local_reload_epoch = *my_ref_reload_epoch - 1;
593  }
594 #endif /* __TBB_TASK_PRIORITY */
595 #if __TBB_TASK_ISOLATION
596  isolation_tag isolation = my_innermost_running_task->prefix().isolation;
597  if (t && isolation != no_isolation) {
598  __TBB_ASSERT(t->prefix().isolation == no_isolation, NULL);
599  // Propagate the isolation to the task executed without spawn.
600  t->prefix().isolation = isolation;
601  }
602 #endif /* __TBB_TASK_ISOLATION */
603 #if __TBB_PREVIEW_RESUMABLE_TASKS
604  // The recall flag for the original owner of this scheduler.
605  // It is used only on outermost level of currently attached arena slot.
606  tbb::atomic<bool> recall_flag;
607  recall_flag = false;
608  if (outermost_level() && my_wait_task == NULL && my_properties.genuine) {
609  __TBB_ASSERT(my_arena_slot->my_scheduler == this, NULL);
610  __TBB_ASSERT(my_arena_slot->my_scheduler_is_recalled == NULL, NULL);
611  my_arena_slot->my_scheduler_is_recalled = &recall_flag;
612  my_current_is_recalled = &recall_flag;
613  }
614  __TBB_ASSERT(my_arena_slot->my_scheduler_is_recalled != NULL, NULL);
615  task* old_wait_task = my_wait_task;
616  my_wait_task = &parent;
617 #endif
618 #if TBB_USE_EXCEPTIONS
619  // Infinite safeguard EH loop
620  for (;;) {
621  try {
622 #endif /* TBB_USE_EXCEPTIONS */
623  // Outer loop receives tasks from global environment (via mailbox, FIFO queue(s),
624  // and by stealing from other threads' task pools).
625  // All exit points from the dispatch loop are located in its immediate scope.
626  for(;;) {
627  // Middle loop retrieves tasks from the local task pool.
628  for(;;) {
629  // Inner loop evaluates tasks coming from nesting loops and those returned
630  // by just executed tasks (bypassing spawn or enqueue calls).
631  if ( !process_bypass_loop( context_guard, __TBB_ISOLATION_ARG(t, isolation) ) ) {
632 #if __TBB_PREVIEW_RESUMABLE_TASKS
633  // Restore the old properties for the coroutine reusage (leave in a valid state)
634  my_innermost_running_task = old_innermost_running_task;
635  my_properties = old_properties;
636  my_wait_task = old_wait_task;
637 #endif
638  return;
639  }
640 
641  // Check "normal" exit condition when parent's work is done.
642  if ( parent.prefix().ref_count == 1 ) {
643  __TBB_ASSERT( !cleanup, NULL );
644  __TBB_control_consistency_helper(); // on ref_count
645  ITT_NOTIFY( sync_acquired, &parent.prefix().ref_count );
646  goto done;
647  }
648 #if __TBB_PREVIEW_RESUMABLE_TASKS
649  // The thread may be otside of its original scheduler. Check the recall request.
650  if ( &recall_flag != my_arena_slot->my_scheduler_is_recalled ) {
651  __TBB_ASSERT( my_arena_slot->my_scheduler_is_recalled != NULL, "A broken recall flag" );
652  if ( *my_arena_slot->my_scheduler_is_recalled ) {
653  if ( !resume_original_scheduler() ) {
654  // We are requested to finish the current coroutine before the resume.
655  __TBB_ASSERT( !my_properties.genuine && my_properties.outermost,
656  "Only a coroutine on outermost level can be left." );
657  // Restore the old properties for the coroutine reusage (leave in a valid state)
658  my_innermost_running_task = old_innermost_running_task;
659  my_properties = old_properties;
660  my_wait_task = old_wait_task;
661  return;
662  }
663  }
664  }
665 #endif
666  // Retrieve the task from local task pool.
667  __TBB_ASSERT( is_task_pool_published() || is_quiescent_local_task_pool_reset(), NULL );
668  t = is_task_pool_published() ? get_task( __TBB_ISOLATION_EXPR( isolation ) ) : NULL;
669  assert_task_pool_valid();
670 
671  if ( !t ) // No tasks in the local task pool. Go to stealing loop.
672  break;
673  }; // end of local task pool retrieval loop
674 
675 #if __TBB_HOARD_NONLOCAL_TASKS
676  // before stealing, previously stolen task objects are returned
677  for (; my_nonlocal_free_list; my_nonlocal_free_list = t ) {
678  t = my_nonlocal_free_list->prefix().next;
679  free_nonlocal_small_task( *my_nonlocal_free_list );
680  }
681 #endif
682  if ( cleanup ) {
683  __TBB_ASSERT( !is_task_pool_published() && is_quiescent_local_task_pool_reset(), NULL );
684  __TBB_ASSERT( !worker_outermost_level(), NULL );
685  my_innermost_running_task = old_innermost_running_task;
686  my_properties = old_properties;
687 #if __TBB_TASK_PRIORITY
688  my_ref_top_priority = old_ref_top_priority;
689  if(my_ref_reload_epoch != old_ref_reload_epoch)
690  my_local_reload_epoch = *old_ref_reload_epoch-1;
691  my_ref_reload_epoch = old_ref_reload_epoch;
692 #endif /* __TBB_TASK_PRIORITY */
693 #if __TBB_PREVIEW_RESUMABLE_TASKS
694  if (&recall_flag != my_arena_slot->my_scheduler_is_recalled) {
695  // The recall point
696  __TBB_ASSERT(!recall_flag, NULL);
697  tbb::task::suspend(recall_functor(&recall_flag));
698  if (my_inbox.is_idle_state(true))
699  my_inbox.set_is_idle(false);
700  continue;
701  }
702  __TBB_ASSERT(&recall_flag == my_arena_slot->my_scheduler_is_recalled, NULL);
703  __TBB_ASSERT(!(my_wait_task->prefix().ref_count & internal::abandon_flag), NULL);
704  my_wait_task = old_wait_task;
705 #endif
706  return;
707  }
708  t = receive_or_steal_task( __TBB_ISOLATION_ARG( parent.prefix().ref_count, isolation ) );
709  if ( !t ) {
710 #if __TBB_PREVIEW_RESUMABLE_TASKS
711  if ( *my_arena_slot->my_scheduler_is_recalled )
712  continue;
713  // Done if either original thread enters or we are on the nested level or attached the same arena
714  if ( &recall_flag == my_arena_slot->my_scheduler_is_recalled || old_wait_task != NULL )
715  goto done;
716  // The recall point. Continue dispatch loop because recalled thread may have tasks in it's task pool.
717  __TBB_ASSERT(!recall_flag, NULL);
718  tbb::task::suspend( recall_functor(&recall_flag) );
719  if ( my_inbox.is_idle_state(true) )
720  my_inbox.set_is_idle(false);
721 #else
722  // Just exit dispatch loop
723  goto done;
724 #endif
725  }
726  } // end of infinite stealing loop
727 #if TBB_USE_EXCEPTIONS
728  __TBB_ASSERT( false, "Must never get here" );
729  } // end of try-block
730  TbbCatchAll( my_innermost_running_task->prefix().context );
731  t = my_innermost_running_task;
732  // Complete post-processing ...
733  if( t->state() == task::recycle
734 #if __TBB_RECYCLE_TO_ENQUEUE
735  // TODO: the enqueue semantics gets lost below, consider reimplementing
736  || t->state() == task::to_enqueue
737 #endif
738  ) {
739  // ... for recycled tasks to atomically decrement ref_count
741  if( SchedulerTraits::itt_possible )
742  ITT_NOTIFY(sync_releasing, &t->prefix().ref_count);
743  if( __TBB_FetchAndDecrementWrelease(&t->prefix().ref_count)==1 ) {
744  if( SchedulerTraits::itt_possible )
745  ITT_NOTIFY(sync_acquired, &t->prefix().ref_count);
746  }else{
747  t = NULL;
748  }
749  }
750  } // end of infinite EH loop
751  __TBB_ASSERT( false, "Must never get here too" );
752 #endif /* TBB_USE_EXCEPTIONS */
753 done:
754 #if __TBB_PREVIEW_RESUMABLE_TASKS
755  __TBB_ASSERT(!(parent.prefix().ref_count & internal::abandon_flag), NULL);
756  my_wait_task = old_wait_task;
757  if (my_wait_task == NULL) {
758  __TBB_ASSERT(outermost_level(), "my_wait_task could be NULL only on outermost level");
759  if (&recall_flag != my_arena_slot->my_scheduler_is_recalled) {
760  // The recall point.
761  __TBB_ASSERT(my_properties.genuine, NULL);
762  __TBB_ASSERT(!recall_flag, NULL);
763  tbb::task::suspend(recall_functor(&recall_flag));
764  if (my_inbox.is_idle_state(true))
765  my_inbox.set_is_idle(false);
766  }
767  __TBB_ASSERT(my_arena_slot->my_scheduler == this, NULL);
768  my_arena_slot->my_scheduler_is_recalled = NULL;
769  my_current_is_recalled = NULL;
770  }
771 
772 #endif /* __TBB_PREVIEW_RESUMABLE_TASKS */
773  my_innermost_running_task = old_innermost_running_task;
774  my_properties = old_properties;
775 #if __TBB_TASK_PRIORITY
776  my_ref_top_priority = old_ref_top_priority;
777  if(my_ref_reload_epoch != old_ref_reload_epoch)
778  my_local_reload_epoch = *old_ref_reload_epoch-1;
779  my_ref_reload_epoch = old_ref_reload_epoch;
780 #endif /* __TBB_TASK_PRIORITY */
781  if ( !ConcurrentWaitsEnabled(parent) ) {
782  if ( parent.prefix().ref_count != 1) {
783  // This is a worker that was revoked by the market.
784  __TBB_ASSERT( worker_outermost_level(),
785  "Worker thread exits nested dispatch loop prematurely" );
786  return;
787  }
788  parent.prefix().ref_count = 0;
789  }
790 #if TBB_USE_ASSERT
791  parent.prefix().extra_state &= ~es_ref_count_active;
792 #endif /* TBB_USE_ASSERT */
793 #if __TBB_TASK_GROUP_CONTEXT
794  __TBB_ASSERT(parent.prefix().context && default_context(), NULL);
795  task_group_context* parent_ctx = parent.prefix().context;
796  if ( parent_ctx->my_cancellation_requested ) {
797  task_group_context::exception_container_type *pe = parent_ctx->my_exception;
798  if ( master_outermost_level() && parent_ctx == default_context() ) {
799  // We are in the outermost task dispatch loop of a master thread, and
800  // the whole task tree has been collapsed. So we may clear cancellation data.
801  parent_ctx->my_cancellation_requested = 0;
802  // TODO: Add assertion that master's dummy task context does not have children
803  parent_ctx->my_state &= ~(uintptr_t)task_group_context::may_have_children;
804  }
805  if ( pe ) {
806  // On Windows, FPU control settings changed in the helper destructor are not visible
807  // outside a catch block. So restore the default settings manually before rethrowing
808  // the exception.
809  context_guard.restore_default();
810  TbbRethrowException( pe );
811  }
812  }
813  __TBB_ASSERT(!is_worker() || !CancellationInfoPresent(*my_dummy_task),
814  "Worker's dummy task context modified");
815  __TBB_ASSERT(!master_outermost_level() || !CancellationInfoPresent(*my_dummy_task),
816  "Unexpected exception or cancellation data in the master's dummy task");
817 #endif /* __TBB_TASK_GROUP_CONTEXT */
818  assert_task_pool_valid();
819 }
820 
821 } // namespace internal
822 } // namespace tbb
823 
824 #endif /* _TBB_custom_scheduler_H */
FastRandom my_random
Random number generator used for picking a random victim from which to steal.
Definition: scheduler.h:175
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:305
Used to form groups of tasks.
Definition: task.h:347
state_type state() const
Current execution state.
Definition: task.h:901
Bit-field representing properties of a sheduler.
Definition: scheduler.h:50
void enqueue_task(task &, intptr_t, FastRandom &)
enqueue a task into starvation-resistance queue
Definition: arena.cpp:553
task * parent() const
task on whose behalf this task is working, or NULL if this is a root.
Definition: task.h:854
task_group_context * context
Shared context that is used to communicate asynchronous state changes.
Definition: task.h:219
#define __TBB_ISOLATION_EXPR(isolation)
task is in ready pool, or is going to be put there, or was just taken off.
Definition: task.h:630
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
custom_scheduler(market &m, bool genuine)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d int
task to be recycled as continuation
Definition: task.h:636
#define ITT_STACK(precond, name, obj)
Definition: itt_notify.h:122
task * next_offloaded
Pointer to the next offloaded lower priority task.
Definition: task.h:241
#define __TBB_fallthrough
Definition: tbb_stddef.h:250
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
The graph class.
void assert_task_valid(const task *)
bool ConcurrentWaitsEnabled(task &t)
void local_wait_for_all(task &parent, task *child) __TBB_override
Scheduler loop that dispatches tasks.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p sync_cancel
task to be rescheduled.
Definition: task.h:628
void local_spawn(task *first, task *&next)
Definition: scheduler.cpp:649
static generic_scheduler * local_scheduler()
Obtain the thread-local instance of the TBB scheduler.
Definition: governor.h:129
void tally_completion_of_predecessor(task &s, __TBB_ISOLATION_ARG(task *&bypass_slot, isolation_tag isolation))
Decrements ref_count of a predecessor.
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
Definition: governor.cpp:120
#define __TBB_atomic
Definition: tbb_stddef.h:237
unsigned char state
A task::state_type, stored as a byte for compactness.
Definition: task.h:272
custom_scheduler< SchedulerTraits > scheduler_type
bool is_enqueued_task() const
True if the task was enqueued.
Definition: task.h:879
#define __TBB_Yield()
Definition: ibm_aix51.h:44
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
intptr_t isolation_tag
A tag for task isolation.
Definition: task.h:132
#define __TBB_control_consistency_helper()
Definition: gcc_generic.h:60
void const char const char int ITT_FORMAT __itt_group_sync s
__TBB_atomic reference_count ref_count
Reference count used for synchronization.
Definition: task.h:263
Set if ref_count might be changed by another thread. Used for debugging.
scheduler * owner
Obsolete. The scheduler that owns the task.
Definition: task.h:236
void reset_extra_state(task *t)
virtual ~task()
Destructor.
Definition: task.h:618
intptr_t reference_count
A reference count.
Definition: task.h:120
Base class for user-defined tasks.
Definition: task.h:604
Work stealing task scheduler.
Definition: scheduler.h:137
#define __TBB_ISOLATION_ARG(arg1, isolation)
unsigned short affinity_id
An id as used for specifying affinity.
Definition: task.h:128
static generic_scheduler * allocate_scheduler(market &m, bool genuine)
task object is freshly allocated or recycled.
Definition: task.h:632
void wait_for_all(task &parent, task *child) __TBB_override
Entry point from client code to the scheduler loop that dispatches tasks.
#define __TBB_override
Definition: tbb_stddef.h:240
arena * my_arena
The arena that I own (if master) or am servicing at the moment (if worker)
Definition: scheduler.h:85
#define __TBB_FetchAndDecrementWrelease(P)
Definition: tbb_machine.h:311
Exception container that preserves the exact copy of the original exception.
task * receive_or_steal_task(__TBB_ISOLATION_ARG(__TBB_atomic reference_count &completion_ref_count, isolation_tag isolation)) __TBB_override
Try getting a task from the mailbox or stealing from another scheduler.
internal::task_prefix & prefix(internal::version_tag *=NULL) const
Get reference to corresponding task_prefix.
Definition: task.h:991
__itt_caller itt_caller
Used to set and maintain stack stitching point for Intel Performance Tools.
Definition: task.h:407
task is running, and will be destroyed after method execute() completes.
Definition: task.h:626
#define ITT_SYNC_CREATE(obj, type, name)
Definition: itt_notify.h:119
isolation_tag isolation
The tag used for task isolation.
Definition: task.h:209
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
Traits classes for scheduler.
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:116
uintptr_t my_cancellation_requested
Specifies whether cancellation was requested for this task group.
Definition: task.h:429
void const char const char int ITT_FORMAT __itt_group_sync p
const isolation_tag no_isolation
Definition: task.h:133
static const intptr_t num_priority_levels
A scheduler with a customized evaluation loop.
Memory prefix to a task object.
Definition: task.h:192
tbb::task * next
"next" field for list of task
Definition: task.h:286
bool process_bypass_loop(context_guard_helper< SchedulerTraits::itt_possible > &context_guard, __TBB_ISOLATION_ARG(task *t, isolation_tag isolation))
Implements the bypass loop of the dispatch loop (local_wait_for_all).
#define GATHER_STATISTIC(x)
virtual task * execute()=0
Should be overridden by derived classes.
bool is_critical(task &t)
Definition: task.h:1003

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.