GRPC C++  1.62.0
method_handler.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
20 #define GRPCPP_SUPPORT_METHOD_HANDLER_H
21 
22 #include <grpc/byte_buffer.h>
23 #include <grpc/support/log.h>
27 
28 namespace grpc {
29 
30 namespace internal {
31 
32 // Invoke the method handler, fill in the status, and
33 // return whether or not we finished safely (without an exception).
34 // Note that exception handling is 0-cost in most compiler/library
35 // implementations (except when an exception is actually thrown),
36 // so this process doesn't require additional overhead in the common case.
37 // Additionally, we don't need to return if we caught an exception or not;
38 // the handling is the same in either case.
39 template <class Callable>
41 #if GRPC_ALLOW_EXCEPTIONS
42  try {
43  return handler();
44  } catch (...) {
46  "Unexpected error in RPC handling");
47  }
48 #else // GRPC_ALLOW_EXCEPTIONS
49  return handler();
50 #endif // GRPC_ALLOW_EXCEPTIONS
51 }
52 
56 
57 template <class ResponseType>
59  ResponseType* rsp, grpc::Status& status) {
60  GPR_ASSERT(!param.server_context->sent_initial_metadata_);
64  ops;
65  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
66  param.server_context->initial_metadata_flags());
68  ops.set_compression_level(param.server_context->compression_level());
69  }
70  if (status.ok()) {
71  status = ops.SendMessagePtr(rsp);
72  }
73  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
74  param.call->PerformOps(&ops);
75  param.call->cq()->Pluck(&ops);
76 }
77 
79 
80 template <class RequestType>
82  RequestType* request) {
83  grpc::ByteBuffer buf;
84  buf.set_buffer(req);
86  &buf, static_cast<RequestType*>(request));
87  buf.Release();
88  if (status->ok()) {
89  return request;
90  }
91  request->~RequestType();
92  return nullptr;
93 }
94 
96 template <class ServiceType, class RequestType, class ResponseType,
97  class BaseRequestType = RequestType,
98  class BaseResponseType = ResponseType>
99 class RpcMethodHandler : public grpc::internal::MethodHandler {
100  public:
102  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
103  const RequestType*, ResponseType*)>
104  func,
105  ServiceType* service)
106  : func_(func), service_(service) {}
107 
108  void RunHandler(const HandlerParameter& param) final {
109  ResponseType rsp;
110  grpc::Status status = param.status;
111  if (status.ok()) {
112  status = CatchingFunctionHandler([this, &param, &rsp] {
113  return func_(service_,
114  static_cast<grpc::ServerContext*>(param.server_context),
115  static_cast<RequestType*>(param.request), &rsp);
116  });
117  static_cast<RequestType*>(param.request)->~RequestType();
118  }
119  UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
120  }
121 
123  grpc::Status* status, void** /*handler_data*/) final {
124  auto* request =
125  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType;
126  return UnaryDeserializeHelper(req, status,
127  static_cast<BaseRequestType*>(request));
128  }
129 
130  private:
132  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
133  const RequestType*, ResponseType*)>
134  func_;
135  // The class the above handler function lives in.
136  ServiceType* service_;
137 };
138 
140 template <class ServiceType, class RequestType, class ResponseType>
141 class ClientStreamingHandler : public grpc::internal::MethodHandler {
142  public:
144  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
145  ServerReader<RequestType>*, ResponseType*)>
146  func,
147  ServiceType* service)
148  : func_(func), service_(service) {}
149 
150  void RunHandler(const HandlerParameter& param) final {
152  param.call, static_cast<grpc::ServerContext*>(param.server_context));
153  ResponseType rsp;
154  grpc::Status status =
155  CatchingFunctionHandler([this, &param, &reader, &rsp] {
156  return func_(service_,
157  static_cast<grpc::ServerContext*>(param.server_context),
158  &reader, &rsp);
159  });
160 
164  ops;
165  if (!param.server_context->sent_initial_metadata_) {
166  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
167  param.server_context->initial_metadata_flags());
168  if (param.server_context->compression_level_set()) {
169  ops.set_compression_level(param.server_context->compression_level());
170  }
171  }
172  if (status.ok()) {
173  status = ops.SendMessagePtr(&rsp);
174  }
175  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
176  param.call->PerformOps(&ops);
177  param.call->cq()->Pluck(&ops);
178  }
179 
180  private:
181  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
182  ServerReader<RequestType>*, ResponseType*)>
183  func_;
184  ServiceType* service_;
185 };
186 
188 template <class ServiceType, class RequestType, class ResponseType>
189 class ServerStreamingHandler : public grpc::internal::MethodHandler {
190  public:
192  ServiceType*, grpc::ServerContext*,
193  const RequestType*, ServerWriter<ResponseType>*)>
194  func,
195  ServiceType* service)
196  : func_(func), service_(service) {}
197 
198  void RunHandler(const HandlerParameter& param) final {
199  grpc::Status status = param.status;
200  if (status.ok()) {
202  param.call, static_cast<grpc::ServerContext*>(param.server_context));
203  status = CatchingFunctionHandler([this, &param, &writer] {
204  return func_(service_,
205  static_cast<grpc::ServerContext*>(param.server_context),
206  static_cast<RequestType*>(param.request), &writer);
207  });
208  static_cast<RequestType*>(param.request)->~RequestType();
209  }
210 
213  ops;
214  if (!param.server_context->sent_initial_metadata_) {
215  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
216  param.server_context->initial_metadata_flags());
217  if (param.server_context->compression_level_set()) {
218  ops.set_compression_level(param.server_context->compression_level());
219  }
220  }
221  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
222  param.call->PerformOps(&ops);
223  if (param.server_context->has_pending_ops_) {
224  param.call->cq()->Pluck(&param.server_context->pending_ops_);
225  }
226  param.call->cq()->Pluck(&ops);
227  }
228 
230  grpc::Status* status, void** /*handler_data*/) final {
231  grpc::ByteBuffer buf;
232  buf.set_buffer(req);
233  auto* request =
234  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
235  *status =
237  buf.Release();
238  if (status->ok()) {
239  return request;
240  }
241  request->~RequestType();
242  return nullptr;
243  }
244 
245  private:
246  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
247  const RequestType*, ServerWriter<ResponseType>*)>
248  func_;
249  ServiceType* service_;
250 };
251 
259 template <class Streamer, bool WriteNeeded>
260 class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
261  public:
263  std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
264  : func_(func), write_needed_(WriteNeeded) {}
265 
266  void RunHandler(const HandlerParameter& param) final {
267  Streamer stream(param.call,
268  static_cast<grpc::ServerContext*>(param.server_context));
269  grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
270  return func_(static_cast<grpc::ServerContext*>(param.server_context),
271  &stream);
272  });
273 
276  ops;
277  if (!param.server_context->sent_initial_metadata_) {
278  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
279  param.server_context->initial_metadata_flags());
280  if (param.server_context->compression_level_set()) {
281  ops.set_compression_level(param.server_context->compression_level());
282  }
283  if (write_needed_ && status.ok()) {
284  // If we needed a write but never did one, we need to mark the
285  // status as a fail
287  "Service did not provide response message");
288  }
289  }
290  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
291  param.call->PerformOps(&ops);
292  if (param.server_context->has_pending_ops_) {
293  param.call->cq()->Pluck(&param.server_context->pending_ops_);
294  }
295  param.call->cq()->Pluck(&ops);
296  }
297 
298  private:
299  std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
300  const bool write_needed_;
301 };
302 
303 template <class ServiceType, class RequestType, class ResponseType>
304 class BidiStreamingHandler
305  : public TemplatedBidiStreamingHandler<
306  ServerReaderWriter<ResponseType, RequestType>, false> {
307  public:
309  ServiceType*, grpc::ServerContext*,
311  func,
312  ServiceType* service)
313  // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
315  ServerReaderWriter<ResponseType, RequestType>, false>(
316  [func, service](
317  grpc::ServerContext* ctx,
318  ServerReaderWriter<ResponseType, RequestType>* streamer) {
319  return func(service, ctx, streamer);
320  }) {}
321 };
322 
323 template <class RequestType, class ResponseType>
326  ServerUnaryStreamer<RequestType, ResponseType>, true> {
327  public:
329  std::function<
332  func)
334  ServerUnaryStreamer<RequestType, ResponseType>, true>(
335  std::move(func)) {}
336 };
337 
338 template <class RequestType, class ResponseType>
341  ServerSplitStreamer<RequestType, ResponseType>, false> {
342  public:
344  std::function<
347  func)
349  ServerSplitStreamer<RequestType, ResponseType>, false>(
350  std::move(func)) {}
351 };
352 
355 template <grpc::StatusCode code>
356 class ErrorMethodHandler : public grpc::internal::MethodHandler {
357  public:
358  explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
359 
360  template <class T>
361  static void FillOps(grpc::ServerContextBase* context,
362  const std::string& message, T* ops) {
363  grpc::Status status(code, message);
364  if (!context->sent_initial_metadata_) {
365  ops->SendInitialMetadata(&context->initial_metadata_,
366  context->initial_metadata_flags());
367  if (context->compression_level_set()) {
368  ops->set_compression_level(context->compression_level());
369  }
370  context->sent_initial_metadata_ = true;
371  }
372  ops->ServerSendStatus(&context->trailing_metadata_, status);
373  }
374 
375  void RunHandler(const HandlerParameter& param) final {
378  ops;
379  FillOps(param.server_context, message_, &ops);
380  param.call->PerformOps(&ops);
381  param.call->cq()->Pluck(&ops);
382  }
383 
384  void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
385  grpc::Status* /*status*/, void** /*handler_data*/) final {
386  // We have to destroy any request payload
387  if (req != nullptr) {
389  }
390  return nullptr;
391  }
392 
393  private:
394  const std::string message_;
395 };
396 
397 typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
401 
402 } // namespace internal
403 } // namespace grpc
404 
405 #endif // GRPCPP_SUPPORT_METHOD_HANDLER_H
grpc::internal::Call::cq
grpc::CompletionQueue * cq() const
Definition: call.h:71
grpc::internal::ClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:150
grpc::internal::ServerStreamingHandler::ServerStreamingHandler
ServerStreamingHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler.h:191
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::ServerContext
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:572
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:654
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:96
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:286
grpc::internal::TemplatedBidiStreamingHandler::TemplatedBidiStreamingHandler
TemplatedBidiStreamingHandler(std::function< grpc::Status(grpc::ServerContext *, Streamer *)> func)
Definition: method_handler.h:262
grpc::internal::ErrorMethodHandler
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: completion_queue.h:80
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:41
grpc::internal::ServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:198
grpc::ServerWriter
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:61
grpc::internal::SplitServerStreamingHandler
Definition: method_handler.h:339
grpc::internal::ClientStreamingHandler::ClientStreamingHandler
ClientStreamingHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:143
grpc_byte_buffer_destroy
GRPCAPI void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)
Destroys byte_buffer deallocating all its memory.
GPR_ASSERT
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:95
grpc::internal::TemplatedBidiStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:266
grpc::internal::RpcMethodHandler::RpcMethodHandler
RpcMethodHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:101
grpc::ServerContextBase
Base class of ServerContext.
Definition: server_context.h:124
rpc_service_method.h
grpc::internal::RpcMethodHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: method_handler.h:122
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:216
grpc::internal::CallOpServerSendStatus::ServerSendStatus
void ServerSendStatus(std::multimap< std::string, std::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:658
grpc::internal::MethodHandler::HandlerParameter::call
Call *const call
Definition: rpc_service_method.h:62
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:126
grpc::ServerSplitStreamer
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:892
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:35
grpc::internal::ResourceExhaustedHandler
ErrorMethodHandler< grpc::StatusCode::RESOURCE_EXHAUSTED > ResourceExhaustedHandler
Definition: method_handler.h:400
log.h
grpc::ServerReader
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:59
grpc::internal::UnaryDeserializeHelper
void * UnaryDeserializeHelper(grpc_byte_buffer *, grpc::Status *, RequestType *)
A helper function with reduced templating to do deserializing.
Definition: method_handler.h:81
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc_byte_buffer
Definition: grpc_types.h:43
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc::internal::BidiStreamingHandler::BidiStreamingHandler
BidiStreamingHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler.h:308
grpc::ServerContextBase::compression_level_set
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
byte_buffer.h
grpc::ServerReaderWriter
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:785
grpc::internal::MethodHandler::HandlerParameter::server_context
grpc::ServerContextBase *const server_context
Definition: rpc_service_method.h:63
grpc::internal::TemplatedBidiStreamingHandler
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:78
sync_stream.h
byte_buffer.h
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
grpc::internal::ErrorMethodHandler::ErrorMethodHandler
ErrorMethodHandler(const std::string &message)
Definition: method_handler.h:358
std
Definition: async_unary_call.h:405
grpc::internal::ErrorMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:375
grpc::internal::StreamedUnaryHandler::StreamedUnaryHandler
StreamedUnaryHandler(std::function< grpc::Status(grpc::ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:328
grpc::internal::ErrorMethodHandler::FillOps
static void FillOps(grpc::ServerContextBase *context, const std::string &message, T *ops)
Definition: method_handler.h:361
grpc::internal::CatchingFunctionHandler
::grpc::Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler.h:40
grpc::ServerUnaryStreamer
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:826
grpc::internal::RpcMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:108
grpc::UNKNOWN
@ UNKNOWN
Unknown error.
Definition: status_code_enum.h:37
grpc::internal::UnknownMethodHandler
ErrorMethodHandler< grpc::StatusCode::UNIMPLEMENTED > UnknownMethodHandler
Definition: method_handler.h:398
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:144
grpc::internal::StreamedUnaryHandler
Definition: method_handler.h:324
grpc::internal::UnaryRunHandlerHelper
void UnaryRunHandlerHelper(const grpc::internal::MethodHandler::HandlerParameter &, ResponseType *, grpc::Status &)
A helper function with reduced templating to do the common work needed to actually send the server re...
Definition: method_handler.h:58
grpc::protobuf::util::Status
::absl::Status Status
Definition: config_protobuf.h:97
grpc::internal::SplitServerStreamingHandler::SplitServerStreamingHandler
SplitServerStreamingHandler(std::function< grpc::Status(grpc::ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:343
grpc::internal::ErrorMethodHandler::Deserialize
void * Deserialize(grpc_call *, grpc_byte_buffer *req, grpc::Status *, void **) final
Definition: method_handler.h:384
grpc::internal::ServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: method_handler.h:229
grpc::INTERNAL
@ INTERNAL
Internal errors.
Definition: status_code_enum.h:121
grpc::ServerContextBase::compression_level
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236