GRPC C++  1.30.0
server_callback_handlers.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20 
26 
27 namespace grpc_impl {
28 namespace internal {
29 
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
35  const RequestType*, ResponseType*)>
36  get_reactor)
37  : get_reactor_(std::move(get_reactor)) {}
38 
41  allocator) {
42  allocator_ = allocator;
43  }
44 
45  void RunHandler(const HandlerParameter& param) final {
46  // Arena allocate a controller structure (that includes request/response)
48  auto* allocator_state = static_cast<
50  param.internal_data);
51 
53  param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54  ServerCallbackUnaryImpl(
55  static_cast<::grpc_impl::CallbackServerContext*>(
56  param.server_context),
57  param.call, allocator_state, std::move(param.call_requester));
58  param.server_context->BeginCompletionOp(
59  param.call, [call](bool) { call->MaybeDone(); }, call);
60 
61  ServerUnaryReactor* reactor = nullptr;
62  if (param.status.ok()) {
63  reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
64  get_reactor_,
65  static_cast<::grpc_impl::CallbackServerContext*>(
66  param.server_context),
67  call->request(), call->response());
68  }
69 
70  if (reactor == nullptr) {
71  // if deserialization or reactor creator failed, we need to fail the call
73  param.call->call(), sizeof(UnimplementedUnaryReactor)))
76  }
77 
79  call->SetupReactor(reactor);
80  }
81 
83  ::grpc::Status* status, void** handler_data) final {
85  buf.set_buffer(req);
86  RequestType* request = nullptr;
88  allocator_state = nullptr;
89  if (allocator_ != nullptr) {
90  allocator_state = allocator_->AllocateMessages();
91  } else {
92  allocator_state =
96  }
97  *handler_data = allocator_state;
98  request = allocator_state->request();
99  *status =
101  buf.Release();
102  if (status->ok()) {
103  return request;
104  }
105  // Clean up on deserialization failure.
106  allocator_state->Release();
107  return nullptr;
108  }
109 
110  private:
112  const RequestType*, ResponseType*)>
113  get_reactor_;
115  allocator_ = nullptr;
116 
117  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
118  public:
119  void Finish(::grpc::Status s) override {
120  // A callback that only contains a call to MaybeDone can be run as an
121  // inline callback regardless of whether or not OnDone is inlineable
122  // because if the actual OnDone callback needs to be scheduled, MaybeDone
123  // is responsible for dispatching to an executor thread if needed. Thus,
124  // when setting up the finish_tag_, we can set its own callback to
125  // inlineable.
126  finish_tag_.Set(
127  call_.call(),
128  [this](bool) {
129  this->MaybeDone(
130  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
131  },
132  &finish_ops_, /*can_inline=*/true);
133  finish_ops_.set_core_cq_tag(&finish_tag_);
134 
135  if (!ctx_->sent_initial_metadata_) {
136  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
137  ctx_->initial_metadata_flags());
138  if (ctx_->compression_level_set()) {
139  finish_ops_.set_compression_level(ctx_->compression_level());
140  }
141  ctx_->sent_initial_metadata_ = true;
142  }
143  // The response is dropped if the status is not OK.
144  if (s.ok()) {
145  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
146  finish_ops_.SendMessagePtr(response()));
147  } else {
148  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
149  }
150  finish_ops_.set_core_cq_tag(&finish_tag_);
151  call_.PerformOps(&finish_ops_);
152  }
153 
154  void SendInitialMetadata() override {
155  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
156  this->Ref();
157  // The callback for this function should not be marked inline because it
158  // is directly invoking a user-controlled reaction
159  // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
160  // thread. However, any OnDone needed after that can be inlined because it
161  // is already running on an executor thread.
162  meta_tag_.Set(call_.call(),
163  [this](bool ok) {
164  ServerUnaryReactor* reactor =
165  reactor_.load(std::memory_order_relaxed);
166  reactor->OnSendInitialMetadataDone(ok);
167  this->MaybeDone(/*inlineable_ondone=*/true);
168  },
169  &meta_ops_, /*can_inline=*/false);
170  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
171  ctx_->initial_metadata_flags());
172  if (ctx_->compression_level_set()) {
173  meta_ops_.set_compression_level(ctx_->compression_level());
174  }
175  ctx_->sent_initial_metadata_ = true;
176  meta_ops_.set_core_cq_tag(&meta_tag_);
177  call_.PerformOps(&meta_ops_);
178  }
179 
180  private:
181  friend class CallbackUnaryHandler<RequestType, ResponseType>;
182 
183  ServerCallbackUnaryImpl(
186  allocator_state,
187  std::function<void()> call_requester)
188  : ctx_(ctx),
189  call_(*call),
190  allocator_state_(allocator_state),
191  call_requester_(std::move(call_requester)) {
192  ctx_->set_message_allocator_state(allocator_state);
193  }
194 
199  void SetupReactor(ServerUnaryReactor* reactor) {
200  reactor_.store(reactor, std::memory_order_relaxed);
201  this->BindReactor(reactor);
202  this->MaybeCallOnCancel(reactor);
203  this->MaybeDone(reactor->InternalInlineable());
204  }
205 
206  const RequestType* request() { return allocator_state_->request(); }
207  ResponseType* response() { return allocator_state_->response(); }
208 
209  void CallOnDone() override {
210  reactor_.load(std::memory_order_relaxed)->OnDone();
211  grpc_call* call = call_.call();
212  auto call_requester = std::move(call_requester_);
213  allocator_state_->Release();
214  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
216  call_requester();
217  }
218 
219  ServerReactor* reactor() override {
220  return reactor_.load(std::memory_order_relaxed);
221  }
222 
224  meta_ops_;
229  finish_ops_;
231 
235  allocator_state_;
236  std::function<void()> call_requester_;
237  // reactor_ can always be loaded/stored with relaxed memory ordering because
238  // its value is only set once, independently of other data in the object,
239  // and the loads that use it will always actually come provably later even
240  // though they are from different threads since they are triggered by
241  // actions initiated only by the setting up of the reactor_ variable. In
242  // a sense, it's a delayed "const": it gets its value from the SetupReactor
243  // method (not the constructor, so it's not a true const), but it doesn't
244  // change after that and it only gets used by actions caused, directly or
245  // indirectly, by that setup. This comment also applies to the reactor_
246  // variables of the other streaming objects in this file.
247  std::atomic<ServerUnaryReactor*> reactor_;
248  // callbacks_outstanding_ follows a refcount pattern
249  std::atomic<intptr_t> callbacks_outstanding_{
250  3}; // reserve for start, Finish, and CompletionOp
251  };
252 };
253 
254 template <class RequestType, class ResponseType>
256  public:
258  std::function<ServerReadReactor<RequestType>*(
259  ::grpc_impl::CallbackServerContext*, ResponseType*)>
260  get_reactor)
261  : get_reactor_(std::move(get_reactor)) {}
262  void RunHandler(const HandlerParameter& param) final {
263  // Arena allocate a reader structure (that includes response)
264  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
265 
267  param.call->call(), sizeof(ServerCallbackReaderImpl)))
268  ServerCallbackReaderImpl(
269  static_cast<::grpc_impl::CallbackServerContext*>(
270  param.server_context),
271  param.call, std::move(param.call_requester));
272  // Inlineable OnDone can be false in the CompletionOp callback because there
273  // is no read reactor that has an inlineable OnDone; this only applies to
274  // the DefaultReactor (which is unary).
275  param.server_context->BeginCompletionOp(
276  param.call,
277  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
278  reader);
279 
280  ServerReadReactor<RequestType>* reactor = nullptr;
281  if (param.status.ok()) {
284  get_reactor_,
285  static_cast<::grpc_impl::CallbackServerContext*>(
286  param.server_context),
287  reader->response());
288  }
289 
290  if (reactor == nullptr) {
291  // if deserialization or reactor creator failed, we need to fail the call
293  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
296  }
297 
298  reader->SetupReactor(reactor);
299  }
300 
301  private:
302  std::function<ServerReadReactor<RequestType>*(
303  ::grpc_impl::CallbackServerContext*, ResponseType*)>
304  get_reactor_;
305 
306  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
307  public:
308  void Finish(::grpc::Status s) override {
309  // A finish tag with only MaybeDone can have its callback inlined
310  // regardless even if OnDone is not inlineable because this callback just
311  // checks a ref and then decides whether or not to dispatch OnDone.
312  finish_tag_.Set(call_.call(),
313  [this](bool) {
314  // Inlineable OnDone can be false here because there is
315  // no read reactor that has an inlineable OnDone; this
316  // only applies to the DefaultReactor (which is unary).
317  this->MaybeDone(/*inlineable_ondone=*/false);
318  },
319  &finish_ops_, /*can_inline=*/true);
320  if (!ctx_->sent_initial_metadata_) {
321  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
322  ctx_->initial_metadata_flags());
323  if (ctx_->compression_level_set()) {
324  finish_ops_.set_compression_level(ctx_->compression_level());
325  }
326  ctx_->sent_initial_metadata_ = true;
327  }
328  // The response is dropped if the status is not OK.
329  if (s.ok()) {
330  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
331  finish_ops_.SendMessagePtr(&resp_));
332  } else {
333  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
334  }
335  finish_ops_.set_core_cq_tag(&finish_tag_);
336  call_.PerformOps(&finish_ops_);
337  }
338 
339  void SendInitialMetadata() override {
340  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
341  this->Ref();
342  // The callback for this function should not be inlined because it invokes
343  // a user-controlled reaction, but any resulting OnDone can be inlined in
344  // the executor to which this callback is dispatched.
345  meta_tag_.Set(call_.call(),
346  [this](bool ok) {
347  ServerReadReactor<RequestType>* reactor =
348  reactor_.load(std::memory_order_relaxed);
349  reactor->OnSendInitialMetadataDone(ok);
350  this->MaybeDone(/*inlineable_ondone=*/true);
351  },
352  &meta_ops_, /*can_inline=*/false);
353  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354  ctx_->initial_metadata_flags());
355  if (ctx_->compression_level_set()) {
356  meta_ops_.set_compression_level(ctx_->compression_level());
357  }
358  ctx_->sent_initial_metadata_ = true;
359  meta_ops_.set_core_cq_tag(&meta_tag_);
360  call_.PerformOps(&meta_ops_);
361  }
362 
363  void Read(RequestType* req) override {
364  this->Ref();
365  read_ops_.RecvMessage(req);
366  call_.PerformOps(&read_ops_);
367  }
368 
369  private:
370  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
371 
372  ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext* ctx,
373  ::grpc::internal::Call* call,
374  std::function<void()> call_requester)
375  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
376 
377  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
378  reactor_.store(reactor, std::memory_order_relaxed);
379  // The callback for this function should not be inlined because it invokes
380  // a user-controlled reaction, but any resulting OnDone can be inlined in
381  // the executor to which this callback is dispatched.
382  read_tag_.Set(call_.call(),
383  [this, reactor](bool ok) {
384  reactor->OnReadDone(ok);
385  this->MaybeDone(/*inlineable_ondone=*/true);
386  },
387  &read_ops_, /*can_inline=*/false);
388  read_ops_.set_core_cq_tag(&read_tag_);
389  this->BindReactor(reactor);
390  this->MaybeCallOnCancel(reactor);
391  // Inlineable OnDone can be false here because there is no read
392  // reactor that has an inlineable OnDone; this only applies to the
393  // DefaultReactor (which is unary).
394  this->MaybeDone(/*inlineable_ondone=*/false);
395  }
396 
397  ~ServerCallbackReaderImpl() {}
398 
399  ResponseType* response() { return &resp_; }
400 
401  void CallOnDone() override {
402  reactor_.load(std::memory_order_relaxed)->OnDone();
403  grpc_call* call = call_.call();
404  auto call_requester = std::move(call_requester_);
405  this->~ServerCallbackReaderImpl(); // explicitly call destructor
407  call_requester();
408  }
409 
410  ServerReactor* reactor() override {
411  return reactor_.load(std::memory_order_relaxed);
412  }
413 
415  meta_ops_;
420  finish_ops_;
424  read_ops_;
426 
429  ResponseType resp_;
430  std::function<void()> call_requester_;
431  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
432  std::atomic<ServerReadReactor<RequestType>*> reactor_;
433  // callbacks_outstanding_ follows a refcount pattern
434  std::atomic<intptr_t> callbacks_outstanding_{
435  3}; // reserve for OnStarted, Finish, and CompletionOp
436  };
437 };
438 
439 template <class RequestType, class ResponseType>
440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
441  public:
443  std::function<ServerWriteReactor<ResponseType>*(
444  ::grpc_impl::CallbackServerContext*, const RequestType*)>
445  get_reactor)
446  : get_reactor_(std::move(get_reactor)) {}
447  void RunHandler(const HandlerParameter& param) final {
448  // Arena allocate a writer structure
449  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
450 
452  param.call->call(), sizeof(ServerCallbackWriterImpl)))
453  ServerCallbackWriterImpl(
454  static_cast<::grpc_impl::CallbackServerContext*>(
455  param.server_context),
456  param.call, static_cast<RequestType*>(param.request),
457  std::move(param.call_requester));
458  // Inlineable OnDone can be false in the CompletionOp callback because there
459  // is no write reactor that has an inlineable OnDone; this only applies to
460  // the DefaultReactor (which is unary).
461  param.server_context->BeginCompletionOp(
462  param.call,
463  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
464  writer);
465 
466  ServerWriteReactor<ResponseType>* reactor = nullptr;
467  if (param.status.ok()) {
470  get_reactor_,
471  static_cast<::grpc_impl::CallbackServerContext*>(
472  param.server_context),
473  writer->request());
474  }
475  if (reactor == nullptr) {
476  // if deserialization or reactor creator failed, we need to fail the call
478  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
481  }
482 
483  writer->SetupReactor(reactor);
484  }
485 
487  ::grpc::Status* status, void** /*handler_data*/) final {
488  ::grpc::ByteBuffer buf;
489  buf.set_buffer(req);
490  auto* request =
492  call, sizeof(RequestType))) RequestType();
493  *status =
495  buf.Release();
496  if (status->ok()) {
497  return request;
498  }
499  request->~RequestType();
500  return nullptr;
501  }
502 
503  private:
504  std::function<ServerWriteReactor<ResponseType>*(
505  ::grpc_impl::CallbackServerContext*, const RequestType*)>
506  get_reactor_;
507 
508  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
509  public:
510  void Finish(::grpc::Status s) override {
511  // A finish tag with only MaybeDone can have its callback inlined
512  // regardless even if OnDone is not inlineable because this callback just
513  // checks a ref and then decides whether or not to dispatch OnDone.
514  finish_tag_.Set(call_.call(),
515  [this](bool) {
516  // Inlineable OnDone can be false here because there is
517  // no write reactor that has an inlineable OnDone; this
518  // only applies to the DefaultReactor (which is unary).
519  this->MaybeDone(/*inlineable_ondone=*/false);
520  },
521  &finish_ops_, /*can_inline=*/true);
522  finish_ops_.set_core_cq_tag(&finish_tag_);
523 
524  if (!ctx_->sent_initial_metadata_) {
525  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
526  ctx_->initial_metadata_flags());
527  if (ctx_->compression_level_set()) {
528  finish_ops_.set_compression_level(ctx_->compression_level());
529  }
530  ctx_->sent_initial_metadata_ = true;
531  }
532  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533  call_.PerformOps(&finish_ops_);
534  }
535 
536  void SendInitialMetadata() override {
537  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
538  this->Ref();
539  // The callback for this function should not be inlined because it invokes
540  // a user-controlled reaction, but any resulting OnDone can be inlined in
541  // the executor to which this callback is dispatched.
542  meta_tag_.Set(call_.call(),
543  [this](bool ok) {
544  ServerWriteReactor<ResponseType>* reactor =
545  reactor_.load(std::memory_order_relaxed);
546  reactor->OnSendInitialMetadataDone(ok);
547  this->MaybeDone(/*inlineable_ondone=*/true);
548  },
549  &meta_ops_, /*can_inline=*/false);
550  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551  ctx_->initial_metadata_flags());
552  if (ctx_->compression_level_set()) {
553  meta_ops_.set_compression_level(ctx_->compression_level());
554  }
555  ctx_->sent_initial_metadata_ = true;
556  meta_ops_.set_core_cq_tag(&meta_tag_);
557  call_.PerformOps(&meta_ops_);
558  }
559 
560  void Write(const ResponseType* resp,
561  ::grpc::WriteOptions options) override {
562  this->Ref();
563  if (options.is_last_message()) {
564  options.set_buffer_hint();
565  }
566  if (!ctx_->sent_initial_metadata_) {
567  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568  ctx_->initial_metadata_flags());
569  if (ctx_->compression_level_set()) {
570  write_ops_.set_compression_level(ctx_->compression_level());
571  }
572  ctx_->sent_initial_metadata_ = true;
573  }
574  // TODO(vjpai): don't assert
575  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576  call_.PerformOps(&write_ops_);
577  }
578 
579  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
580  ::grpc::Status s) override {
581  // This combines the write into the finish callback
582  // Don't send any message if the status is bad
583  if (s.ok()) {
584  // TODO(vjpai): don't assert
585  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
586  }
587  Finish(std::move(s));
588  }
589 
590  private:
591  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
592 
593  ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext* ctx,
594  ::grpc::internal::Call* call,
595  const RequestType* req,
596  std::function<void()> call_requester)
597  : ctx_(ctx),
598  call_(*call),
599  req_(req),
600  call_requester_(std::move(call_requester)) {}
601 
602  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
603  reactor_.store(reactor, std::memory_order_relaxed);
604  // The callback for this function should not be inlined because it invokes
605  // a user-controlled reaction, but any resulting OnDone can be inlined in
606  // the executor to which this callback is dispatched.
607  write_tag_.Set(call_.call(),
608  [this, reactor](bool ok) {
609  reactor->OnWriteDone(ok);
610  this->MaybeDone(/*inlineable_ondone=*/true);
611  },
612  &write_ops_, /*can_inline=*/false);
613  write_ops_.set_core_cq_tag(&write_tag_);
614  this->BindReactor(reactor);
615  this->MaybeCallOnCancel(reactor);
616  // Inlineable OnDone can be false here because there is no write
617  // reactor that has an inlineable OnDone; this only applies to the
618  // DefaultReactor (which is unary).
619  this->MaybeDone(/*inlineable_ondone=*/false);
620  }
621  ~ServerCallbackWriterImpl() { req_->~RequestType(); }
622 
623  const RequestType* request() { return req_; }
624 
625  void CallOnDone() override {
626  reactor_.load(std::memory_order_relaxed)->OnDone();
627  grpc_call* call = call_.call();
628  auto call_requester = std::move(call_requester_);
629  this->~ServerCallbackWriterImpl(); // explicitly call destructor
631  call_requester();
632  }
633 
634  ServerReactor* reactor() override {
635  return reactor_.load(std::memory_order_relaxed);
636  }
637 
639  meta_ops_;
644  finish_ops_;
648  write_ops_;
650 
653  const RequestType* req_;
654  std::function<void()> call_requester_;
655  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
656  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
657  // callbacks_outstanding_ follows a refcount pattern
658  std::atomic<intptr_t> callbacks_outstanding_{
659  3}; // reserve for OnStarted, Finish, and CompletionOp
660  };
661 };
662 
663 template <class RequestType, class ResponseType>
665  public:
669  get_reactor)
670  : get_reactor_(std::move(get_reactor)) {}
671  void RunHandler(const HandlerParameter& param) final {
672  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
673 
675  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
676  ServerCallbackReaderWriterImpl(
677  static_cast<::grpc_impl::CallbackServerContext*>(
678  param.server_context),
679  param.call, std::move(param.call_requester));
680  // Inlineable OnDone can be false in the CompletionOp callback because there
681  // is no bidi reactor that has an inlineable OnDone; this only applies to
682  // the DefaultReactor (which is unary).
683  param.server_context->BeginCompletionOp(
684  param.call,
685  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
686  stream);
687 
689  if (param.status.ok()) {
692  get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>(
693  param.server_context));
694  }
695 
696  if (reactor == nullptr) {
697  // if deserialization or reactor creator failed, we need to fail the call
699  param.call->call(),
703  }
704 
705  stream->SetupReactor(reactor);
706  }
707 
708  private:
709  std::function<ServerBidiReactor<RequestType, ResponseType>*(
711  get_reactor_;
712 
713  class ServerCallbackReaderWriterImpl
714  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
715  public:
716  void Finish(::grpc::Status s) override {
717  // A finish tag with only MaybeDone can have its callback inlined
718  // regardless even if OnDone is not inlineable because this callback just
719  // checks a ref and then decides whether or not to dispatch OnDone.
720  finish_tag_.Set(call_.call(),
721  [this](bool) {
722  // Inlineable OnDone can be false here because there is
723  // no bidi reactor that has an inlineable OnDone; this
724  // only applies to the DefaultReactor (which is unary).
725  this->MaybeDone(/*inlineable_ondone=*/false);
726  },
727  &finish_ops_, /*can_inline=*/true);
728  finish_ops_.set_core_cq_tag(&finish_tag_);
729 
730  if (!ctx_->sent_initial_metadata_) {
731  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
732  ctx_->initial_metadata_flags());
733  if (ctx_->compression_level_set()) {
734  finish_ops_.set_compression_level(ctx_->compression_level());
735  }
736  ctx_->sent_initial_metadata_ = true;
737  }
738  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
739  call_.PerformOps(&finish_ops_);
740  }
741 
742  void SendInitialMetadata() override {
743  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
744  this->Ref();
745  // The callback for this function should not be inlined because it invokes
746  // a user-controlled reaction, but any resulting OnDone can be inlined in
747  // the executor to which this callback is dispatched.
748  meta_tag_.Set(call_.call(),
749  [this](bool ok) {
750  ServerBidiReactor<RequestType, ResponseType>* reactor =
751  reactor_.load(std::memory_order_relaxed);
752  reactor->OnSendInitialMetadataDone(ok);
753  this->MaybeDone(/*inlineable_ondone=*/true);
754  },
755  &meta_ops_, /*can_inline=*/false);
756  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
757  ctx_->initial_metadata_flags());
758  if (ctx_->compression_level_set()) {
759  meta_ops_.set_compression_level(ctx_->compression_level());
760  }
761  ctx_->sent_initial_metadata_ = true;
762  meta_ops_.set_core_cq_tag(&meta_tag_);
763  call_.PerformOps(&meta_ops_);
764  }
765 
766  void Write(const ResponseType* resp,
767  ::grpc::WriteOptions options) override {
768  this->Ref();
769  if (options.is_last_message()) {
770  options.set_buffer_hint();
771  }
772  if (!ctx_->sent_initial_metadata_) {
773  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
774  ctx_->initial_metadata_flags());
775  if (ctx_->compression_level_set()) {
776  write_ops_.set_compression_level(ctx_->compression_level());
777  }
778  ctx_->sent_initial_metadata_ = true;
779  }
780  // TODO(vjpai): don't assert
781  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
782  call_.PerformOps(&write_ops_);
783  }
784 
785  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
786  ::grpc::Status s) override {
787  // Don't send any message if the status is bad
788  if (s.ok()) {
789  // TODO(vjpai): don't assert
790  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
791  }
792  Finish(std::move(s));
793  }
794 
795  void Read(RequestType* req) override {
796  this->Ref();
797  read_ops_.RecvMessage(req);
798  call_.PerformOps(&read_ops_);
799  }
800 
801  private:
802  friend class CallbackBidiHandler<RequestType, ResponseType>;
803 
804  ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext* ctx,
805  ::grpc::internal::Call* call,
806  std::function<void()> call_requester)
807  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
808 
809  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
810  reactor_.store(reactor, std::memory_order_relaxed);
811  // The callbacks for these functions should not be inlined because they
812  // invoke user-controlled reactions, but any resulting OnDones can be
813  // inlined in the executor to which a callback is dispatched.
814  write_tag_.Set(call_.call(),
815  [this, reactor](bool ok) {
816  reactor->OnWriteDone(ok);
817  this->MaybeDone(/*inlineable_ondone=*/true);
818  },
819  &write_ops_, /*can_inline=*/false);
820  write_ops_.set_core_cq_tag(&write_tag_);
821  read_tag_.Set(call_.call(),
822  [this, reactor](bool ok) {
823  reactor->OnReadDone(ok);
824  this->MaybeDone(/*inlineable_ondone=*/true);
825  },
826  &read_ops_, /*can_inline=*/false);
827  read_ops_.set_core_cq_tag(&read_tag_);
828  this->BindReactor(reactor);
829  this->MaybeCallOnCancel(reactor);
830  // Inlineable OnDone can be false here because there is no bidi
831  // reactor that has an inlineable OnDone; this only applies to the
832  // DefaultReactor (which is unary).
833  this->MaybeDone(/*inlineable_ondone=*/false);
834  }
835 
836  void CallOnDone() override {
837  reactor_.load(std::memory_order_relaxed)->OnDone();
838  grpc_call* call = call_.call();
839  auto call_requester = std::move(call_requester_);
840  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
842  call_requester();
843  }
844 
845  ServerReactor* reactor() override {
846  return reactor_.load(std::memory_order_relaxed);
847  }
848 
850  meta_ops_;
855  finish_ops_;
859  write_ops_;
863  read_ops_;
865 
868  std::function<void()> call_requester_;
869  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
870  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
871  // callbacks_outstanding_ follows a refcount pattern
872  std::atomic<intptr_t> callbacks_outstanding_{
873  3}; // reserve for OnStarted, Finish, and CompletionOp
874  };
875 };
876 
877 } // namespace internal
878 } // namespace grpc_impl
879 
880 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
grpc_impl::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:671
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:652
rpc_service_method.h
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:849
status.h
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:286
grpc_impl::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:45
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:44
grpc_impl::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback_impl.h:201
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
grpc_impl::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:82
grpc_impl::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:447
grpc_impl::ServerUnaryReactor
Definition: server_callback_impl.h:693
server_callback_impl.h
grpc_impl::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(::grpc_impl::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:666
grpc_impl::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(::grpc::experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:39
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:38
grpc_impl::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback_impl.h:186
grpc::experimental::MessageAllocator< RequestType, ResponseType >
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:216
grpc_impl::CallbackServerContext
Definition: server_context_impl.h:550
grpc_impl::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(::grpc_impl::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:257
grpc_impl::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback_impl.h:770
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc_impl::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback_impl.h:92
grpc_impl::ServerCallbackWriter
Definition: server_callback_impl.h:221
grpc_impl::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback_impl.h:182
grpc_impl::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc_impl::internal::DefaultMessageHolder
Definition: server_callback_impl.h:160
grpc_impl::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback_impl.h:106
grpc_impl::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback_impl.h:215
grpc_impl::internal::CallbackBidiHandler
Definition: server_callback_handlers.h:664
grpc_impl::ServerCallbackWriter< ResponseType >::BindReactor
void BindReactor(ServerWriteReactor< ResponseType > *reactor)
Definition: server_callback_impl.h:232
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc_byte_buffer
Definition: grpc_types.h:40
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:67
grpc_impl::internal::CallbackClientStreamingHandler
Definition: server_callback_handlers.h:255
grpc_impl::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::experimental::ServerUnaryReactor
::grpc_impl::ServerUnaryReactor ServerUnaryReactor
Definition: server_callback.h:51
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc_impl::ServerCallbackReaderWriter
Definition: server_callback_impl.h:238
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc_impl::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback_impl.h:184
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
server_context_impl.h
grpc_impl::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(::grpc_impl::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:442
grpc::UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
grpc_impl::ServerCallbackUnary
Definition: server_callback_impl.h:191
grpc::experimental::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:49
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:72
std
Definition: async_unary_call_impl.h:301
grpc_impl::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback_impl.h:124
grpc_impl::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(::grpc_impl::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:33
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:122
grpc_impl::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc_impl
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
grpc_impl::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:262
grpc_impl::ServerCallbackReader
Definition: server_callback_impl.h:207
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:93
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
grpc::experimental::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:58
grpc_impl::ServerUnaryReactor::OnSendInitialMetadataDone
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback_impl.h:729
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:146
grpc_impl::ServerCallbackReaderWriter< RequestType, ResponseType >::BindReactor
void BindReactor(ServerBidiReactor< RequestType, ResponseType > *reactor)
Definition: server_callback_impl.h:250
grpc_impl::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::experimental::MessageHolder::Release
virtual void Release()=0
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:51
grpc_impl::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_handlers.h:486
message_allocator.h
grpc::experimental::MessageHolder< RequestType, ResponseType >
grpc::experimental::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:48
grpc_impl::internal::FinishOnlyReactor
Definition: server_callback_impl.h:764