GRPC C++  1.66.0
method_handler.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
20 #define GRPCPP_SUPPORT_METHOD_HANDLER_H
21 
22 #include "absl/log/absl_check.h"
23 
24 #include <grpc/byte_buffer.h>
25 #include <grpc/support/log.h>
29 
30 namespace grpc {
31 
32 namespace internal {
33 
34 // Invoke the method handler, fill in the status, and
35 // return whether or not we finished safely (without an exception).
36 // Note that exception handling is 0-cost in most compiler/library
37 // implementations (except when an exception is actually thrown),
38 // so this process doesn't require additional overhead in the common case.
39 // Additionally, we don't need to return if we caught an exception or not;
40 // the handling is the same in either case.
41 template <class Callable>
43 #if GRPC_ALLOW_EXCEPTIONS
44  try {
45  return handler();
46  } catch (...) {
48  "Unexpected error in RPC handling");
49  }
50 #else // GRPC_ALLOW_EXCEPTIONS
51  return handler();
52 #endif // GRPC_ALLOW_EXCEPTIONS
53 }
54 
58 
59 template <class ResponseType>
61  ResponseType* rsp, grpc::Status& status) {
62  ABSL_CHECK(!param.server_context->sent_initial_metadata_);
66  ops;
67  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
68  param.server_context->initial_metadata_flags());
70  ops.set_compression_level(param.server_context->compression_level());
71  }
72  if (status.ok()) {
73  status = ops.SendMessagePtr(rsp);
74  }
75  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
76  param.call->PerformOps(&ops);
77  param.call->cq()->Pluck(&ops);
78 }
79 
81 
82 template <class RequestType>
84  RequestType* request) {
85  grpc::ByteBuffer buf;
86  buf.set_buffer(req);
88  &buf, static_cast<RequestType*>(request));
89  buf.Release();
90  if (status->ok()) {
91  return request;
92  }
93  request->~RequestType();
94  return nullptr;
95 }
96 
98 template <class ServiceType, class RequestType, class ResponseType,
99  class BaseRequestType = RequestType,
100  class BaseResponseType = ResponseType>
101 class RpcMethodHandler : public grpc::internal::MethodHandler {
102  public:
104  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
105  const RequestType*, ResponseType*)>
106  func,
107  ServiceType* service)
108  : func_(func), service_(service) {}
109 
110  void RunHandler(const HandlerParameter& param) final {
111  ResponseType rsp;
112  grpc::Status status = param.status;
113  if (status.ok()) {
114  status = CatchingFunctionHandler([this, &param, &rsp] {
115  return func_(service_,
116  static_cast<grpc::ServerContext*>(param.server_context),
117  static_cast<RequestType*>(param.request), &rsp);
118  });
119  static_cast<RequestType*>(param.request)->~RequestType();
120  }
121  UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
122  }
123 
125  grpc::Status* status, void** /*handler_data*/) final {
126  auto* request =
127  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType;
128  return UnaryDeserializeHelper(req, status,
129  static_cast<BaseRequestType*>(request));
130  }
131 
132  private:
134  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
135  const RequestType*, ResponseType*)>
136  func_;
137  // The class the above handler function lives in.
138  ServiceType* service_;
139 };
140 
142 template <class ServiceType, class RequestType, class ResponseType>
143 class ClientStreamingHandler : public grpc::internal::MethodHandler {
144  public:
146  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
147  ServerReader<RequestType>*, ResponseType*)>
148  func,
149  ServiceType* service)
150  : func_(func), service_(service) {}
151 
152  void RunHandler(const HandlerParameter& param) final {
154  param.call, static_cast<grpc::ServerContext*>(param.server_context));
155  ResponseType rsp;
156  grpc::Status status =
157  CatchingFunctionHandler([this, &param, &reader, &rsp] {
158  return func_(service_,
159  static_cast<grpc::ServerContext*>(param.server_context),
160  &reader, &rsp);
161  });
162 
166  ops;
167  if (!param.server_context->sent_initial_metadata_) {
168  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
169  param.server_context->initial_metadata_flags());
170  if (param.server_context->compression_level_set()) {
171  ops.set_compression_level(param.server_context->compression_level());
172  }
173  }
174  if (status.ok()) {
175  status = ops.SendMessagePtr(&rsp);
176  }
177  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
178  param.call->PerformOps(&ops);
179  param.call->cq()->Pluck(&ops);
180  }
181 
182  private:
183  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
184  ServerReader<RequestType>*, ResponseType*)>
185  func_;
186  ServiceType* service_;
187 };
188 
190 template <class ServiceType, class RequestType, class ResponseType>
191 class ServerStreamingHandler : public grpc::internal::MethodHandler {
192  public:
194  ServiceType*, grpc::ServerContext*,
195  const RequestType*, ServerWriter<ResponseType>*)>
196  func,
197  ServiceType* service)
198  : func_(func), service_(service) {}
199 
200  void RunHandler(const HandlerParameter& param) final {
201  grpc::Status status = param.status;
202  if (status.ok()) {
204  param.call, static_cast<grpc::ServerContext*>(param.server_context));
205  status = CatchingFunctionHandler([this, &param, &writer] {
206  return func_(service_,
207  static_cast<grpc::ServerContext*>(param.server_context),
208  static_cast<RequestType*>(param.request), &writer);
209  });
210  static_cast<RequestType*>(param.request)->~RequestType();
211  }
212 
215  ops;
216  if (!param.server_context->sent_initial_metadata_) {
217  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
218  param.server_context->initial_metadata_flags());
219  if (param.server_context->compression_level_set()) {
220  ops.set_compression_level(param.server_context->compression_level());
221  }
222  }
223  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
224  param.call->PerformOps(&ops);
225  if (param.server_context->has_pending_ops_) {
226  param.call->cq()->Pluck(&param.server_context->pending_ops_);
227  }
228  param.call->cq()->Pluck(&ops);
229  }
230 
232  grpc::Status* status, void** /*handler_data*/) final {
233  grpc::ByteBuffer buf;
234  buf.set_buffer(req);
235  auto* request =
236  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
237  *status =
239  buf.Release();
240  if (status->ok()) {
241  return request;
242  }
243  request->~RequestType();
244  return nullptr;
245  }
246 
247  private:
248  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
249  const RequestType*, ServerWriter<ResponseType>*)>
250  func_;
251  ServiceType* service_;
252 };
253 
261 template <class Streamer, bool WriteNeeded>
262 class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
263  public:
265  std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
266  : func_(func), write_needed_(WriteNeeded) {}
267 
268  void RunHandler(const HandlerParameter& param) final {
269  Streamer stream(param.call,
270  static_cast<grpc::ServerContext*>(param.server_context));
271  grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
272  return func_(static_cast<grpc::ServerContext*>(param.server_context),
273  &stream);
274  });
275 
278  ops;
279  if (!param.server_context->sent_initial_metadata_) {
280  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
281  param.server_context->initial_metadata_flags());
282  if (param.server_context->compression_level_set()) {
283  ops.set_compression_level(param.server_context->compression_level());
284  }
285  if (write_needed_ && status.ok()) {
286  // If we needed a write but never did one, we need to mark the
287  // status as a fail
289  "Service did not provide response message");
290  }
291  }
292  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
293  param.call->PerformOps(&ops);
294  if (param.server_context->has_pending_ops_) {
295  param.call->cq()->Pluck(&param.server_context->pending_ops_);
296  }
297  param.call->cq()->Pluck(&ops);
298  }
299 
300  private:
301  std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
302  const bool write_needed_;
303 };
304 
305 template <class ServiceType, class RequestType, class ResponseType>
306 class BidiStreamingHandler
307  : public TemplatedBidiStreamingHandler<
308  ServerReaderWriter<ResponseType, RequestType>, false> {
309  public:
311  ServiceType*, grpc::ServerContext*,
313  func,
314  ServiceType* service)
315  // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
317  ServerReaderWriter<ResponseType, RequestType>, false>(
318  [func, service](
319  grpc::ServerContext* ctx,
320  ServerReaderWriter<ResponseType, RequestType>* streamer) {
321  return func(service, ctx, streamer);
322  }) {}
323 };
324 
325 template <class RequestType, class ResponseType>
328  ServerUnaryStreamer<RequestType, ResponseType>, true> {
329  public:
331  std::function<
334  func)
336  ServerUnaryStreamer<RequestType, ResponseType>, true>(
337  std::move(func)) {}
338 };
339 
340 template <class RequestType, class ResponseType>
343  ServerSplitStreamer<RequestType, ResponseType>, false> {
344  public:
346  std::function<
349  func)
351  ServerSplitStreamer<RequestType, ResponseType>, false>(
352  std::move(func)) {}
353 };
354 
357 template <grpc::StatusCode code>
358 class ErrorMethodHandler : public grpc::internal::MethodHandler {
359  public:
360  explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
361 
362  template <class T>
363  static void FillOps(grpc::ServerContextBase* context,
364  const std::string& message, T* ops) {
365  grpc::Status status(code, message);
366  if (!context->sent_initial_metadata_) {
367  ops->SendInitialMetadata(&context->initial_metadata_,
368  context->initial_metadata_flags());
369  if (context->compression_level_set()) {
370  ops->set_compression_level(context->compression_level());
371  }
372  context->sent_initial_metadata_ = true;
373  }
374  ops->ServerSendStatus(&context->trailing_metadata_, status);
375  }
376 
377  void RunHandler(const HandlerParameter& param) final {
380  ops;
381  FillOps(param.server_context, message_, &ops);
382  param.call->PerformOps(&ops);
383  param.call->cq()->Pluck(&ops);
384  }
385 
386  void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
387  grpc::Status* /*status*/, void** /*handler_data*/) final {
388  // We have to destroy any request payload
389  if (req != nullptr) {
391  }
392  return nullptr;
393  }
394 
395  private:
396  const std::string message_;
397 };
398 
399 typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
403 
404 } // namespace internal
405 } // namespace grpc
406 
407 #endif // GRPCPP_SUPPORT_METHOD_HANDLER_H
grpc::internal::Call::cq
grpc::CompletionQueue * cq() const
Definition: call.h:71
grpc::internal::ClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:152
grpc::internal::ServerStreamingHandler::ServerStreamingHandler
ServerStreamingHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler.h:193
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::ServerContext
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:578
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:656
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:98
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:288
grpc::internal::TemplatedBidiStreamingHandler::TemplatedBidiStreamingHandler
TemplatedBidiStreamingHandler(std::function< grpc::Status(grpc::ServerContext *, Streamer *)> func)
Definition: method_handler.h:264
grpc::internal::ErrorMethodHandler
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: completion_queue.h:82
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:43
grpc::internal::ServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:200
grpc::ServerWriter
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:63
grpc::internal::SplitServerStreamingHandler
Definition: method_handler.h:341
grpc::internal::ClientStreamingHandler::ClientStreamingHandler
ClientStreamingHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:145
grpc_byte_buffer_destroy
GRPCAPI void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)
Destroys byte_buffer deallocating all its memory.
grpc::internal::TemplatedBidiStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:268
grpc::internal::RpcMethodHandler::RpcMethodHandler
RpcMethodHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:103
grpc::ServerContextBase
Base class of ServerContext.
Definition: server_context.h:124
rpc_service_method.h
grpc::internal::RpcMethodHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: method_handler.h:124
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:218
grpc::internal::CallOpServerSendStatus::ServerSendStatus
void ServerSendStatus(std::multimap< std::string, std::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:660
grpc::internal::MethodHandler::HandlerParameter::call
Call *const call
Definition: rpc_service_method.h:64
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:125
grpc::ServerSplitStreamer
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:894
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
grpc::internal::ResourceExhaustedHandler
ErrorMethodHandler< grpc::StatusCode::RESOURCE_EXHAUSTED > ResourceExhaustedHandler
Definition: method_handler.h:402
log.h
grpc::ServerReader
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:61
grpc::internal::UnaryDeserializeHelper
void * UnaryDeserializeHelper(grpc_byte_buffer *, grpc::Status *, RequestType *)
A helper function with reduced templating to do deserializing.
Definition: method_handler.h:83
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:69
grpc_byte_buffer
Definition: grpc_types.h:42
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc::internal::BidiStreamingHandler::BidiStreamingHandler
BidiStreamingHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler.h:310
grpc::ServerContextBase::compression_level_set
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:40
byte_buffer.h
grpc::ServerReaderWriter
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:787
grpc::internal::MethodHandler::HandlerParameter::server_context
grpc::ServerContextBase *const server_context
Definition: rpc_service_method.h:65
grpc::internal::TemplatedBidiStreamingHandler
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:80
sync_stream.h
byte_buffer.h
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
grpc::internal::ErrorMethodHandler::ErrorMethodHandler
ErrorMethodHandler(const std::string &message)
Definition: method_handler.h:360
std
Definition: async_unary_call.h:407
grpc::internal::ErrorMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:377
grpc::internal::StreamedUnaryHandler::StreamedUnaryHandler
StreamedUnaryHandler(std::function< grpc::Status(grpc::ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:330
grpc::internal::ErrorMethodHandler::FillOps
static void FillOps(grpc::ServerContextBase *context, const std::string &message, T *ops)
Definition: method_handler.h:363
grpc::internal::CatchingFunctionHandler
::grpc::Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler.h:42
grpc::ServerUnaryStreamer
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:828
grpc::internal::RpcMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:110
grpc::UNKNOWN
@ UNKNOWN
Unknown error.
Definition: status_code_enum.h:37
grpc::internal::UnknownMethodHandler
ErrorMethodHandler< grpc::StatusCode::UNIMPLEMENTED > UnknownMethodHandler
Definition: method_handler.h:400
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:144
grpc::internal::StreamedUnaryHandler
Definition: method_handler.h:326
grpc::internal::UnaryRunHandlerHelper
void UnaryRunHandlerHelper(const grpc::internal::MethodHandler::HandlerParameter &, ResponseType *, grpc::Status &)
A helper function with reduced templating to do the common work needed to actually send the server re...
Definition: method_handler.h:60
grpc::protobuf::util::Status
::absl::Status Status
Definition: config_protobuf.h:106
grpc::internal::SplitServerStreamingHandler::SplitServerStreamingHandler
SplitServerStreamingHandler(std::function< grpc::Status(grpc::ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:345
grpc::internal::ErrorMethodHandler::Deserialize
void * Deserialize(grpc_call *, grpc_byte_buffer *req, grpc::Status *, void **) final
Definition: method_handler.h:386
grpc::internal::ServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: method_handler.h:231
grpc::INTERNAL
@ INTERNAL
Internal errors.
Definition: status_code_enum.h:121
grpc::ServerContextBase::compression_level
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236