17#ifndef __TBB__concurrent_queue_impl_H
18#define __TBB__concurrent_queue_impl_H
20#ifndef __TBB_concurrent_queue_H
21#error Do not #include this internal file directly; use public TBB headers instead.
24#include "../tbb_stddef.h"
25#include "../tbb_machine.h"
27#include "../spin_mutex.h"
28#include "../cache_aligned_allocator.h"
29#include "../tbb_exception.h"
30#include "../tbb_profiling.h"
32#include __TBB_STD_SWAP_HEADER
37#if !__TBB_TEMPLATE_FRIENDS_BROKEN
41template<
typename T,
typename A>
class concurrent_queue;
44template<
typename T,
typename A>
class concurrent_bounded_queue;
73 static const size_t phi = 3;
103 return uintptr_t(
p)>1;
121#if _MSC_VER && !defined(__INTEL_COMPILER)
123#pragma warning( push )
124#pragma warning( disable: 4146 )
146 construct_item( &
get_ref(dst, dindex), src );
152 T& src_item =
get_ref(
const_cast<page&
>(src), sindex );
153 construct_item( &
get_ref(dst, dindex),
static_cast<const void*
>(&src_item) );
216 item_constructor_t construct_item )
226 ++base.
my_rep->n_invalid_entries;
227 invalidate_page_and_rethrow( k );
233 if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.
my_rep );
249 copy_item( *
p, index, item, construct_item );
255 ++base.
my_rep->n_invalid_entries;
272 bool success =
false;
275 if(
p->mask & uintptr_t(1)<<index ) {
277 assign_and_destroy_item( dst, *
p, index );
279 --base.
my_rep->n_invalid_entries;
287 item_constructor_t construct_item )
294 ticket g_index = head_counter;
298 size_t end_in_first_page = (index+n_items<base.
my_rep->items_per_page)?(index+n_items):base.
my_rep->items_per_page;
300 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
301 page* cur_page = head_page;
305 cur_page->
next = make_copy( base, srcp, 0, base.
my_rep->items_per_page, g_index, construct_item );
306 cur_page = cur_page->
next;
311 if( last_index==0 ) last_index = base.
my_rep->items_per_page;
313 cur_page->
next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
314 cur_page = cur_page->
next;
316 tail_page = cur_page;
318 invalidate_page_and_rethrow( g_index );
321 head_page = tail_page = NULL;
329 page* invalid_page = (
page*)uintptr_t(1);
335 q->
next = invalid_page;
337 head_page = invalid_page;
338 tail_page = invalid_page;
346 ticket& g_index, item_constructor_t construct_item )
350 new_page->
next = NULL;
352 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353 if( new_page->
mask & uintptr_t(1)<<begin_in_page )
354 copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
378 my_queue.head_page = q;
380 my_queue.tail_page = NULL;
385 allocator.deallocate_page(
p );
389#if _MSC_VER && !defined(__INTEL_COMPILER)
390#pragma warning( pop )
461 size_t nq =
my_rep->n_queue;
462 for(
size_t i=0; i<nq; i++ )
472 r.
choose(k).push( src, k, *
this, construct_item );
497#if __TBB_CPP11_RVALUE_REF_PRESENT
507 const size_t item_size =
sizeof(T);
514 my_rep->item_size = item_size;
515 my_rep->items_per_page = item_size<= 8 ? 32 :
516 item_size<= 16 ? 16 :
536#if defined(_MSC_VER) && defined(_Wp64)
537 #pragma warning (push)
538 #pragma warning (disable: 4267)
541#if defined(_MSC_VER) && defined(_Wp64)
542 #pragma warning (pop)
548 }
while( !r.
choose( k ).pop( dst, k, *
this ) );
555 __TBB_ASSERT(
sizeof(ptrdiff_t)<=
sizeof(
size_t), NULL );
560 ptrdiff_t sz = tc-hc-nie;
561 return sz<0 ? 0 : size_t(sz);
577 for(
size_t i=0; i<nq; ++i ) {
581 deallocate_page( tp );
582 r.
array[i].tail_page = NULL;
601 for(
size_t i = 0; i < r.
n_queue; ++i )
602 r.
array[i].assign( src.
my_rep->array[i], *
this, construct_item);
605 "the source concurrent queue should not be concurrently modified." );
621 for(
size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
631 if( k==my_queue.my_rep->tail_counter ) {
639 return (
p->mask & uintptr_t(1)<<i)!=0;
645template<
typename Value>
651 template<
typename C,
typename T,
typename U>
654 template<
typename C,
typename T,
typename U>
662#if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
694template<
typename Value>
699 if( !my_rep->get_item(my_item, k) ) advance();
702template<
typename Value>
704 if( my_rep!=other.
my_rep ) {
717template<
typename Value>
719 __TBB_ASSERT( my_item,
"attempt to increment iterator past end of queue" );
720 size_t k = my_rep->head_counter;
724 my_rep->get_item(tmp,k);
733 my_rep->head_counter = ++k;
734 if( !my_rep->get_item(my_item, k) ) advance();
747template<
typename Container,
typename Value>
749 public std::iterator<std::forward_iterator_tag,Value> {
750#if !__TBB_TEMPLATE_FRIENDS_BROKEN
751 template<
typename T,
class A>
752 friend class ::tbb::strict_ppl::concurrent_queue;
779 return *
static_cast<Value*
>(this->
my_item);
799template<
typename C,
typename T,
typename U>
804template<
typename C,
typename T,
typename U>
854#if __TBB_PROTECTED_NESTED_CLASS_BROKEN
915#if __TBB_CPP11_RVALUE_REF_PRESENT
965 template<
typename C,
typename T,
typename U>
968 template<
typename C,
typename T,
typename U>
1011template<
typename Container,
typename Value>
1013 public std::iterator<std::forward_iterator_tag,Value> {
1015#if !__TBB_TEMPLATE_FRIENDS_BROKEN
1016 template<
typename T,
class A>
1017 friend class ::tbb::concurrent_bounded_queue;
1045 return *
static_cast<Value*
>(
my_item);
1065template<
typename C,
typename T,
typename U>
1070template<
typename C,
typename T,
typename U>
#define __TBB_EXPORTED_METHOD
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
#define __TBB_compiler_fence()
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync p
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
void move(tbb_thread &t1, tbb_thread &t2)
Identifiers declared inside namespace internal should never be used directly by client code.
void swap(atomic< T > &lhs, atomic< T > &rhs)
bool operator!=(const vector_iterator< Container, T > &i, const vector_iterator< Container, U > &j)
bool operator==(const vector_iterator< Container, T > &i, const vector_iterator< Container, U > &j)
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void itt_hide_store_word(T &dst, T src)
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
auto last(Container &c) -> decltype(begin(c))
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
void call_itt_notify(notify_type, void *)
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
bool is_valid_page(const concurrent_queue_rep_base::page *p)
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
A queue using simple locking.
atomic< ticket > head_counter
static T & get_ref(page &p, size_t index)
concurrent_queue_rep_base::page page
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
void assign_and_destroy_item(void *dst, page &src, size_t index)
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
atomic< page * > head_page
atomic< page * > tail_page
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
void(* item_constructor_t)(T *location, const void *src)
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
atomic< ticket > tail_counter
void invalidate_page_and_rethrow(ticket k)
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
micro_queue< T > & my_queue
~micro_queue_pop_finalizer()
concurrent_queue_page_allocator & allocator
concurrent_queue_rep_base::page page
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
base class of concurrent_queue
concurrent_queue_base_v3()
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
void internal_finish_clear()
free any remaining pages
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
virtual ~concurrent_queue_base_v3()
micro_queue< T >::padded_page padded_page
micro_queue< T >::item_constructor_t item_constructor_t
void internal_throw_exception() const
Obsolete.
virtual void * allocate_block(size_t n)=0
custom allocator
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
bool internal_empty() const
check if the queue is empty; thread safe
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
concurrent_queue_rep< T > * my_rep
Internal representation.
virtual void deallocate_block(void *p, size_t n)=0
custom de-allocator
virtual page * allocate_page() __TBB_override
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
concurrent_queue_rep< T >::page page
representation of concurrent_queue_base
static size_t index(ticket k)
Map ticket to an array index.
micro_queue< T > array[n_queue]
micro_queue< T > & choose(ticket k)
parts of concurrent_queue_rep that do not have references to micro_queue
atomic< ticket > tail_counter
static const size_t phi
Approximately n_queue/golden ratio.
size_t items_per_page
Always a power of 2.
static const size_t n_queue
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
atomic< ticket > head_counter
char pad1[NFS_MaxLineSize-sizeof(atomic< ticket >)]
size_t item_size
Size of an item.
char pad2[NFS_MaxLineSize-sizeof(atomic< ticket >)]
char pad3[NFS_MaxLineSize-sizeof(size_t) -sizeof(size_t) -sizeof(atomic< size_t >)]
Abstract class to define interface for page allocation/deallocation.
virtual ~concurrent_queue_page_allocator()
virtual concurrent_queue_rep_base::page * allocate_page()=0
virtual void deallocate_page(concurrent_queue_rep_base::page *p)=0
Class used to ensure exception-safety of method "pop".
padded_page()
Not defined anywhere - exists to quiet warnings.
T last
Must be last field.
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
const concurrent_queue_base_v3< T > & my_queue
concurrent_queue_base_v3< T >::page * array[concurrent_queue_rep< T >::n_queue]
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
micro_queue< T >::padded_page padded_page
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
Constness-independent portion of concurrent_queue_iterator.
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
concurrent_queue_iterator_base_v3()
Default constructor.
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
Value * my_item
Pointer to current item.
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
void advance()
Advance iterator one step towards tail of queue.
concurrent_queue_iterator_base_v3(const concurrent_queue_base_v3< Value > &queue)
Construct iterator pointing to head of queue.
~concurrent_queue_iterator_base_v3()
Destructor.
Meets requirements of a forward iterator for STL.
concurrent_queue_iterator()
concurrent_queue_iterator(const concurrent_queue_base_v3< typename tbb_remove_cv< Value >::type > &queue)
Construct iterator pointing to head of queue.
concurrent_queue_iterator & operator++()
Advance to next item in queue.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
Value & operator*() const
Reference to current item.
Value * operator->() const
Value * operator++(int)
Post increment.
Similar to C++0x std::remove_cv.
Meets requirements of a forward iterator for STL.
concurrent_queue_iterator()
Value * operator++(int)
Post increment.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
concurrent_queue_iterator & operator++()
Advance to next item in queue.
Value & operator*() const
Reference to current item.
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
Value * operator->() const
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
void __TBB_EXPORTED_METHOD internal_pop(void *dst)
Dequeue item from head of queue.
void __TBB_EXPORTED_METHOD internal_abort()
Abort all pending queue operations.
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
virtual void assign_and_destroy_item(void *dst, page &src, size_t index)=0
virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3()
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_base_v3 &src)
copy internal representation
concurrent_queue_rep * my_rep
Internal representation.
virtual void copy_item(page &dst, size_t index, const void *src)=0
void __TBB_EXPORTED_METHOD internal_set_capacity(ptrdiff_t capacity, size_t element_size)
Set the queue capacity.
size_t item_size
Size of an item.
void __TBB_EXPORTED_METHOD internal_push(const void *src)
Enqueue item at tail of queue using copy operation.
bool __TBB_EXPORTED_METHOD internal_push_if_not_full(const void *src)
Attempt to enqueue item onto queue using copy operation.
void __TBB_EXPORTED_METHOD internal_throw_exception() const
throw an exception
void internal_assign(const concurrent_queue_base_v3 &src, copy_specifics op_type)
Assigns one queue to another using specified operation (copy or move)
void internal_insert_item(const void *src, copy_specifics op_type)
Enqueues item at tail of queue using specified operation (copy or move)
bool __TBB_EXPORTED_METHOD internal_pop_if_present(void *dst)
Attempt to dequeue item from queue.
__TBB_EXPORTED_METHOD concurrent_queue_base_v3(size_t item_size)
ptrdiff_t my_capacity
Capacity of the queue.
virtual void deallocate_page(page *p)=0
custom de-allocator
virtual page * allocate_page()=0
custom allocator
bool internal_insert_if_not_full(const void *src, copy_specifics op_type)
Attempts to enqueue at tail of queue using specified operation (copy or move)
void internal_swap(concurrent_queue_base_v3 &src)
swap queues
void __TBB_EXPORTED_METHOD internal_finish_clear()
free any remaining pages
ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const
Get size of queue.
bool __TBB_EXPORTED_METHOD internal_empty() const
Check if the queue is empty.
size_t items_per_page
Always a power of 2.
T last
Must be last field.
padded_page()
Not defined anywhere - exists to quiet warnings.
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
virtual void move_item(page &dst, size_t index, const void *src)=0
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
concurrent_queue_base_v8(size_t item_sz)
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
Type-independent portion of concurrent_queue_iterator.
void initialize(const concurrent_queue_base_v3 &queue, size_t offset_of_data)
void * my_item
Pointer to current item.
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
concurrent_queue_iterator_base_v3()
Default constructor.
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_iterator_base_v3 &i)
Assignment.
void __TBB_EXPORTED_METHOD advance()
Advance iterator one step towards tail of queue.
__TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3()
Destructor.
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
A lock that occupies a single byte.
Represents acquisition of a mutex.
Class that implements exponential backoff.
void pause()
Pause for a while.
Base class for types that should not be assigned.
Base class for types that should not be copied or assigned.
A queue using simple locking.
Internal representation of a ConcurrentQueue.