DPDK  24.11.6
rte_ring_rts_elem_pvt.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_RTS_ELEM_PVT_H_
11 #define _RTE_RING_RTS_ELEM_PVT_H_
12 
24 static __rte_always_inline void
25 __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
26 {
27  union __rte_ring_rts_poscnt h, ot, nt;
28 
29  /*
30  * If there are other enqueues/dequeues in progress that
31  * might preceded us, then don't update tail with new value.
32  */
33 
34  /*
35  * A0 = {A0.a, A0.b}: Synchronizes with the CAS at R0.
36  * The CAS at R0 in same typed thread establishes a happens-before
37  * relationship with this load acquire. Ensures that this thread
38  * observes the same or later values for h.raw/h.val.cnt
39  * observed by the other thread when it updated ht->tail.raw.
40  * If not, ht->tail.raw may get updated out of sync (e.g. getting
41  * updated to the same value twice). A0.a makes sure this condition
42  * holds when CAS succeeds and A0.b when it fails.
43  */
44  /* A0.a */
45  ot.raw = rte_atomic_load_explicit(&ht->tail.raw, rte_memory_order_acquire);
46 
47  do {
48  /* on 32-bit systems we have to do atomic read here */
49  h.raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_relaxed);
50 
51  nt.raw = ot.raw;
52  if (++nt.val.cnt == h.val.cnt)
53  nt.val.pos = h.val.pos;
54 
55  /*
56  * R0: Synchronizes with A2 of a different thread of the opposite type and A0.b
57  * of a different thread of the same type.
58  */
59  /* A0.b */
60  } while (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
61  (uint64_t *)(uintptr_t)&ot.raw, nt.raw,
62  rte_memory_order_release, rte_memory_order_acquire) == 0);
63 }
64 
69 static __rte_always_inline union __rte_ring_rts_poscnt
70 __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
71  int memorder)
72 {
73  union __rte_ring_rts_poscnt h;
74  uint32_t max = ht->htd_max;
75 
76 
77  h.raw = rte_atomic_load_explicit(&ht->head.raw, memorder);
78 
79  while (h.val.pos - ht->tail.val.pos > max) {
80  rte_pause();
81  h.raw = rte_atomic_load_explicit(&ht->head.raw, memorder);
82  }
83 
84  return h;
85 }
86 
90 static __rte_always_inline uint32_t
91 __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
92  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
93  uint32_t *free_entries)
94 {
95  uint32_t n, cons_tail;
96  union __rte_ring_rts_poscnt nh, oh;
97 
98  const uint32_t capacity = r->capacity;
99 
100  do {
101  /* Reset n to the initial burst count */
102  n = num;
103 
104  /*
105  * wait for prod head/tail distance,
106  * make sure that we read prod head *before*
107  * reading cons tail.
108  */
109  /*
110  * A1 Synchronizes with the CAS at R1.
111  * Establishes a happens-before relationship with a thread of the same
112  * type that released the ht.raw, ensuring this thread observes all of
113  * its memory effects needed to maintain a safe partial order.
114  */
115  oh = __rte_ring_rts_head_wait(&r->rts_prod, rte_memory_order_acquire);
116 
117  /*
118  * A2: Establish a synchronizes-with edge using a store-release at R0.
119  * This ensures that all memory effects from the preceding opposing
120  * thread are observed.
121  */
122  cons_tail = rte_atomic_load_explicit(&r->cons.tail, rte_memory_order_acquire);
123 
124  /*
125  * The subtraction is done between two unsigned 32bits value
126  * (the result is always modulo 32 bits even if we have
127  * *old_head > cons_tail). So 'free_entries' is always between 0
128  * and capacity (which is < size).
129  */
130  *free_entries = capacity + cons_tail - oh.val.pos;
131 
132  /* check that we have enough room in ring */
133  if (unlikely(n > *free_entries))
134  n = (behavior == RTE_RING_QUEUE_FIXED) ?
135  0 : *free_entries;
136 
137  if (n == 0)
138  break;
139 
140  nh.val.pos = oh.val.pos + n;
141  nh.val.cnt = oh.val.cnt + 1;
142 
143  /*
144  * R1: Establishes a synchronizes-with edge with the load-acquire
145  * of ht.raw at A1. Ensures that the store-release to the tail by
146  * this thread, if it was of the opposite type, becomes
147  * visible to another thread of the current type. That thread will
148  * then observe the updates in the same order, keeping a safe
149  * partial order.
150  */
151  } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_prod.head.raw,
152  (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
153  rte_memory_order_release, rte_memory_order_relaxed) == 0);
154 
155  *old_head = oh.val.pos;
156  return n;
157 }
158 
162 static __rte_always_inline unsigned int
163 __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
164  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
165  uint32_t *entries)
166 {
167  uint32_t n, prod_tail;
168  union __rte_ring_rts_poscnt nh, oh;
169 
170  /* move cons.head atomically */
171  do {
172  /* Restore n as it may change every loop */
173  n = num;
174 
175  /*
176  * wait for cons head/tail distance,
177  * make sure that we read cons head *before*
178  * reading prod tail.
179  */
180  /*
181  * A3: Synchronizes with the CAS at R2.
182  * Establishes a happens-before relationship with a thread of the same
183  * type that released the ht.raw, ensuring this thread observes all of
184  * its memory effects needed to maintain a safe partial order.
185  */
186  oh = __rte_ring_rts_head_wait(&r->rts_cons, rte_memory_order_acquire);
187 
188  /*
189  * A4: Establish a synchronizes-with edge using a store-release at R0.
190  * This ensures that all memory effects from the preceding opposing
191  * thread are observed.
192  */
193  prod_tail = rte_atomic_load_explicit(&r->prod.tail, rte_memory_order_acquire);
194 
195  /* The subtraction is done between two unsigned 32bits value
196  * (the result is always modulo 32 bits even if we have
197  * cons_head > prod_tail). So 'entries' is always between 0
198  * and size(ring)-1.
199  */
200  *entries = prod_tail - oh.val.pos;
201 
202  /* Set the actual entries for dequeue */
203  if (n > *entries)
204  n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
205 
206  if (unlikely(n == 0))
207  break;
208 
209  nh.val.pos = oh.val.pos + n;
210  nh.val.cnt = oh.val.cnt + 1;
211 
212  /*
213  * R2: Establishes a synchronizes-with edge with the load-acquire
214  * of ht.raw at A3. Ensures that the store-release to the tail by
215  * this thread, if it was of the opposite type, becomes
216  * visible to another thread of the current type. That thread will
217  * then observe the updates in the same order, keeping a safe
218  * partial order.
219  */
220  } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_cons.head.raw,
221  (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
222  rte_memory_order_release, rte_memory_order_relaxed) == 0);
223 
224  *old_head = oh.val.pos;
225  return n;
226 }
227 
250 static __rte_always_inline unsigned int
251 __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
252  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
253  uint32_t *free_space)
254 {
255  uint32_t free, head;
256 
257  n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
258 
259  if (n != 0) {
260  __rte_ring_enqueue_elems(r, head, obj_table, esize, n);
261  __rte_ring_rts_update_tail(&r->rts_prod);
262  }
263 
264  if (free_space != NULL)
265  *free_space = free - n;
266  return n;
267 }
268 
291 static __rte_always_inline unsigned int
292 __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
293  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
294  uint32_t *available)
295 {
296  uint32_t entries, head;
297 
298  n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
299 
300  if (n != 0) {
301  __rte_ring_dequeue_elems(r, head, obj_table, esize, n);
302  __rte_ring_rts_update_tail(&r->rts_cons);
303  }
304 
305  if (available != NULL)
306  *available = entries - n;
307  return n;
308 }
309 
310 #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
#define __rte_always_inline
Definition: rte_common.h:413
rte_ring_queue_behavior
Definition: rte_ring_core.h:40
#define unlikely(x)
static void rte_pause(void)
uint32_t capacity