Intel(R) Threading Building Blocks Doxygen Documentation
version 4.2.3
|
Go to the documentation of this file.
17 #ifndef __TBB__concurrent_queue_impl_H
18 #define __TBB__concurrent_queue_impl_H
20 #ifndef __TBB_concurrent_queue_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
24 #include "../tbb_stddef.h"
25 #include "../tbb_machine.h"
26 #include "../atomic.h"
27 #include "../spin_mutex.h"
28 #include "../cache_aligned_allocator.h"
29 #include "../tbb_exception.h"
30 #include "../tbb_profiling.h"
32 #include __TBB_STD_SWAP_HEADER
37 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
40 namespace strict_ppl {
41 template<
typename T,
typename A>
class concurrent_queue;
44 template<
typename T,
typename A>
class concurrent_bounded_queue;
49 namespace strict_ppl {
73 static const size_t phi = 3;
77 static const size_t n_queue = 8;
103 return uintptr_t(
p)>1;
121 #if _MSC_VER && !defined(__INTEL_COMPILER)
123 #pragma warning( push )
124 #pragma warning( disable: 4146 )
133 typedef void (*item_constructor_t)(T* location,
const void* src);
145 void copy_item(
page& dst,
size_t dindex,
const void* src, item_constructor_t construct_item ) {
146 construct_item( &get_ref(dst, dindex), src );
150 item_constructor_t construct_item )
152 T& src_item = get_ref(
const_cast<page&
>(src), sindex );
153 construct_item( &get_ref(dst, dindex),
static_cast<const void*
>(&src_item) );
157 T& from = get_ref(src,index);
177 return (&
static_cast<padded_page*
>(
static_cast<void*
>(&
p))->
last)[index];
189 item_constructor_t construct_item ) ;
194 item_constructor_t construct_item ) ;
197 size_t end_in_page,
ticket& g_index, item_constructor_t construct_item ) ;
216 item_constructor_t construct_item )
226 ++base.
my_rep->n_invalid_entries;
227 invalidate_page_and_rethrow( k );
233 if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.
my_rep );
249 copy_item( *
p, index, item, construct_item );
255 ++base.
my_rep->n_invalid_entries;
272 bool success =
false;
275 if(
p->mask & uintptr_t(1)<<index ) {
277 assign_and_destroy_item( dst, *
p, index );
279 --base.
my_rep->n_invalid_entries;
287 item_constructor_t construct_item )
294 ticket g_index = head_counter;
298 size_t end_in_first_page = (index+n_items<base.
my_rep->items_per_page)?(index+n_items):base.
my_rep->items_per_page;
300 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
301 page* cur_page = head_page;
305 cur_page->
next = make_copy( base, srcp, 0, base.
my_rep->items_per_page, g_index, construct_item );
306 cur_page = cur_page->
next;
311 if( last_index==0 ) last_index = base.
my_rep->items_per_page;
313 cur_page->
next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
314 cur_page = cur_page->
next;
316 tail_page = cur_page;
318 invalidate_page_and_rethrow( g_index );
321 head_page = tail_page = NULL;
329 page* invalid_page = (
page*)uintptr_t(1);
335 q->
next = invalid_page;
337 head_page = invalid_page;
338 tail_page = invalid_page;
346 ticket& g_index, item_constructor_t construct_item )
350 new_page->
next = NULL;
352 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353 if( new_page->
mask & uintptr_t(1)<<begin_in_page )
354 copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
367 my_ticket(k), my_queue(queue), my_page(
p), allocator(b)
378 my_queue.head_page = q;
380 my_queue.tail_page = NULL;
385 allocator.deallocate_page(
p );
389 #if _MSC_VER && !defined(__INTEL_COMPILER)
390 #pragma warning( pop )
391 #endif // warning 4146 is back
406 return k*phi%n_queue;
411 return array[index(k)];
440 size_t n =
sizeof(
padded_page) + (r.items_per_page-1)*
sizeof(T);
441 return reinterpret_cast<page*
>(allocate_block ( n ));
446 size_t n =
sizeof(
padded_page) + (r.items_per_page-1)*
sizeof(T);
447 deallocate_block(
reinterpret_cast<void*
>(
p), n );
461 size_t nq = my_rep->n_queue;
462 for(
size_t i=0; i<nq; i++ )
463 __TBB_ASSERT( my_rep->
array[i].tail_page==NULL,
"pages were not freed properly" );
471 ticket k = r.tail_counter++;
472 r.
choose(k).push( src, k, *
this, construct_item );
497 #if __TBB_CPP11_RVALUE_REF_PRESENT
507 const size_t item_size =
sizeof(T);
514 my_rep->item_size = item_size;
515 my_rep->items_per_page = item_size<= 8 ? 32 :
516 item_size<= 16 ? 16 :
530 if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
536 #if defined(_MSC_VER) && defined(_Wp64)
537 #pragma warning (push)
538 #pragma warning (disable: 4267)
540 k = r.head_counter.compare_and_swap( tk+1, tk );
541 #if defined(_MSC_VER) && defined(_Wp64)
542 #pragma warning (pop)
548 }
while( !r.
choose( k ).pop( dst, k, *
this ) );
555 __TBB_ASSERT(
sizeof(ptrdiff_t)<=
sizeof(
size_t), NULL );
556 ticket hc = r.head_counter;
557 size_t nie = r.n_invalid_entries;
558 ticket tc = r.tail_counter;
560 ptrdiff_t sz = tc-hc-nie;
561 return sz<0 ? 0 : size_t(sz);
567 ticket tc = r.tail_counter;
568 ticket hc = r.head_counter;
570 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
576 size_t nq = r.n_queue;
577 for(
size_t i=0; i<nq; ++i ) {
581 deallocate_page( tp );
582 r.
array[i].tail_page = NULL;
593 r.items_per_page = src.
my_rep->items_per_page;
596 r.head_counter = src.
my_rep->head_counter;
597 r.tail_counter = src.
my_rep->tail_counter;
598 r.n_invalid_entries = src.
my_rep->n_invalid_entries;
601 for(
size_t i = 0; i < r.n_queue; ++i )
602 r.
array[i].assign( src.
my_rep->array[i], *
this, construct_item);
605 "the source concurrent queue should not be concurrently modified." );
618 head_counter(queue.my_rep->head_counter),
621 for(
size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
622 array[k] = queue.
my_rep->array[k].head_page;
631 if( k==my_queue.my_rep->tail_counter ) {
639 return (
p->mask & uintptr_t(1)<<i)!=0;
645 template<
typename Value>
651 template<
typename C,
typename T,
typename U>
654 template<
typename C,
typename T,
typename U>
662 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
669 : my_rep(NULL), my_item(NULL) {
694 template<
typename Value>
699 if( !my_rep->get_item(my_item, k) ) advance();
702 template<
typename Value>
704 if( my_rep!=other.
my_rep ) {
717 template<
typename Value>
719 __TBB_ASSERT( my_item,
"attempt to increment iterator past end of queue" );
720 size_t k = my_rep->head_counter;
724 my_rep->get_item(tmp,k);
728 if( i==queue.
my_rep->items_per_page-1 ) {
733 my_rep->head_counter = ++k;
734 if( !my_rep->get_item(my_item, k) ) advance();
747 template<
typename Container,
typename Value>
749 public std::iterator<std::forward_iterator_tag,Value> {
750 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
751 template<
typename T,
class A>
752 friend class ::tbb::strict_ppl::concurrent_queue;
779 return *
static_cast<Value*
>(this->my_item);
792 Value* result = &operator*();
799 template<
typename C,
typename T,
typename U>
801 return i.my_item==j.my_item;
804 template<
typename C,
typename T,
typename U>
806 return i.my_item!=j.my_item;
854 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
915 #if __TBB_CPP11_RVALUE_REF_PRESENT
926 void internal_insert_item(
const void* src, copy_specifics op_type );
929 bool internal_insert_if_not_full(
const void* src, copy_specifics op_type );
965 template<
typename C,
typename T,
typename U>
968 template<
typename C,
typename T,
typename U>
1011 template<
typename Container,
typename Value>
1013 public std::iterator<std::forward_iterator_tag,Value> {
1015 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1016 template<
typename T,
class A>
1017 friend class ::tbb::concurrent_bounded_queue;
1045 return *
static_cast<Value*
>(my_item);
1065 template<
typename C,
typename T,
typename U>
1067 return i.my_item==j.my_item;
1070 template<
typename C,
typename T,
typename U>
1072 return i.my_item!=j.my_item;
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
micro_queue< T > & choose(ticket k)
Value * operator->() const
Internal representation of a ConcurrentQueue.
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
concurrent_queue_rep< T >::page page
concurrent_queue_rep_base::page page
void pause()
Pause for a while.
concurrent_queue_rep_base::page page
concurrent_queue_base_v8(size_t item_sz)
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
concurrent_queue_page_allocator & allocator
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
bool is_valid_page(const concurrent_queue_rep_base::page *p)
virtual void assign_and_destroy_item(void *dst, page &src, size_t index)=0
virtual ~concurrent_queue_base_v3()
Class that implements exponential backoff.
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
Base class for types that should not be copied or assigned.
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
atomic< page * > tail_page
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
virtual void * allocate_block(size_t n)=0
custom allocator
size_t item_size
Size of an item.
concurrent_queue_iterator & operator++()
Advance to next item in queue.
bool internal_empty() const
check if the queue is empty; thread safe
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
Meets requirements of a forward iterator for STL.
concurrent_queue_iterator()
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
~micro_queue_pop_finalizer()
base class of concurrent_queue
virtual concurrent_queue_rep_base::page * allocate_page()=0
Represents acquisition of a mutex.
A queue using simple locking.
void swap(atomic< T > &lhs, atomic< T > &rhs)
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Value & operator*() const
Reference to current item.
T last
Must be last field.
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
virtual void deallocate_block(void *p, size_t n)=0
custom de-allocator
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
Meets requirements of a forward iterator for STL.
void advance()
Advance iterator one step towards tail of queue.
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
A queue using simple locking.
virtual void move_item(page &dst, size_t index, const void *src)=0
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
concurrent_queue_iterator_base_v3()
Default constructor.
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
padded_page()
Not defined anywhere - exists to quiet warnings.
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
Value * my_item
Pointer to current item.
~concurrent_queue_iterator_base_v3()
Destructor.
micro_queue< T >::item_constructor_t item_constructor_t
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
atomic< page * > head_page
Base class for types that should not be assigned.
Value * operator->() const
static const size_t n_queue
void internal_finish_clear()
free any remaining pages
micro_queue< T >::padded_page padded_page
Value * operator++(int)
Post increment.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
parts of concurrent_queue_rep that do not have references to micro_queue
void move(tbb_thread &t1, tbb_thread &t2)
T last
Must be last field.
concurrent_queue_iterator & operator++()
Advance to next item in queue.
void * my_item
Pointer to current item.
void internal_throw_exception() const
Obsolete.
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
Abstract class to define interface for page allocation/deallocation.
static size_t index(ticket k)
Map ticket to an array index.
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
representation of concurrent_queue_base
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
concurrent_queue_iterator()
size_t items_per_page
Always a power of 2.
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
virtual page * allocate_page() __TBB_override
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
void itt_hide_store_word(T &dst, T src)
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
atomic< ticket > tail_counter
concurrent_queue_base_v3()
concurrent_queue_iterator_base_v3()
Default constructor.
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
micro_queue< T > & my_queue
virtual ~concurrent_queue_page_allocator()
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
Value * operator++(int)
Post increment.
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
padded_page()
Not defined anywhere - exists to quiet warnings.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
Similar to C++0x std::remove_cv.
ptrdiff_t my_capacity
Capacity of the queue.
const concurrent_queue_base_v3< T > & my_queue
Type-independent portion of concurrent_queue_iterator.
atomic< ticket > head_counter
size_t item_size
Size of an item.
atomic< ticket > tail_counter
A lock that occupies a single byte.
Identifiers declared inside namespace internal should never be used directly by client code.
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
auto last(Container &c) -> decltype(begin(c))
void const char const char int ITT_FORMAT __itt_group_sync p
#define __TBB_EXPORTED_METHOD
void call_itt_notify(notify_type, void *)
void assign_and_destroy_item(void *dst, page &src, size_t index)
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
concurrent_queue_rep< T > * my_rep
Internal representation.
Constness-independent portion of concurrent_queue_iterator.
size_t items_per_page
Always a power of 2.
atomic< ticket > head_counter
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
Class used to ensure exception-safety of method "pop".
virtual page * allocate_page()=0
custom allocator
Value & operator*() const
Reference to current item.
concurrent_queue_rep * my_rep
Internal representation.
micro_queue< T > array[n_queue]
virtual void copy_item(page &dst, size_t index, const void *src)=0
static T & get_ref(page &p, size_t index)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
#define __TBB_compiler_fence()
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
virtual void deallocate_page(page *p)=0
custom de-allocator
micro_queue< T >::padded_page padded_page
void invalidate_page_and_rethrow(ticket k)
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
virtual void deallocate_page(concurrent_queue_rep_base::page *p)=0
Copyright © 2005-2020 Intel Corporation. All Rights Reserved.
Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are
registered trademarks or trademarks of Intel Corporation or its
subsidiaries in the United States and other countries.
* Other names and brands may be claimed as the property of others.