GNU Radio C++ API Reference  g65ab7ea
The Free & Open Software Radio Ecosystem
block_detail.h
Go to the documentation of this file.
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * SPDX-License-Identifier: GPL-3.0-or-later
8  *
9  */
10 
11 #ifndef INCLUDED_GR_RUNTIME_BLOCK_DETAIL_H
12 #define INCLUDED_GR_RUNTIME_BLOCK_DETAIL_H
13 
14 #include <gnuradio/api.h>
15 #include <gnuradio/buffer.h>
16 #include <gnuradio/buffer_reader.h>
18 #include <gnuradio/logger.h>
19 #include <gnuradio/runtime_types.h>
20 #include <gnuradio/tags.h>
21 #include <gnuradio/tpb_detail.h>
22 #include <stdexcept>
23 
24 namespace gr {
25 
26 /*!
27  * \brief Implementation details to support the signal processing abstraction
28  * \ingroup internal
29  *
30  * This class contains implementation detail that should be "out of
31  * sight" of almost all users of GNU Radio. This decoupling also
32  * means that we can make changes to the guts without having to
33  * recompile everything.
34  */
36 {
37 public:
39 
40  int ninputs() const { return d_ninputs; }
41  int noutputs() const { return d_noutputs; }
42  bool sink_p() const { return d_noutputs == 0; }
43  bool source_p() const { return d_ninputs == 0; }
44 
45  void set_done(bool done);
46  bool done() const { return d_done; }
47 
48  void set_input(unsigned int which, buffer_reader_sptr reader);
49  buffer_reader_sptr input(unsigned int which)
50  {
51  if (which >= d_ninputs)
52  throw std::invalid_argument("block_detail::input");
53  return d_input[which];
54  }
55 
56  void set_output(unsigned int which, buffer_sptr buffer);
57  buffer_sptr output(unsigned int which)
58  {
59  if (which >= d_noutputs)
60  throw std::invalid_argument("block_detail::output");
61  return d_output[which];
62  }
63 
64  /*!
65  * \brief Tell the scheduler \p how_many_items of input stream \p
66  * which_input were consumed.
67  */
68  void consume(int which_input, int how_many_items);
69 
70  /*!
71  * \brief Tell the scheduler \p how_many_items were consumed on
72  * each input stream.
73  */
74  void consume_each(int how_many_items);
75 
76  /*!
77  * \brief Tell the scheduler \p how_many_items were produced on
78  * output stream \p which_output.
79  */
80  void produce(int which_output, int how_many_items);
81 
82  /*!
83  * \brief Tell the scheduler \p how_many_items were produced on
84  * each output stream.
85  */
86  void produce_each(int how_many_items);
87 
88  // Return the number of items read on input stream which_input
89  uint64_t nitems_read(unsigned int which_input);
90 
91  // Return the number of items written on output stream which_output
92  uint64_t nitems_written(unsigned int which_output);
93 
94  // sets nitems_read and nitems_written to 0 for all input/output
95  // buffers.
97 
98  // Clears all tags from the input buffers.
99  void clear_tags();
100 
101  /*!
102  * \brief Adds a new tag to the given output stream.
103  *
104  * Calls gr::buffer::add_item_tag(),
105  * which appends the tag onto its deque.
106  *
107  * \param which_output an integer of which output stream to attach the tag
108  * \param tag the tag object to add
109  */
110  void add_item_tag(unsigned int which_output, const tag_t& tag);
111 
112  /*!
113  * \brief Given a [start,end), returns a vector of all tags in the range.
114  *
115  * Pass-through function to gr::buffer_reader to get a vector of
116  * tags in given range. Range of counts is from start to end-1.
117  *
118  * Tags are tuples of:
119  * (item count, source id, key, value)
120  *
121  * \param v a vector reference to return tags into
122  * \param which_input an integer of which input stream to pull from
123  * \param abs_start a uint64 count of the start of the range of interest
124  * \param abs_end a uint64 count of the end of the range of interest
125  */
126  void get_tags_in_range(std::vector<tag_t>& v,
127  unsigned int which_input,
128  uint64_t abs_start,
129  uint64_t abs_end);
130 
131  /*!
132  * \brief Given a [start,end), returns a vector of all tags in the
133  * range with a given key.
134  *
135  * Calls get_tags_in_range(which_input, abs_start, abs_end) to get
136  * a vector of tags from the buffers. This function then provides
137  * a secondary filter to the tags to extract only tags with the
138  * given 'key'.
139  *
140  * The vector is sorted ascendingly by the offset of the tags.
141  *
142  * Tags are tuples of:
143  * (item count, source id, key, value)
144  *
145  * \param v a vector reference to return tags into
146  * \param which_input an integer of which input stream to pull from
147  * \param abs_start a uint64 count of the start of the range of interest
148  * \param abs_end a uint64 count of the end of the range of interest
149  * \param key a PMT symbol to select only tags of this key
150  */
151  void get_tags_in_range(std::vector<tag_t>& v,
152  unsigned int which_input,
153  uint64_t abs_start,
154  uint64_t abs_end,
155  const pmt::pmt_t& key);
156 
157  /*!
158  * \brief Get the first tag in specified range (if any), fulfilling criterion
159  *
160  * \details
161  * This function returns the lowest-offset tag in the range for whom the predicate
162  * function returns true.
163  *
164  * The predicate function hence needs to map tags to booleans; its signature is
165  * bool function(const tag_t& tag_to check);
166  *
167  * A sensible choice is a side-effect-free lambda, e.g., you'd use this as:
168  *
169  * auto timestamp = get_first_tag_in_range(
170  * 0, // which input
171  * nitems_read(0), // start index
172  * nitems_read(0) + something, // end
173  * [this](const gr::tag_t& tag) {
174  * return pmt::eqv(tag.key, d_time_tag) && !pmt::is_null(tag.value)
175  * });
176  * if (timestamp) {
177  * d_logger->info("got time tag {} at offset {}",
178  * timestamp.value.value,
179  * timestamp.value.offset);
180  * }
181  *
182  * \param which_input an integer of which input stream to pull from
183  * \param start a uint64 count of the start of the range of interest
184  * \param end a uint64 count of the end of the range of interest
185  * \param predicate a function of tag_t, returning a boolean
186  */
187  [[nodiscard]] std::optional<gr::tag_t> get_first_tag_in_range(
188  unsigned which_input,
189  uint64_t start,
190  uint64_t end,
191  std::function<bool(const gr::tag_t&)> predicate = [](const gr::tag_t&) {
192  return true;
193  });
194 
195  /*!
196  * \brief Set core affinity of block to the cores in the vector
197  * mask.
198  *
199  * \param mask a vector of ints of the core numbers available to
200  * this block.
201  */
202  void set_processor_affinity(const std::vector<int>& mask);
203 
204  /*!
205  * \brief Unset core affinity.
206  */
208 
209  /*!
210  * \brief Get the current thread priority
211  */
213 
214  /*!
215  * \brief Set the current thread priority
216  *
217  * \param priority the new thread priority to set
218  */
219  int set_thread_priority(int priority);
220 
221  /*!
222  * Post general_work() cleanup to decrement the active counts for all inputs
223  * and outputs.
224  */
226  {
227  // Decrement active counts for all inputs and outputs
228  for (int i = 0; i < noutputs(); i++)
229  output(i)->decrement_active();
230  for (int i = 0; i < ninputs(); i++)
231  input(i)->buffer()->decrement_active();
232  }
233 
234  bool threaded; // set if thread is currently running.
235  gr::thread::gr_thread_t thread; // portable thread handle
236 
238  void stop_perf_counters(int noutput_items, int nproduced);
240 
241  // Calls to get performance counter items
243  float pc_nproduced();
244  float pc_input_buffers_full(size_t which);
245  std::vector<float> pc_input_buffers_full();
246  float pc_output_buffers_full(size_t which);
247  std::vector<float> pc_output_buffers_full();
248  float pc_work_time();
249 
252  float pc_input_buffers_full_avg(size_t which);
253  std::vector<float> pc_input_buffers_full_avg();
254  float pc_output_buffers_full_avg(size_t which);
255  std::vector<float> pc_output_buffers_full_avg();
258 
261  float pc_input_buffers_full_var(size_t which);
262  std::vector<float> pc_input_buffers_full_var();
263  float pc_output_buffers_full_var(size_t which);
264  std::vector<float> pc_output_buffers_full_var();
266 
268 
269  tpb_detail d_tpb; // used by thread-per-block scheduler
271 
272  int consumed() const;
273 
274  // necessary because stupidly block_executor.cc's "propagate_tags" is a function, not
275  // any class member
277 
278  // ----------------------------------------------------------------------------
279 
280 private:
281  unsigned int d_ninputs;
282  unsigned int d_noutputs;
283  std::vector<buffer_reader_sptr> d_input;
284  std::vector<buffer_sptr> d_output;
285  bool d_done;
286  int d_consumed;
287 
288  // Performance counters
289  float d_ins_noutput_items;
290  float d_avg_noutput_items;
291  float d_var_noutput_items;
292  float d_total_noutput_items;
293  gr::high_res_timer_type d_pc_start_time;
294  gr::high_res_timer_type d_pc_last_work_time;
295  float d_ins_nproduced;
296  float d_avg_nproduced;
297  float d_var_nproduced;
298  std::vector<float> d_ins_input_buffers_full;
299  std::vector<float> d_avg_input_buffers_full;
300  std::vector<float> d_var_input_buffers_full;
301  std::vector<float> d_ins_output_buffers_full;
302  std::vector<float> d_avg_output_buffers_full;
303  std::vector<float> d_var_output_buffers_full;
304  gr::high_res_timer_type d_start_of_work, d_end_of_work;
305  float d_ins_work_time;
306  float d_avg_work_time;
307  float d_var_work_time;
308  float d_total_work_time;
309  float d_avg_throughput;
310  float d_pc_counter;
311 
312  block_detail(unsigned int ninputs, unsigned int noutputs);
313 
314  friend struct tpb_detail;
315 
316  friend GR_RUNTIME_API block_detail_sptr make_block_detail(unsigned int ninputs,
317  unsigned int noutputs);
318 };
319 
320 GR_RUNTIME_API block_detail_sptr make_block_detail(unsigned int ninputs,
321  unsigned int noutputs);
322 
324 
325 } /* namespace gr */
326 
327 #endif /* INCLUDED_GR_RUNTIME_BLOCK_DETAIL_H */
Implementation details to support the signal processing abstraction.
Definition: block_detail.h:36
float pc_work_time()
std::optional< gr::tag_t > get_first_tag_in_range(unsigned which_input, uint64_t start, uint64_t end, std::function< bool(const gr::tag_t &)> predicate=[](const gr::tag_t &) { return true;})
Get the first tag in specified range (if any), fulfilling criterion.
float pc_work_time_total()
bool threaded
Definition: block_detail.h:234
std::vector< float > pc_output_buffers_full_avg()
void set_processor_affinity(const std::vector< int > &mask)
Set core affinity of block to the cores in the vector mask.
friend GR_RUNTIME_API block_detail_sptr make_block_detail(unsigned int ninputs, unsigned int noutputs)
bool done() const
Definition: block_detail.h:46
float pc_noutput_items_avg()
float pc_nproduced_avg()
float pc_input_buffers_full_avg(size_t which)
int thread_priority()
Get the current thread priority.
buffer_reader_sptr input(unsigned int which)
Definition: block_detail.h:49
tpb_detail d_tpb
Definition: block_detail.h:269
void start_perf_counters()
float pc_output_buffers_full_avg(size_t which)
gr::logger_ptr d_debug_logger
Definition: block_detail.h:276
void set_output(unsigned int which, buffer_sptr buffer)
float pc_nproduced()
std::vector< float > pc_input_buffers_full_avg()
void produce_each(int how_many_items)
Tell the scheduler how_many_items were produced on each output stream.
float pc_nproduced_var()
std::vector< float > pc_output_buffers_full()
float pc_work_time_avg()
bool sink_p() const
Definition: block_detail.h:42
void get_tags_in_range(std::vector< tag_t > &v, unsigned int which_input, uint64_t abs_start, uint64_t abs_end)
Given a [start,end), returns a vector of all tags in the range.
float pc_output_buffers_full(size_t which)
int noutputs() const
Definition: block_detail.h:41
float pc_noutput_items_var()
uint64_t nitems_written(unsigned int which_output)
std::vector< float > pc_input_buffers_full_var()
uint64_t nitems_read(unsigned int which_input)
void add_item_tag(unsigned int which_output, const tag_t &tag)
Adds a new tag to the given output stream.
void get_tags_in_range(std::vector< tag_t > &v, unsigned int which_input, uint64_t abs_start, uint64_t abs_end, const pmt::pmt_t &key)
Given a [start,end), returns a vector of all tags in the range with a given key.
float pc_input_buffers_full(size_t which)
std::vector< float > pc_output_buffers_full_var()
float pc_noutput_items()
void consume(int which_input, int how_many_items)
Tell the scheduler how_many_items of input stream which_input were consumed.
int d_produce_or
Definition: block_detail.h:270
void consume_each(int how_many_items)
Tell the scheduler how_many_items were consumed on each input stream.
bool source_p() const
Definition: block_detail.h:43
std::vector< float > pc_input_buffers_full()
gr::thread::gr_thread_t thread
Definition: block_detail.h:235
int consumed() const
float pc_work_time_var()
float pc_output_buffers_full_var(size_t which)
int set_thread_priority(int priority)
Set the current thread priority.
void produce(int which_output, int how_many_items)
Tell the scheduler how_many_items were produced on output stream which_output.
void set_input(unsigned int which, buffer_reader_sptr reader)
void reset_perf_counters()
float pc_throughput_avg()
void reset_nitem_counters()
buffer_sptr output(unsigned int which)
Definition: block_detail.h:57
void unset_processor_affinity()
Unset core affinity.
float pc_input_buffers_full_var(size_t which)
void stop_perf_counters(int noutput_items, int nproduced)
void post_work_cleanup()
Definition: block_detail.h:225
void set_done(bool done)
int ninputs() const
Definition: block_detail.h:40
Single writer, multiple reader fifo.
Definition: buffer.h:67
#define GR_RUNTIME_API
Definition: gnuradio-runtime/include/gnuradio/api.h:18
pthread_t gr_thread_t
a system-dependent typedef for the underlying thread type.
Definition: thread.h:47
GNU Radio logging wrapper.
Definition: basic_block.h:29
std::shared_ptr< logger > logger_ptr
Definition: logger.h:248
GR_RUNTIME_API block_detail_sptr make_block_detail(unsigned int ninputs, unsigned int noutputs)
signed long long high_res_timer_type
Typedef for the timer tick count.
Definition: high_res_timer.h:41
GR_RUNTIME_API long block_detail_ncurrently_allocated()
std::shared_ptr< pmt_base > pmt_t
typedef for shared pointer (transparent reference counting).
Definition: pmt.h:85
Definition: tags.h:28
used by thread-per-block scheduler
Definition: tpb_detail.h:26
Definition: cc_common.h:35