GRPC C++  1.70.1
method_handler.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
20 #define GRPCPP_SUPPORT_METHOD_HANDLER_H
21 
22 #include <grpc/byte_buffer.h>
26 
27 #include "absl/log/absl_check.h"
28 
29 namespace grpc {
30 
31 namespace internal {
32 
33 // Invoke the method handler, fill in the status, and
34 // return whether or not we finished safely (without an exception).
35 // Note that exception handling is 0-cost in most compiler/library
36 // implementations (except when an exception is actually thrown),
37 // so this process doesn't require additional overhead in the common case.
38 // Additionally, we don't need to return if we caught an exception or not;
39 // the handling is the same in either case.
40 template <class Callable>
42 #if GRPC_ALLOW_EXCEPTIONS
43  try {
44  return handler();
45  } catch (...) {
47  "Unexpected error in RPC handling");
48  }
49 #else // GRPC_ALLOW_EXCEPTIONS
50  return handler();
51 #endif // GRPC_ALLOW_EXCEPTIONS
52 }
53 
57 
58 template <class ResponseType>
60  ResponseType* rsp, grpc::Status& status) {
61  ABSL_CHECK(!param.server_context->sent_initial_metadata_);
65  ops;
66  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
67  param.server_context->initial_metadata_flags());
69  ops.set_compression_level(param.server_context->compression_level());
70  }
71  if (status.ok()) {
72  status = ops.SendMessagePtr(rsp);
73  }
74  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
75  param.call->PerformOps(&ops);
76  param.call->cq()->Pluck(&ops);
77 }
78 
80 
81 template <class RequestType>
83  RequestType* request) {
84  grpc::ByteBuffer buf;
85  buf.set_buffer(req);
87  &buf, static_cast<RequestType*>(request));
88  buf.Release();
89  if (status->ok()) {
90  return request;
91  }
92  request->~RequestType();
93  return nullptr;
94 }
95 
97 template <class ServiceType, class RequestType, class ResponseType,
98  class BaseRequestType = RequestType,
99  class BaseResponseType = ResponseType>
100 class RpcMethodHandler : public grpc::internal::MethodHandler {
101  public:
103  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
104  const RequestType*, ResponseType*)>
105  func,
106  ServiceType* service)
107  : func_(func), service_(service) {}
108 
109  void RunHandler(const HandlerParameter& param) final {
110  ResponseType rsp;
111  grpc::Status status = param.status;
112  if (status.ok()) {
113  status = CatchingFunctionHandler([this, &param, &rsp] {
114  return func_(service_,
115  static_cast<grpc::ServerContext*>(param.server_context),
116  static_cast<RequestType*>(param.request), &rsp);
117  });
118  static_cast<RequestType*>(param.request)->~RequestType();
119  }
120  UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
121  }
122 
124  grpc::Status* status, void** /*handler_data*/) final {
125  auto* request =
126  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType;
127  return UnaryDeserializeHelper(req, status,
128  static_cast<BaseRequestType*>(request));
129  }
130 
131  private:
133  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
134  const RequestType*, ResponseType*)>
135  func_;
136  // The class the above handler function lives in.
137  ServiceType* service_;
138 };
139 
141 template <class ServiceType, class RequestType, class ResponseType>
142 class ClientStreamingHandler : public grpc::internal::MethodHandler {
143  public:
145  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
146  ServerReader<RequestType>*, ResponseType*)>
147  func,
148  ServiceType* service)
149  : func_(func), service_(service) {}
150 
151  void RunHandler(const HandlerParameter& param) final {
153  param.call, static_cast<grpc::ServerContext*>(param.server_context));
154  ResponseType rsp;
155  grpc::Status status =
156  CatchingFunctionHandler([this, &param, &reader, &rsp] {
157  return func_(service_,
158  static_cast<grpc::ServerContext*>(param.server_context),
159  &reader, &rsp);
160  });
161 
165  ops;
166  if (!param.server_context->sent_initial_metadata_) {
167  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
168  param.server_context->initial_metadata_flags());
169  if (param.server_context->compression_level_set()) {
170  ops.set_compression_level(param.server_context->compression_level());
171  }
172  }
173  if (status.ok()) {
174  status = ops.SendMessagePtr(&rsp);
175  }
176  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
177  param.call->PerformOps(&ops);
178  param.call->cq()->Pluck(&ops);
179  }
180 
181  private:
182  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
183  ServerReader<RequestType>*, ResponseType*)>
184  func_;
185  ServiceType* service_;
186 };
187 
189 template <class ServiceType, class RequestType, class ResponseType>
190 class ServerStreamingHandler : public grpc::internal::MethodHandler {
191  public:
193  ServiceType*, grpc::ServerContext*,
194  const RequestType*, ServerWriter<ResponseType>*)>
195  func,
196  ServiceType* service)
197  : func_(func), service_(service) {}
198 
199  void RunHandler(const HandlerParameter& param) final {
200  grpc::Status status = param.status;
201  if (status.ok()) {
203  param.call, static_cast<grpc::ServerContext*>(param.server_context));
204  status = CatchingFunctionHandler([this, &param, &writer] {
205  return func_(service_,
206  static_cast<grpc::ServerContext*>(param.server_context),
207  static_cast<RequestType*>(param.request), &writer);
208  });
209  static_cast<RequestType*>(param.request)->~RequestType();
210  }
211 
214  ops;
215  if (!param.server_context->sent_initial_metadata_) {
216  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
217  param.server_context->initial_metadata_flags());
218  if (param.server_context->compression_level_set()) {
219  ops.set_compression_level(param.server_context->compression_level());
220  }
221  }
222  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
223  param.call->PerformOps(&ops);
224  if (param.server_context->has_pending_ops_) {
225  param.call->cq()->Pluck(&param.server_context->pending_ops_);
226  }
227  param.call->cq()->Pluck(&ops);
228  }
229 
231  grpc::Status* status, void** /*handler_data*/) final {
232  grpc::ByteBuffer buf;
233  buf.set_buffer(req);
234  auto* request =
235  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
236  *status =
238  buf.Release();
239  if (status->ok()) {
240  return request;
241  }
242  request->~RequestType();
243  return nullptr;
244  }
245 
246  private:
247  std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
248  const RequestType*, ServerWriter<ResponseType>*)>
249  func_;
250  ServiceType* service_;
251 };
252 
260 template <class Streamer, bool WriteNeeded>
261 class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
262  public:
264  std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
265  : func_(func), write_needed_(WriteNeeded) {}
266 
267  void RunHandler(const HandlerParameter& param) final {
268  Streamer stream(param.call,
269  static_cast<grpc::ServerContext*>(param.server_context));
270  grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
271  return func_(static_cast<grpc::ServerContext*>(param.server_context),
272  &stream);
273  });
274 
277  ops;
278  if (!param.server_context->sent_initial_metadata_) {
279  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
280  param.server_context->initial_metadata_flags());
281  if (param.server_context->compression_level_set()) {
282  ops.set_compression_level(param.server_context->compression_level());
283  }
284  if (write_needed_ && status.ok()) {
285  // If we needed a write but never did one, we need to mark the
286  // status as a fail
288  "Service did not provide response message");
289  }
290  }
291  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
292  param.call->PerformOps(&ops);
293  if (param.server_context->has_pending_ops_) {
294  param.call->cq()->Pluck(&param.server_context->pending_ops_);
295  }
296  param.call->cq()->Pluck(&ops);
297  }
298 
299  private:
300  std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
301  const bool write_needed_;
302 };
303 
304 template <class ServiceType, class RequestType, class ResponseType>
305 class BidiStreamingHandler
306  : public TemplatedBidiStreamingHandler<
307  ServerReaderWriter<ResponseType, RequestType>, false> {
308  public:
310  ServiceType*, grpc::ServerContext*,
312  func,
313  ServiceType* service)
314  // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
316  ServerReaderWriter<ResponseType, RequestType>, false>(
317  [func, service](
318  grpc::ServerContext* ctx,
319  ServerReaderWriter<ResponseType, RequestType>* streamer) {
320  return func(service, ctx, streamer);
321  }) {}
322 };
323 
324 template <class RequestType, class ResponseType>
327  ServerUnaryStreamer<RequestType, ResponseType>, true> {
328  public:
330  std::function<
333  func)
335  ServerUnaryStreamer<RequestType, ResponseType>, true>(
336  std::move(func)) {}
337 };
338 
339 template <class RequestType, class ResponseType>
342  ServerSplitStreamer<RequestType, ResponseType>, false> {
343  public:
345  std::function<
348  func)
350  ServerSplitStreamer<RequestType, ResponseType>, false>(
351  std::move(func)) {}
352 };
353 
356 template <grpc::StatusCode code>
357 class ErrorMethodHandler : public grpc::internal::MethodHandler {
358  public:
359  explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
360 
361  template <class T>
362  static void FillOps(grpc::ServerContextBase* context,
363  const std::string& message, T* ops) {
364  grpc::Status status(code, message);
365  if (!context->sent_initial_metadata_) {
366  ops->SendInitialMetadata(&context->initial_metadata_,
367  context->initial_metadata_flags());
368  if (context->compression_level_set()) {
369  ops->set_compression_level(context->compression_level());
370  }
371  context->sent_initial_metadata_ = true;
372  }
373  ops->ServerSendStatus(&context->trailing_metadata_, status);
374  }
375 
376  void RunHandler(const HandlerParameter& param) final {
379  ops;
380  FillOps(param.server_context, message_, &ops);
381  param.call->PerformOps(&ops);
382  param.call->cq()->Pluck(&ops);
383  }
384 
385  void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
386  grpc::Status* /*status*/, void** /*handler_data*/) final {
387  // We have to destroy any request payload
388  if (req != nullptr) {
390  }
391  return nullptr;
392  }
393 
394  private:
395  const std::string message_;
396 };
397 
398 typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
402 
403 } // namespace internal
404 } // namespace grpc
405 
406 #endif // GRPCPP_SUPPORT_METHOD_HANDLER_H
grpc::internal::Call::cq
grpc::CompletionQueue * cq() const
Definition: call.h:71
grpc::internal::ClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:151
grpc::internal::ServerStreamingHandler::ServerStreamingHandler
ServerStreamingHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler.h:192
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::ServerContext
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:578
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:656
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:97
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:288
grpc::internal::TemplatedBidiStreamingHandler::TemplatedBidiStreamingHandler
TemplatedBidiStreamingHandler(std::function< grpc::Status(grpc::ServerContext *, Streamer *)> func)
Definition: method_handler.h:263
grpc::internal::ErrorMethodHandler
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: completion_queue.h:81
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:43
grpc::internal::ServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:199
grpc::ServerWriter
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:62
grpc::internal::SplitServerStreamingHandler
Definition: method_handler.h:340
grpc::internal::ClientStreamingHandler::ClientStreamingHandler
ClientStreamingHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:144
grpc_byte_buffer_destroy
GRPCAPI void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)
Destroys byte_buffer deallocating all its memory.
grpc::internal::TemplatedBidiStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:267
grpc::internal::RpcMethodHandler::RpcMethodHandler
RpcMethodHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:102
grpc::ServerContextBase
Base class of ServerContext.
Definition: server_context.h:124
rpc_service_method.h
grpc::internal::RpcMethodHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: method_handler.h:123
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:218
grpc::internal::CallOpServerSendStatus::ServerSendStatus
void ServerSendStatus(std::multimap< std::string, std::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:660
grpc::internal::MethodHandler::HandlerParameter::call
Call *const call
Definition: rpc_service_method.h:64
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:125
grpc::ServerSplitStreamer
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:893
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
grpc::internal::ResourceExhaustedHandler
ErrorMethodHandler< grpc::StatusCode::RESOURCE_EXHAUSTED > ResourceExhaustedHandler
Definition: method_handler.h:401
grpc::ServerReader
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:60
grpc::internal::UnaryDeserializeHelper
void * UnaryDeserializeHelper(grpc_byte_buffer *, grpc::Status *, RequestType *)
A helper function with reduced templating to do deserializing.
Definition: method_handler.h:82
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:68
grpc_byte_buffer
Definition: grpc_types.h:41
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:59
grpc::internal::BidiStreamingHandler::BidiStreamingHandler
BidiStreamingHandler(std::function< grpc::Status(ServiceType *, grpc::ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler.h:309
grpc::ServerContextBase::compression_level_set
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:40
byte_buffer.h
grpc::ServerReaderWriter
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:786
grpc::internal::MethodHandler::HandlerParameter::server_context
grpc::ServerContextBase *const server_context
Definition: rpc_service_method.h:65
grpc::internal::TemplatedBidiStreamingHandler
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:79
sync_stream.h
byte_buffer.h
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
grpc::internal::ErrorMethodHandler::ErrorMethodHandler
ErrorMethodHandler(const std::string &message)
Definition: method_handler.h:359
std
Definition: async_unary_call.h:406
grpc::internal::ErrorMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:376
grpc::internal::StreamedUnaryHandler::StreamedUnaryHandler
StreamedUnaryHandler(std::function< grpc::Status(grpc::ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:329
grpc::internal::ErrorMethodHandler::FillOps
static void FillOps(grpc::ServerContextBase *context, const std::string &message, T *ops)
Definition: method_handler.h:362
grpc::internal::CatchingFunctionHandler
::grpc::Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler.h:41
grpc::ServerUnaryStreamer
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:827
grpc::internal::RpcMethodHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:109
grpc::UNKNOWN
@ UNKNOWN
Unknown error.
Definition: status_code_enum.h:37
grpc::internal::UnknownMethodHandler
ErrorMethodHandler< grpc::StatusCode::UNIMPLEMENTED > UnknownMethodHandler
Definition: method_handler.h:399
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:143
grpc::internal::StreamedUnaryHandler
Definition: method_handler.h:325
grpc::internal::UnaryRunHandlerHelper
void UnaryRunHandlerHelper(const grpc::internal::MethodHandler::HandlerParameter &, ResponseType *, grpc::Status &)
A helper function with reduced templating to do the common work needed to actually send the server re...
Definition: method_handler.h:59
grpc::protobuf::util::Status
::absl::Status Status
Definition: config_protobuf.h:107
grpc::internal::SplitServerStreamingHandler::SplitServerStreamingHandler
SplitServerStreamingHandler(std::function< grpc::Status(grpc::ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:344
grpc::internal::ErrorMethodHandler::Deserialize
void * Deserialize(grpc_call *, grpc_byte_buffer *req, grpc::Status *, void **) final
Definition: method_handler.h:385
grpc::internal::ServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: method_handler.h:230
grpc::INTERNAL
@ INTERNAL
Internal errors.
Definition: status_code_enum.h:121
grpc::ServerContextBase::compression_level
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236