Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
Loading...
Searching...
No Matches
task_stream.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef _TBB_task_stream_H
18#define _TBB_task_stream_H
19
20#include "tbb/tbb_stddef.h"
21#include <deque>
22#include <climits>
23#include "tbb/atomic.h" // for __TBB_Atomic*
24#include "tbb/spin_mutex.h"
25#include "tbb/tbb_allocator.h"
26#include "scheduler_common.h"
27#include "tbb_misc.h" // for FastRandom
28
29namespace tbb {
30namespace internal {
31
33
35template< typename T, typename mutex_t >
37 typedef std::deque< T, tbb_allocator<T> > queue_base_t;
38
41
44};
45
46typedef uintptr_t population_t;
47const population_t one = 1;
48
49inline void set_one_bit( population_t& dest, int pos ) {
50 __TBB_ASSERT( pos>=0, NULL );
51 __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
52 __TBB_AtomicOR( &dest, one<<pos );
53}
54
55inline void clear_one_bit( population_t& dest, int pos ) {
56 __TBB_ASSERT( pos>=0, NULL );
57 __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
58 __TBB_AtomicAND( &dest, ~(one<<pos) );
59}
60
61inline bool is_bit_set( population_t val, int pos ) {
62 __TBB_ASSERT( pos>=0, NULL );
63 __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
64 return (val & (one<<pos)) != 0;
65}
66
68template<int Levels>
70 typedef queue_and_mutex <task*, spin_mutex> lane_t;
73 unsigned N;
74
75public:
76 task_stream() : N() {
77 for(int level = 0; level < Levels; level++) {
78 population[level] = 0;
79 lanes[level] = NULL;
80 }
81 }
82
83 void initialize( unsigned n_lanes ) {
84 const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
85
86 N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
87 __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
88 __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
89 for(int level = 0; level < Levels; level++) {
90 lanes[level] = new padded<lane_t>[N];
91 __TBB_ASSERT( !population[level], NULL );
92 }
93 }
94
96 for(int level = 0; level < Levels; level++)
97 if (lanes[level]) delete[] lanes[level];
98 }
99
101 void push( task* source, int level, FastRandom& random ) {
102 // Lane selection is random. Each thread should keep a separate seed value.
103 unsigned idx;
104 for( ; ; ) {
105 idx = random.get() & (N-1);
107 if( lock.try_acquire(lanes[level][idx].my_mutex) ) {
108 lanes[level][idx].my_queue.push_back(source);
109 set_one_bit( population[level], idx ); //TODO: avoid atomic op if the bit is already set
110 break;
111 }
112 }
113 }
114
116 task* pop( int level, unsigned& last_used_lane ) {
117 task* result = NULL;
118 // Lane selection is round-robin. Each thread should keep its last used lane.
119 unsigned idx = (last_used_lane+1)&(N-1);
120 for( ; population[level]; idx=(idx+1)&(N-1) ) {
121 if( is_bit_set( population[level], idx ) ) {
122 lane_t& lane = lanes[level][idx];
124 if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
125 result = lane.my_queue.front();
126 lane.my_queue.pop_front();
127 if( lane.my_queue.empty() )
128 clear_one_bit( population[level], idx );
129 break;
130 }
131 }
132 }
133 last_used_lane = idx;
134 return result;
135 }
136
138 bool empty(int level) {
139 return !population[level];
140 }
141
143
145 intptr_t drain() {
146 intptr_t result = 0;
147 for(int level = 0; level < Levels; level++)
148 for(unsigned i=0; i<N; ++i) {
149 lane_t& lane = lanes[level][i];
151 for(lane_t::queue_base_t::iterator it=lane.my_queue.begin();
152 it!=lane.my_queue.end(); ++it, ++result)
153 {
154 __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
155 task* t = *it;
156 tbb::task::destroy(*t);
157 }
158 lane.my_queue.clear();
159 clear_one_bit( population[level], i );
160 }
161 return result;
162 }
163}; // task_stream
164
165} // namespace internal
166} // namespace tbb
167
168#endif /* _TBB_task_stream_H */
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition tbb_stddef.h:165
#define __TBB_AtomicAND(P, V)
#define __TBB_AtomicOR(P, V)
#define __TBB_Log2(V)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
CRITICAL_SECTION mutex_t
The graph class.
bool is_bit_set(population_t val, int pos)
Definition task_stream.h:61
void clear_one_bit(population_t &dest, int pos)
Definition task_stream.h:55
uintptr_t population_t
Definition task_stream.h:46
void set_one_bit(population_t &dest, int pos)
Definition task_stream.h:49
const population_t one
Definition task_stream.h:47
Represents acquisition of a mutex.
Definition spin_mutex.h:53
Base class for user-defined tasks.
Definition task.h:615
Pads type T to fill out to a multiple of cache line size.
Definition tbb_stddef.h:261
Base class for types that should not be copied or assigned.
Definition tbb_stddef.h:330
Essentially, this is just a pair of a queue and a mutex to protect the queue.
Definition task_stream.h:36
std::deque< T, tbb_allocator< T > > queue_base_t
Definition task_stream.h:37
The container for "fairness-oriented" aka "enqueued" tasks.
Definition task_stream.h:69
void push(task *source, int level, FastRandom &random)
Push a task into a lane.
padded< lane_t > * lanes[Levels]
Definition task_stream.h:72
void initialize(unsigned n_lanes)
Definition task_stream.h:83
population_t population[Levels]
Definition task_stream.h:71
task * pop(int level, unsigned &last_used_lane)
Try finding and popping a task.
queue_and_mutex< task *, spin_mutex > lane_t
Definition task_stream.h:70
intptr_t drain()
Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.
bool empty(int level)
Checks existence of a task.
A fast random number generator.
Definition tbb_misc.h:135
unsigned short get()
Get a random number.
Definition tbb_misc.h:146

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.