18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
30 template <
class RequestType,
class ResponseType>
35 const RequestType*, ResponseType*)>
37 : get_reactor_(
std::move(get_reactor)) {}
42 allocator_ = allocator;
48 auto* allocator_state =
static_cast<
53 param.call->call(),
sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
55 static_cast<::grpc_impl::CallbackServerContext*>(
56 param.server_context),
57 param.call, allocator_state, std::move(param.call_requester));
58 param.server_context->BeginCompletionOp(
59 param.call, [call](
bool) { call->MaybeDone(); }, call);
62 if (param.status.ok()) {
63 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
65 static_cast<::grpc_impl::CallbackServerContext*>(
66 param.server_context),
70 if (reactor ==
nullptr) {
79 call->SetupReactor(reactor);
86 RequestType* request =
nullptr;
88 allocator_state =
nullptr;
89 if (allocator_ !=
nullptr) {
97 *handler_data = allocator_state;
98 request = allocator_state->
request();
112 const RequestType*, ResponseType*)>
115 allocator_ =
nullptr;
130 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
133 finish_ops_.set_core_cq_tag(&finish_tag_);
135 if (!ctx_->sent_initial_metadata_) {
137 ctx_->initial_metadata_flags());
138 if (ctx_->compression_level_set()) {
139 finish_ops_.set_compression_level(ctx_->compression_level());
141 ctx_->sent_initial_metadata_ =
true;
145 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
146 finish_ops_.SendMessagePtr(response()));
148 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
150 finish_ops_.set_core_cq_tag(&finish_tag_);
151 call_.PerformOps(&finish_ops_);
154 void SendInitialMetadata()
override {
162 meta_tag_.Set(call_.call(),
165 reactor_.load(std::memory_order_relaxed);
170 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
171 ctx_->initial_metadata_flags());
172 if (ctx_->compression_level_set()) {
173 meta_ops_.set_compression_level(ctx_->compression_level());
175 ctx_->sent_initial_metadata_ =
true;
176 meta_ops_.set_core_cq_tag(&meta_tag_);
177 call_.PerformOps(&meta_ops_);
183 ServerCallbackUnaryImpl(
187 std::function<
void()> call_requester)
190 allocator_state_(allocator_state),
191 call_requester_(
std::move(call_requester)) {
192 ctx_->set_message_allocator_state(allocator_state);
200 reactor_.store(reactor, std::memory_order_relaxed);
203 this->
MaybeDone(reactor->InternalInlineable());
206 const RequestType* request() {
return allocator_state_->request(); }
207 ResponseType* response() {
return allocator_state_->response(); }
209 void CallOnDone()
override {
210 reactor_.load(std::memory_order_relaxed)->OnDone();
212 auto call_requester = std::move(call_requester_);
213 allocator_state_->Release();
214 this->~ServerCallbackUnaryImpl();
219 ServerReactor* reactor()
override {
220 return reactor_.load(std::memory_order_relaxed);
236 std::function<void()> call_requester_;
247 std::atomic<ServerUnaryReactor*> reactor_;
249 std::atomic<intptr_t> callbacks_outstanding_{
254 template <
class RequestType,
class ResponseType>
261 : get_reactor_(
std::move(get_reactor)) {}
267 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
268 ServerCallbackReaderImpl(
269 static_cast<::grpc_impl::CallbackServerContext*>(
270 param.server_context),
271 param.call, std::move(param.call_requester));
275 param.server_context->BeginCompletionOp(
277 [reader](
bool) { reader->MaybeDone(false); },
281 if (param.status.ok()) {
285 static_cast<::grpc_impl::CallbackServerContext*>(
286 param.server_context),
290 if (reactor ==
nullptr) {
298 reader->SetupReactor(reactor);
302 std::function<ServerReadReactor<RequestType>*(
312 finish_tag_.Set(call_.call(),
317 this->MaybeDone(
false);
320 if (!ctx_->sent_initial_metadata_) {
322 ctx_->initial_metadata_flags());
323 if (ctx_->compression_level_set()) {
324 finish_ops_.set_compression_level(ctx_->compression_level());
326 ctx_->sent_initial_metadata_ =
true;
330 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
331 finish_ops_.SendMessagePtr(&resp_));
333 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
335 finish_ops_.set_core_cq_tag(&finish_tag_);
336 call_.PerformOps(&finish_ops_);
339 void SendInitialMetadata()
override {
345 meta_tag_.Set(call_.call(),
347 ServerReadReactor<RequestType>* reactor =
348 reactor_.load(std::memory_order_relaxed);
349 reactor->OnSendInitialMetadataDone(ok);
353 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354 ctx_->initial_metadata_flags());
355 if (ctx_->compression_level_set()) {
356 meta_ops_.set_compression_level(ctx_->compression_level());
358 ctx_->sent_initial_metadata_ =
true;
359 meta_ops_.set_core_cq_tag(&meta_tag_);
360 call_.PerformOps(&meta_ops_);
363 void Read(RequestType* req)
override {
365 read_ops_.RecvMessage(req);
366 call_.PerformOps(&read_ops_);
374 std::function<
void()> call_requester)
375 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
377 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
378 reactor_.store(reactor, std::memory_order_relaxed);
382 read_tag_.Set(call_.call(),
383 [
this, reactor](
bool ok) {
384 reactor->OnReadDone(ok);
388 read_ops_.set_core_cq_tag(&read_tag_);
397 ~ServerCallbackReaderImpl() {}
399 ResponseType* response() {
return &resp_; }
401 void CallOnDone()
override {
402 reactor_.load(std::memory_order_relaxed)->OnDone();
404 auto call_requester = std::move(call_requester_);
405 this->~ServerCallbackReaderImpl();
410 ServerReactor* reactor()
override {
411 return reactor_.load(std::memory_order_relaxed);
430 std::function<void()> call_requester_;
432 std::atomic<ServerReadReactor<RequestType>*> reactor_;
434 std::atomic<intptr_t> callbacks_outstanding_{
439 template <
class RequestType,
class ResponseType>
446 : get_reactor_(
std::move(get_reactor)) {}
452 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
453 ServerCallbackWriterImpl(
454 static_cast<::grpc_impl::CallbackServerContext*>(
455 param.server_context),
456 param.call, static_cast<RequestType*>(param.request),
457 std::move(param.call_requester));
461 param.server_context->BeginCompletionOp(
463 [writer](
bool) { writer->MaybeDone(false); },
467 if (param.status.ok()) {
471 static_cast<::grpc_impl::CallbackServerContext*>(
472 param.server_context),
475 if (reactor ==
nullptr) {
483 writer->SetupReactor(reactor);
492 call,
sizeof(RequestType))) RequestType();
499 request->~RequestType();
504 std::function<ServerWriteReactor<ResponseType>*(
514 finish_tag_.Set(call_.call(),
519 this->MaybeDone(
false);
522 finish_ops_.set_core_cq_tag(&finish_tag_);
524 if (!ctx_->sent_initial_metadata_) {
526 ctx_->initial_metadata_flags());
527 if (ctx_->compression_level_set()) {
528 finish_ops_.set_compression_level(ctx_->compression_level());
530 ctx_->sent_initial_metadata_ =
true;
532 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533 call_.PerformOps(&finish_ops_);
536 void SendInitialMetadata()
override {
542 meta_tag_.Set(call_.call(),
544 ServerWriteReactor<ResponseType>* reactor =
545 reactor_.load(std::memory_order_relaxed);
546 reactor->OnSendInitialMetadataDone(ok);
550 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
551 ctx_->initial_metadata_flags());
552 if (ctx_->compression_level_set()) {
553 meta_ops_.set_compression_level(ctx_->compression_level());
555 ctx_->sent_initial_metadata_ =
true;
556 meta_ops_.set_core_cq_tag(&meta_tag_);
557 call_.PerformOps(&meta_ops_);
560 void Write(
const ResponseType* resp,
566 if (!ctx_->sent_initial_metadata_) {
567 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568 ctx_->initial_metadata_flags());
569 if (ctx_->compression_level_set()) {
570 write_ops_.set_compression_level(ctx_->compression_level());
572 ctx_->sent_initial_metadata_ =
true;
576 call_.PerformOps(&write_ops_);
587 Finish(std::move(s));
595 const RequestType* req,
596 std::function<
void()> call_requester)
600 call_requester_(
std::move(call_requester)) {}
602 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
603 reactor_.store(reactor, std::memory_order_relaxed);
607 write_tag_.Set(call_.call(),
608 [
this, reactor](
bool ok) {
609 reactor->OnWriteDone(ok);
613 write_ops_.set_core_cq_tag(&write_tag_);
621 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
623 const RequestType* request() {
return req_; }
625 void CallOnDone()
override {
626 reactor_.load(std::memory_order_relaxed)->OnDone();
628 auto call_requester = std::move(call_requester_);
629 this->~ServerCallbackWriterImpl();
634 ServerReactor* reactor()
override {
635 return reactor_.load(std::memory_order_relaxed);
653 const RequestType* req_;
654 std::function<void()> call_requester_;
656 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
658 std::atomic<intptr_t> callbacks_outstanding_{
663 template <
class RequestType,
class ResponseType>
670 : get_reactor_(
std::move(get_reactor)) {}
675 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
676 ServerCallbackReaderWriterImpl(
677 static_cast<::grpc_impl::CallbackServerContext*>(
678 param.server_context),
679 param.call, std::move(param.call_requester));
683 param.server_context->BeginCompletionOp(
685 [stream](
bool) { stream->MaybeDone(false); },
689 if (param.status.ok()) {
692 get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>(
693 param.server_context));
696 if (reactor ==
nullptr) {
705 stream->SetupReactor(reactor);
709 std::function<ServerBidiReactor<RequestType, ResponseType>*(
713 class ServerCallbackReaderWriterImpl
720 finish_tag_.Set(call_.call(),
725 this->MaybeDone(
false);
728 finish_ops_.set_core_cq_tag(&finish_tag_);
730 if (!ctx_->sent_initial_metadata_) {
732 ctx_->initial_metadata_flags());
733 if (ctx_->compression_level_set()) {
734 finish_ops_.set_compression_level(ctx_->compression_level());
736 ctx_->sent_initial_metadata_ =
true;
738 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
739 call_.PerformOps(&finish_ops_);
742 void SendInitialMetadata()
override {
748 meta_tag_.Set(call_.call(),
750 ServerBidiReactor<RequestType, ResponseType>* reactor =
751 reactor_.load(std::memory_order_relaxed);
752 reactor->OnSendInitialMetadataDone(ok);
756 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
757 ctx_->initial_metadata_flags());
758 if (ctx_->compression_level_set()) {
759 meta_ops_.set_compression_level(ctx_->compression_level());
761 ctx_->sent_initial_metadata_ =
true;
762 meta_ops_.set_core_cq_tag(&meta_tag_);
763 call_.PerformOps(&meta_ops_);
766 void Write(
const ResponseType* resp,
772 if (!ctx_->sent_initial_metadata_) {
773 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
774 ctx_->initial_metadata_flags());
775 if (ctx_->compression_level_set()) {
776 write_ops_.set_compression_level(ctx_->compression_level());
778 ctx_->sent_initial_metadata_ =
true;
782 call_.PerformOps(&write_ops_);
792 Finish(std::move(s));
795 void Read(RequestType* req)
override {
797 read_ops_.RecvMessage(req);
798 call_.PerformOps(&read_ops_);
806 std::function<
void()> call_requester)
807 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
809 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
810 reactor_.store(reactor, std::memory_order_relaxed);
814 write_tag_.Set(call_.call(),
815 [
this, reactor](
bool ok) {
816 reactor->OnWriteDone(ok);
820 write_ops_.set_core_cq_tag(&write_tag_);
821 read_tag_.Set(call_.call(),
822 [
this, reactor](
bool ok) {
823 reactor->OnReadDone(ok);
827 read_ops_.set_core_cq_tag(&read_tag_);
836 void CallOnDone()
override {
837 reactor_.load(std::memory_order_relaxed)->OnDone();
839 auto call_requester = std::move(call_requester_);
840 this->~ServerCallbackReaderWriterImpl();
845 ServerReactor* reactor()
override {
846 return reactor_.load(std::memory_order_relaxed);
868 std::function<void()> call_requester_;
870 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
872 std::atomic<intptr_t> callbacks_outstanding_{
880 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H