GRPC C++  1.81.0
server_callback_handlers.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 
18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
20 
21 #include <grpc/grpc.h>
22 #include <grpc/impl/call.h>
25 #include <grpcpp/server.h>
26 #include <grpcpp/server_context.h>
29 #include <grpcpp/support/status.h>
30 
31 #include "absl/log/absl_check.h"
32 
33 namespace grpc {
34 namespace internal {
35 
36 template <class RequestType, class ResponseType>
38  public:
41  const RequestType*, ResponseType*)>
42  get_reactor)
43  : get_reactor_(std::move(get_reactor)) {}
44 
47  allocator_ = allocator;
48  }
49 
50  void RunHandler(const HandlerParameter& param) final {
51  // Arena allocate a controller structure (that includes request/response)
52  grpc_call_ref(param.call->call());
53  auto* allocator_state =
55  param.internal_data);
56 
57  auto* call = new (grpc_call_arena_alloc(param.call->call(),
58  sizeof(ServerCallbackUnaryImpl)))
59  ServerCallbackUnaryImpl(
60  static_cast<grpc::CallbackServerContext*>(param.server_context),
61  param.call, allocator_state, param.call_requester);
62  param.server_context->BeginCompletionOp(
63  param.call, [call](bool) { call->MaybeDone(); }, call);
64 
65  ServerUnaryReactor* reactor = nullptr;
66  if (param.status.ok()) {
67  reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
68  get_reactor_,
69  static_cast<grpc::CallbackServerContext*>(param.server_context),
70  call->request(), call->response());
71  }
72 
73  if (reactor == nullptr) {
74  // if deserialization or reactor creator failed, we need to fail the call
75  reactor = new (grpc_call_arena_alloc(param.call->call(),
79  }
80 
82  call->SetupReactor(reactor);
83  }
84 
86  grpc::Status* status, void** handler_data) final {
87  grpc::ByteBuffer buf;
88  buf.set_buffer(req);
89  RequestType* request = nullptr;
91  if (allocator_ != nullptr) {
92  allocator_state = allocator_->AllocateMessages();
93  } else {
94  allocator_state = new (grpc_call_arena_alloc(
97  }
98  *handler_data = allocator_state;
99  request = allocator_state->request();
100  *status = grpc::Deserialize(&buf, request);
101  buf.Release();
102  if (status->ok()) {
103  return request;
104  }
105  return nullptr;
106  }
107 
108  private:
110  const RequestType*, ResponseType*)>
111  get_reactor_;
112  MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
113 
114  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
115  public:
116  void Finish(grpc::Status s) override {
117  // A callback that only contains a call to MaybeDone can be run as an
118  // inline callback regardless of whether or not OnDone is inlineable
119  // because if the actual OnDone callback needs to be scheduled, MaybeDone
120  // is responsible for dispatching to an EventEngine thread if needed.
121  // Thus, when setting up the finish_tag_, we can set its own callback to
122  // inlineable.
123  finish_tag_.Set(
124  call_.call(),
125  [this](bool) {
126  this->MaybeDone(
127  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
128  },
129  &finish_ops_, /*can_inline=*/true);
130  finish_ops_.set_core_cq_tag(&finish_tag_);
131 
132  if (!ctx_->sent_initial_metadata_) {
133  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
134  ctx_->initial_metadata_flags());
135  if (ctx_->compression_level_set()) {
136  finish_ops_.set_compression_level(ctx_->compression_level());
137  }
138  ctx_->sent_initial_metadata_ = true;
139  }
140  // The response is dropped if the status is not OK.
141  if (s.ok()) {
142  finish_ops_.ServerSendStatus(
143  &ctx_->trailing_metadata_,
144  finish_ops_.SendMessagePtr(response(), ctx_->memory_allocator()));
145  } else {
146  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
147  }
148  finish_ops_.set_core_cq_tag(&finish_tag_);
149  finish_ops_.FillOps(&call_);
150  }
151 
152  void SendInitialMetadata() override {
153  ABSL_CHECK(!ctx_->sent_initial_metadata_);
154  this->Ref();
155  // The callback for this function should not be marked inline because it
156  // is directly invoking a user-controlled reaction
157  // (OnSendInitialMetadataDone). Thus it must be dispatched to an
158  // EventEngine thread. However, any OnDone needed after that can be
159  // inlined because it is already running on an EventEngine thread.
160  meta_tag_.Set(
161  call_.call(),
162  [this](bool ok) {
163  ServerUnaryReactor* reactor =
164  reactor_.load(std::memory_order_relaxed);
165  reactor->OnSendInitialMetadataDone(ok);
166  this->MaybeDone(/*inlineable_ondone=*/true);
167  },
168  &meta_ops_, /*can_inline=*/false);
169  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170  ctx_->initial_metadata_flags());
171  if (ctx_->compression_level_set()) {
172  meta_ops_.set_compression_level(ctx_->compression_level());
173  }
174  ctx_->sent_initial_metadata_ = true;
175  meta_ops_.set_core_cq_tag(&meta_tag_);
176  meta_ops_.FillOps(&call_);
177  }
178 
179  private:
180  friend class CallbackUnaryHandler<RequestType, ResponseType>;
181 
182  ServerCallbackUnaryImpl(
184  MessageHolder<RequestType, ResponseType>* allocator_state,
185  std::function<void()> call_requester)
186  : ctx_(ctx),
187  call_(*call),
188  allocator_state_(allocator_state),
189  call_requester_(std::move(call_requester)) {
190  ctx_->set_message_allocator_state(allocator_state);
191  }
192 
193  grpc_call* call() override { return call_.call(); }
194 
199  void SetupReactor(ServerUnaryReactor* reactor) {
200  reactor_.store(reactor, std::memory_order_relaxed);
201  this->BindReactor(reactor);
202  this->MaybeCallOnCancel(reactor);
203  this->MaybeDone(reactor->InternalInlineable());
204  }
205 
206  const RequestType* request() { return allocator_state_->request(); }
207  ResponseType* response() { return allocator_state_->response(); }
208 
209  void CallOnDone() override {
210  reactor_.load(std::memory_order_relaxed)->OnDone();
211  grpc_call* call = call_.call();
212  auto call_requester = std::move(call_requester_);
213  allocator_state_->Release();
214  if (ctx_->context_allocator() != nullptr) {
215  ctx_->context_allocator()->Release(ctx_);
216  }
217  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
218  grpc_call_unref(call);
219  call_requester();
220  }
221 
222  ServerReactor* reactor() override {
223  return reactor_.load(std::memory_order_relaxed);
224  }
225 
227  meta_ops_;
232  finish_ops_;
234 
235  grpc::CallbackServerContext* const ctx_;
236  grpc::internal::Call call_;
237  MessageHolder<RequestType, ResponseType>* const allocator_state_;
238  std::function<void()> call_requester_;
239  // reactor_ can always be loaded/stored with relaxed memory ordering because
240  // its value is only set once, independently of other data in the object,
241  // and the loads that use it will always actually come provably later even
242  // though they are from different threads since they are triggered by
243  // actions initiated only by the setting up of the reactor_ variable. In
244  // a sense, it's a delayed "const": it gets its value from the SetupReactor
245  // method (not the constructor, so it's not a true const), but it doesn't
246  // change after that and it only gets used by actions caused, directly or
247  // indirectly, by that setup. This comment also applies to the reactor_
248  // variables of the other streaming objects in this file.
249  std::atomic<ServerUnaryReactor*> reactor_;
250  // callbacks_outstanding_ follows a refcount pattern
251  std::atomic<intptr_t> callbacks_outstanding_{
252  3}; // reserve for start, Finish, and CompletionOp
253  };
254 };
255 
256 template <class RequestType, class ResponseType>
258  public:
260  std::function<ServerReadReactor<RequestType>*(
261  grpc::CallbackServerContext*, ResponseType*)>
262  get_reactor)
263  : get_reactor_(std::move(get_reactor)) {}
264  void RunHandler(const HandlerParameter& param) final {
265  // Arena allocate a reader structure (that includes response)
266  grpc_call_ref(param.call->call());
267 
268  auto* reader = new (grpc_call_arena_alloc(param.call->call(),
269  sizeof(ServerCallbackReaderImpl)))
270  ServerCallbackReaderImpl(
271  static_cast<grpc::CallbackServerContext*>(param.server_context),
272  param.call, param.call_requester);
273  // Inlineable OnDone can be false in the CompletionOp callback because there
274  // is no read reactor that has an inlineable OnDone; this only applies to
275  // the DefaultReactor (which is unary).
276  param.server_context->BeginCompletionOp(
277  param.call,
278  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
279  reader);
280 
281  ServerReadReactor<RequestType>* reactor = nullptr;
282  if (param.status.ok()) {
283  reactor =
284  grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
285  get_reactor_,
286  static_cast<grpc::CallbackServerContext*>(param.server_context),
287  reader->response());
288  }
289 
290  if (reactor == nullptr) {
291  // if deserialization or reactor creator failed, we need to fail the call
292  reactor = new (grpc_call_arena_alloc(
293  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
296  }
297 
298  reader->SetupReactor(reactor);
299  }
300 
301  private:
302  std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*,
303  ResponseType*)>
304  get_reactor_;
305 
306  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
307  public:
308  void Finish(grpc::Status s) override {
309  // A finish tag with only MaybeDone can have its callback inlined
310  // regardless even if OnDone is not inlineable because this callback just
311  // checks a ref and then decides whether or not to dispatch OnDone.
312  finish_tag_.Set(
313  call_.call(),
314  [this](bool) {
315  // Inlineable OnDone can be false here because there is
316  // no read reactor that has an inlineable OnDone; this
317  // only applies to the DefaultReactor (which is unary).
318  this->MaybeDone(/*inlineable_ondone=*/false);
319  },
320  &finish_ops_, /*can_inline=*/true);
321  if (!ctx_->sent_initial_metadata_) {
322  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
323  ctx_->initial_metadata_flags());
324  if (ctx_->compression_level_set()) {
325  finish_ops_.set_compression_level(ctx_->compression_level());
326  }
327  ctx_->sent_initial_metadata_ = true;
328  }
329  // The response is dropped if the status is not OK.
330  if (s.ok()) {
331  finish_ops_.ServerSendStatus(
332  &ctx_->trailing_metadata_,
333  finish_ops_.SendMessagePtr(&resp_, ctx_->memory_allocator()));
334  } else {
335  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
336  }
337  finish_ops_.set_core_cq_tag(&finish_tag_);
338  finish_ops_.FillOps(&call_);
339  }
340 
341  void SendInitialMetadata() override {
342  ABSL_CHECK(!ctx_->sent_initial_metadata_);
343  this->Ref();
344  // The callback for this function should not be inlined because it invokes
345  // a user-controlled reaction, but any resulting OnDone can be inlined in
346  // the EventEngine thread to which this callback is dispatched.
347  meta_tag_.Set(
348  call_.call(),
349  [this](bool ok) {
350  ServerReadReactor<RequestType>* reactor =
351  reactor_.load(std::memory_order_relaxed);
352  reactor->OnSendInitialMetadataDone(ok);
353  this->MaybeDone(/*inlineable_ondone=*/true);
354  },
355  &meta_ops_, /*can_inline=*/false);
356  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
357  ctx_->initial_metadata_flags());
358  if (ctx_->compression_level_set()) {
359  meta_ops_.set_compression_level(ctx_->compression_level());
360  }
361  ctx_->sent_initial_metadata_ = true;
362  meta_ops_.set_core_cq_tag(&meta_tag_);
363  meta_ops_.FillOps(&call_);
364  }
365 
366  void Read(RequestType* req) override {
367  this->Ref();
368  read_ops_.RecvMessage(req);
369  read_ops_.FillOps(&call_);
370  }
371 
372  private:
373  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
374 
375  ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx,
376  grpc::internal::Call* call,
377  std::function<void()> call_requester)
378  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
379 
380  grpc_call* call() override { return call_.call(); }
381 
382  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
383  reactor_.store(reactor, std::memory_order_relaxed);
384  // The callback for this function should not be inlined because it invokes
385  // a user-controlled reaction, but any resulting OnDone can be inlined in
386  // the EventEngine thread to which this callback is dispatched.
387  read_tag_.Set(
388  call_.call(),
389  [this, reactor](bool ok) {
390  if (GPR_UNLIKELY(!ok)) {
391  ctx_->MaybeMarkCancelledOnRead();
392  }
393  reactor->OnReadDone(ok);
394  this->MaybeDone(/*inlineable_ondone=*/true);
395  },
396  &read_ops_, /*can_inline=*/false);
397  read_ops_.set_core_cq_tag(&read_tag_);
398  this->BindReactor(reactor);
399  this->MaybeCallOnCancel(reactor);
400  // Inlineable OnDone can be false here because there is no read
401  // reactor that has an inlineable OnDone; this only applies to the
402  // DefaultReactor (which is unary).
403  this->MaybeDone(/*inlineable_ondone=*/false);
404  }
405 
406  ~ServerCallbackReaderImpl() {}
407 
408  ResponseType* response() { return &resp_; }
409 
410  void CallOnDone() override {
411  reactor_.load(std::memory_order_relaxed)->OnDone();
412  grpc_call* call = call_.call();
413  auto call_requester = std::move(call_requester_);
414  if (ctx_->context_allocator() != nullptr) {
415  ctx_->context_allocator()->Release(ctx_);
416  }
417  this->~ServerCallbackReaderImpl(); // explicitly call destructor
418  grpc_call_unref(call);
419  call_requester();
420  }
421 
422  ServerReactor* reactor() override {
423  return reactor_.load(std::memory_order_relaxed);
424  }
425 
427  meta_ops_;
432  finish_ops_;
435  read_ops_;
437 
438  grpc::CallbackServerContext* const ctx_;
439  grpc::internal::Call call_;
440  ResponseType resp_;
441  std::function<void()> call_requester_;
442  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
443  std::atomic<ServerReadReactor<RequestType>*> reactor_;
444  // callbacks_outstanding_ follows a refcount pattern
445  std::atomic<intptr_t> callbacks_outstanding_{
446  3}; // reserve for OnStarted, Finish, and CompletionOp
447  };
448 };
449 
450 template <class RequestType, class ResponseType>
452  public:
454  std::function<ServerWriteReactor<ResponseType>*(
455  grpc::CallbackServerContext*, const RequestType*)>
456  get_reactor)
457  : get_reactor_(std::move(get_reactor)) {}
458  void RunHandler(const HandlerParameter& param) final {
459  // Arena allocate a writer structure
460  grpc_call_ref(param.call->call());
461 
462  auto* writer = new (grpc_call_arena_alloc(param.call->call(),
463  sizeof(ServerCallbackWriterImpl)))
464  ServerCallbackWriterImpl(
465  static_cast<grpc::CallbackServerContext*>(param.server_context),
466  param.call, static_cast<RequestType*>(param.request),
467  param.call_requester);
468  // Inlineable OnDone can be false in the CompletionOp callback because there
469  // is no write reactor that has an inlineable OnDone; this only applies to
470  // the DefaultReactor (which is unary).
471  param.server_context->BeginCompletionOp(
472  param.call,
473  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
474  writer);
475 
476  ServerWriteReactor<ResponseType>* reactor = nullptr;
477  if (param.status.ok()) {
480  get_reactor_,
481  static_cast<grpc::CallbackServerContext*>(param.server_context),
482  writer->request());
483  }
484  if (reactor == nullptr) {
485  // if deserialization or reactor creator failed, we need to fail the call
486  reactor = new (grpc_call_arena_alloc(
487  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
490  }
491 
492  writer->SetupReactor(reactor);
493  }
494 
496  grpc::Status* status, void** /*handler_data*/) final {
497  grpc::ByteBuffer buf;
498  buf.set_buffer(req);
499  auto* request =
500  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
501  *status = grpc::Deserialize(&buf, request);
502  buf.Release();
503  if (status->ok()) {
504  return request;
505  }
506  request->~RequestType();
507  return nullptr;
508  }
509 
510  private:
511  std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*,
512  const RequestType*)>
513  get_reactor_;
514 
515  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
516  public:
517  void Finish(grpc::Status s) override {
518  // A finish tag with only MaybeDone can have its callback inlined
519  // regardless even if OnDone is not inlineable because this callback just
520  // checks a ref and then decides whether or not to dispatch OnDone.
521  finish_tag_.Set(
522  call_.call(),
523  [this](bool) {
524  // Inlineable OnDone can be false here because there is
525  // no write reactor that has an inlineable OnDone; this
526  // only applies to the DefaultReactor (which is unary).
527  this->MaybeDone(/*inlineable_ondone=*/false);
528  },
529  &finish_ops_, /*can_inline=*/true);
530  finish_ops_.set_core_cq_tag(&finish_tag_);
531 
532  if (!ctx_->sent_initial_metadata_) {
533  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
534  ctx_->initial_metadata_flags());
535  if (ctx_->compression_level_set()) {
536  finish_ops_.set_compression_level(ctx_->compression_level());
537  }
538  ctx_->sent_initial_metadata_ = true;
539  }
540  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
541  finish_ops_.FillOps(&call_);
542  }
543 
544  void SendInitialMetadata() override {
545  ABSL_CHECK(!ctx_->sent_initial_metadata_);
546  this->Ref();
547  // The callback for this function should not be inlined because it invokes
548  // a user-controlled reaction, but any resulting OnDone can be inlined in
549  // the EventEngine thread to which this callback is dispatched.
550  meta_tag_.Set(
551  call_.call(),
552  [this](bool ok) {
553  ServerWriteReactor<ResponseType>* reactor =
554  reactor_.load(std::memory_order_relaxed);
555  reactor->OnSendInitialMetadataDone(ok);
556  this->MaybeDone(/*inlineable_ondone=*/true);
557  },
558  &meta_ops_, /*can_inline=*/false);
559  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
560  ctx_->initial_metadata_flags());
561  if (ctx_->compression_level_set()) {
562  meta_ops_.set_compression_level(ctx_->compression_level());
563  }
564  ctx_->sent_initial_metadata_ = true;
565  meta_ops_.set_core_cq_tag(&meta_tag_);
566  meta_ops_.FillOps(&call_);
567  }
568 
569  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
570  this->Ref();
571  if (options.is_last_message()) {
572  options.set_buffer_hint();
573  }
574  if (!ctx_->sent_initial_metadata_) {
575  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
576  ctx_->initial_metadata_flags());
577  if (ctx_->compression_level_set()) {
578  write_ops_.set_compression_level(ctx_->compression_level());
579  }
580  ctx_->sent_initial_metadata_ = true;
581  }
582  // TODO(vjpai): don't assert
583  ABSL_CHECK(
584  write_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
585  .ok());
586  write_ops_.FillOps(&call_);
587  }
588 
589  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
590  grpc::Status s) override {
591  // This combines the write into the finish callback
592  // TODO(vjpai): don't assert
593  ABSL_CHECK(
594  finish_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
595  .ok());
596  Finish(std::move(s));
597  }
598 
599  private:
600  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
601 
602  ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx,
603  grpc::internal::Call* call, const RequestType* req,
604  std::function<void()> call_requester)
605  : ctx_(ctx),
606  call_(*call),
607  req_(req),
608  call_requester_(std::move(call_requester)) {}
609 
610  grpc_call* call() override { return call_.call(); }
611 
612  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
613  reactor_.store(reactor, std::memory_order_relaxed);
614  // The callback for this function should not be inlined because it invokes
615  // a user-controlled reaction, but any resulting OnDone can be inlined in
616  // the EventEngine thread to which this callback is dispatched.
617  write_tag_.Set(
618  call_.call(),
619  [this, reactor](bool ok) {
620  reactor->OnWriteDone(ok);
621  this->MaybeDone(/*inlineable_ondone=*/true);
622  },
623  &write_ops_, /*can_inline=*/false);
624  write_ops_.set_core_cq_tag(&write_tag_);
625  this->BindReactor(reactor);
626  this->MaybeCallOnCancel(reactor);
627  // Inlineable OnDone can be false here because there is no write
628  // reactor that has an inlineable OnDone; this only applies to the
629  // DefaultReactor (which is unary).
630  this->MaybeDone(/*inlineable_ondone=*/false);
631  }
632  ~ServerCallbackWriterImpl() {
633  if (req_ != nullptr) {
634  req_->~RequestType();
635  }
636  }
637 
638  const RequestType* request() { return req_; }
639 
640  void CallOnDone() override {
641  reactor_.load(std::memory_order_relaxed)->OnDone();
642  grpc_call* call = call_.call();
643  auto call_requester = std::move(call_requester_);
644  if (ctx_->context_allocator() != nullptr) {
645  ctx_->context_allocator()->Release(ctx_);
646  }
647  this->~ServerCallbackWriterImpl(); // explicitly call destructor
648  grpc_call_unref(call);
649  call_requester();
650  }
651 
652  ServerReactor* reactor() override {
653  return reactor_.load(std::memory_order_relaxed);
654  }
655 
657  meta_ops_;
662  finish_ops_;
666  write_ops_;
668 
669  grpc::CallbackServerContext* const ctx_;
670  grpc::internal::Call call_;
671  const RequestType* req_;
672  std::function<void()> call_requester_;
673  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
674  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
675  // callbacks_outstanding_ follows a refcount pattern
676  std::atomic<intptr_t> callbacks_outstanding_{
677  3}; // reserve for OnStarted, Finish, and CompletionOp
678  };
679 };
680 
681 template <class RequestType, class ResponseType>
683  public:
687  get_reactor)
688  : get_reactor_(std::move(get_reactor)) {}
689  void RunHandler(const HandlerParameter& param) final {
690  grpc_call_ref(param.call->call());
691 
692  auto* stream = new (grpc_call_arena_alloc(
693  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
694  ServerCallbackReaderWriterImpl(
695  static_cast<grpc::CallbackServerContext*>(param.server_context),
696  param.call, param.call_requester);
697  // Inlineable OnDone can be false in the CompletionOp callback because there
698  // is no bidi reactor that has an inlineable OnDone; this only applies to
699  // the DefaultReactor (which is unary).
700  param.server_context->BeginCompletionOp(
701  param.call,
702  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
703  stream);
704 
706  if (param.status.ok()) {
709  get_reactor_,
710  static_cast<grpc::CallbackServerContext*>(param.server_context));
711  }
712 
713  if (reactor == nullptr) {
714  // if deserialization or reactor creator failed, we need to fail the call
715  reactor = new (grpc_call_arena_alloc(
716  param.call->call(),
720  }
721 
722  stream->SetupReactor(reactor);
723  }
724 
725  private:
726  std::function<ServerBidiReactor<RequestType, ResponseType>*(
728  get_reactor_;
729 
730  class ServerCallbackReaderWriterImpl
731  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
732  public:
733  void Finish(grpc::Status s) override {
734  // A finish tag with only MaybeDone can have its callback inlined
735  // regardless even if OnDone is not inlineable because this callback just
736  // checks a ref and then decides whether or not to dispatch OnDone.
737  finish_tag_.Set(
738  call_.call(),
739  [this](bool) {
740  // Inlineable OnDone can be false here because there is
741  // no bidi reactor that has an inlineable OnDone; this
742  // only applies to the DefaultReactor (which is unary).
743  this->MaybeDone(/*inlineable_ondone=*/false);
744  },
745  &finish_ops_, /*can_inline=*/true);
746  finish_ops_.set_core_cq_tag(&finish_tag_);
747 
748  if (!ctx_->sent_initial_metadata_) {
749  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
750  ctx_->initial_metadata_flags());
751  if (ctx_->compression_level_set()) {
752  finish_ops_.set_compression_level(ctx_->compression_level());
753  }
754  ctx_->sent_initial_metadata_ = true;
755  }
756  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
757  finish_ops_.FillOps(&call_);
758  }
759 
760  void SendInitialMetadata() override {
761  ABSL_CHECK(!ctx_->sent_initial_metadata_);
762  this->Ref();
763  // The callback for this function should not be inlined because it invokes
764  // a user-controlled reaction, but any resulting OnDone can be inlined in
765  // the EventEngine thread to which this callback is dispatched.
766  meta_tag_.Set(
767  call_.call(),
768  [this](bool ok) {
769  ServerBidiReactor<RequestType, ResponseType>* reactor =
770  reactor_.load(std::memory_order_relaxed);
771  reactor->OnSendInitialMetadataDone(ok);
772  this->MaybeDone(/*inlineable_ondone=*/true);
773  },
774  &meta_ops_, /*can_inline=*/false);
775  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
776  ctx_->initial_metadata_flags());
777  if (ctx_->compression_level_set()) {
778  meta_ops_.set_compression_level(ctx_->compression_level());
779  }
780  ctx_->sent_initial_metadata_ = true;
781  meta_ops_.set_core_cq_tag(&meta_tag_);
782  meta_ops_.FillOps(&call_);
783  }
784 
785  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
786  this->Ref();
787  if (options.is_last_message()) {
788  options.set_buffer_hint();
789  }
790  if (!ctx_->sent_initial_metadata_) {
791  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
792  ctx_->initial_metadata_flags());
793  if (ctx_->compression_level_set()) {
794  write_ops_.set_compression_level(ctx_->compression_level());
795  }
796  ctx_->sent_initial_metadata_ = true;
797  }
798  // TODO(vjpai): don't assert
799  ABSL_CHECK(
800  write_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
801  .ok());
802  write_ops_.FillOps(&call_);
803  }
804 
805  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
806  grpc::Status s) override {
807  // TODO(vjpai): don't assert
808  ABSL_CHECK(
809  finish_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
810  .ok());
811  Finish(std::move(s));
812  }
813 
814  void Read(RequestType* req) override {
815  this->Ref();
816  read_ops_.RecvMessage(req);
817  read_ops_.FillOps(&call_);
818  }
819 
820  private:
821  friend class CallbackBidiHandler<RequestType, ResponseType>;
822 
823  ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx,
824  grpc::internal::Call* call,
825  std::function<void()> call_requester)
826  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
827 
828  grpc_call* call() override { return call_.call(); }
829 
830  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
831  reactor_.store(reactor, std::memory_order_relaxed);
832  // The callbacks for these functions should not be inlined because they
833  // invoke user-controlled reactions, but any resulting OnDones can be
834  // inlined in the EventEngine thread to which a callback is dispatched.
835  write_tag_.Set(
836  call_.call(),
837  [this, reactor](bool ok) {
838  reactor->OnWriteDone(ok);
839  this->MaybeDone(/*inlineable_ondone=*/true);
840  },
841  &write_ops_, /*can_inline=*/false);
842  write_ops_.set_core_cq_tag(&write_tag_);
843  read_tag_.Set(
844  call_.call(),
845  [this, reactor](bool ok) {
846  if (GPR_UNLIKELY(!ok)) {
847  ctx_->MaybeMarkCancelledOnRead();
848  }
849  reactor->OnReadDone(ok);
850  this->MaybeDone(/*inlineable_ondone=*/true);
851  },
852  &read_ops_, /*can_inline=*/false);
853  read_ops_.set_core_cq_tag(&read_tag_);
854  this->BindReactor(reactor);
855  this->MaybeCallOnCancel(reactor);
856  // Inlineable OnDone can be false here because there is no bidi
857  // reactor that has an inlineable OnDone; this only applies to the
858  // DefaultReactor (which is unary).
859  this->MaybeDone(/*inlineable_ondone=*/false);
860  }
861 
862  void CallOnDone() override {
863  reactor_.load(std::memory_order_relaxed)->OnDone();
864  grpc_call* call = call_.call();
865  auto call_requester = std::move(call_requester_);
866  if (ctx_->context_allocator() != nullptr) {
867  ctx_->context_allocator()->Release(ctx_);
868  }
869  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
870  grpc_call_unref(call);
871  call_requester();
872  }
873 
874  ServerReactor* reactor() override {
875  return reactor_.load(std::memory_order_relaxed);
876  }
877 
879  meta_ops_;
884  finish_ops_;
888  write_ops_;
891  read_ops_;
893 
894  grpc::CallbackServerContext* const ctx_;
895  grpc::internal::Call call_;
896  std::function<void()> call_requester_;
897  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
898  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
899  // callbacks_outstanding_ follows a refcount pattern
900  std::atomic<intptr_t> callbacks_outstanding_{
901  3}; // reserve for OnStarted, Finish, and CompletionOp
902  };
903 };
904 
905 } // namespace internal
906 
907 namespace experimental {
908 namespace internal {
909 
910 template <class RequestType>
912  public:
915  grpc::CallbackServerContext*, const RequestType*)>
916  get_reactor,
917  grpc::Service* service = nullptr)
918  : get_reactor_(std::move(get_reactor)), service_(service) {
919  ABSL_CHECK(service_ != nullptr && service_->is_virtual_service_);
920  }
921 
922  void RunHandler(const HandlerParameter& param) final {
923  // Arena allocate a controller structure (that includes request/response)
924  grpc_call_ref(param.call->call());
925  auto* allocator_state =
927  param.internal_data);
928 
929  grpc::Server* inner_server = nullptr;
930  inner_server = static_cast<grpc::Server*>(service_->server_);
931  ABSL_CHECK(inner_server != nullptr);
932 
933  auto* call = new (grpc_call_arena_alloc(param.call->call(),
934  sizeof(ServerCallbackSessionImpl)))
935  ServerCallbackSessionImpl(
936  static_cast<grpc::CallbackServerContext*>(param.server_context),
937  param.call, allocator_state, param.call_requester, inner_server);
938 
939  param.server_context->BeginCompletionOp(
940  param.call, [call](bool) { call->MaybeDone(); }, call);
941 
942  grpc::experimental::ServerSessionReactor* reactor = nullptr;
943  if (param.status.ok()) {
946  get_reactor_,
947  static_cast<grpc::CallbackServerContext*>(param.server_context),
948  call->request());
949  }
950 
951  if (reactor == nullptr) {
952  // if deserialization or reactor creator failed, we need to fail the call.
953  reactor = new (grpc_call_arena_alloc(param.call->call(),
957  }
958 
960  call->SetupReactor(reactor);
961  }
962 
964  grpc::Status* status, void** handler_data) final {
965  grpc::ByteBuffer buf;
966  buf.set_buffer(req);
967  RequestType* request = nullptr;
969  allocator_state = new (grpc_call_arena_alloc(
970  call, sizeof(grpc::internal::DefaultMessageHolder<RequestType,
971  grpc::ByteBuffer>)))
973  *handler_data = allocator_state;
974  request = allocator_state->request();
975  *status = grpc::Deserialize(&buf, request);
976  buf.Release();
977  if (status->ok()) {
978  return request;
979  }
980  return nullptr;
981  }
982 
983  private:
985  grpc::CallbackServerContext*, const RequestType*)>
986  get_reactor_;
987  grpc::Service* service_;
988 
989  class ServerCallbackSessionImpl
991  public:
992  void Finish(grpc::Status s) override {
993  if (ctx_->IsCancelled()) {
994  MaybeDone(
995  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
996  return;
997  }
998  // A callback that only contains a call to MaybeDone can be run as an
999  // inline callback regardless of whether or not OnDone is inlineable
1000  // because if the actual OnDone callback needs to be scheduled, MaybeDone
1001  // is responsible for dispatching to an EventEngine thread if needed.
1002  // Thus, when setting up the finish_tag_, we can set its own callback to
1003  // inlineable.
1004  finish_tag_.Set(
1005  call_.call(),
1006  [this](bool) {
1007  this->MaybeDone(
1008  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
1009  },
1010  &finish_ops_, /*can_inline=*/true);
1011  finish_ops_.set_core_cq_tag(&finish_tag_);
1012 
1013  bool is_first_metadata = !ctx_->sent_initial_metadata_;
1014  if (is_first_metadata) {
1015  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1016  ctx_->initial_metadata_flags());
1017  if (ctx_->compression_level_set()) {
1018  finish_ops_.set_compression_level(ctx_->compression_level());
1019  }
1020  ctx_->sent_initial_metadata_ = true;
1021  }
1022  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
1023  finish_ops_.set_core_cq_tag(&finish_tag_);
1024  finish_ops_.FillOps(&call_);
1025  }
1026 
1027  void SendInitialMetadata() override {
1028  ABSL_CHECK(!ctx_->sent_initial_metadata_);
1029  this->Ref();
1030  // The callback for this function should not be marked inline because it
1031  // is directly invoking a user-controlled reaction
1032  // (OnSendInitialMetadataDone). Thus it must be dispatched to an
1033  // EventEngine thread. However, any OnDone needed after that can be
1034  // inlined because it is already running on an EventEngine thread.
1035  meta_tag_.Set(
1036  call_.call(),
1037  [this](bool ok) {
1038  grpc::experimental::ServerSessionReactor* reactor =
1039  reactor_.load(std::memory_order_relaxed);
1040  reactor->OnSendInitialMetadataDone(ok);
1041  this->MaybeDone(/*inlineable_ondone=*/true);
1042  },
1043  &meta_ops_, /*can_inline=*/false);
1044  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1045  ctx_->initial_metadata_flags());
1046  if (ctx_->compression_level_set()) {
1047  meta_ops_.set_compression_level(ctx_->compression_level());
1048  }
1049  ctx_->sent_initial_metadata_ = true;
1050  meta_ops_.set_core_cq_tag(&meta_tag_);
1051  meta_ops_.FillOps(&call_);
1052  // We bind the inner server only when sending initial metadata because
1053  // this signals that the session handler has completed context
1054  // establishment and virtual RPCs can be started.
1055  BindInnerServer(inner_server_);
1056  }
1057 
1058  void BindInnerServer(grpc::Server* inner_server) override {
1060  inner_server);
1061  }
1062 
1063  private:
1064  friend class CallbackSessionHandler<RequestType>;
1065 
1066  ServerCallbackSessionImpl(
1068  MessageHolder<RequestType, grpc::ByteBuffer>* allocator_state,
1069  std::function<void()> call_requester, grpc::Server* inner_server)
1070  : ctx_(ctx),
1071  call_(*call),
1072  allocator_state_(allocator_state),
1073  call_requester_(std::move(call_requester)),
1074  inner_server_(inner_server) {
1075  ABSL_CHECK(inner_server_ != nullptr);
1076  ctx_->set_message_allocator_state(allocator_state);
1077  }
1078 
1079  grpc_call* call() override { return call_.call(); }
1080 
1085  void SetupReactor(grpc::experimental::ServerSessionReactor* reactor) {
1086  reactor_.store(reactor, std::memory_order_relaxed);
1087  this->BindReactor(reactor);
1088  this->MaybeCallOnCancel(reactor);
1089  this->MaybeDone(reactor->InternalInlineable());
1090  }
1091 
1092  const RequestType* request() { return allocator_state_->request(); }
1093 
1094  void CallOnDone() override {
1095  reactor_.load(std::memory_order_relaxed)->OnDone();
1096  grpc_call* call = call_.call();
1097  auto call_requester = std::move(call_requester_);
1098  allocator_state_->Release();
1099  if (ctx_->context_allocator() != nullptr) {
1100  ctx_->context_allocator()->Release(ctx_);
1101  }
1102  this->~ServerCallbackSessionImpl(); // explicitly call destructor
1103  grpc_call_unref(call);
1104  call_requester();
1105  }
1106 
1107  grpc::internal::ServerReactor* reactor() override {
1108  return reactor_.load(std::memory_order_relaxed);
1109  }
1110 
1112  meta_ops_;
1116  finish_ops_;
1118 
1119  grpc::CallbackServerContext* const ctx_;
1120  grpc::internal::Call call_;
1121  MessageHolder<RequestType, grpc::ByteBuffer>* const allocator_state_;
1122  std::function<void()> call_requester_;
1123  grpc::Server* inner_server_;
1124  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
1125  std::atomic<grpc::experimental::ServerSessionReactor*> reactor_;
1126  // callbacks_outstanding_ follows a refcount pattern
1127  std::atomic<intptr_t> callbacks_outstanding_{
1128  3}; // reserve for start, Finish, and CompletionOp
1129  };
1130 };
1131 
1132 } // namespace internal
1133 } // namespace experimental
1134 } // namespace grpc
1135 
1136 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
grpc::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:199
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:153
grpc::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
message_allocator.h
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:656
grpc::Server
Represents a gRPC server.
Definition: server.h:58
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata >
grpc::CallbackServerContext
Definition: server_context.h:632
grpc::Deserialize
auto Deserialize(BufferPtr buffer, Message *msg)
Definition: serialization_traits.h:120
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:287
grpc::experimental::internal::BindSessionToInnerServer
void BindSessionToInnerServer(grpc_call *call, grpc::Server *inner_server)
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:43
grpc::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:46
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:34
rpc_service_method.h
grpc::Service
Descriptor of an RPC service and its various RPC methods.
Definition: service_type.h:66
grpc::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:264
status.h
grpc::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:223
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:217
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:118
grpc::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:259
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:124
grpc_call_ref
GRPCAPI void grpc_call_ref(grpc_call *call)
Ref a call.
grpc::ServerCallbackReaderWriter
Definition: server_callback.h:277
grpc::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:39
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
grpc::experimental::ServerSessionReactor
Definition: server_callback.h:805
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Unref a call.
grpc::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: server_callback_handlers.h:495
grpc::experimental::internal::CallbackSessionHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:963
grpc::MessageAllocator< RequestType, ResponseType >
grpc.h
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:68
grpc::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:203
grpc_byte_buffer
Definition: grpc_types.h:41
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:67
grpc::internal::CallbackUnaryHandler
Definition: server_callback_handlers.h:37
grpc::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::DefaultMessageHolder
Definition: server_callback.h:178
grpc::internal::CallbackServerStreamingHandler
Definition: server_callback_handlers.h:451
grpc::experimental::ServerCallbackSession
Definition: server_callback.h:229
grpc::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:453
grpc::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:689
grpc::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:50
grpc::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:254
grpc::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::experimental::internal::CallbackSessionHandler::CallbackSessionHandler
CallbackSessionHandler(std::function< grpc::experimental::ServerSessionReactor *(grpc::CallbackServerContext *, const RequestType *)> get_reactor, grpc::Service *service=nullptr)
Definition: server_callback_handlers.h:913
grpc::internal::FinishOnlyReactor
Definition: server_context.h:88
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:40
server_callback.h
grpc::experimental::internal::CallbackSessionHandler
Definition: server_callback_handlers.h:911
grpc::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:45
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:201
grpc::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:890
grpc::internal::CallbackBidiHandler
Definition: server_callback_handlers.h:682
grpc::UNIMPLEMENTED
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:117
grpc::ServerCallbackWriter
Definition: server_callback.h:260
call.h
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:55
server_context.h
std
Definition: async_unary_call.h:410
grpc::experimental::internal::UnimplementedSessionReactor
grpc::internal::FinishOnlyReactor< ServerSessionReactor > UnimplementedSessionReactor
Definition: server_callback.h:905
grpc::internal::ServerReactor::InternalInlineable
virtual bool InternalInlineable()
Definition: server_callback.h:70
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:117
grpc::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:45
grpc::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
service_type.h
grpc::internal::ServerReactor
Definition: server_callback.h:60
grpc::experimental::internal::CallbackSessionHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:922
grpc::internal::CallbackClientStreamingHandler
Definition: server_callback_handlers.h:257
grpc::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback.h:136
grpc::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:684
grpc::ServerCallbackReader
Definition: server_callback.h:246
server.h
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:172
grpc::MessageHolder< RequestType, ResponseType >
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:151
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:57
grpc::ServerCallbackUnary
Definition: server_callback.h:213
grpc::ServerUnaryReactor
Definition: server_callback.h:736
grpc::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:458
grpc::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback.h:104
grpc::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:85