GRPC C++  1.70.1
server_callback_handlers.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 
18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
20 
21 #include <grpc/grpc.h>
22 #include <grpc/impl/call.h>
24 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/status.h>
28 
29 #include "absl/log/absl_check.h"
30 
31 namespace grpc {
32 namespace internal {
33 
34 template <class RequestType, class ResponseType>
36  public:
39  const RequestType*, ResponseType*)>
40  get_reactor)
41  : get_reactor_(std::move(get_reactor)) {}
42 
45  allocator_ = allocator;
46  }
47 
48  void RunHandler(const HandlerParameter& param) final {
49  // Arena allocate a controller structure (that includes request/response)
50  grpc_call_ref(param.call->call());
51  auto* allocator_state =
53  param.internal_data);
54 
55  auto* call = new (grpc_call_arena_alloc(param.call->call(),
56  sizeof(ServerCallbackUnaryImpl)))
57  ServerCallbackUnaryImpl(
58  static_cast<grpc::CallbackServerContext*>(param.server_context),
59  param.call, allocator_state, param.call_requester);
60  param.server_context->BeginCompletionOp(
61  param.call, [call](bool) { call->MaybeDone(); }, call);
62 
63  ServerUnaryReactor* reactor = nullptr;
64  if (param.status.ok()) {
65  reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
66  get_reactor_,
67  static_cast<grpc::CallbackServerContext*>(param.server_context),
68  call->request(), call->response());
69  }
70 
71  if (reactor == nullptr) {
72  // if deserialization or reactor creator failed, we need to fail the call
73  reactor = new (grpc_call_arena_alloc(param.call->call(),
77  }
78 
80  call->SetupReactor(reactor);
81  }
82 
84  grpc::Status* status, void** handler_data) final {
85  grpc::ByteBuffer buf;
86  buf.set_buffer(req);
87  RequestType* request = nullptr;
89  if (allocator_ != nullptr) {
90  allocator_state = allocator_->AllocateMessages();
91  } else {
92  allocator_state = new (grpc_call_arena_alloc(
95  }
96  *handler_data = allocator_state;
97  request = allocator_state->request();
98  *status =
100  buf.Release();
101  if (status->ok()) {
102  return request;
103  }
104  return nullptr;
105  }
106 
107  private:
109  const RequestType*, ResponseType*)>
110  get_reactor_;
111  MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
112 
113  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
114  public:
115  void Finish(grpc::Status s) override {
116  // A callback that only contains a call to MaybeDone can be run as an
117  // inline callback regardless of whether or not OnDone is inlineable
118  // because if the actual OnDone callback needs to be scheduled, MaybeDone
119  // is responsible for dispatching to an executor thread if needed. Thus,
120  // when setting up the finish_tag_, we can set its own callback to
121  // inlineable.
122  finish_tag_.Set(
123  call_.call(),
124  [this](bool) {
125  this->MaybeDone(
126  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
127  },
128  &finish_ops_, /*can_inline=*/true);
129  finish_ops_.set_core_cq_tag(&finish_tag_);
130 
131  if (!ctx_->sent_initial_metadata_) {
132  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
133  ctx_->initial_metadata_flags());
134  if (ctx_->compression_level_set()) {
135  finish_ops_.set_compression_level(ctx_->compression_level());
136  }
137  ctx_->sent_initial_metadata_ = true;
138  }
139  // The response is dropped if the status is not OK.
140  if (s.ok()) {
141  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
142  finish_ops_.SendMessagePtr(response()));
143  } else {
144  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
145  }
146  finish_ops_.set_core_cq_tag(&finish_tag_);
147  call_.PerformOps(&finish_ops_);
148  }
149 
150  void SendInitialMetadata() override {
151  ABSL_CHECK(!ctx_->sent_initial_metadata_);
152  this->Ref();
153  // The callback for this function should not be marked inline because it
154  // is directly invoking a user-controlled reaction
155  // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
156  // thread. However, any OnDone needed after that can be inlined because it
157  // is already running on an executor thread.
158  meta_tag_.Set(
159  call_.call(),
160  [this](bool ok) {
161  ServerUnaryReactor* reactor =
162  reactor_.load(std::memory_order_relaxed);
163  reactor->OnSendInitialMetadataDone(ok);
164  this->MaybeDone(/*inlineable_ondone=*/true);
165  },
166  &meta_ops_, /*can_inline=*/false);
167  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
168  ctx_->initial_metadata_flags());
169  if (ctx_->compression_level_set()) {
170  meta_ops_.set_compression_level(ctx_->compression_level());
171  }
172  ctx_->sent_initial_metadata_ = true;
173  meta_ops_.set_core_cq_tag(&meta_tag_);
174  call_.PerformOps(&meta_ops_);
175  }
176 
177  private:
178  friend class CallbackUnaryHandler<RequestType, ResponseType>;
179 
180  ServerCallbackUnaryImpl(
182  MessageHolder<RequestType, ResponseType>* allocator_state,
183  std::function<void()> call_requester)
184  : ctx_(ctx),
185  call_(*call),
186  allocator_state_(allocator_state),
187  call_requester_(std::move(call_requester)) {
188  ctx_->set_message_allocator_state(allocator_state);
189  }
190 
191  grpc_call* call() override { return call_.call(); }
192 
197  void SetupReactor(ServerUnaryReactor* reactor) {
198  reactor_.store(reactor, std::memory_order_relaxed);
199  this->BindReactor(reactor);
200  this->MaybeCallOnCancel(reactor);
201  this->MaybeDone(reactor->InternalInlineable());
202  }
203 
204  const RequestType* request() { return allocator_state_->request(); }
205  ResponseType* response() { return allocator_state_->response(); }
206 
207  void CallOnDone() override {
208  reactor_.load(std::memory_order_relaxed)->OnDone();
209  grpc_call* call = call_.call();
210  auto call_requester = std::move(call_requester_);
211  allocator_state_->Release();
212  if (ctx_->context_allocator() != nullptr) {
213  ctx_->context_allocator()->Release(ctx_);
214  }
215  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
216  grpc_call_unref(call);
217  call_requester();
218  }
219 
220  ServerReactor* reactor() override {
221  return reactor_.load(std::memory_order_relaxed);
222  }
223 
225  meta_ops_;
230  finish_ops_;
232 
233  grpc::CallbackServerContext* const ctx_;
234  grpc::internal::Call call_;
235  MessageHolder<RequestType, ResponseType>* const allocator_state_;
236  std::function<void()> call_requester_;
237  // reactor_ can always be loaded/stored with relaxed memory ordering because
238  // its value is only set once, independently of other data in the object,
239  // and the loads that use it will always actually come provably later even
240  // though they are from different threads since they are triggered by
241  // actions initiated only by the setting up of the reactor_ variable. In
242  // a sense, it's a delayed "const": it gets its value from the SetupReactor
243  // method (not the constructor, so it's not a true const), but it doesn't
244  // change after that and it only gets used by actions caused, directly or
245  // indirectly, by that setup. This comment also applies to the reactor_
246  // variables of the other streaming objects in this file.
247  std::atomic<ServerUnaryReactor*> reactor_;
248  // callbacks_outstanding_ follows a refcount pattern
249  std::atomic<intptr_t> callbacks_outstanding_{
250  3}; // reserve for start, Finish, and CompletionOp
251  };
252 };
253 
254 template <class RequestType, class ResponseType>
256  public:
258  std::function<ServerReadReactor<RequestType>*(
259  grpc::CallbackServerContext*, ResponseType*)>
260  get_reactor)
261  : get_reactor_(std::move(get_reactor)) {}
262  void RunHandler(const HandlerParameter& param) final {
263  // Arena allocate a reader structure (that includes response)
264  grpc_call_ref(param.call->call());
265 
266  auto* reader = new (grpc_call_arena_alloc(param.call->call(),
267  sizeof(ServerCallbackReaderImpl)))
268  ServerCallbackReaderImpl(
269  static_cast<grpc::CallbackServerContext*>(param.server_context),
270  param.call, param.call_requester);
271  // Inlineable OnDone can be false in the CompletionOp callback because there
272  // is no read reactor that has an inlineable OnDone; this only applies to
273  // the DefaultReactor (which is unary).
274  param.server_context->BeginCompletionOp(
275  param.call,
276  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
277  reader);
278 
279  ServerReadReactor<RequestType>* reactor = nullptr;
280  if (param.status.ok()) {
281  reactor =
282  grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
283  get_reactor_,
284  static_cast<grpc::CallbackServerContext*>(param.server_context),
285  reader->response());
286  }
287 
288  if (reactor == nullptr) {
289  // if deserialization or reactor creator failed, we need to fail the call
290  reactor = new (grpc_call_arena_alloc(
291  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
294  }
295 
296  reader->SetupReactor(reactor);
297  }
298 
299  private:
300  std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*,
301  ResponseType*)>
302  get_reactor_;
303 
304  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
305  public:
306  void Finish(grpc::Status s) override {
307  // A finish tag with only MaybeDone can have its callback inlined
308  // regardless even if OnDone is not inlineable because this callback just
309  // checks a ref and then decides whether or not to dispatch OnDone.
310  finish_tag_.Set(
311  call_.call(),
312  [this](bool) {
313  // Inlineable OnDone can be false here because there is
314  // no read reactor that has an inlineable OnDone; this
315  // only applies to the DefaultReactor (which is unary).
316  this->MaybeDone(/*inlineable_ondone=*/false);
317  },
318  &finish_ops_, /*can_inline=*/true);
319  if (!ctx_->sent_initial_metadata_) {
320  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
321  ctx_->initial_metadata_flags());
322  if (ctx_->compression_level_set()) {
323  finish_ops_.set_compression_level(ctx_->compression_level());
324  }
325  ctx_->sent_initial_metadata_ = true;
326  }
327  // The response is dropped if the status is not OK.
328  if (s.ok()) {
329  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
330  finish_ops_.SendMessagePtr(&resp_));
331  } else {
332  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
333  }
334  finish_ops_.set_core_cq_tag(&finish_tag_);
335  call_.PerformOps(&finish_ops_);
336  }
337 
338  void SendInitialMetadata() override {
339  ABSL_CHECK(!ctx_->sent_initial_metadata_);
340  this->Ref();
341  // The callback for this function should not be inlined because it invokes
342  // a user-controlled reaction, but any resulting OnDone can be inlined in
343  // the executor to which this callback is dispatched.
344  meta_tag_.Set(
345  call_.call(),
346  [this](bool ok) {
347  ServerReadReactor<RequestType>* reactor =
348  reactor_.load(std::memory_order_relaxed);
349  reactor->OnSendInitialMetadataDone(ok);
350  this->MaybeDone(/*inlineable_ondone=*/true);
351  },
352  &meta_ops_, /*can_inline=*/false);
353  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354  ctx_->initial_metadata_flags());
355  if (ctx_->compression_level_set()) {
356  meta_ops_.set_compression_level(ctx_->compression_level());
357  }
358  ctx_->sent_initial_metadata_ = true;
359  meta_ops_.set_core_cq_tag(&meta_tag_);
360  call_.PerformOps(&meta_ops_);
361  }
362 
363  void Read(RequestType* req) override {
364  this->Ref();
365  read_ops_.RecvMessage(req);
366  call_.PerformOps(&read_ops_);
367  }
368 
369  private:
370  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
371 
372  ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx,
373  grpc::internal::Call* call,
374  std::function<void()> call_requester)
375  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
376 
377  grpc_call* call() override { return call_.call(); }
378 
379  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
380  reactor_.store(reactor, std::memory_order_relaxed);
381  // The callback for this function should not be inlined because it invokes
382  // a user-controlled reaction, but any resulting OnDone can be inlined in
383  // the executor to which this callback is dispatched.
384  read_tag_.Set(
385  call_.call(),
386  [this, reactor](bool ok) {
387  if (GPR_UNLIKELY(!ok)) {
388  ctx_->MaybeMarkCancelledOnRead();
389  }
390  reactor->OnReadDone(ok);
391  this->MaybeDone(/*inlineable_ondone=*/true);
392  },
393  &read_ops_, /*can_inline=*/false);
394  read_ops_.set_core_cq_tag(&read_tag_);
395  this->BindReactor(reactor);
396  this->MaybeCallOnCancel(reactor);
397  // Inlineable OnDone can be false here because there is no read
398  // reactor that has an inlineable OnDone; this only applies to the
399  // DefaultReactor (which is unary).
400  this->MaybeDone(/*inlineable_ondone=*/false);
401  }
402 
403  ~ServerCallbackReaderImpl() {}
404 
405  ResponseType* response() { return &resp_; }
406 
407  void CallOnDone() override {
408  reactor_.load(std::memory_order_relaxed)->OnDone();
409  grpc_call* call = call_.call();
410  auto call_requester = std::move(call_requester_);
411  if (ctx_->context_allocator() != nullptr) {
412  ctx_->context_allocator()->Release(ctx_);
413  }
414  this->~ServerCallbackReaderImpl(); // explicitly call destructor
415  grpc_call_unref(call);
416  call_requester();
417  }
418 
419  ServerReactor* reactor() override {
420  return reactor_.load(std::memory_order_relaxed);
421  }
422 
424  meta_ops_;
429  finish_ops_;
432  read_ops_;
434 
435  grpc::CallbackServerContext* const ctx_;
436  grpc::internal::Call call_;
437  ResponseType resp_;
438  std::function<void()> call_requester_;
439  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
440  std::atomic<ServerReadReactor<RequestType>*> reactor_;
441  // callbacks_outstanding_ follows a refcount pattern
442  std::atomic<intptr_t> callbacks_outstanding_{
443  3}; // reserve for OnStarted, Finish, and CompletionOp
444  };
445 };
446 
447 template <class RequestType, class ResponseType>
449  public:
451  std::function<ServerWriteReactor<ResponseType>*(
452  grpc::CallbackServerContext*, const RequestType*)>
453  get_reactor)
454  : get_reactor_(std::move(get_reactor)) {}
455  void RunHandler(const HandlerParameter& param) final {
456  // Arena allocate a writer structure
457  grpc_call_ref(param.call->call());
458 
459  auto* writer = new (grpc_call_arena_alloc(param.call->call(),
460  sizeof(ServerCallbackWriterImpl)))
461  ServerCallbackWriterImpl(
462  static_cast<grpc::CallbackServerContext*>(param.server_context),
463  param.call, static_cast<RequestType*>(param.request),
464  param.call_requester);
465  // Inlineable OnDone can be false in the CompletionOp callback because there
466  // is no write reactor that has an inlineable OnDone; this only applies to
467  // the DefaultReactor (which is unary).
468  param.server_context->BeginCompletionOp(
469  param.call,
470  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
471  writer);
472 
473  ServerWriteReactor<ResponseType>* reactor = nullptr;
474  if (param.status.ok()) {
477  get_reactor_,
478  static_cast<grpc::CallbackServerContext*>(param.server_context),
479  writer->request());
480  }
481  if (reactor == nullptr) {
482  // if deserialization or reactor creator failed, we need to fail the call
483  reactor = new (grpc_call_arena_alloc(
484  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
487  }
488 
489  writer->SetupReactor(reactor);
490  }
491 
493  grpc::Status* status, void** /*handler_data*/) final {
494  grpc::ByteBuffer buf;
495  buf.set_buffer(req);
496  auto* request =
497  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
498  *status =
500  buf.Release();
501  if (status->ok()) {
502  return request;
503  }
504  request->~RequestType();
505  return nullptr;
506  }
507 
508  private:
509  std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*,
510  const RequestType*)>
511  get_reactor_;
512 
513  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
514  public:
515  void Finish(grpc::Status s) override {
516  // A finish tag with only MaybeDone can have its callback inlined
517  // regardless even if OnDone is not inlineable because this callback just
518  // checks a ref and then decides whether or not to dispatch OnDone.
519  finish_tag_.Set(
520  call_.call(),
521  [this](bool) {
522  // Inlineable OnDone can be false here because there is
523  // no write reactor that has an inlineable OnDone; this
524  // only applies to the DefaultReactor (which is unary).
525  this->MaybeDone(/*inlineable_ondone=*/false);
526  },
527  &finish_ops_, /*can_inline=*/true);
528  finish_ops_.set_core_cq_tag(&finish_tag_);
529 
530  if (!ctx_->sent_initial_metadata_) {
531  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
532  ctx_->initial_metadata_flags());
533  if (ctx_->compression_level_set()) {
534  finish_ops_.set_compression_level(ctx_->compression_level());
535  }
536  ctx_->sent_initial_metadata_ = true;
537  }
538  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
539  call_.PerformOps(&finish_ops_);
540  }
541 
542  void SendInitialMetadata() override {
543  ABSL_CHECK(!ctx_->sent_initial_metadata_);
544  this->Ref();
545  // The callback for this function should not be inlined because it invokes
546  // a user-controlled reaction, but any resulting OnDone can be inlined in
547  // the executor to which this callback is dispatched.
548  meta_tag_.Set(
549  call_.call(),
550  [this](bool ok) {
551  ServerWriteReactor<ResponseType>* reactor =
552  reactor_.load(std::memory_order_relaxed);
553  reactor->OnSendInitialMetadataDone(ok);
554  this->MaybeDone(/*inlineable_ondone=*/true);
555  },
556  &meta_ops_, /*can_inline=*/false);
557  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
558  ctx_->initial_metadata_flags());
559  if (ctx_->compression_level_set()) {
560  meta_ops_.set_compression_level(ctx_->compression_level());
561  }
562  ctx_->sent_initial_metadata_ = true;
563  meta_ops_.set_core_cq_tag(&meta_tag_);
564  call_.PerformOps(&meta_ops_);
565  }
566 
567  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
568  this->Ref();
569  if (options.is_last_message()) {
570  options.set_buffer_hint();
571  }
572  if (!ctx_->sent_initial_metadata_) {
573  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574  ctx_->initial_metadata_flags());
575  if (ctx_->compression_level_set()) {
576  write_ops_.set_compression_level(ctx_->compression_level());
577  }
578  ctx_->sent_initial_metadata_ = true;
579  }
580  // TODO(vjpai): don't assert
581  ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
582  call_.PerformOps(&write_ops_);
583  }
584 
585  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
586  grpc::Status s) override {
587  // This combines the write into the finish callback
588  // TODO(vjpai): don't assert
589  ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
590  Finish(std::move(s));
591  }
592 
593  private:
594  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
595 
596  ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx,
597  grpc::internal::Call* call, const RequestType* req,
598  std::function<void()> call_requester)
599  : ctx_(ctx),
600  call_(*call),
601  req_(req),
602  call_requester_(std::move(call_requester)) {}
603 
604  grpc_call* call() override { return call_.call(); }
605 
606  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
607  reactor_.store(reactor, std::memory_order_relaxed);
608  // The callback for this function should not be inlined because it invokes
609  // a user-controlled reaction, but any resulting OnDone can be inlined in
610  // the executor to which this callback is dispatched.
611  write_tag_.Set(
612  call_.call(),
613  [this, reactor](bool ok) {
614  reactor->OnWriteDone(ok);
615  this->MaybeDone(/*inlineable_ondone=*/true);
616  },
617  &write_ops_, /*can_inline=*/false);
618  write_ops_.set_core_cq_tag(&write_tag_);
619  this->BindReactor(reactor);
620  this->MaybeCallOnCancel(reactor);
621  // Inlineable OnDone can be false here because there is no write
622  // reactor that has an inlineable OnDone; this only applies to the
623  // DefaultReactor (which is unary).
624  this->MaybeDone(/*inlineable_ondone=*/false);
625  }
626  ~ServerCallbackWriterImpl() {
627  if (req_ != nullptr) {
628  req_->~RequestType();
629  }
630  }
631 
632  const RequestType* request() { return req_; }
633 
634  void CallOnDone() override {
635  reactor_.load(std::memory_order_relaxed)->OnDone();
636  grpc_call* call = call_.call();
637  auto call_requester = std::move(call_requester_);
638  if (ctx_->context_allocator() != nullptr) {
639  ctx_->context_allocator()->Release(ctx_);
640  }
641  this->~ServerCallbackWriterImpl(); // explicitly call destructor
642  grpc_call_unref(call);
643  call_requester();
644  }
645 
646  ServerReactor* reactor() override {
647  return reactor_.load(std::memory_order_relaxed);
648  }
649 
651  meta_ops_;
656  finish_ops_;
660  write_ops_;
662 
663  grpc::CallbackServerContext* const ctx_;
664  grpc::internal::Call call_;
665  const RequestType* req_;
666  std::function<void()> call_requester_;
667  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
668  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
669  // callbacks_outstanding_ follows a refcount pattern
670  std::atomic<intptr_t> callbacks_outstanding_{
671  3}; // reserve for OnStarted, Finish, and CompletionOp
672  };
673 };
674 
675 template <class RequestType, class ResponseType>
677  public:
681  get_reactor)
682  : get_reactor_(std::move(get_reactor)) {}
683  void RunHandler(const HandlerParameter& param) final {
684  grpc_call_ref(param.call->call());
685 
686  auto* stream = new (grpc_call_arena_alloc(
687  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
688  ServerCallbackReaderWriterImpl(
689  static_cast<grpc::CallbackServerContext*>(param.server_context),
690  param.call, param.call_requester);
691  // Inlineable OnDone can be false in the CompletionOp callback because there
692  // is no bidi reactor that has an inlineable OnDone; this only applies to
693  // the DefaultReactor (which is unary).
694  param.server_context->BeginCompletionOp(
695  param.call,
696  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
697  stream);
698 
700  if (param.status.ok()) {
703  get_reactor_,
704  static_cast<grpc::CallbackServerContext*>(param.server_context));
705  }
706 
707  if (reactor == nullptr) {
708  // if deserialization or reactor creator failed, we need to fail the call
709  reactor = new (grpc_call_arena_alloc(
710  param.call->call(),
714  }
715 
716  stream->SetupReactor(reactor);
717  }
718 
719  private:
720  std::function<ServerBidiReactor<RequestType, ResponseType>*(
722  get_reactor_;
723 
724  class ServerCallbackReaderWriterImpl
725  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
726  public:
727  void Finish(grpc::Status s) override {
728  // A finish tag with only MaybeDone can have its callback inlined
729  // regardless even if OnDone is not inlineable because this callback just
730  // checks a ref and then decides whether or not to dispatch OnDone.
731  finish_tag_.Set(
732  call_.call(),
733  [this](bool) {
734  // Inlineable OnDone can be false here because there is
735  // no bidi reactor that has an inlineable OnDone; this
736  // only applies to the DefaultReactor (which is unary).
737  this->MaybeDone(/*inlineable_ondone=*/false);
738  },
739  &finish_ops_, /*can_inline=*/true);
740  finish_ops_.set_core_cq_tag(&finish_tag_);
741 
742  if (!ctx_->sent_initial_metadata_) {
743  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
744  ctx_->initial_metadata_flags());
745  if (ctx_->compression_level_set()) {
746  finish_ops_.set_compression_level(ctx_->compression_level());
747  }
748  ctx_->sent_initial_metadata_ = true;
749  }
750  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
751  call_.PerformOps(&finish_ops_);
752  }
753 
754  void SendInitialMetadata() override {
755  ABSL_CHECK(!ctx_->sent_initial_metadata_);
756  this->Ref();
757  // The callback for this function should not be inlined because it invokes
758  // a user-controlled reaction, but any resulting OnDone can be inlined in
759  // the executor to which this callback is dispatched.
760  meta_tag_.Set(
761  call_.call(),
762  [this](bool ok) {
763  ServerBidiReactor<RequestType, ResponseType>* reactor =
764  reactor_.load(std::memory_order_relaxed);
765  reactor->OnSendInitialMetadataDone(ok);
766  this->MaybeDone(/*inlineable_ondone=*/true);
767  },
768  &meta_ops_, /*can_inline=*/false);
769  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
770  ctx_->initial_metadata_flags());
771  if (ctx_->compression_level_set()) {
772  meta_ops_.set_compression_level(ctx_->compression_level());
773  }
774  ctx_->sent_initial_metadata_ = true;
775  meta_ops_.set_core_cq_tag(&meta_tag_);
776  call_.PerformOps(&meta_ops_);
777  }
778 
779  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
780  this->Ref();
781  if (options.is_last_message()) {
782  options.set_buffer_hint();
783  }
784  if (!ctx_->sent_initial_metadata_) {
785  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
786  ctx_->initial_metadata_flags());
787  if (ctx_->compression_level_set()) {
788  write_ops_.set_compression_level(ctx_->compression_level());
789  }
790  ctx_->sent_initial_metadata_ = true;
791  }
792  // TODO(vjpai): don't assert
793  ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
794  call_.PerformOps(&write_ops_);
795  }
796 
797  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
798  grpc::Status s) override {
799  // TODO(vjpai): don't assert
800  ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
801  Finish(std::move(s));
802  }
803 
804  void Read(RequestType* req) override {
805  this->Ref();
806  read_ops_.RecvMessage(req);
807  call_.PerformOps(&read_ops_);
808  }
809 
810  private:
811  friend class CallbackBidiHandler<RequestType, ResponseType>;
812 
813  ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx,
814  grpc::internal::Call* call,
815  std::function<void()> call_requester)
816  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
817 
818  grpc_call* call() override { return call_.call(); }
819 
820  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
821  reactor_.store(reactor, std::memory_order_relaxed);
822  // The callbacks for these functions should not be inlined because they
823  // invoke user-controlled reactions, but any resulting OnDones can be
824  // inlined in the executor to which a callback is dispatched.
825  write_tag_.Set(
826  call_.call(),
827  [this, reactor](bool ok) {
828  reactor->OnWriteDone(ok);
829  this->MaybeDone(/*inlineable_ondone=*/true);
830  },
831  &write_ops_, /*can_inline=*/false);
832  write_ops_.set_core_cq_tag(&write_tag_);
833  read_tag_.Set(
834  call_.call(),
835  [this, reactor](bool ok) {
836  if (GPR_UNLIKELY(!ok)) {
837  ctx_->MaybeMarkCancelledOnRead();
838  }
839  reactor->OnReadDone(ok);
840  this->MaybeDone(/*inlineable_ondone=*/true);
841  },
842  &read_ops_, /*can_inline=*/false);
843  read_ops_.set_core_cq_tag(&read_tag_);
844  this->BindReactor(reactor);
845  this->MaybeCallOnCancel(reactor);
846  // Inlineable OnDone can be false here because there is no bidi
847  // reactor that has an inlineable OnDone; this only applies to the
848  // DefaultReactor (which is unary).
849  this->MaybeDone(/*inlineable_ondone=*/false);
850  }
851 
852  void CallOnDone() override {
853  reactor_.load(std::memory_order_relaxed)->OnDone();
854  grpc_call* call = call_.call();
855  auto call_requester = std::move(call_requester_);
856  if (ctx_->context_allocator() != nullptr) {
857  ctx_->context_allocator()->Release(ctx_);
858  }
859  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
860  grpc_call_unref(call);
861  call_requester();
862  }
863 
864  ServerReactor* reactor() override {
865  return reactor_.load(std::memory_order_relaxed);
866  }
867 
869  meta_ops_;
874  finish_ops_;
878  write_ops_;
881  read_ops_;
883 
884  grpc::CallbackServerContext* const ctx_;
885  grpc::internal::Call call_;
886  std::function<void()> call_requester_;
887  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
888  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
889  // callbacks_outstanding_ follows a refcount pattern
890  std::atomic<intptr_t> callbacks_outstanding_{
891  3}; // reserve for OnStarted, Finish, and CompletionOp
892  };
893 };
894 
895 } // namespace internal
896 } // namespace grpc
897 
898 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
grpc::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:191
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:149
grpc::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
message_allocator.h
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:656
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata >
grpc::CallbackServerContext
Definition: server_context.h:618
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:288
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:43
grpc::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:46
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:36
rpc_service_method.h
grpc::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:262
status.h
grpc::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:210
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:218
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:110
grpc::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:257
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:125
grpc_call_ref
GRPCAPI void grpc_call_ref(grpc_call *call)
Ref a call.
grpc::ServerCallbackReaderWriter
Definition: server_callback.h:247
grpc::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:37
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Unref a call.
grpc::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: server_callback_handlers.h:492
grpc::MessageAllocator< RequestType, ResponseType >
grpc.h
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:68
grpc::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:195
grpc_byte_buffer
Definition: grpc_types.h:41
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:59
grpc::internal::CallbackUnaryHandler
Definition: server_callback_handlers.h:35
grpc::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::DefaultMessageHolder
Definition: server_callback.h:170
grpc::internal::CallbackServerStreamingHandler
Definition: server_callback_handlers.h:448
grpc::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:450
grpc::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:683
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:48
grpc::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:224
grpc::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::internal::FinishOnlyReactor
Definition: server_context.h:88
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:40
server_callback.h
grpc::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:45
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:80
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:193
grpc::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:783
grpc::internal::CallbackBidiHandler
Definition: server_callback_handlers.h:676
grpc::UNIMPLEMENTED
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:117
grpc::ServerCallbackWriter
Definition: server_callback.h:230
call.h
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:70
server_context.h
std
Definition: async_unary_call.h:406
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:118
grpc::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:43
grpc::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::CallbackClientStreamingHandler
Definition: server_callback_handlers.h:255
grpc::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback.h:128
grpc::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:678
grpc::ServerCallbackReader
Definition: server_callback.h:216
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:173
grpc::MessageHolder< RequestType, ResponseType >
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:143
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:55
grpc::ServerCallbackUnary
Definition: server_callback.h:200
grpc::ServerUnaryReactor
Definition: server_callback.h:706
grpc::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:455
grpc::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback.h:96
grpc::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:83