GRPC C++  1.30.0
method_handler_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
21 
26 
27 namespace grpc_impl {
28 
29 namespace internal {
30 
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
40 #if GRPC_ALLOW_EXCEPTIONS
41  try {
42  return handler();
43  } catch (...) {
45  "Unexpected error in RPC handling");
46  }
47 #else // GRPC_ALLOW_EXCEPTIONS
48  return handler();
49 #endif // GRPC_ALLOW_EXCEPTIONS
50 }
51 
53 template <class ServiceType, class RequestType, class ResponseType>
55  public:
57  std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
58  const RequestType*, ResponseType*)>
59  func,
60  ServiceType* service)
61  : func_(func), service_(service) {}
62 
63  void RunHandler(const HandlerParameter& param) final {
64  ResponseType rsp;
65  ::grpc::Status status = param.status;
66  if (status.ok()) {
67  status = CatchingFunctionHandler([this, &param, &rsp] {
68  return func_(
69  service_,
70  static_cast<::grpc_impl::ServerContext*>(param.server_context),
71  static_cast<RequestType*>(param.request), &rsp);
72  });
73  static_cast<RequestType*>(param.request)->~RequestType();
74  }
75 
76  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
80  ops;
81  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
82  param.server_context->initial_metadata_flags());
83  if (param.server_context->compression_level_set()) {
84  ops.set_compression_level(param.server_context->compression_level());
85  }
86  if (status.ok()) {
87  status = ops.SendMessagePtr(&rsp);
88  }
89  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
90  param.call->PerformOps(&ops);
91  param.call->cq()->Pluck(&ops);
92  }
93 
95  ::grpc::Status* status, void** /*handler_data*/) final {
97  buf.set_buffer(req);
98  auto* request =
100  call, sizeof(RequestType))) RequestType();
101  *status =
103  buf.Release();
104  if (status->ok()) {
105  return request;
106  }
107  request->~RequestType();
108  return nullptr;
109  }
110 
111  private:
113  std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
114  const RequestType*, ResponseType*)>
115  func_;
116  // The class the above handler function lives in.
117  ServiceType* service_;
118 };
119 
121 template <class ServiceType, class RequestType, class ResponseType>
123  public:
125  std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
127  ResponseType*)>
128  func,
129  ServiceType* service)
130  : func_(func), service_(service) {}
131 
132  void RunHandler(const HandlerParameter& param) final {
134  param.call,
135  static_cast<::grpc_impl::ServerContext*>(param.server_context));
136  ResponseType rsp;
137  ::grpc::Status status =
138  CatchingFunctionHandler([this, &param, &reader, &rsp] {
139  return func_(
140  service_,
141  static_cast<::grpc_impl::ServerContext*>(param.server_context),
142  &reader, &rsp);
143  });
144 
148  ops;
149  if (!param.server_context->sent_initial_metadata_) {
150  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
151  param.server_context->initial_metadata_flags());
152  if (param.server_context->compression_level_set()) {
153  ops.set_compression_level(param.server_context->compression_level());
154  }
155  }
156  if (status.ok()) {
157  status = ops.SendMessagePtr(&rsp);
158  }
159  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
160  param.call->PerformOps(&ops);
161  param.call->cq()->Pluck(&ops);
162  }
163 
164  private:
165  std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
167  ResponseType*)>
168  func_;
169  ServiceType* service_;
170 };
171 
173 template <class ServiceType, class RequestType, class ResponseType>
175  public:
177  std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
178  const RequestType*,
180  func,
181  ServiceType* service)
182  : func_(func), service_(service) {}
183 
184  void RunHandler(const HandlerParameter& param) final {
185  ::grpc::Status status = param.status;
186  if (status.ok()) {
188  param.call,
189  static_cast<::grpc_impl::ServerContext*>(param.server_context));
190  status = CatchingFunctionHandler([this, &param, &writer] {
191  return func_(
192  service_,
193  static_cast<::grpc_impl::ServerContext*>(param.server_context),
194  static_cast<RequestType*>(param.request), &writer);
195  });
196  static_cast<RequestType*>(param.request)->~RequestType();
197  }
198 
201  ops;
202  if (!param.server_context->sent_initial_metadata_) {
203  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
204  param.server_context->initial_metadata_flags());
205  if (param.server_context->compression_level_set()) {
206  ops.set_compression_level(param.server_context->compression_level());
207  }
208  }
209  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
210  param.call->PerformOps(&ops);
211  if (param.server_context->has_pending_ops_) {
212  param.call->cq()->Pluck(&param.server_context->pending_ops_);
213  }
214  param.call->cq()->Pluck(&ops);
215  }
216 
218  ::grpc::Status* status, void** /*handler_data*/) final {
219  ::grpc::ByteBuffer buf;
220  buf.set_buffer(req);
221  auto* request =
223  call, sizeof(RequestType))) RequestType();
224  *status =
226  buf.Release();
227  if (status->ok()) {
228  return request;
229  }
230  request->~RequestType();
231  return nullptr;
232  }
233 
234  private:
235  std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
236  const RequestType*,
238  func_;
239  ServiceType* service_;
240 };
241 
249 template <class Streamer, bool WriteNeeded>
251  public:
253  std::function<::grpc::Status(::grpc_impl::ServerContext*, Streamer*)>
254  func)
255  : func_(func), write_needed_(WriteNeeded) {}
256 
257  void RunHandler(const HandlerParameter& param) final {
258  Streamer stream(param.call, static_cast<::grpc_impl::ServerContext*>(
259  param.server_context));
260  ::grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
261  return func_(
262  static_cast<::grpc_impl::ServerContext*>(param.server_context),
263  &stream);
264  });
265 
268  ops;
269  if (!param.server_context->sent_initial_metadata_) {
270  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
271  param.server_context->initial_metadata_flags());
272  if (param.server_context->compression_level_set()) {
273  ops.set_compression_level(param.server_context->compression_level());
274  }
275  if (write_needed_ && status.ok()) {
276  // If we needed a write but never did one, we need to mark the
277  // status as a fail
279  "Service did not provide response message");
280  }
281  }
282  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
283  param.call->PerformOps(&ops);
284  if (param.server_context->has_pending_ops_) {
285  param.call->cq()->Pluck(&param.server_context->pending_ops_);
286  }
287  param.call->cq()->Pluck(&ops);
288  }
289 
290  private:
291  std::function<::grpc::Status(::grpc_impl::ServerContext*, Streamer*)> func_;
292  const bool write_needed_;
293 };
294 
295 template <class ServiceType, class RequestType, class ResponseType>
298  ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>, false> {
299  public:
301  std::function<::grpc::Status(
302  ServiceType*, ::grpc_impl::ServerContext*,
304  func,
305  ServiceType* service)
306  // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
308  ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>, false>(
309  [func, service](
310  ::grpc_impl::ServerContext* ctx,
311  ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>*
312  streamer) { return func(service, ctx, streamer); }) {}
313 };
314 
315 template <class RequestType, class ResponseType>
318  ::grpc_impl::ServerUnaryStreamer<RequestType, ResponseType>, true> {
319  public:
321  std::function<::grpc::Status(
324  func)
326  ::grpc_impl::ServerUnaryStreamer<RequestType, ResponseType>, true>(
327  std::move(func)) {}
328 };
329 
330 template <class RequestType, class ResponseType>
333  ::grpc_impl::ServerSplitStreamer<RequestType, ResponseType>, false> {
334  public:
336  std::function<::grpc::Status(
339  func)
341  ::grpc_impl::ServerSplitStreamer<RequestType, ResponseType>, false>(
342  std::move(func)) {}
343 };
344 
347 template <::grpc::StatusCode code>
349  public:
350  template <class T>
351  static void FillOps(::grpc_impl::ServerContextBase* context, T* ops) {
352  ::grpc::Status status(code, "");
353  if (!context->sent_initial_metadata_) {
354  ops->SendInitialMetadata(&context->initial_metadata_,
355  context->initial_metadata_flags());
356  if (context->compression_level_set()) {
357  ops->set_compression_level(context->compression_level());
358  }
359  context->sent_initial_metadata_ = true;
360  }
361  ops->ServerSendStatus(&context->trailing_metadata_, status);
362  }
363 
364  void RunHandler(const HandlerParameter& param) final {
367  ops;
368  FillOps(param.server_context, &ops);
369  param.call->PerformOps(&ops);
370  param.call->cq()->Pluck(&ops);
371  }
372 
373  void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
374  ::grpc::Status* /*status*/, void** /*handler_data*/) final {
375  // We have to destroy any request payload
376  if (req != nullptr) {
378  }
379  return nullptr;
380  }
381 };
382 
383 typedef ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED>
387 
388 } // namespace internal
389 } // namespace grpc_impl
390 
391 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
grpc_impl::internal::RpcMethodHandler::RpcMethodHandler
RpcMethodHandler(std::function<::grpc::Status(ServiceType *, ::grpc_impl::ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:56
grpc_impl::internal::ErrorMethodHandler
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: byte_buffer.h:44
grpc_impl::internal::BidiStreamingHandler::BidiStreamingHandler
BidiStreamingHandler(std::function<::grpc::Status(ServiceType *, ::grpc_impl::ServerContext *, ::grpc_impl::ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:300
grpc::internal::RpcMethodHandler
::grpc_impl::internal::RpcMethodHandler< ServiceType, RequestType, ResponseType > RpcMethodHandler
Definition: method_handler.h:36
grpc_impl::internal::StreamedUnaryHandler::StreamedUnaryHandler
StreamedUnaryHandler(std::function<::grpc::Status(::grpc_impl::ServerContext *, ::grpc_impl::ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:320
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:652
rpc_service_method.h
grpc::internal::TemplatedBidiStreamingHandler
::grpc_impl::internal::TemplatedBidiStreamingHandler< Streamer, WriteNeeded > TemplatedBidiStreamingHandler
Definition: method_handler.h:50
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:849
grpc_impl::internal::BidiStreamingHandler
Definition: method_handler_impl.h:296
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:286
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:44
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
sync_stream_impl.h
grpc_impl::ServerUnaryStreamer
A class to represent a flow-controlled unary call.
Definition: sync_stream_impl.h:823
grpc_impl::ServerReaderWriter
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream_impl.h:781
core_codegen_interface.h
grpc_impl::ServerContextBase
Base class of ServerContext. Experimental until callback API is final.
Definition: server_context_impl.h:123
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:216
grpc_impl::internal::SplitServerStreamingHandler::SplitServerStreamingHandler
SplitServerStreamingHandler(std::function<::grpc::Status(::grpc_impl::ServerContext *, ::grpc_impl::ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler_impl.h:335
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:118
grpc::CoreCodegenInterface::grpc_byte_buffer_destroy
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)=0
byte_buffer.h
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:31
grpc_impl::ServerContext
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context_impl.h:510
grpc_impl::internal::SplitServerStreamingHandler
Definition: method_handler_impl.h:331
grpc::internal::ServerStreamingHandler
::grpc_impl::internal::ServerStreamingHandler< ServiceType, RequestType, ResponseType > ServerStreamingHandler
Definition: method_handler.h:46
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc_impl::internal::ResourceExhaustedHandler
ErrorMethodHandler<::grpc::StatusCode::RESOURCE_EXHAUSTED > ResourceExhaustedHandler
Definition: method_handler_impl.h:386
grpc_byte_buffer
Definition: grpc_types.h:40
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:67
grpc_impl::internal::RpcMethodHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: method_handler_impl.h:94
grpc_impl::ServerWriter
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue_impl.h:61
grpc_impl::internal::ServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:184
grpc::internal::CallOpServerSendStatus::ServerSendStatus
void ServerSendStatus(std::multimap< grpc::string, grpc::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:656
grpc_impl::ServerSplitStreamer
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream_impl.h:890
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
grpc_impl::internal::RpcMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:63
grpc::protobuf::util::Status
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
grpc_impl::internal::ServerStreamingHandler::ServerStreamingHandler
ServerStreamingHandler(std::function<::grpc::Status(ServiceType *, ::grpc_impl::ServerContext *, const RequestType *, ::grpc_impl::ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler_impl.h:176
grpc_impl::internal::UnknownMethodHandler
ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED > UnknownMethodHandler
Definition: method_handler_impl.h:384
grpc_impl::internal::ClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:132
grpc_impl::internal::ErrorMethodHandler::FillOps
static void FillOps(::grpc_impl::ServerContextBase *context, T *ops)
Definition: method_handler_impl.h:351
grpc_impl::ServerContextBase::compression_level
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context_impl.h:215
grpc_impl::internal::TemplatedBidiStreamingHandler
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue_impl.h:74
grpc_impl::internal::TemplatedBidiStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:257
std
Definition: async_unary_call_impl.h:301
grpc_impl::internal::TemplatedBidiStreamingHandler::TemplatedBidiStreamingHandler
TemplatedBidiStreamingHandler(std::function<::grpc::Status(::grpc_impl::ServerContext *, Streamer *)> func)
Definition: method_handler_impl.h:252
grpc_impl::ServerReader
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue_impl.h:59
grpc_impl::internal::ErrorMethodHandler::Deserialize
void * Deserialize(grpc_call *, grpc_byte_buffer *req, ::grpc::Status *, void **) final
Definition: method_handler_impl.h:373
grpc_impl
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
grpc_impl::internal::ClientStreamingHandler::ClientStreamingHandler
ClientStreamingHandler(std::function<::grpc::Status(ServiceType *, ::grpc_impl::ServerContext *, ::grpc_impl::ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler_impl.h:124
grpc_impl::internal::ServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: method_handler_impl.h:217
grpc_impl::internal::CatchingFunctionHandler
::grpc::Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler_impl.h:39
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:93
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
grpc::UNKNOWN
Unknown error.
Definition: status_code_enum.h:35
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:146
grpc_impl::internal::ErrorMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler_impl.h:364
grpc::internal::ErrorMethodHandler
::grpc_impl::internal::ErrorMethodHandler< code > ErrorMethodHandler
Definition: method_handler.h:62
grpc::internal::ClientStreamingHandler
::grpc_impl::internal::ClientStreamingHandler< ServiceType, RequestType, ResponseType > ClientStreamingHandler
Definition: method_handler.h:41
grpc_impl::internal::StreamedUnaryHandler
Definition: method_handler_impl.h:316
grpc_impl::ServerContextBase::compression_level_set
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context_impl.h:230
grpc::INTERNAL
Internal errors.
Definition: status_code_enum.h:119