GRPC C++  1.26.0
call_combiner.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
20 #define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
21 
23 
24 #include <stddef.h>
25 
26 #include <grpc/support/atm.h>
27 
35 
36 // A simple, lock-free mechanism for serializing activity related to a
37 // single call. This is similar to a combiner but is more lightweight.
38 //
39 // It requires the callback (or, in the common case where the callback
40 // actually kicks off a chain of callbacks, the last callback in that
41 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
42 // when it is done with the action that was kicked off by the original
43 // callback.
44 
45 namespace grpc_core {
46 
48 
49 class CallCombiner {
50  public:
51  CallCombiner();
52  ~CallCombiner();
53 
54 #ifndef NDEBUG
55 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
56  (call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason))
57 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
58  (call_combiner)->Stop(__FILE__, __LINE__, (reason))
59  void Start(grpc_closure* closure, grpc_error* error, const char* file,
61  int line, const char* reason);
63  void Stop(const char* file, int line, const char* reason);
64 #else
65 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
66  (call_combiner)->Start((closure), (error), (reason))
67 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
68  (call_combiner)->Stop((reason))
69  void Start(grpc_closure* closure, grpc_error* error, const char* reason);
72  void Stop(const char* reason);
73 #endif
74 
100  void SetNotifyOnCancel(grpc_closure* closure);
101 
103  void Cancel(grpc_error* error);
104 
105  private:
106  void ScheduleClosure(grpc_closure* closure, grpc_error* error);
107 #ifdef GRPC_TSAN_ENABLED
108  static void TsanClosure(void* arg, grpc_error* error);
109 #endif
110 
111  gpr_atm size_ = 0; // size_t, num closures in queue or currently executing
112  MultiProducerSingleConsumerQueue queue_;
113  // Either 0 (if not cancelled and no cancellation closure set),
114  // a grpc_closure* (if the lowest bit is 0),
115  // or a grpc_error* (if the lowest bit is 1).
116  gpr_atm cancel_state_ = 0;
117 #ifdef GRPC_TSAN_ENABLED
118  // A fake ref-counted lock that is kept alive after the destruction of
119  // grpc_call_combiner, when we are running the original closure.
120  //
121  // Ideally we want to lock and unlock the call combiner as a pointer, when the
122  // callback is called. However, original_closure is free to trigger
123  // anything on the call combiner (including destruction of grpc_call).
124  // Thus, we need a ref-counted structure that can outlive the call combiner.
125  struct TsanLock : public RefCounted<TsanLock, NonPolymorphicRefCount> {
126  TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); }
127  ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); }
128  // To avoid double-locking by the same thread, we should acquire/release
129  // the lock only when taken is false. On each acquire taken must be set to
130  // true.
131  std::atomic<bool> taken{false};
132  };
133  RefCountedPtr<TsanLock> tsan_lock_ = MakeRefCounted<TsanLock>();
134  grpc_closure tsan_closure_;
135  grpc_closure* original_closure_;
136 #endif
137 };
138 
139 // Helper for running a list of closures in a call combiner.
140 //
141 // Each callback running in the call combiner will eventually be
142 // returned to the surface, at which point the surface will yield the
143 // call combiner. So when we are running in the call combiner and have
144 // more than one callback to return to the surface, we need to re-enter
145 // the call combiner for all but one of those callbacks.
147  public:
149 
150  // Adds a closure to the list. The closure must eventually result in
151  // the call combiner being yielded.
152  void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
153  closures_.emplace_back(closure, error, reason);
154  }
155 
156  // Runs all closures in the call combiner and yields the call combiner.
157  //
158  // All but one of the closures in the list will be scheduled via
159  // GRPC_CALL_COMBINER_START(), and the remaining closure will be
160  // scheduled via ExecCtx::Run(), which will eventually result
161  // in yielding the call combiner. If the list is empty, then the call
162  // combiner will be yielded immediately.
163  void RunClosures(CallCombiner* call_combiner) {
164  if (closures_.empty()) {
165  GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
166  return;
167  }
168  for (size_t i = 1; i < closures_.size(); ++i) {
169  auto& closure = closures_[i];
170  GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
171  closure.reason);
172  }
175  "CallCombinerClosureList executing closure while already "
176  "holding call_combiner %p: closure=%p error=%s reason=%s",
177  call_combiner, closures_[0].closure,
178  grpc_error_string(closures_[0].error), closures_[0].reason);
179  }
180  // This will release the call combiner.
181  ExecCtx::Run(DEBUG_LOCATION, closures_[0].closure, closures_[0].error);
182  closures_.clear();
183  }
184 
185  // Runs all closures in the call combiner, but does NOT yield the call
186  // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
188  for (size_t i = 0; i < closures_.size(); ++i) {
189  auto& closure = closures_[i];
190  GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
191  closure.reason);
192  }
193  closures_.clear();
194  }
195 
196  size_t size() const { return closures_.size(); }
197 
198  private:
199  struct CallCombinerClosure {
200  grpc_closure* closure;
201  grpc_error* error;
202  const char* reason;
203 
204  CallCombinerClosure(grpc_closure* closure, grpc_error* error,
205  const char* reason)
206  : closure(closure), error(error), reason(reason) {}
207  };
208 
209  // There are generally a maximum of 6 closures to run in the call
210  // combiner, one for each pending op.
211  InlinedVector<CallCombinerClosure, 6> closures_;
212 };
213 
214 } // namespace grpc_core
215 
216 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
void Add(grpc_closure *closure, grpc_error *error, const char *reason)
Definition: call_combiner.h:152
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason)
Definition: call_combiner.h:57
#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason)
Definition: call_combiner.h:55
#define GPR_INFO
Definition: log.h:56
DebugOnlyTraceFlag grpc_call_combiner_trace
Definition: call_combiner.h:49
Definition: error_internal.h:39
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
Log a message.
const char * grpc_error_string(grpc_error *error)
#define DEBUG_LOCATION
Definition: debug_location.h:41
size_t size() const
Definition: call_combiner.h:196
Internal thread interface.
Definition: backoff.h:26
void Cancel(grpc_error *error)
Indicates that the call has been cancelled.
Definition: call_combiner.h:146
void SetNotifyOnCancel(grpc_closure *closure)
Registers closure to be invoked when Cancel() is called.
bool empty() const
Definition: inlined_vector.h:166
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error *error)
intptr_t gpr_atm
Definition: atm_gcc_atomic.h:30
size_t size() const
Definition: inlined_vector.h:165
CallCombinerClosureList()
Definition: call_combiner.h:148
#define TSAN_ANNOTATE_RWLOCK_CREATE(addr)
Definition: dynamic_annotations.h:60
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: trace.h:112
A closure over a grpc_iomgr_cb_func.
Definition: closure.h:56
void RunClosuresWithoutYielding(CallCombiner *call_combiner)
Definition: call_combiner.h:187
void clear()
Definition: inlined_vector.h:170
void Stop(const char *file, int line, const char *reason)
Yields the call combiner to the next closure in the queue, if any.
void Start(grpc_closure *closure, grpc_error *error, const char *file, int line, const char *reason)
Starts processing closure.
#define TSAN_ANNOTATE_RWLOCK_DESTROY(addr)
Definition: dynamic_annotations.h:61
void RunClosures(CallCombiner *call_combiner)
Definition: call_combiner.h:163
void emplace_back(Args &&... args)
Definition: inlined_vector.h:145
TraceFlag DebugOnlyTraceFlag
Definition: trace.h:115