GRPC C++  1.78.1
async_unary_call.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H
20 #define GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H
21 
22 #include <grpc/grpc.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/impl/call.h>
29 #include <grpcpp/server_context.h>
30 #include <grpcpp/support/status.h>
31 
32 #include "absl/log/absl_check.h"
33 
34 namespace grpc {
35 
36 // Forward declaration for use in Helper class
37 template <class R>
38 class ClientAsyncResponseReader;
39 
42 template <class R>
44  public:
46 
50  virtual void StartCall() = 0;
51 
58  virtual void ReadInitialMetadata(void* tag) = 0;
59 
74  virtual void Finish(R* msg, grpc::Status* status, void* tag) = 0;
75 };
76 
77 namespace internal {
78 
80  public:
93  template <class R, class W, class BaseR = R, class BaseW = W>
96  const grpc::internal::RpcMethod& method, grpc::ClientContext* context,
97  const W& request) /* __attribute__((noinline)) */ {
98  grpc::internal::Call call = channel->CreateCall(method, context, cq);
100  call.call(), sizeof(ClientAsyncResponseReader<R>)))
101  ClientAsyncResponseReader<R>(call, context);
102  SetupRequest<BaseR, BaseW>(channel, call.call(), &result->single_buf_,
103  &result->read_initial_metadata_,
104  &result->finish_,
105  static_cast<const BaseW&>(request));
106 
107  return result;
108  }
109 
110  // Various helper functions to reduce templating use
111 
112  template <class R, class W>
113  static void SetupRequest(
114  grpc::ChannelInterface* channel, grpc_call* call,
116  std::function<void(ClientContext*, internal::Call*,
118  read_initial_metadata,
119  std::function<
120  void(ClientContext*, internal::Call*, bool initial_metadata_read,
122  internal::CallOpSetInterface**, void*, Status*, void*)>* finish,
123  const W& request) {
124  using SingleBufType =
131  SingleBufType* single_buf =
132  new (grpc_call_arena_alloc(call, sizeof(SingleBufType))) SingleBufType;
133  *single_buf_ptr = single_buf;
134 
135  // TODO(ctiller): don't assert
136  ABSL_CHECK(
137  single_buf->SendMessage(request, channel->memory_allocator()).ok());
138  single_buf->ClientSendClose();
139 
140  // The purpose of the following functions is to type-erase the actual
141  // templated type of the CallOpSet being used by hiding that type inside the
142  // function definition rather than specifying it as an argument of the
143  // function or a member of the class. The type-erased CallOpSet will get
144  // static_cast'ed back to the real type so that it can be used properly.
145  *read_initial_metadata =
146  [](ClientContext* context, internal::Call* call,
147  internal::CallOpSendInitialMetadata* single_buf_view, void* tag) {
148  auto* single_buf = static_cast<SingleBufType*>(single_buf_view);
149  single_buf->set_output_tag(tag);
150  single_buf->RecvInitialMetadata(context);
151  call->PerformOps(single_buf);
152  };
153 
154  // Note that this function goes one step further than the previous one
155  // because it type-erases the message being written down to a void*. This
156  // will be static-cast'ed back to the class specified here by hiding that
157  // class information inside the function definition. Note that this feature
158  // expects the class being specified here for R to be a base-class of the
159  // "real" R without any multiple-inheritance (as applies in protobuf wrt
160  // MessageLite)
161  *finish = [](ClientContext* context, internal::Call* call,
162  bool initial_metadata_read,
163  internal::CallOpSendInitialMetadata* single_buf_view,
164  internal::CallOpSetInterface** finish_buf_ptr, void* msg,
165  Status* status, void* tag) {
166  if (initial_metadata_read) {
167  using FinishBufType =
170  FinishBufType* finish_buf =
171  new (grpc_call_arena_alloc(call->call(), sizeof(FinishBufType)))
172  FinishBufType;
173  *finish_buf_ptr = finish_buf;
174  finish_buf->set_output_tag(tag);
175  finish_buf->RecvMessage(static_cast<R*>(msg));
176  finish_buf->AllowNoMessage();
177  finish_buf->ClientRecvStatus(context, status);
178  call->PerformOps(finish_buf);
179  } else {
180  auto* single_buf = static_cast<SingleBufType*>(single_buf_view);
181  single_buf->set_output_tag(tag);
182  single_buf->RecvInitialMetadata(context);
183  single_buf->RecvMessage(static_cast<R*>(msg));
184  single_buf->AllowNoMessage();
185  single_buf->ClientRecvStatus(context, status);
186  call->PerformOps(single_buf);
187  }
188  };
189  }
190 
191  static void StartCall(grpc::ClientContext* context,
193  single_buf->SendInitialMetadata(&context->send_initial_metadata_,
194  context->initial_metadata_flags());
195  }
196 };
197 
198 // TODO(vjpai): This templated factory is deprecated and will be replaced by
199 //. the non-templated helper as soon as possible.
200 template <class R>
202  public:
203  template <class W>
206  const grpc::internal::RpcMethod& method, grpc::ClientContext* context,
207  const W& request, bool start) {
208  auto* result = ClientAsyncResponseReaderHelper::Create<R>(
209  channel, cq, method, context, request);
210  if (start) {
211  result->StartCall();
212  }
213  return result;
214  }
215 };
216 
217 } // namespace internal
218 
221 template <class R>
222 class ClientAsyncResponseReader final
223  : public ClientAsyncResponseReaderInterface<R> {
224  public:
225  // always allocated against a call arena, no memory free required
226  static void operator delete(void* /*ptr*/, std::size_t size) {
227  ABSL_CHECK_EQ(size, sizeof(ClientAsyncResponseReader));
228  }
229 
230  // This operator should never be called as the memory should be freed as part
231  // of the arena destruction. It only exists to provide a matching operator
232  // delete to the operator new so that some compilers will not complain (see
233  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
234  // there are no tests catching the compiler warning.
235  static void operator delete(void*, void*) { ABSL_CHECK(false); }
236 
237  void StartCall() override {
238  ABSL_DCHECK(!started_);
239  started_ = true;
241  }
242 
249  void ReadInitialMetadata(void* tag) override {
250  ABSL_DCHECK(started_);
251  ABSL_DCHECK(!context_->initial_metadata_received_);
252  read_initial_metadata_(context_, &call_, single_buf_, tag);
253  initial_metadata_read_ = true;
254  }
255 
261  void Finish(R* msg, grpc::Status* status, void* tag) override {
262  ABSL_DCHECK(started_);
263  finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_,
264  static_cast<void*>(msg), status, tag);
265  }
266 
267  private:
269  grpc::ClientContext* const context_;
270  grpc::internal::Call call_;
271  bool started_ = false;
272  bool initial_metadata_read_ = false;
273 
275  grpc::ClientContext* context)
276  : context_(context), call_(call) {}
277 
278  // disable operator new
279  static void* operator new(std::size_t size);
280  static void* operator new(std::size_t /*size*/, void* p) { return p; }
281 
282  internal::CallOpSendInitialMetadata* single_buf_;
283  internal::CallOpSetInterface* finish_buf_ = nullptr;
284  std::function<void(ClientContext*, internal::Call*,
285  internal::CallOpSendInitialMetadata*, void*)>
286  read_initial_metadata_;
287  std::function<void(ClientContext*, internal::Call*,
288  bool initial_metadata_read,
289  internal::CallOpSendInitialMetadata*,
290  internal::CallOpSetInterface**, void*, Status*, void*)>
291  finish_;
292 };
293 
296 template <class W>
297 class ServerAsyncResponseWriter final
299  public:
301  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
302 
310  void SendInitialMetadata(void* tag) override {
311  ABSL_CHECK(!ctx_->sent_initial_metadata_);
312 
313  meta_buf_.set_output_tag(tag);
314  meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
315  ctx_->initial_metadata_flags());
316  if (ctx_->compression_level_set()) {
317  meta_buf_.set_compression_level(ctx_->compression_level());
318  }
319  ctx_->sent_initial_metadata_ = true;
320  call_.PerformOps(&meta_buf_);
321  }
322 
342  void Finish(const W& msg, const grpc::Status& status, void* tag) {
343  finish_buf_.set_output_tag(tag);
344  finish_buf_.set_core_cq_tag(&finish_buf_);
345  if (!ctx_->sent_initial_metadata_) {
346  finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
347  ctx_->initial_metadata_flags());
348  if (ctx_->compression_level_set()) {
349  finish_buf_.set_compression_level(ctx_->compression_level());
350  }
351  ctx_->sent_initial_metadata_ = true;
352  }
353  // The response is dropped if the status is not OK.
354  if (status.ok()) {
355  finish_buf_.ServerSendStatus(
356  &ctx_->trailing_metadata_,
357  finish_buf_.SendMessage(msg, ctx_->memory_allocator()));
358  } else {
359  finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
360  }
361  call_.PerformOps(&finish_buf_);
362  }
363 
380  void FinishWithError(const grpc::Status& status, void* tag) {
381  ABSL_CHECK(!status.ok());
382  finish_buf_.set_output_tag(tag);
383  if (!ctx_->sent_initial_metadata_) {
384  finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
385  ctx_->initial_metadata_flags());
386  if (ctx_->compression_level_set()) {
387  finish_buf_.set_compression_level(ctx_->compression_level());
388  }
389  ctx_->sent_initial_metadata_ = true;
390  }
391  finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
392  call_.PerformOps(&finish_buf_);
393  }
394 
395  private:
396  void BindCall(grpc::internal::Call* call) override { call_ = *call; }
397 
398  grpc::internal::Call call_;
399  grpc::ServerContext* ctx_;
401  meta_buf_;
405  finish_buf_;
406 };
407 
408 } // namespace grpc
409 
410 namespace std {
411 template <class R>
412 class default_delete<grpc::ClientAsyncResponseReader<R>> {
413  public:
414  void operator()(void* /*p*/) {}
415 };
416 template <class R>
417 class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> {
418  public:
419  void operator()(void* /*p*/) {}
420 };
421 } // namespace std
422 
423 #endif // GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H
grpc::ClientAsyncResponseReader::StartCall
void StartCall() override
Definition: async_unary_call.h:237
grpc::internal::CallOpRecvInitialMetadata
Definition: call_op_set.h:725
grpc::internal::ServerAsyncStreamingInterface
Definition: service_type.h:38
grpc::internal::CallOpClientSendClose
Definition: call_op_set.h:623
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::ServerContext
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:586
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:658
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:97
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:289
grpc::internal::ClientAsyncResponseReaderHelper::Create
static ClientAsyncResponseReader< R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request)
Start a call and write the request out if start is set.
Definition: async_unary_call.h:94
grpc::internal::ClientAsyncResponseReaderFactory
Definition: async_unary_call.h:201
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:36
grpc::ServerContextBase::memory_allocator
grpc_event_engine::experimental::MemoryAllocator * memory_allocator()
Definition: server_context.h:300
status.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:219
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:125
grpc::ServerAsyncResponseWriter::Finish
void Finish(const W &msg, const grpc::Status &status, void *tag)
Indicate that the stream is to be finished and request notification when the server has sent the appr...
Definition: async_unary_call.h:342
std::default_delete< grpc::ClientAsyncResponseReader< R > >::operator()
void operator()(void *)
Definition: async_unary_call.h:414
grpc::ClientAsyncResponseReaderInterface::Finish
virtual void Finish(R *msg, grpc::Status *status, void *tag)=0
Request to receive the server's response msg and final status for the call, and to notify tag on this...
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
grpc::ClientAsyncResponseReaderInterface
An interface relevant for async client side unary RPCs (which send one request message to a server an...
Definition: async_unary_call.h:43
grpc::ClientAsyncResponseReaderInterface::StartCall
virtual void StartCall()=0
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
grpc::ClientAsyncResponseReaderInterface::ReadInitialMetadata
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of initial metadata.
grpc::ServerContext::compression_level
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236
grpc::internal::CallOpClientRecvStatus::ClientRecvStatus
void ClientRecvStatus(grpc::ClientContext *context, Status *status)
Definition: call_op_set.h:780
grpc::ClientContext
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:194
grpc::ClientAsyncResponseReader
Async API for client-side unary RPCs, where the message response received from the server is of type ...
Definition: client_context.h:92
grpc.h
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:68
std::default_delete< grpc::ClientAsyncResponseReaderInterface< R > >::operator()
void operator()(void *)
Definition: async_unary_call.h:419
grpc::internal::CallOpSetInterface
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:36
grpc::internal::ClientAsyncResponseReaderHelper::SetupRequest
static void SetupRequest(grpc::ChannelInterface *channel, grpc_call *call, grpc::internal::CallOpSendInitialMetadata **single_buf_ptr, std::function< void(ClientContext *, internal::Call *, internal::CallOpSendInitialMetadata *, void *)> *read_initial_metadata, std::function< void(ClientContext *, internal::Call *, bool initial_metadata_read, internal::CallOpSendInitialMetadata *, internal::CallOpSetInterface **, void *, Status *, void *)> *finish, const W &request)
Definition: async_unary_call.h:113
grpc::ServerAsyncResponseWriter::ServerAsyncResponseWriter
ServerAsyncResponseWriter(grpc::ServerContext *ctx)
Definition: async_unary_call.h:300
channel_interface.h
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:72
grpc::ServerAsyncResponseWriter::FinishWithError
void FinishWithError(const grpc::Status &status, void *tag)
Indicate that the stream is to be finished with a non-OK status, and request notification for when th...
Definition: async_unary_call.h:380
grpc::internal::ClientAsyncResponseReaderHelper
Definition: async_unary_call.h:79
grpc::ClientAsyncResponseReaderInterface::~ClientAsyncResponseReaderInterface
virtual ~ClientAsyncResponseReaderInterface()
Definition: async_unary_call.h:45
call_op_set_interface.h
grpc::ClientAsyncResponseReader::Finish
void Finish(R *msg, grpc::Status *status, void *tag) override
See ClientAsyncResponseReaderInterface::Finish for semantics.
Definition: async_unary_call.h:261
grpc::ClientAsyncResponseReader::ReadInitialMetadata
void ReadInitialMetadata(void *tag) override
See ClientAsyncResponseReaderInterface::ReadInitialMetadata for semantics.
Definition: async_unary_call.h:249
grpc::internal::CallOpSet::set_output_tag
void set_output_tag(void *return_tag)
Definition: call_op_set.h:941
grpc::internal::ClientAsyncResponseReaderFactory::Create
static ClientAsyncResponseReader< R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request, bool start)
Definition: async_unary_call.h:204
client_context.h
grpc::internal::ClientAsyncResponseReaderHelper::StartCall
static void StartCall(grpc::ClientContext *context, grpc::internal::CallOpSendInitialMetadata *single_buf)
Definition: async_unary_call.h:191
grpc::ServerAsyncResponseWriter::SendInitialMetadata
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_unary_call.h:310
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:70
server_context.h
call_op_set.h
std
Definition: async_unary_call.h:410
call.h
grpc::ServerContext::compression_level_set
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
grpc::internal::CallOpClientRecvStatus
Definition: call_op_set.h:773
service_type.h
grpc::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:104
grpc::internal::CallOpRecvMessage
Definition: call_op_set.h:429
grpc::internal::RpcMethod
Descriptor of an RPC method.
Definition: rpc_method.h:29
grpc::internal::CallOpSendInitialMetadata::SendInitialMetadata
void SendInitialMetadata(std::multimap< std::string, std::string > *metadata, uint32_t flags)
Definition: call_op_set.h:225
grpc::protobuf::util::Status
::absl::Status Status
Definition: config_protobuf.h:107
grpc::internal::CallOpSet::set_core_cq_tag
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:949
grpc::ChannelInterface::memory_allocator
virtual grpc_event_engine::experimental::MemoryAllocator * memory_allocator() const
Definition: channel_interface.h:106