GRPC C++  1.26.0
server_callback_handlers.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20 
26 
27 namespace grpc_impl {
28 namespace internal {
29 
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
35  const RequestType*, ResponseType*)>
36  get_reactor)
37  : get_reactor_(std::move(get_reactor)) {}
38 
41  allocator) {
42  allocator_ = allocator;
43  }
44 
45  void RunHandler(const HandlerParameter& param) final {
46  // Arena allocate a controller structure (that includes request/response)
48  auto* allocator_state = static_cast<
50  param.internal_data);
51 
53  param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54  ServerCallbackUnaryImpl(
55  static_cast<::grpc_impl::CallbackServerContext*>(
56  param.server_context),
57  param.call, allocator_state, std::move(param.call_requester));
58  param.server_context->BeginCompletionOp(
59  param.call, [call](bool) { call->MaybeDone(); }, call);
60 
61  ServerUnaryReactor* reactor = nullptr;
62  if (param.status.ok()) {
63  reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
64  get_reactor_,
65  static_cast<::grpc_impl::CallbackServerContext*>(
66  param.server_context),
67  call->request(), call->response());
68  }
69 
70  if (reactor == nullptr) {
71  // if deserialization or reactor creator failed, we need to fail the call
73  param.call->call(), sizeof(UnimplementedUnaryReactor)))
76  }
77 
79  call->SetupReactor(reactor);
80  }
81 
83  ::grpc::Status* status, void** handler_data) final {
85  buf.set_buffer(req);
86  RequestType* request = nullptr;
88  allocator_state = nullptr;
89  if (allocator_ != nullptr) {
90  allocator_state = allocator_->AllocateMessages();
91  } else {
92  allocator_state =
96  }
97  *handler_data = allocator_state;
98  request = allocator_state->request();
99  *status =
101  buf.Release();
102  if (status->ok()) {
103  return request;
104  }
105  // Clean up on deserialization failure.
106  allocator_state->Release();
107  return nullptr;
108  }
109 
110  private:
112  const RequestType*, ResponseType*)>
113  get_reactor_;
115  allocator_ = nullptr;
116 
117  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
118  public:
119  void Finish(::grpc::Status s) override {
120  finish_tag_.Set(
121  call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
122  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
123  finish_ops_.set_core_cq_tag(&finish_tag_);
124 
125  if (!ctx_->sent_initial_metadata_) {
126  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
127  ctx_->initial_metadata_flags());
128  if (ctx_->compression_level_set()) {
129  finish_ops_.set_compression_level(ctx_->compression_level());
130  }
131  ctx_->sent_initial_metadata_ = true;
132  }
133  // The response is dropped if the status is not OK.
134  if (s.ok()) {
135  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
136  finish_ops_.SendMessagePtr(response()));
137  } else {
138  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
139  }
140  finish_ops_.set_core_cq_tag(&finish_tag_);
141  call_.PerformOps(&finish_ops_);
142  }
143 
144  void SendInitialMetadata() override {
145  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
146  this->Ref();
147  meta_tag_.Set(call_.call(),
148  [this](bool ok) {
149  reactor_.load(std::memory_order_relaxed)
150  ->OnSendInitialMetadataDone(ok);
151  MaybeDone();
152  },
153  &meta_ops_, false);
154  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
155  ctx_->initial_metadata_flags());
156  if (ctx_->compression_level_set()) {
157  meta_ops_.set_compression_level(ctx_->compression_level());
158  }
159  ctx_->sent_initial_metadata_ = true;
160  meta_ops_.set_core_cq_tag(&meta_tag_);
161  call_.PerformOps(&meta_ops_);
162  }
163 
164  private:
165  friend class CallbackUnaryHandler<RequestType, ResponseType>;
166 
167  ServerCallbackUnaryImpl(
170  allocator_state,
171  std::function<void()> call_requester)
172  : ctx_(ctx),
173  call_(*call),
174  allocator_state_(allocator_state),
175  call_requester_(std::move(call_requester)) {
176  ctx_->set_message_allocator_state(allocator_state);
177  }
178 
183  void SetupReactor(ServerUnaryReactor* reactor) {
184  reactor_.store(reactor, std::memory_order_relaxed);
185  this->BindReactor(reactor);
186  this->MaybeCallOnCancel(reactor);
187  this->MaybeDone();
188  }
189 
190  const RequestType* request() { return allocator_state_->request(); }
191  ResponseType* response() { return allocator_state_->response(); }
192 
193  void MaybeDone() override {
194  if (GPR_UNLIKELY(this->Unref() == 1)) {
195  reactor_.load(std::memory_order_relaxed)->OnDone();
196  grpc_call* call = call_.call();
197  auto call_requester = std::move(call_requester_);
198  allocator_state_->Release();
199  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
201  call_requester();
202  }
203  }
204 
205  ServerReactor* reactor() override {
206  return reactor_.load(std::memory_order_relaxed);
207  }
208 
210  meta_ops_;
215  finish_ops_;
217 
221  allocator_state_;
222  std::function<void()> call_requester_;
223  // reactor_ can always be loaded/stored with relaxed memory ordering because
224  // its value is only set once, independently of other data in the object,
225  // and the loads that use it will always actually come provably later even
226  // though they are from different threads since they are triggered by
227  // actions initiated only by the setting up of the reactor_ variable. In
228  // a sense, it's a delayed "const": it gets its value from the SetupReactor
229  // method (not the constructor, so it's not a true const), but it doesn't
230  // change after that and it only gets used by actions caused, directly or
231  // indirectly, by that setup. This comment also applies to the reactor_
232  // variables of the other streaming objects in this file.
233  std::atomic<ServerUnaryReactor*> reactor_;
234  // callbacks_outstanding_ follows a refcount pattern
235  std::atomic<intptr_t> callbacks_outstanding_{
236  3}; // reserve for start, Finish, and CompletionOp
237  };
238 };
239 
240 template <class RequestType, class ResponseType>
242  public:
244  std::function<ServerReadReactor<RequestType>*(
245  ::grpc_impl::CallbackServerContext*, ResponseType*)>
246  get_reactor)
247  : get_reactor_(std::move(get_reactor)) {}
248  void RunHandler(const HandlerParameter& param) final {
249  // Arena allocate a reader structure (that includes response)
250  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
251 
253  param.call->call(), sizeof(ServerCallbackReaderImpl)))
254  ServerCallbackReaderImpl(
255  static_cast<::grpc_impl::CallbackServerContext*>(
256  param.server_context),
257  param.call, std::move(param.call_requester));
258  param.server_context->BeginCompletionOp(
259  param.call, [reader](bool) { reader->MaybeDone(); }, reader);
260 
261  ServerReadReactor<RequestType>* reactor = nullptr;
262  if (param.status.ok()) {
265  get_reactor_,
266  static_cast<::grpc_impl::CallbackServerContext*>(
267  param.server_context),
268  reader->response());
269  }
270 
271  if (reactor == nullptr) {
272  // if deserialization or reactor creator failed, we need to fail the call
274  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
277  }
278 
279  reader->SetupReactor(reactor);
280  }
281 
282  private:
283  std::function<ServerReadReactor<RequestType>*(
284  ::grpc_impl::CallbackServerContext*, ResponseType*)>
285  get_reactor_;
286 
287  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
288  public:
289  void Finish(::grpc::Status s) override {
290  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
291  false);
292  if (!ctx_->sent_initial_metadata_) {
293  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
294  ctx_->initial_metadata_flags());
295  if (ctx_->compression_level_set()) {
296  finish_ops_.set_compression_level(ctx_->compression_level());
297  }
298  ctx_->sent_initial_metadata_ = true;
299  }
300  // The response is dropped if the status is not OK.
301  if (s.ok()) {
302  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
303  finish_ops_.SendMessagePtr(&resp_));
304  } else {
305  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
306  }
307  finish_ops_.set_core_cq_tag(&finish_tag_);
308  call_.PerformOps(&finish_ops_);
309  }
310 
311  void SendInitialMetadata() override {
312  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
313  this->Ref();
314  meta_tag_.Set(call_.call(),
315  [this](bool ok) {
316  reactor_.load(std::memory_order_relaxed)
317  ->OnSendInitialMetadataDone(ok);
318  MaybeDone();
319  },
320  &meta_ops_, false);
321  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
322  ctx_->initial_metadata_flags());
323  if (ctx_->compression_level_set()) {
324  meta_ops_.set_compression_level(ctx_->compression_level());
325  }
326  ctx_->sent_initial_metadata_ = true;
327  meta_ops_.set_core_cq_tag(&meta_tag_);
328  call_.PerformOps(&meta_ops_);
329  }
330 
331  void Read(RequestType* req) override {
332  this->Ref();
333  read_ops_.RecvMessage(req);
334  call_.PerformOps(&read_ops_);
335  }
336 
337  private:
338  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
339 
340  ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext* ctx,
341  ::grpc::internal::Call* call,
342  std::function<void()> call_requester)
343  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
344 
345  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
346  reactor_.store(reactor, std::memory_order_relaxed);
347  read_tag_.Set(call_.call(),
348  [this](bool ok) {
349  reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
350  MaybeDone();
351  },
352  &read_ops_, false);
353  read_ops_.set_core_cq_tag(&read_tag_);
354  this->BindReactor(reactor);
355  this->MaybeCallOnCancel(reactor);
356  this->MaybeDone();
357  }
358 
359  ~ServerCallbackReaderImpl() {}
360 
361  ResponseType* response() { return &resp_; }
362 
363  void MaybeDone() override {
364  if (GPR_UNLIKELY(this->Unref() == 1)) {
365  reactor_.load(std::memory_order_relaxed)->OnDone();
366  grpc_call* call = call_.call();
367  auto call_requester = std::move(call_requester_);
368  this->~ServerCallbackReaderImpl(); // explicitly call destructor
370  call_requester();
371  }
372  }
373 
374  ServerReactor* reactor() override {
375  return reactor_.load(std::memory_order_relaxed);
376  }
377 
379  meta_ops_;
384  finish_ops_;
388  read_ops_;
390 
393  ResponseType resp_;
394  std::function<void()> call_requester_;
395  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
396  std::atomic<ServerReadReactor<RequestType>*> reactor_;
397  // callbacks_outstanding_ follows a refcount pattern
398  std::atomic<intptr_t> callbacks_outstanding_{
399  3}; // reserve for OnStarted, Finish, and CompletionOp
400  };
401 };
402 
403 template <class RequestType, class ResponseType>
404 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
405  public:
407  std::function<ServerWriteReactor<ResponseType>*(
408  ::grpc_impl::CallbackServerContext*, const RequestType*)>
409  get_reactor)
410  : get_reactor_(std::move(get_reactor)) {}
411  void RunHandler(const HandlerParameter& param) final {
412  // Arena allocate a writer structure
413  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
414 
416  param.call->call(), sizeof(ServerCallbackWriterImpl)))
417  ServerCallbackWriterImpl(
418  static_cast<::grpc_impl::CallbackServerContext*>(
419  param.server_context),
420  param.call, static_cast<RequestType*>(param.request),
421  std::move(param.call_requester));
422  param.server_context->BeginCompletionOp(
423  param.call, [writer](bool) { writer->MaybeDone(); }, writer);
424 
425  ServerWriteReactor<ResponseType>* reactor = nullptr;
426  if (param.status.ok()) {
429  get_reactor_,
430  static_cast<::grpc_impl::CallbackServerContext*>(
431  param.server_context),
432  writer->request());
433  }
434  if (reactor == nullptr) {
435  // if deserialization or reactor creator failed, we need to fail the call
437  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
440  }
441 
442  writer->SetupReactor(reactor);
443  }
444 
446  ::grpc::Status* status, void** /*handler_data*/) final {
447  ::grpc::ByteBuffer buf;
448  buf.set_buffer(req);
449  auto* request =
451  call, sizeof(RequestType))) RequestType();
452  *status =
454  buf.Release();
455  if (status->ok()) {
456  return request;
457  }
458  request->~RequestType();
459  return nullptr;
460  }
461 
462  private:
463  std::function<ServerWriteReactor<ResponseType>*(
464  ::grpc_impl::CallbackServerContext*, const RequestType*)>
465  get_reactor_;
466 
467  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
468  public:
469  void Finish(::grpc::Status s) override {
470  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
471  false);
472  finish_ops_.set_core_cq_tag(&finish_tag_);
473 
474  if (!ctx_->sent_initial_metadata_) {
475  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
476  ctx_->initial_metadata_flags());
477  if (ctx_->compression_level_set()) {
478  finish_ops_.set_compression_level(ctx_->compression_level());
479  }
480  ctx_->sent_initial_metadata_ = true;
481  }
482  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
483  call_.PerformOps(&finish_ops_);
484  }
485 
486  void SendInitialMetadata() override {
487  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
488  this->Ref();
489  meta_tag_.Set(call_.call(),
490  [this](bool ok) {
491  reactor_.load(std::memory_order_relaxed)
492  ->OnSendInitialMetadataDone(ok);
493  MaybeDone();
494  },
495  &meta_ops_, false);
496  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
497  ctx_->initial_metadata_flags());
498  if (ctx_->compression_level_set()) {
499  meta_ops_.set_compression_level(ctx_->compression_level());
500  }
501  ctx_->sent_initial_metadata_ = true;
502  meta_ops_.set_core_cq_tag(&meta_tag_);
503  call_.PerformOps(&meta_ops_);
504  }
505 
506  void Write(const ResponseType* resp,
507  ::grpc::WriteOptions options) override {
508  this->Ref();
509  if (options.is_last_message()) {
510  options.set_buffer_hint();
511  }
512  if (!ctx_->sent_initial_metadata_) {
513  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
514  ctx_->initial_metadata_flags());
515  if (ctx_->compression_level_set()) {
516  write_ops_.set_compression_level(ctx_->compression_level());
517  }
518  ctx_->sent_initial_metadata_ = true;
519  }
520  // TODO(vjpai): don't assert
521  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
522  call_.PerformOps(&write_ops_);
523  }
524 
525  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
526  ::grpc::Status s) override {
527  // This combines the write into the finish callback
528  // Don't send any message if the status is bad
529  if (s.ok()) {
530  // TODO(vjpai): don't assert
531  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
532  }
533  Finish(std::move(s));
534  }
535 
536  private:
537  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
538 
539  ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext* ctx,
540  ::grpc::internal::Call* call,
541  const RequestType* req,
542  std::function<void()> call_requester)
543  : ctx_(ctx),
544  call_(*call),
545  req_(req),
546  call_requester_(std::move(call_requester)) {}
547 
548  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
549  reactor_.store(reactor, std::memory_order_relaxed);
550  write_tag_.Set(
551  call_.call(),
552  [this](bool ok) {
553  reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
554  MaybeDone();
555  },
556  &write_ops_, false);
557  write_ops_.set_core_cq_tag(&write_tag_);
558  this->BindReactor(reactor);
559  this->MaybeCallOnCancel(reactor);
560  this->MaybeDone();
561  }
562  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
563 
564  const RequestType* request() { return req_; }
565 
566  void MaybeDone() override {
567  if (GPR_UNLIKELY(this->Unref() == 1)) {
568  reactor_.load(std::memory_order_relaxed)->OnDone();
569  grpc_call* call = call_.call();
570  auto call_requester = std::move(call_requester_);
571  this->~ServerCallbackWriterImpl(); // explicitly call destructor
573  call_requester();
574  }
575  }
576 
577  ServerReactor* reactor() override {
578  return reactor_.load(std::memory_order_relaxed);
579  }
580 
582  meta_ops_;
587  finish_ops_;
591  write_ops_;
593 
596  const RequestType* req_;
597  std::function<void()> call_requester_;
598  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
599  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
600  // callbacks_outstanding_ follows a refcount pattern
601  std::atomic<intptr_t> callbacks_outstanding_{
602  3}; // reserve for OnStarted, Finish, and CompletionOp
603  };
604 };
605 
606 template <class RequestType, class ResponseType>
608  public:
612  get_reactor)
613  : get_reactor_(std::move(get_reactor)) {}
614  void RunHandler(const HandlerParameter& param) final {
615  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
616 
618  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
619  ServerCallbackReaderWriterImpl(
620  static_cast<::grpc_impl::CallbackServerContext*>(
621  param.server_context),
622  param.call, std::move(param.call_requester));
623  param.server_context->BeginCompletionOp(
624  param.call, [stream](bool) { stream->MaybeDone(); }, stream);
625 
627  if (param.status.ok()) {
630  get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>(
631  param.server_context));
632  }
633 
634  if (reactor == nullptr) {
635  // if deserialization or reactor creator failed, we need to fail the call
637  param.call->call(),
641  }
642 
643  stream->SetupReactor(reactor);
644  }
645 
646  private:
647  std::function<ServerBidiReactor<RequestType, ResponseType>*(
649  get_reactor_;
650 
651  class ServerCallbackReaderWriterImpl
652  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
653  public:
654  void Finish(::grpc::Status s) override {
655  finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
656  false);
657  finish_ops_.set_core_cq_tag(&finish_tag_);
658 
659  if (!ctx_->sent_initial_metadata_) {
660  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
661  ctx_->initial_metadata_flags());
662  if (ctx_->compression_level_set()) {
663  finish_ops_.set_compression_level(ctx_->compression_level());
664  }
665  ctx_->sent_initial_metadata_ = true;
666  }
667  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
668  call_.PerformOps(&finish_ops_);
669  }
670 
671  void SendInitialMetadata() override {
672  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
673  this->Ref();
674  meta_tag_.Set(call_.call(),
675  [this](bool ok) {
676  reactor_.load(std::memory_order_relaxed)
677  ->OnSendInitialMetadataDone(ok);
678  MaybeDone();
679  },
680  &meta_ops_, false);
681  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
682  ctx_->initial_metadata_flags());
683  if (ctx_->compression_level_set()) {
684  meta_ops_.set_compression_level(ctx_->compression_level());
685  }
686  ctx_->sent_initial_metadata_ = true;
687  meta_ops_.set_core_cq_tag(&meta_tag_);
688  call_.PerformOps(&meta_ops_);
689  }
690 
691  void Write(const ResponseType* resp,
692  ::grpc::WriteOptions options) override {
693  this->Ref();
694  if (options.is_last_message()) {
695  options.set_buffer_hint();
696  }
697  if (!ctx_->sent_initial_metadata_) {
698  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
699  ctx_->initial_metadata_flags());
700  if (ctx_->compression_level_set()) {
701  write_ops_.set_compression_level(ctx_->compression_level());
702  }
703  ctx_->sent_initial_metadata_ = true;
704  }
705  // TODO(vjpai): don't assert
706  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
707  call_.PerformOps(&write_ops_);
708  }
709 
710  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
711  ::grpc::Status s) override {
712  // Don't send any message if the status is bad
713  if (s.ok()) {
714  // TODO(vjpai): don't assert
715  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
716  }
717  Finish(std::move(s));
718  }
719 
720  void Read(RequestType* req) override {
721  this->Ref();
722  read_ops_.RecvMessage(req);
723  call_.PerformOps(&read_ops_);
724  }
725 
726  private:
727  friend class CallbackBidiHandler<RequestType, ResponseType>;
728 
729  ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext* ctx,
730  ::grpc::internal::Call* call,
731  std::function<void()> call_requester)
732  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
733 
734  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
735  reactor_.store(reactor, std::memory_order_relaxed);
736  write_tag_.Set(
737  call_.call(),
738  [this](bool ok) {
739  reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
740  MaybeDone();
741  },
742  &write_ops_, false);
743  write_ops_.set_core_cq_tag(&write_tag_);
744  read_tag_.Set(call_.call(),
745  [this](bool ok) {
746  reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
747  MaybeDone();
748  },
749  &read_ops_, false);
750  read_ops_.set_core_cq_tag(&read_tag_);
751  this->BindReactor(reactor);
752  this->MaybeCallOnCancel(reactor);
753  this->MaybeDone();
754  }
755 
756  void MaybeDone() override {
757  if (GPR_UNLIKELY(this->Unref() == 1)) {
758  reactor_.load(std::memory_order_relaxed)->OnDone();
759  grpc_call* call = call_.call();
760  auto call_requester = std::move(call_requester_);
761  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
763  call_requester();
764  }
765  }
766 
767  ServerReactor* reactor() override {
768  return reactor_.load(std::memory_order_relaxed);
769  }
770 
772  meta_ops_;
777  finish_ops_;
781  write_ops_;
785  read_ops_;
787 
790  std::function<void()> call_requester_;
791  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
792  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
793  // callbacks_outstanding_ follows a refcount pattern
794  std::atomic<intptr_t> callbacks_outstanding_{
795  3}; // reserve for OnStarted, Finish, and CompletionOp
796  };
797 };
798 
799 } // namespace internal
800 } // namespace grpc_impl
801 
802 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:45
Definition: server_context_impl.h:528
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback_impl.h:733
CoreCodegenInterface * g_core_codegen_interface
Null-initializes the global gRPC variables for the codegen library.
Definition: completion_queue_impl.h:90
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback_impl.h:144
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:122
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
virtual void grpc_call_ref(grpc_call *call)=0
virtual void SendInitialMetadata()=0
Definition: server_callback_impl.h:120
virtual void SendInitialMetadata()=0
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:51
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:248
Definition: server_callback_impl.h:151
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:824
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback_impl.h:84
void SetMessageAllocator(::grpc::experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:39
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_handlers.h:445
#define GPR_UNLIKELY(x)
Definition: port_platform.h:702
virtual void grpc_call_unref(grpc_call *call)=0
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback_impl.h:146
Definition: async_unary_call_impl.h:301
Definition: grpc_types.h:40
grpc_call * call() const
Definition: call.h:72
Definition: server_callback_impl.h:181
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(::grpc_impl::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:609
void BindReactor(ServerBidiReactor< RequestType, ResponseType > *reactor)
Definition: server_callback_impl.h:210
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(::grpc_impl::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:406
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
ResponseT * response()
Definition: message_allocator.h:47
Definition: call_op_set.h:627
RequestT * request()
Definition: message_allocator.h:46
virtual void SendInitialMetadata()=0
Definition: server_callback_impl.h:198
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:411
Definition: call_op_set.h:216
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:146
void BindReactor(Reactor *reactor)
Definition: server_callback_impl.h:161
Definition: call_op_set.h:286
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback_impl.h:142
::grpc_impl::ServerUnaryReactor ServerUnaryReactor
Definition: server_callback.h:35
Definition: rpc_service_method.h:44
Definition: byte_buffer.h:58
int Unref()
Decreases the reference count and returns the previous value.
Definition: server_callback_impl.h:102
Per-message write options.
Definition: call_op_set.h:79
CallbackUnaryHandler(std::function< ServerUnaryReactor *(::grpc_impl::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:33
void BindReactor(ServerWriteReactor< ResponseType > *reactor)
Definition: server_callback_impl.h:192
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback_impl.h:175
Definition: server_callback_impl.h:660
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
Definition: server_callback_handlers.h:607
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
bool ok() const
Is the status OK?
Definition: status.h:118
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
void Ref()
Increases the reference count.
Definition: server_callback_impl.h:99
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:82
Did it work? If it didn&#39;t, why?
Definition: status.h:31
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(::grpc_impl::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:243
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
Definition: server_callback_impl.h:727
Definition: server_callback_impl.h:167
Definition: server_callback_handlers.h:241
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:614
A sequence of bytes.
Definition: byte_buffer.h:67
Straightforward wrapping of the C call object.
Definition: call.h:38