GRPC C++  1.66.0
server_callback_handlers.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 
18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
20 
21 #include "absl/log/absl_check.h"
22 
23 #include <grpc/grpc.h>
24 #include <grpc/impl/call.h>
25 #include <grpc/support/log.h>
27 #include <grpcpp/server_context.h>
30 #include <grpcpp/support/status.h>
31 
32 namespace grpc {
33 namespace internal {
34 
35 template <class RequestType, class ResponseType>
37  public:
40  const RequestType*, ResponseType*)>
41  get_reactor)
42  : get_reactor_(std::move(get_reactor)) {}
43 
46  allocator_ = allocator;
47  }
48 
49  void RunHandler(const HandlerParameter& param) final {
50  // Arena allocate a controller structure (that includes request/response)
51  grpc_call_ref(param.call->call());
52  auto* allocator_state =
54  param.internal_data);
55 
56  auto* call = new (grpc_call_arena_alloc(param.call->call(),
57  sizeof(ServerCallbackUnaryImpl)))
58  ServerCallbackUnaryImpl(
59  static_cast<grpc::CallbackServerContext*>(param.server_context),
60  param.call, allocator_state, param.call_requester);
61  param.server_context->BeginCompletionOp(
62  param.call, [call](bool) { call->MaybeDone(); }, call);
63 
64  ServerUnaryReactor* reactor = nullptr;
65  if (param.status.ok()) {
66  reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
67  get_reactor_,
68  static_cast<grpc::CallbackServerContext*>(param.server_context),
69  call->request(), call->response());
70  }
71 
72  if (reactor == nullptr) {
73  // if deserialization or reactor creator failed, we need to fail the call
74  reactor = new (grpc_call_arena_alloc(param.call->call(),
78  }
79 
81  call->SetupReactor(reactor);
82  }
83 
85  grpc::Status* status, void** handler_data) final {
86  grpc::ByteBuffer buf;
87  buf.set_buffer(req);
88  RequestType* request = nullptr;
90  if (allocator_ != nullptr) {
91  allocator_state = allocator_->AllocateMessages();
92  } else {
93  allocator_state = new (grpc_call_arena_alloc(
96  }
97  *handler_data = allocator_state;
98  request = allocator_state->request();
99  *status =
101  buf.Release();
102  if (status->ok()) {
103  return request;
104  }
105  return nullptr;
106  }
107 
108  private:
110  const RequestType*, ResponseType*)>
111  get_reactor_;
112  MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
113 
114  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
115  public:
116  void Finish(grpc::Status s) override {
117  // A callback that only contains a call to MaybeDone can be run as an
118  // inline callback regardless of whether or not OnDone is inlineable
119  // because if the actual OnDone callback needs to be scheduled, MaybeDone
120  // is responsible for dispatching to an executor thread if needed. Thus,
121  // when setting up the finish_tag_, we can set its own callback to
122  // inlineable.
123  finish_tag_.Set(
124  call_.call(),
125  [this](bool) {
126  this->MaybeDone(
127  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
128  },
129  &finish_ops_, /*can_inline=*/true);
130  finish_ops_.set_core_cq_tag(&finish_tag_);
131 
132  if (!ctx_->sent_initial_metadata_) {
133  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
134  ctx_->initial_metadata_flags());
135  if (ctx_->compression_level_set()) {
136  finish_ops_.set_compression_level(ctx_->compression_level());
137  }
138  ctx_->sent_initial_metadata_ = true;
139  }
140  // The response is dropped if the status is not OK.
141  if (s.ok()) {
142  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
143  finish_ops_.SendMessagePtr(response()));
144  } else {
145  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
146  }
147  finish_ops_.set_core_cq_tag(&finish_tag_);
148  call_.PerformOps(&finish_ops_);
149  }
150 
151  void SendInitialMetadata() override {
152  ABSL_CHECK(!ctx_->sent_initial_metadata_);
153  this->Ref();
154  // The callback for this function should not be marked inline because it
155  // is directly invoking a user-controlled reaction
156  // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
157  // thread. However, any OnDone needed after that can be inlined because it
158  // is already running on an executor thread.
159  meta_tag_.Set(
160  call_.call(),
161  [this](bool ok) {
162  ServerUnaryReactor* reactor =
163  reactor_.load(std::memory_order_relaxed);
164  reactor->OnSendInitialMetadataDone(ok);
165  this->MaybeDone(/*inlineable_ondone=*/true);
166  },
167  &meta_ops_, /*can_inline=*/false);
168  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
169  ctx_->initial_metadata_flags());
170  if (ctx_->compression_level_set()) {
171  meta_ops_.set_compression_level(ctx_->compression_level());
172  }
173  ctx_->sent_initial_metadata_ = true;
174  meta_ops_.set_core_cq_tag(&meta_tag_);
175  call_.PerformOps(&meta_ops_);
176  }
177 
178  private:
179  friend class CallbackUnaryHandler<RequestType, ResponseType>;
180 
181  ServerCallbackUnaryImpl(
183  MessageHolder<RequestType, ResponseType>* allocator_state,
184  std::function<void()> call_requester)
185  : ctx_(ctx),
186  call_(*call),
187  allocator_state_(allocator_state),
188  call_requester_(std::move(call_requester)) {
189  ctx_->set_message_allocator_state(allocator_state);
190  }
191 
192  grpc_call* call() override { return call_.call(); }
193 
198  void SetupReactor(ServerUnaryReactor* reactor) {
199  reactor_.store(reactor, std::memory_order_relaxed);
200  this->BindReactor(reactor);
201  this->MaybeCallOnCancel(reactor);
202  this->MaybeDone(reactor->InternalInlineable());
203  }
204 
205  const RequestType* request() { return allocator_state_->request(); }
206  ResponseType* response() { return allocator_state_->response(); }
207 
208  void CallOnDone() override {
209  reactor_.load(std::memory_order_relaxed)->OnDone();
210  grpc_call* call = call_.call();
211  auto call_requester = std::move(call_requester_);
212  allocator_state_->Release();
213  if (ctx_->context_allocator() != nullptr) {
214  ctx_->context_allocator()->Release(ctx_);
215  }
216  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
217  grpc_call_unref(call);
218  call_requester();
219  }
220 
221  ServerReactor* reactor() override {
222  return reactor_.load(std::memory_order_relaxed);
223  }
224 
226  meta_ops_;
231  finish_ops_;
233 
234  grpc::CallbackServerContext* const ctx_;
235  grpc::internal::Call call_;
236  MessageHolder<RequestType, ResponseType>* const allocator_state_;
237  std::function<void()> call_requester_;
238  // reactor_ can always be loaded/stored with relaxed memory ordering because
239  // its value is only set once, independently of other data in the object,
240  // and the loads that use it will always actually come provably later even
241  // though they are from different threads since they are triggered by
242  // actions initiated only by the setting up of the reactor_ variable. In
243  // a sense, it's a delayed "const": it gets its value from the SetupReactor
244  // method (not the constructor, so it's not a true const), but it doesn't
245  // change after that and it only gets used by actions caused, directly or
246  // indirectly, by that setup. This comment also applies to the reactor_
247  // variables of the other streaming objects in this file.
248  std::atomic<ServerUnaryReactor*> reactor_;
249  // callbacks_outstanding_ follows a refcount pattern
250  std::atomic<intptr_t> callbacks_outstanding_{
251  3}; // reserve for start, Finish, and CompletionOp
252  };
253 };
254 
255 template <class RequestType, class ResponseType>
257  public:
259  std::function<ServerReadReactor<RequestType>*(
260  grpc::CallbackServerContext*, ResponseType*)>
261  get_reactor)
262  : get_reactor_(std::move(get_reactor)) {}
263  void RunHandler(const HandlerParameter& param) final {
264  // Arena allocate a reader structure (that includes response)
265  grpc_call_ref(param.call->call());
266 
267  auto* reader = new (grpc_call_arena_alloc(param.call->call(),
268  sizeof(ServerCallbackReaderImpl)))
269  ServerCallbackReaderImpl(
270  static_cast<grpc::CallbackServerContext*>(param.server_context),
271  param.call, param.call_requester);
272  // Inlineable OnDone can be false in the CompletionOp callback because there
273  // is no read reactor that has an inlineable OnDone; this only applies to
274  // the DefaultReactor (which is unary).
275  param.server_context->BeginCompletionOp(
276  param.call,
277  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
278  reader);
279 
280  ServerReadReactor<RequestType>* reactor = nullptr;
281  if (param.status.ok()) {
282  reactor =
283  grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
284  get_reactor_,
285  static_cast<grpc::CallbackServerContext*>(param.server_context),
286  reader->response());
287  }
288 
289  if (reactor == nullptr) {
290  // if deserialization or reactor creator failed, we need to fail the call
291  reactor = new (grpc_call_arena_alloc(
292  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
295  }
296 
297  reader->SetupReactor(reactor);
298  }
299 
300  private:
301  std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*,
302  ResponseType*)>
303  get_reactor_;
304 
305  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
306  public:
307  void Finish(grpc::Status s) override {
308  // A finish tag with only MaybeDone can have its callback inlined
309  // regardless even if OnDone is not inlineable because this callback just
310  // checks a ref and then decides whether or not to dispatch OnDone.
311  finish_tag_.Set(
312  call_.call(),
313  [this](bool) {
314  // Inlineable OnDone can be false here because there is
315  // no read reactor that has an inlineable OnDone; this
316  // only applies to the DefaultReactor (which is unary).
317  this->MaybeDone(/*inlineable_ondone=*/false);
318  },
319  &finish_ops_, /*can_inline=*/true);
320  if (!ctx_->sent_initial_metadata_) {
321  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
322  ctx_->initial_metadata_flags());
323  if (ctx_->compression_level_set()) {
324  finish_ops_.set_compression_level(ctx_->compression_level());
325  }
326  ctx_->sent_initial_metadata_ = true;
327  }
328  // The response is dropped if the status is not OK.
329  if (s.ok()) {
330  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
331  finish_ops_.SendMessagePtr(&resp_));
332  } else {
333  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
334  }
335  finish_ops_.set_core_cq_tag(&finish_tag_);
336  call_.PerformOps(&finish_ops_);
337  }
338 
339  void SendInitialMetadata() override {
340  ABSL_CHECK(!ctx_->sent_initial_metadata_);
341  this->Ref();
342  // The callback for this function should not be inlined because it invokes
343  // a user-controlled reaction, but any resulting OnDone can be inlined in
344  // the executor to which this callback is dispatched.
345  meta_tag_.Set(
346  call_.call(),
347  [this](bool ok) {
348  ServerReadReactor<RequestType>* reactor =
349  reactor_.load(std::memory_order_relaxed);
350  reactor->OnSendInitialMetadataDone(ok);
351  this->MaybeDone(/*inlineable_ondone=*/true);
352  },
353  &meta_ops_, /*can_inline=*/false);
354  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
355  ctx_->initial_metadata_flags());
356  if (ctx_->compression_level_set()) {
357  meta_ops_.set_compression_level(ctx_->compression_level());
358  }
359  ctx_->sent_initial_metadata_ = true;
360  meta_ops_.set_core_cq_tag(&meta_tag_);
361  call_.PerformOps(&meta_ops_);
362  }
363 
364  void Read(RequestType* req) override {
365  this->Ref();
366  read_ops_.RecvMessage(req);
367  call_.PerformOps(&read_ops_);
368  }
369 
370  private:
371  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
372 
373  ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx,
374  grpc::internal::Call* call,
375  std::function<void()> call_requester)
376  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
377 
378  grpc_call* call() override { return call_.call(); }
379 
380  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
381  reactor_.store(reactor, std::memory_order_relaxed);
382  // The callback for this function should not be inlined because it invokes
383  // a user-controlled reaction, but any resulting OnDone can be inlined in
384  // the executor to which this callback is dispatched.
385  read_tag_.Set(
386  call_.call(),
387  [this, reactor](bool ok) {
388  if (GPR_UNLIKELY(!ok)) {
389  ctx_->MaybeMarkCancelledOnRead();
390  }
391  reactor->OnReadDone(ok);
392  this->MaybeDone(/*inlineable_ondone=*/true);
393  },
394  &read_ops_, /*can_inline=*/false);
395  read_ops_.set_core_cq_tag(&read_tag_);
396  this->BindReactor(reactor);
397  this->MaybeCallOnCancel(reactor);
398  // Inlineable OnDone can be false here because there is no read
399  // reactor that has an inlineable OnDone; this only applies to the
400  // DefaultReactor (which is unary).
401  this->MaybeDone(/*inlineable_ondone=*/false);
402  }
403 
404  ~ServerCallbackReaderImpl() {}
405 
406  ResponseType* response() { return &resp_; }
407 
408  void CallOnDone() override {
409  reactor_.load(std::memory_order_relaxed)->OnDone();
410  grpc_call* call = call_.call();
411  auto call_requester = std::move(call_requester_);
412  if (ctx_->context_allocator() != nullptr) {
413  ctx_->context_allocator()->Release(ctx_);
414  }
415  this->~ServerCallbackReaderImpl(); // explicitly call destructor
416  grpc_call_unref(call);
417  call_requester();
418  }
419 
420  ServerReactor* reactor() override {
421  return reactor_.load(std::memory_order_relaxed);
422  }
423 
425  meta_ops_;
430  finish_ops_;
433  read_ops_;
435 
436  grpc::CallbackServerContext* const ctx_;
437  grpc::internal::Call call_;
438  ResponseType resp_;
439  std::function<void()> call_requester_;
440  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
441  std::atomic<ServerReadReactor<RequestType>*> reactor_;
442  // callbacks_outstanding_ follows a refcount pattern
443  std::atomic<intptr_t> callbacks_outstanding_{
444  3}; // reserve for OnStarted, Finish, and CompletionOp
445  };
446 };
447 
448 template <class RequestType, class ResponseType>
450  public:
452  std::function<ServerWriteReactor<ResponseType>*(
453  grpc::CallbackServerContext*, const RequestType*)>
454  get_reactor)
455  : get_reactor_(std::move(get_reactor)) {}
456  void RunHandler(const HandlerParameter& param) final {
457  // Arena allocate a writer structure
458  grpc_call_ref(param.call->call());
459 
460  auto* writer = new (grpc_call_arena_alloc(param.call->call(),
461  sizeof(ServerCallbackWriterImpl)))
462  ServerCallbackWriterImpl(
463  static_cast<grpc::CallbackServerContext*>(param.server_context),
464  param.call, static_cast<RequestType*>(param.request),
465  param.call_requester);
466  // Inlineable OnDone can be false in the CompletionOp callback because there
467  // is no write reactor that has an inlineable OnDone; this only applies to
468  // the DefaultReactor (which is unary).
469  param.server_context->BeginCompletionOp(
470  param.call,
471  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
472  writer);
473 
474  ServerWriteReactor<ResponseType>* reactor = nullptr;
475  if (param.status.ok()) {
478  get_reactor_,
479  static_cast<grpc::CallbackServerContext*>(param.server_context),
480  writer->request());
481  }
482  if (reactor == nullptr) {
483  // if deserialization or reactor creator failed, we need to fail the call
484  reactor = new (grpc_call_arena_alloc(
485  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
488  }
489 
490  writer->SetupReactor(reactor);
491  }
492 
494  grpc::Status* status, void** /*handler_data*/) final {
495  grpc::ByteBuffer buf;
496  buf.set_buffer(req);
497  auto* request =
498  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
499  *status =
501  buf.Release();
502  if (status->ok()) {
503  return request;
504  }
505  request->~RequestType();
506  return nullptr;
507  }
508 
509  private:
510  std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*,
511  const RequestType*)>
512  get_reactor_;
513 
514  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
515  public:
516  void Finish(grpc::Status s) override {
517  // A finish tag with only MaybeDone can have its callback inlined
518  // regardless even if OnDone is not inlineable because this callback just
519  // checks a ref and then decides whether or not to dispatch OnDone.
520  finish_tag_.Set(
521  call_.call(),
522  [this](bool) {
523  // Inlineable OnDone can be false here because there is
524  // no write reactor that has an inlineable OnDone; this
525  // only applies to the DefaultReactor (which is unary).
526  this->MaybeDone(/*inlineable_ondone=*/false);
527  },
528  &finish_ops_, /*can_inline=*/true);
529  finish_ops_.set_core_cq_tag(&finish_tag_);
530 
531  if (!ctx_->sent_initial_metadata_) {
532  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
533  ctx_->initial_metadata_flags());
534  if (ctx_->compression_level_set()) {
535  finish_ops_.set_compression_level(ctx_->compression_level());
536  }
537  ctx_->sent_initial_metadata_ = true;
538  }
539  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
540  call_.PerformOps(&finish_ops_);
541  }
542 
543  void SendInitialMetadata() override {
544  ABSL_CHECK(!ctx_->sent_initial_metadata_);
545  this->Ref();
546  // The callback for this function should not be inlined because it invokes
547  // a user-controlled reaction, but any resulting OnDone can be inlined in
548  // the executor to which this callback is dispatched.
549  meta_tag_.Set(
550  call_.call(),
551  [this](bool ok) {
552  ServerWriteReactor<ResponseType>* reactor =
553  reactor_.load(std::memory_order_relaxed);
554  reactor->OnSendInitialMetadataDone(ok);
555  this->MaybeDone(/*inlineable_ondone=*/true);
556  },
557  &meta_ops_, /*can_inline=*/false);
558  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
559  ctx_->initial_metadata_flags());
560  if (ctx_->compression_level_set()) {
561  meta_ops_.set_compression_level(ctx_->compression_level());
562  }
563  ctx_->sent_initial_metadata_ = true;
564  meta_ops_.set_core_cq_tag(&meta_tag_);
565  call_.PerformOps(&meta_ops_);
566  }
567 
568  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
569  this->Ref();
570  if (options.is_last_message()) {
571  options.set_buffer_hint();
572  }
573  if (!ctx_->sent_initial_metadata_) {
574  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
575  ctx_->initial_metadata_flags());
576  if (ctx_->compression_level_set()) {
577  write_ops_.set_compression_level(ctx_->compression_level());
578  }
579  ctx_->sent_initial_metadata_ = true;
580  }
581  // TODO(vjpai): don't assert
582  ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
583  call_.PerformOps(&write_ops_);
584  }
585 
586  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
587  grpc::Status s) override {
588  // This combines the write into the finish callback
589  // TODO(vjpai): don't assert
590  ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
591  Finish(std::move(s));
592  }
593 
594  private:
595  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
596 
597  ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx,
598  grpc::internal::Call* call, const RequestType* req,
599  std::function<void()> call_requester)
600  : ctx_(ctx),
601  call_(*call),
602  req_(req),
603  call_requester_(std::move(call_requester)) {}
604 
605  grpc_call* call() override { return call_.call(); }
606 
607  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
608  reactor_.store(reactor, std::memory_order_relaxed);
609  // The callback for this function should not be inlined because it invokes
610  // a user-controlled reaction, but any resulting OnDone can be inlined in
611  // the executor to which this callback is dispatched.
612  write_tag_.Set(
613  call_.call(),
614  [this, reactor](bool ok) {
615  reactor->OnWriteDone(ok);
616  this->MaybeDone(/*inlineable_ondone=*/true);
617  },
618  &write_ops_, /*can_inline=*/false);
619  write_ops_.set_core_cq_tag(&write_tag_);
620  this->BindReactor(reactor);
621  this->MaybeCallOnCancel(reactor);
622  // Inlineable OnDone can be false here because there is no write
623  // reactor that has an inlineable OnDone; this only applies to the
624  // DefaultReactor (which is unary).
625  this->MaybeDone(/*inlineable_ondone=*/false);
626  }
627  ~ServerCallbackWriterImpl() {
628  if (req_ != nullptr) {
629  req_->~RequestType();
630  }
631  }
632 
633  const RequestType* request() { return req_; }
634 
635  void CallOnDone() override {
636  reactor_.load(std::memory_order_relaxed)->OnDone();
637  grpc_call* call = call_.call();
638  auto call_requester = std::move(call_requester_);
639  if (ctx_->context_allocator() != nullptr) {
640  ctx_->context_allocator()->Release(ctx_);
641  }
642  this->~ServerCallbackWriterImpl(); // explicitly call destructor
643  grpc_call_unref(call);
644  call_requester();
645  }
646 
647  ServerReactor* reactor() override {
648  return reactor_.load(std::memory_order_relaxed);
649  }
650 
652  meta_ops_;
657  finish_ops_;
661  write_ops_;
663 
664  grpc::CallbackServerContext* const ctx_;
665  grpc::internal::Call call_;
666  const RequestType* req_;
667  std::function<void()> call_requester_;
668  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
669  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
670  // callbacks_outstanding_ follows a refcount pattern
671  std::atomic<intptr_t> callbacks_outstanding_{
672  3}; // reserve for OnStarted, Finish, and CompletionOp
673  };
674 };
675 
676 template <class RequestType, class ResponseType>
678  public:
682  get_reactor)
683  : get_reactor_(std::move(get_reactor)) {}
684  void RunHandler(const HandlerParameter& param) final {
685  grpc_call_ref(param.call->call());
686 
687  auto* stream = new (grpc_call_arena_alloc(
688  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
689  ServerCallbackReaderWriterImpl(
690  static_cast<grpc::CallbackServerContext*>(param.server_context),
691  param.call, param.call_requester);
692  // Inlineable OnDone can be false in the CompletionOp callback because there
693  // is no bidi reactor that has an inlineable OnDone; this only applies to
694  // the DefaultReactor (which is unary).
695  param.server_context->BeginCompletionOp(
696  param.call,
697  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
698  stream);
699 
701  if (param.status.ok()) {
704  get_reactor_,
705  static_cast<grpc::CallbackServerContext*>(param.server_context));
706  }
707 
708  if (reactor == nullptr) {
709  // if deserialization or reactor creator failed, we need to fail the call
710  reactor = new (grpc_call_arena_alloc(
711  param.call->call(),
715  }
716 
717  stream->SetupReactor(reactor);
718  }
719 
720  private:
721  std::function<ServerBidiReactor<RequestType, ResponseType>*(
723  get_reactor_;
724 
725  class ServerCallbackReaderWriterImpl
726  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
727  public:
728  void Finish(grpc::Status s) override {
729  // A finish tag with only MaybeDone can have its callback inlined
730  // regardless even if OnDone is not inlineable because this callback just
731  // checks a ref and then decides whether or not to dispatch OnDone.
732  finish_tag_.Set(
733  call_.call(),
734  [this](bool) {
735  // Inlineable OnDone can be false here because there is
736  // no bidi reactor that has an inlineable OnDone; this
737  // only applies to the DefaultReactor (which is unary).
738  this->MaybeDone(/*inlineable_ondone=*/false);
739  },
740  &finish_ops_, /*can_inline=*/true);
741  finish_ops_.set_core_cq_tag(&finish_tag_);
742 
743  if (!ctx_->sent_initial_metadata_) {
744  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
745  ctx_->initial_metadata_flags());
746  if (ctx_->compression_level_set()) {
747  finish_ops_.set_compression_level(ctx_->compression_level());
748  }
749  ctx_->sent_initial_metadata_ = true;
750  }
751  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
752  call_.PerformOps(&finish_ops_);
753  }
754 
755  void SendInitialMetadata() override {
756  ABSL_CHECK(!ctx_->sent_initial_metadata_);
757  this->Ref();
758  // The callback for this function should not be inlined because it invokes
759  // a user-controlled reaction, but any resulting OnDone can be inlined in
760  // the executor to which this callback is dispatched.
761  meta_tag_.Set(
762  call_.call(),
763  [this](bool ok) {
764  ServerBidiReactor<RequestType, ResponseType>* reactor =
765  reactor_.load(std::memory_order_relaxed);
766  reactor->OnSendInitialMetadataDone(ok);
767  this->MaybeDone(/*inlineable_ondone=*/true);
768  },
769  &meta_ops_, /*can_inline=*/false);
770  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
771  ctx_->initial_metadata_flags());
772  if (ctx_->compression_level_set()) {
773  meta_ops_.set_compression_level(ctx_->compression_level());
774  }
775  ctx_->sent_initial_metadata_ = true;
776  meta_ops_.set_core_cq_tag(&meta_tag_);
777  call_.PerformOps(&meta_ops_);
778  }
779 
780  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
781  this->Ref();
782  if (options.is_last_message()) {
783  options.set_buffer_hint();
784  }
785  if (!ctx_->sent_initial_metadata_) {
786  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
787  ctx_->initial_metadata_flags());
788  if (ctx_->compression_level_set()) {
789  write_ops_.set_compression_level(ctx_->compression_level());
790  }
791  ctx_->sent_initial_metadata_ = true;
792  }
793  // TODO(vjpai): don't assert
794  ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
795  call_.PerformOps(&write_ops_);
796  }
797 
798  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
799  grpc::Status s) override {
800  // TODO(vjpai): don't assert
801  ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
802  Finish(std::move(s));
803  }
804 
805  void Read(RequestType* req) override {
806  this->Ref();
807  read_ops_.RecvMessage(req);
808  call_.PerformOps(&read_ops_);
809  }
810 
811  private:
812  friend class CallbackBidiHandler<RequestType, ResponseType>;
813 
814  ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx,
815  grpc::internal::Call* call,
816  std::function<void()> call_requester)
817  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
818 
819  grpc_call* call() override { return call_.call(); }
820 
821  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
822  reactor_.store(reactor, std::memory_order_relaxed);
823  // The callbacks for these functions should not be inlined because they
824  // invoke user-controlled reactions, but any resulting OnDones can be
825  // inlined in the executor to which a callback is dispatched.
826  write_tag_.Set(
827  call_.call(),
828  [this, reactor](bool ok) {
829  reactor->OnWriteDone(ok);
830  this->MaybeDone(/*inlineable_ondone=*/true);
831  },
832  &write_ops_, /*can_inline=*/false);
833  write_ops_.set_core_cq_tag(&write_tag_);
834  read_tag_.Set(
835  call_.call(),
836  [this, reactor](bool ok) {
837  if (GPR_UNLIKELY(!ok)) {
838  ctx_->MaybeMarkCancelledOnRead();
839  }
840  reactor->OnReadDone(ok);
841  this->MaybeDone(/*inlineable_ondone=*/true);
842  },
843  &read_ops_, /*can_inline=*/false);
844  read_ops_.set_core_cq_tag(&read_tag_);
845  this->BindReactor(reactor);
846  this->MaybeCallOnCancel(reactor);
847  // Inlineable OnDone can be false here because there is no bidi
848  // reactor that has an inlineable OnDone; this only applies to the
849  // DefaultReactor (which is unary).
850  this->MaybeDone(/*inlineable_ondone=*/false);
851  }
852 
853  void CallOnDone() override {
854  reactor_.load(std::memory_order_relaxed)->OnDone();
855  grpc_call* call = call_.call();
856  auto call_requester = std::move(call_requester_);
857  if (ctx_->context_allocator() != nullptr) {
858  ctx_->context_allocator()->Release(ctx_);
859  }
860  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
861  grpc_call_unref(call);
862  call_requester();
863  }
864 
865  ServerReactor* reactor() override {
866  return reactor_.load(std::memory_order_relaxed);
867  }
868 
870  meta_ops_;
875  finish_ops_;
879  write_ops_;
882  read_ops_;
884 
885  grpc::CallbackServerContext* const ctx_;
886  grpc::internal::Call call_;
887  std::function<void()> call_requester_;
888  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
889  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
890  // callbacks_outstanding_ follows a refcount pattern
891  std::atomic<intptr_t> callbacks_outstanding_{
892  3}; // reserve for OnStarted, Finish, and CompletionOp
893  };
894 };
895 
896 } // namespace internal
897 } // namespace grpc
898 
899 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
grpc::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:191
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:138
grpc::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
message_allocator.h
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:656
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata >
grpc::CallbackServerContext
Definition: server_context.h:618
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:288
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:43
grpc::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:46
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:36
rpc_service_method.h
grpc::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:263
status.h
grpc::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:210
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:218
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:110
grpc::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:258
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:125
grpc_call_ref
GRPCAPI void grpc_call_ref(grpc_call *call)
Ref a call.
grpc::ServerCallbackReaderWriter
Definition: server_callback.h:247
grpc::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:38
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
log.h
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Unref a call.
grpc::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: server_callback_handlers.h:493
grpc::MessageAllocator< RequestType, ResponseType >
grpc.h
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:69
grpc::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:195
grpc_byte_buffer
Definition: grpc_types.h:42
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc::internal::CallbackUnaryHandler
Definition: server_callback_handlers.h:36
grpc::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::DefaultMessageHolder
Definition: server_callback.h:170
grpc::internal::CallbackServerStreamingHandler
Definition: server_callback_handlers.h:449
grpc::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:451
grpc::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:684
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:49
grpc::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:224
grpc::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::internal::FinishOnlyReactor
Definition: server_context.h:88
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:40
server_callback.h
grpc::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:45
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:80
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:193
grpc::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:783
grpc::internal::CallbackBidiHandler
Definition: server_callback_handlers.h:677
grpc::UNIMPLEMENTED
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:117
grpc::ServerCallbackWriter
Definition: server_callback.h:230
call.h
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:70
server_context.h
std
Definition: async_unary_call.h:407
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:118
grpc::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:44
grpc::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::CallbackClientStreamingHandler
Definition: server_callback_handlers.h:256
grpc::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback.h:128
grpc::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:679
grpc::ServerCallbackReader
Definition: server_callback.h:216
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:173
grpc::MessageHolder< RequestType, ResponseType >
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:144
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:55
grpc::ServerCallbackUnary
Definition: server_callback.h:200
grpc::ServerUnaryReactor
Definition: server_callback.h:706
grpc::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:456
grpc::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback.h:96
grpc::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:84