GRPC Core  9.0.0
subchannel.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
21 
23 
31 #include "src/core/lib/gprpp/map.h"
39 
40 // Channel arg containing a grpc_resolved_address to connect to.
41 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
42 
43 // For debugging refcounting.
44 #ifndef NDEBUG
45 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
46 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
47 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
48 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
49 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
50 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
51  const char *file, int line, const char *reason
52 #define GRPC_SUBCHANNEL_REF_REASON reason
53 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
54  , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
55 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
56 #else
57 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
58 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
59 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
60 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
61 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
62 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
63 #define GRPC_SUBCHANNEL_REF_REASON ""
64 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
65 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
66 #endif
67 
68 namespace grpc_core {
69 
70 class SubchannelCall;
71 
72 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
73  public:
78 
79  void StartWatch(grpc_pollset_set* interested_parties,
81 
82  void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
83 
84  grpc_channel_stack* channel_stack() const { return channel_stack_; }
85  const grpc_channel_args* args() const { return args_; }
87  return channelz_subchannel_.get();
88  }
89 
90  size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
91 
92  private:
93  grpc_channel_stack* channel_stack_;
94  grpc_channel_args* args_;
95  // ref counted pointer to the channelz node in this connected subchannel's
96  // owning subchannel.
97  RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
98 };
99 
100 // Implements the interface of RefCounted<>.
102  public:
103  struct Args {
107  gpr_cycle_counter start_time;
113  };
114  static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
115 
116  // Continues processing a transport stream op batch.
118 
119  // Returns a pointer to the parent data associated with the subchannel call.
120  // The data will be of the size specified in \a parent_data_size field of
121  // the args passed to \a ConnectedSubchannel::CreateCall().
122  void* GetParentData();
123 
124  // Returns the call stack of the subchannel call.
126 
127  // Sets the 'then_schedule_closure' argument for call stack destruction.
128  // Must be called once per call.
129  void SetAfterCallStackDestroy(grpc_closure* closure);
130 
131  // Interface of RefCounted<>.
134  const char* reason) GRPC_MUST_USE_RESULT;
135  // When refcount drops to 0, destroys itself and the associated call stack,
136  // but does NOT free the memory because it's in the call arena.
137  void Unref();
138  void Unref(const DebugLocation& location, const char* reason);
139 
140  static void Destroy(void* arg, grpc_error* error);
141 
142  private:
143  // Allow RefCountedPtr<> to access IncrementRefCount().
144  template <typename T>
145  friend class RefCountedPtr;
146 
147  SubchannelCall(Args args, grpc_error** error);
148 
149  // If channelz is enabled, intercepts recv_trailing so that we may check the
150  // status and associate it to a subchannel.
151  void MaybeInterceptRecvTrailingMetadata(
153 
154  static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
155 
156  // Interface of RefCounted<>.
157  void IncrementRefCount();
158  void IncrementRefCount(const DebugLocation& location, const char* reason);
159 
160  RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
161  grpc_closure* after_call_stack_destroy_ = nullptr;
162  // State needed to support channelz interception of recv trailing metadata.
163  grpc_closure recv_trailing_metadata_ready_;
164  grpc_closure* original_recv_trailing_metadata_ = nullptr;
165  grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
166  grpc_millis deadline_;
167 };
168 
169 // A subchannel that knows how to connect to exactly one target address. It
170 // provides a target for load balancing.
171 //
172 // Note that this is the "real" subchannel implementation, whose API is
173 // different from the SubchannelInterface that is exposed to LB policy
174 // implementations. The client channel provides an adaptor class
175 // (SubchannelWrapper) that "converts" between the two.
176 class Subchannel {
177  public:
179  : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
180  public:
181  virtual ~ConnectivityStateWatcherInterface() = default;
182 
183  // Will be invoked whenever the subchannel's connectivity state
184  // changes. There will be only one invocation of this method on a
185  // given watcher instance at any given time.
186  //
187  // When the state changes to READY, connected_subchannel will
188  // contain a ref to the connected subchannel. When it changes from
189  // READY to some other state, the implementation must release its
190  // ref to the connected subchannel.
191  virtual void OnConnectivityStateChange(
192  grpc_connectivity_state new_state,
193  RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
194  = 0;
195 
196  virtual grpc_pollset_set* interested_parties() = 0;
197  };
198 
199  // The ctor and dtor are not intended to use directly.
201  const grpc_channel_args* args);
202  ~Subchannel();
203 
204  // Creates a subchannel given \a connector and \a args.
206  const grpc_channel_args* args);
207 
208  // Strong and weak refcounting.
212  void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
213  // Attempts to return a strong ref when only the weak refcount is guaranteed
214  // non-zero. If the strong refcount is zero, does not alter the refcount and
215  // returns null.
216  Subchannel* RefFromWeakRef();
217 
218  // Gets the string representing the subchannel address.
219  // Caller doesn't take ownership.
220  const char* GetTargetAddress();
221 
222  const grpc_channel_args* channel_args() const { return args_; }
223 
224  channelz::SubchannelNode* channelz_node();
225 
226  // Returns the current connectivity state of the subchannel.
227  // If health_check_service_name is non-null, the returned connectivity
228  // state will be based on the state reported by the backend for that
229  // service name.
230  // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
231  grpc_connectivity_state CheckConnectivityState(
232  const char* health_check_service_name,
233  RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
234 
235  // Starts watching the subchannel's connectivity state.
236  // The first callback to the watcher will be delivered when the
237  // subchannel's connectivity state becomes a value other than
238  // initial_state, which may happen immediately.
239  // Subsequent callbacks will be delivered as the subchannel's state
240  // changes.
241  // The watcher will be destroyed either when the subchannel is
242  // destroyed or when CancelConnectivityStateWatch() is called.
243  void WatchConnectivityState(
244  grpc_connectivity_state initial_state,
245  grpc_core::UniquePtr<char> health_check_service_name,
247 
248  // Cancels a connectivity state watch.
249  // If the watcher has already been destroyed, this is a no-op.
250  void CancelConnectivityStateWatch(const char* health_check_service_name,
252 
253  // Attempt to connect to the backend. Has no effect if already connected.
254  void AttemptToConnect();
255 
256  // Resets the connection backoff of the subchannel.
257  // TODO(roth): Move connection backoff out of subchannels and up into LB
258  // policy code (probably by adding a SubchannelGroup between
259  // SubchannelList and SubchannelData), at which point this method can
260  // go away.
261  void ResetBackoff();
262 
263  // Returns a new channel arg encoding the subchannel address as a URI
264  // string. Caller is responsible for freeing the string.
265  static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
266 
267  // Returns the URI string from the subchannel address arg in \a args.
268  static const char* GetUriFromSubchannelAddressArg(
269  const grpc_channel_args* args);
270 
271  // Sets \a addr from the subchannel address arg in \a args.
272  static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
273  grpc_resolved_address* addr);
274 
275  private:
276  // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
277  // the subchannel's state.
278  class ConnectivityStateWatcherList {
279  public:
280  ~ConnectivityStateWatcherList() { Clear(); }
281 
282  void AddWatcherLocked(
283  OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
284  void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
285 
286  // Notifies all watchers in the list about a change to state.
287  void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
288 
289  void Clear() { watchers_.clear(); }
290 
291  bool empty() const { return watchers_.empty(); }
292 
293  private:
294  // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
295  // be a set instead of a map.
296  std::map<ConnectivityStateWatcherInterface*,
297  OrphanablePtr<ConnectivityStateWatcherInterface>>
298  watchers_;
299  };
300 
301  // A map that tracks ConnectivityStateWatcherInterfaces using a particular
302  // health check service name.
303  //
304  // There is one entry in the map for each health check service name.
305  // Entries exist only as long as there are watchers using the
306  // corresponding service name.
307  //
308  // A health check client is maintained only while the subchannel is in
309  // state READY.
310  class HealthWatcherMap {
311  public:
312  void AddWatcherLocked(
313  Subchannel* subchannel, grpc_connectivity_state initial_state,
314  grpc_core::UniquePtr<char> health_check_service_name,
315  OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
316  void RemoveWatcherLocked(const char* health_check_service_name,
317  ConnectivityStateWatcherInterface* watcher);
318 
319  // Notifies the watcher when the subchannel's state changes.
320  void NotifyLocked(grpc_connectivity_state state);
321 
322  grpc_connectivity_state CheckConnectivityStateLocked(
323  Subchannel* subchannel, const char* health_check_service_name);
324 
325  void ShutdownLocked();
326 
327  private:
328  class HealthWatcher;
329 
330  std::map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
331  };
332 
333  class ConnectedSubchannelStateWatcher;
334 
335  // Sets the subchannel's connectivity state to \a state.
336  void SetConnectivityStateLocked(grpc_connectivity_state state);
337 
338  // Methods for connection.
339  void MaybeStartConnectingLocked();
340  static void OnRetryAlarm(void* arg, grpc_error* error);
341  void ContinueConnectingLocked();
342  static void OnConnectingFinished(void* arg, grpc_error* error);
343  bool PublishTransportLocked();
344  void Disconnect();
345 
346  gpr_atm RefMutate(gpr_atm delta,
348 
349  // The subchannel pool this subchannel is in.
350  RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
351  // TODO(juanlishen): Consider using args_ as key_ directly.
352  // Subchannel key that identifies this subchannel in the subchannel pool.
353  SubchannelKey* key_;
354  // Channel args.
355  grpc_channel_args* args_;
356  // pollset_set tracking who's interested in a connection being setup.
357  grpc_pollset_set* pollset_set_;
358  // Protects the other members.
359  Mutex mu_;
360  // Refcount
361  // - lower INTERNAL_REF_BITS bits are for internal references:
362  // these do not keep the subchannel open.
363  // - upper remaining bits are for public references: these do
364  // keep the subchannel open
365  gpr_atm ref_pair_;
366 
367  // Connection states.
368  OrphanablePtr<SubchannelConnector> connector_;
369  // Set during connection.
370  SubchannelConnector::Result connecting_result_;
371  grpc_closure on_connecting_finished_;
372  // Active connection, or null.
373  RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
374  bool connecting_ = false;
375  bool disconnected_ = false;
376 
377  // Connectivity state tracking.
379  // The list of watchers without a health check service name.
380  ConnectivityStateWatcherList watcher_list_;
381  // The map of watchers with health check service names.
382  HealthWatcherMap health_watcher_map_;
383 
384  // Backoff state.
385  BackOff backoff_;
386  grpc_millis next_attempt_deadline_;
387  grpc_millis min_connect_timeout_ms_;
388  bool backoff_begun_ = false;
389 
390  // Retry alarm.
391  grpc_timer retry_alarm_;
392  grpc_closure on_retry_alarm_;
393  bool have_retry_alarm_ = false;
394  // reset_backoff() was called while alarm was pending.
395  bool retry_immediately_ = false;
396 
397  // Channelz tracking.
398  RefCountedPtr<channelz::SubchannelNode> channelz_node_;
399 };
400 
401 } // namespace grpc_core
402 
403 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */
Definition: channel_stack.h:175
int64_t grpc_millis
Definition: exec_ctx.h:35
Definition: arena.h:44
void Ping(grpc_closure *on_initiate, grpc_closure *on_ack)
Definition: subchannel.cc:109
RefCountedPtr< SubchannelCall > Ref() GRPC_MUST_USE_RESULT
Definition: subchannel.cc:201
grpc_transport_stream_op_batch batch
Definition: client_channel.cc:471
Definition: connectivity_state.h:45
An array of arguments that can be passed around.
Definition: grpc_types.h:132
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
Definition: subchannel.h:50
Definition: metadata_batch.h:49
const grpc_channel_args * args() const
Definition: subchannel.h:85
static RefCountedPtr< SubchannelCall > Create(Args args, grpc_error **error)
Definition: subchannel.cc:137
Definition: resolve_address.h:44
Definition: debug_location.h:31
Definition: call_combiner.h:49
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1]. ...
Definition: slice.h:60
Definition: error_internal.h:39
grpc_connectivity_state
Connectivity state of a channel.
Definition: connectivity_state.h:27
~ConnectedSubchannel()
Definition: subchannel.cc:93
A single argument...
Definition: grpc_types.h:103
const grpc_channel_args * channel_args() const
Definition: subchannel.h:222
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:68
grpc_slice path
Definition: subchannel.h:106
void Unref()
Definition: subchannel.cc:212
Definition: subchannel.h:72
Arena * arena
Definition: subchannel.h:109
RefCountedPtr< ConnectedSubchannel > connected_subchannel
Definition: subchannel.h:104
Definition: subchannel_pool_interface.h:35
Definition: orphanable.h:77
Definition: polling_entity.h:37
Round Robin Policy.
Definition: backend_metric.cc:24
void * GetParentData()
Definition: subchannel.cc:185
size_t GetInitialCallSizeEstimate(size_t parent_data_size) const
Definition: subchannel.cc:119
grpc_call_context_element * context
Definition: subchannel.h:110
Definition: transport.h:163
Definition: ref_counted_ptr.h:35
grpc_millis deadline
Definition: subchannel.h:108
Definition: client_channel_channelz.h:37
Definition: subchannel.h:101
#define GRPC_MUST_USE_RESULT
Definition: port_platform.h:570
intptr_t gpr_atm
Definition: atm_gcc_atomic.h:30
Definition: subchannel.h:103
CallCombiner * call_combiner
Definition: subchannel.h:111
Definition: context.h:44
struct grpc_pollset_set grpc_pollset_set
Definition: pollset_set.h:31
std::unique_ptr< T, DefaultDeleteChar > UniquePtr
Definition: memory.h:45
void StartWatch(grpc_pollset_set *interested_parties, OrphanablePtr< ConnectivityStateWatcherInterface > watcher)
Definition: subchannel.cc:98
Definition: timer.h:30
A closure over a grpc_iomgr_cb_func.
Definition: closure.h:56
Definition: channel_stack.h:185
channelz::SubchannelNode * channelz_subchannel() const
Definition: subchannel.h:86
ConnectedSubchannel(grpc_channel_stack *channel_stack, const grpc_channel_args *args, RefCountedPtr< channelz::SubchannelNode > channelz_subchannel)
Definition: subchannel.cc:85
channel is idle
Definition: connectivity_state.h:29
Definition: ref_counted.h:248
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch *batch)
Definition: subchannel.cc:175
grpc_call_stack * GetCallStack()
Definition: subchannel.cc:191
grpc_channel_stack * channel_stack() const
Definition: subchannel.h:84
grpc_polling_entity * pollent
Definition: subchannel.h:105
#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
Definition: subchannel.h:53
size_t parent_data_size
Definition: subchannel.h:112
gpr_cycle_counter start_time
Definition: subchannel.h:107
static void Destroy(void *arg, grpc_error *error)
Definition: subchannel.cc:221
Definition: subchannel.h:176
void SetAfterCallStackDestroy(grpc_closure *closure)
Definition: subchannel.cc:195