GRPC C++  1.78.1
server_callback_handlers.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 
18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
20 
21 #include <grpc/grpc.h>
22 #include <grpc/impl/call.h>
24 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/status.h>
28 
29 #include "absl/log/absl_check.h"
30 
31 namespace grpc {
32 namespace internal {
33 
34 template <class RequestType, class ResponseType>
36  public:
39  const RequestType*, ResponseType*)>
40  get_reactor)
41  : get_reactor_(std::move(get_reactor)) {}
42 
45  allocator_ = allocator;
46  }
47 
48  void RunHandler(const HandlerParameter& param) final {
49  // Arena allocate a controller structure (that includes request/response)
50  grpc_call_ref(param.call->call());
51  auto* allocator_state =
53  param.internal_data);
54 
55  auto* call = new (grpc_call_arena_alloc(param.call->call(),
56  sizeof(ServerCallbackUnaryImpl)))
57  ServerCallbackUnaryImpl(
58  static_cast<grpc::CallbackServerContext*>(param.server_context),
59  param.call, allocator_state, param.call_requester);
60  param.server_context->BeginCompletionOp(
61  param.call, [call](bool) { call->MaybeDone(); }, call);
62 
63  ServerUnaryReactor* reactor = nullptr;
64  if (param.status.ok()) {
65  reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
66  get_reactor_,
67  static_cast<grpc::CallbackServerContext*>(param.server_context),
68  call->request(), call->response());
69  }
70 
71  if (reactor == nullptr) {
72  // if deserialization or reactor creator failed, we need to fail the call
73  reactor = new (grpc_call_arena_alloc(param.call->call(),
77  }
78 
80  call->SetupReactor(reactor);
81  }
82 
84  grpc::Status* status, void** handler_data) final {
85  grpc::ByteBuffer buf;
86  buf.set_buffer(req);
87  RequestType* request = nullptr;
89  if (allocator_ != nullptr) {
90  allocator_state = allocator_->AllocateMessages();
91  } else {
92  allocator_state = new (grpc_call_arena_alloc(
95  }
96  *handler_data = allocator_state;
97  request = allocator_state->request();
98  *status = grpc::Deserialize(&buf, request);
99  buf.Release();
100  if (status->ok()) {
101  return request;
102  }
103  return nullptr;
104  }
105 
106  private:
108  const RequestType*, ResponseType*)>
109  get_reactor_;
110  MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
111 
112  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
113  public:
114  void Finish(grpc::Status s) override {
115  // A callback that only contains a call to MaybeDone can be run as an
116  // inline callback regardless of whether or not OnDone is inlineable
117  // because if the actual OnDone callback needs to be scheduled, MaybeDone
118  // is responsible for dispatching to an EventEngine thread if needed.
119  // Thus, when setting up the finish_tag_, we can set its own callback to
120  // inlineable.
121  finish_tag_.Set(
122  call_.call(),
123  [this](bool) {
124  this->MaybeDone(
125  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
126  },
127  &finish_ops_, /*can_inline=*/true);
128  finish_ops_.set_core_cq_tag(&finish_tag_);
129 
130  if (!ctx_->sent_initial_metadata_) {
131  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
132  ctx_->initial_metadata_flags());
133  if (ctx_->compression_level_set()) {
134  finish_ops_.set_compression_level(ctx_->compression_level());
135  }
136  ctx_->sent_initial_metadata_ = true;
137  }
138  // The response is dropped if the status is not OK.
139  if (s.ok()) {
140  finish_ops_.ServerSendStatus(
141  &ctx_->trailing_metadata_,
142  finish_ops_.SendMessagePtr(response(), ctx_->memory_allocator()));
143  } else {
144  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
145  }
146  finish_ops_.set_core_cq_tag(&finish_tag_);
147  call_.PerformOps(&finish_ops_);
148  }
149 
150  void SendInitialMetadata() override {
151  ABSL_CHECK(!ctx_->sent_initial_metadata_);
152  this->Ref();
153  // The callback for this function should not be marked inline because it
154  // is directly invoking a user-controlled reaction
155  // (OnSendInitialMetadataDone). Thus it must be dispatched to an
156  // EventEngine thread. However, any OnDone needed after that can be
157  // inlined because it is already running on an EventEngine thread.
158  meta_tag_.Set(
159  call_.call(),
160  [this](bool ok) {
161  ServerUnaryReactor* reactor =
162  reactor_.load(std::memory_order_relaxed);
163  reactor->OnSendInitialMetadataDone(ok);
164  this->MaybeDone(/*inlineable_ondone=*/true);
165  },
166  &meta_ops_, /*can_inline=*/false);
167  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
168  ctx_->initial_metadata_flags());
169  if (ctx_->compression_level_set()) {
170  meta_ops_.set_compression_level(ctx_->compression_level());
171  }
172  ctx_->sent_initial_metadata_ = true;
173  meta_ops_.set_core_cq_tag(&meta_tag_);
174  call_.PerformOps(&meta_ops_);
175  }
176 
177  private:
178  friend class CallbackUnaryHandler<RequestType, ResponseType>;
179 
180  ServerCallbackUnaryImpl(
182  MessageHolder<RequestType, ResponseType>* allocator_state,
183  std::function<void()> call_requester)
184  : ctx_(ctx),
185  call_(*call),
186  allocator_state_(allocator_state),
187  call_requester_(std::move(call_requester)) {
188  ctx_->set_message_allocator_state(allocator_state);
189  }
190 
191  grpc_call* call() override { return call_.call(); }
192 
197  void SetupReactor(ServerUnaryReactor* reactor) {
198  reactor_.store(reactor, std::memory_order_relaxed);
199  this->BindReactor(reactor);
200  this->MaybeCallOnCancel(reactor);
201  this->MaybeDone(reactor->InternalInlineable());
202  }
203 
204  const RequestType* request() { return allocator_state_->request(); }
205  ResponseType* response() { return allocator_state_->response(); }
206 
207  void CallOnDone() override {
208  reactor_.load(std::memory_order_relaxed)->OnDone();
209  grpc_call* call = call_.call();
210  auto call_requester = std::move(call_requester_);
211  allocator_state_->Release();
212  if (ctx_->context_allocator() != nullptr) {
213  ctx_->context_allocator()->Release(ctx_);
214  }
215  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
216  grpc_call_unref(call);
217  call_requester();
218  }
219 
220  ServerReactor* reactor() override {
221  return reactor_.load(std::memory_order_relaxed);
222  }
223 
225  meta_ops_;
230  finish_ops_;
232 
233  grpc::CallbackServerContext* const ctx_;
234  grpc::internal::Call call_;
235  MessageHolder<RequestType, ResponseType>* const allocator_state_;
236  std::function<void()> call_requester_;
237  // reactor_ can always be loaded/stored with relaxed memory ordering because
238  // its value is only set once, independently of other data in the object,
239  // and the loads that use it will always actually come provably later even
240  // though they are from different threads since they are triggered by
241  // actions initiated only by the setting up of the reactor_ variable. In
242  // a sense, it's a delayed "const": it gets its value from the SetupReactor
243  // method (not the constructor, so it's not a true const), but it doesn't
244  // change after that and it only gets used by actions caused, directly or
245  // indirectly, by that setup. This comment also applies to the reactor_
246  // variables of the other streaming objects in this file.
247  std::atomic<ServerUnaryReactor*> reactor_;
248  // callbacks_outstanding_ follows a refcount pattern
249  std::atomic<intptr_t> callbacks_outstanding_{
250  3}; // reserve for start, Finish, and CompletionOp
251  };
252 };
253 
254 template <class RequestType, class ResponseType>
256  public:
258  std::function<ServerReadReactor<RequestType>*(
259  grpc::CallbackServerContext*, ResponseType*)>
260  get_reactor)
261  : get_reactor_(std::move(get_reactor)) {}
262  void RunHandler(const HandlerParameter& param) final {
263  // Arena allocate a reader structure (that includes response)
264  grpc_call_ref(param.call->call());
265 
266  auto* reader = new (grpc_call_arena_alloc(param.call->call(),
267  sizeof(ServerCallbackReaderImpl)))
268  ServerCallbackReaderImpl(
269  static_cast<grpc::CallbackServerContext*>(param.server_context),
270  param.call, param.call_requester);
271  // Inlineable OnDone can be false in the CompletionOp callback because there
272  // is no read reactor that has an inlineable OnDone; this only applies to
273  // the DefaultReactor (which is unary).
274  param.server_context->BeginCompletionOp(
275  param.call,
276  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
277  reader);
278 
279  ServerReadReactor<RequestType>* reactor = nullptr;
280  if (param.status.ok()) {
281  reactor =
282  grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
283  get_reactor_,
284  static_cast<grpc::CallbackServerContext*>(param.server_context),
285  reader->response());
286  }
287 
288  if (reactor == nullptr) {
289  // if deserialization or reactor creator failed, we need to fail the call
290  reactor = new (grpc_call_arena_alloc(
291  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
294  }
295 
296  reader->SetupReactor(reactor);
297  }
298 
299  private:
300  std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*,
301  ResponseType*)>
302  get_reactor_;
303 
304  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
305  public:
306  void Finish(grpc::Status s) override {
307  // A finish tag with only MaybeDone can have its callback inlined
308  // regardless even if OnDone is not inlineable because this callback just
309  // checks a ref and then decides whether or not to dispatch OnDone.
310  finish_tag_.Set(
311  call_.call(),
312  [this](bool) {
313  // Inlineable OnDone can be false here because there is
314  // no read reactor that has an inlineable OnDone; this
315  // only applies to the DefaultReactor (which is unary).
316  this->MaybeDone(/*inlineable_ondone=*/false);
317  },
318  &finish_ops_, /*can_inline=*/true);
319  if (!ctx_->sent_initial_metadata_) {
320  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
321  ctx_->initial_metadata_flags());
322  if (ctx_->compression_level_set()) {
323  finish_ops_.set_compression_level(ctx_->compression_level());
324  }
325  ctx_->sent_initial_metadata_ = true;
326  }
327  // The response is dropped if the status is not OK.
328  if (s.ok()) {
329  finish_ops_.ServerSendStatus(
330  &ctx_->trailing_metadata_,
331  finish_ops_.SendMessagePtr(&resp_, ctx_->memory_allocator()));
332  } else {
333  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
334  }
335  finish_ops_.set_core_cq_tag(&finish_tag_);
336  call_.PerformOps(&finish_ops_);
337  }
338 
339  void SendInitialMetadata() override {
340  ABSL_CHECK(!ctx_->sent_initial_metadata_);
341  this->Ref();
342  // The callback for this function should not be inlined because it invokes
343  // a user-controlled reaction, but any resulting OnDone can be inlined in
344  // the EventEngine thread to which this callback is dispatched.
345  meta_tag_.Set(
346  call_.call(),
347  [this](bool ok) {
348  ServerReadReactor<RequestType>* reactor =
349  reactor_.load(std::memory_order_relaxed);
350  reactor->OnSendInitialMetadataDone(ok);
351  this->MaybeDone(/*inlineable_ondone=*/true);
352  },
353  &meta_ops_, /*can_inline=*/false);
354  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
355  ctx_->initial_metadata_flags());
356  if (ctx_->compression_level_set()) {
357  meta_ops_.set_compression_level(ctx_->compression_level());
358  }
359  ctx_->sent_initial_metadata_ = true;
360  meta_ops_.set_core_cq_tag(&meta_tag_);
361  call_.PerformOps(&meta_ops_);
362  }
363 
364  void Read(RequestType* req) override {
365  this->Ref();
366  read_ops_.RecvMessage(req);
367  call_.PerformOps(&read_ops_);
368  }
369 
370  private:
371  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
372 
373  ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx,
374  grpc::internal::Call* call,
375  std::function<void()> call_requester)
376  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
377 
378  grpc_call* call() override { return call_.call(); }
379 
380  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
381  reactor_.store(reactor, std::memory_order_relaxed);
382  // The callback for this function should not be inlined because it invokes
383  // a user-controlled reaction, but any resulting OnDone can be inlined in
384  // the EventEngine thread to which this callback is dispatched.
385  read_tag_.Set(
386  call_.call(),
387  [this, reactor](bool ok) {
388  if (GPR_UNLIKELY(!ok)) {
389  ctx_->MaybeMarkCancelledOnRead();
390  }
391  reactor->OnReadDone(ok);
392  this->MaybeDone(/*inlineable_ondone=*/true);
393  },
394  &read_ops_, /*can_inline=*/false);
395  read_ops_.set_core_cq_tag(&read_tag_);
396  this->BindReactor(reactor);
397  this->MaybeCallOnCancel(reactor);
398  // Inlineable OnDone can be false here because there is no read
399  // reactor that has an inlineable OnDone; this only applies to the
400  // DefaultReactor (which is unary).
401  this->MaybeDone(/*inlineable_ondone=*/false);
402  }
403 
404  ~ServerCallbackReaderImpl() {}
405 
406  ResponseType* response() { return &resp_; }
407 
408  void CallOnDone() override {
409  reactor_.load(std::memory_order_relaxed)->OnDone();
410  grpc_call* call = call_.call();
411  auto call_requester = std::move(call_requester_);
412  if (ctx_->context_allocator() != nullptr) {
413  ctx_->context_allocator()->Release(ctx_);
414  }
415  this->~ServerCallbackReaderImpl(); // explicitly call destructor
416  grpc_call_unref(call);
417  call_requester();
418  }
419 
420  ServerReactor* reactor() override {
421  return reactor_.load(std::memory_order_relaxed);
422  }
423 
425  meta_ops_;
430  finish_ops_;
433  read_ops_;
435 
436  grpc::CallbackServerContext* const ctx_;
437  grpc::internal::Call call_;
438  ResponseType resp_;
439  std::function<void()> call_requester_;
440  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
441  std::atomic<ServerReadReactor<RequestType>*> reactor_;
442  // callbacks_outstanding_ follows a refcount pattern
443  std::atomic<intptr_t> callbacks_outstanding_{
444  3}; // reserve for OnStarted, Finish, and CompletionOp
445  };
446 };
447 
448 template <class RequestType, class ResponseType>
450  public:
452  std::function<ServerWriteReactor<ResponseType>*(
453  grpc::CallbackServerContext*, const RequestType*)>
454  get_reactor)
455  : get_reactor_(std::move(get_reactor)) {}
456  void RunHandler(const HandlerParameter& param) final {
457  // Arena allocate a writer structure
458  grpc_call_ref(param.call->call());
459 
460  auto* writer = new (grpc_call_arena_alloc(param.call->call(),
461  sizeof(ServerCallbackWriterImpl)))
462  ServerCallbackWriterImpl(
463  static_cast<grpc::CallbackServerContext*>(param.server_context),
464  param.call, static_cast<RequestType*>(param.request),
465  param.call_requester);
466  // Inlineable OnDone can be false in the CompletionOp callback because there
467  // is no write reactor that has an inlineable OnDone; this only applies to
468  // the DefaultReactor (which is unary).
469  param.server_context->BeginCompletionOp(
470  param.call,
471  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
472  writer);
473 
474  ServerWriteReactor<ResponseType>* reactor = nullptr;
475  if (param.status.ok()) {
478  get_reactor_,
479  static_cast<grpc::CallbackServerContext*>(param.server_context),
480  writer->request());
481  }
482  if (reactor == nullptr) {
483  // if deserialization or reactor creator failed, we need to fail the call
484  reactor = new (grpc_call_arena_alloc(
485  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
488  }
489 
490  writer->SetupReactor(reactor);
491  }
492 
494  grpc::Status* status, void** /*handler_data*/) final {
495  grpc::ByteBuffer buf;
496  buf.set_buffer(req);
497  auto* request =
498  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
499  *status = grpc::Deserialize(&buf, request);
500  buf.Release();
501  if (status->ok()) {
502  return request;
503  }
504  request->~RequestType();
505  return nullptr;
506  }
507 
508  private:
509  std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*,
510  const RequestType*)>
511  get_reactor_;
512 
513  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
514  public:
515  void Finish(grpc::Status s) override {
516  // A finish tag with only MaybeDone can have its callback inlined
517  // regardless even if OnDone is not inlineable because this callback just
518  // checks a ref and then decides whether or not to dispatch OnDone.
519  finish_tag_.Set(
520  call_.call(),
521  [this](bool) {
522  // Inlineable OnDone can be false here because there is
523  // no write reactor that has an inlineable OnDone; this
524  // only applies to the DefaultReactor (which is unary).
525  this->MaybeDone(/*inlineable_ondone=*/false);
526  },
527  &finish_ops_, /*can_inline=*/true);
528  finish_ops_.set_core_cq_tag(&finish_tag_);
529 
530  if (!ctx_->sent_initial_metadata_) {
531  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
532  ctx_->initial_metadata_flags());
533  if (ctx_->compression_level_set()) {
534  finish_ops_.set_compression_level(ctx_->compression_level());
535  }
536  ctx_->sent_initial_metadata_ = true;
537  }
538  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
539  call_.PerformOps(&finish_ops_);
540  }
541 
542  void SendInitialMetadata() override {
543  ABSL_CHECK(!ctx_->sent_initial_metadata_);
544  this->Ref();
545  // The callback for this function should not be inlined because it invokes
546  // a user-controlled reaction, but any resulting OnDone can be inlined in
547  // the EventEngine thread to which this callback is dispatched.
548  meta_tag_.Set(
549  call_.call(),
550  [this](bool ok) {
551  ServerWriteReactor<ResponseType>* reactor =
552  reactor_.load(std::memory_order_relaxed);
553  reactor->OnSendInitialMetadataDone(ok);
554  this->MaybeDone(/*inlineable_ondone=*/true);
555  },
556  &meta_ops_, /*can_inline=*/false);
557  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
558  ctx_->initial_metadata_flags());
559  if (ctx_->compression_level_set()) {
560  meta_ops_.set_compression_level(ctx_->compression_level());
561  }
562  ctx_->sent_initial_metadata_ = true;
563  meta_ops_.set_core_cq_tag(&meta_tag_);
564  call_.PerformOps(&meta_ops_);
565  }
566 
567  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
568  this->Ref();
569  if (options.is_last_message()) {
570  options.set_buffer_hint();
571  }
572  if (!ctx_->sent_initial_metadata_) {
573  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574  ctx_->initial_metadata_flags());
575  if (ctx_->compression_level_set()) {
576  write_ops_.set_compression_level(ctx_->compression_level());
577  }
578  ctx_->sent_initial_metadata_ = true;
579  }
580  // TODO(vjpai): don't assert
581  ABSL_CHECK(
582  write_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
583  .ok());
584  call_.PerformOps(&write_ops_);
585  }
586 
587  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
588  grpc::Status s) override {
589  // This combines the write into the finish callback
590  // TODO(vjpai): don't assert
591  ABSL_CHECK(
592  finish_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
593  .ok());
594  Finish(std::move(s));
595  }
596 
597  private:
598  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
599 
600  ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx,
601  grpc::internal::Call* call, const RequestType* req,
602  std::function<void()> call_requester)
603  : ctx_(ctx),
604  call_(*call),
605  req_(req),
606  call_requester_(std::move(call_requester)) {}
607 
608  grpc_call* call() override { return call_.call(); }
609 
610  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
611  reactor_.store(reactor, std::memory_order_relaxed);
612  // The callback for this function should not be inlined because it invokes
613  // a user-controlled reaction, but any resulting OnDone can be inlined in
614  // the EventEngine thread to which this callback is dispatched.
615  write_tag_.Set(
616  call_.call(),
617  [this, reactor](bool ok) {
618  reactor->OnWriteDone(ok);
619  this->MaybeDone(/*inlineable_ondone=*/true);
620  },
621  &write_ops_, /*can_inline=*/false);
622  write_ops_.set_core_cq_tag(&write_tag_);
623  this->BindReactor(reactor);
624  this->MaybeCallOnCancel(reactor);
625  // Inlineable OnDone can be false here because there is no write
626  // reactor that has an inlineable OnDone; this only applies to the
627  // DefaultReactor (which is unary).
628  this->MaybeDone(/*inlineable_ondone=*/false);
629  }
630  ~ServerCallbackWriterImpl() {
631  if (req_ != nullptr) {
632  req_->~RequestType();
633  }
634  }
635 
636  const RequestType* request() { return req_; }
637 
638  void CallOnDone() override {
639  reactor_.load(std::memory_order_relaxed)->OnDone();
640  grpc_call* call = call_.call();
641  auto call_requester = std::move(call_requester_);
642  if (ctx_->context_allocator() != nullptr) {
643  ctx_->context_allocator()->Release(ctx_);
644  }
645  this->~ServerCallbackWriterImpl(); // explicitly call destructor
646  grpc_call_unref(call);
647  call_requester();
648  }
649 
650  ServerReactor* reactor() override {
651  return reactor_.load(std::memory_order_relaxed);
652  }
653 
655  meta_ops_;
660  finish_ops_;
664  write_ops_;
666 
667  grpc::CallbackServerContext* const ctx_;
668  grpc::internal::Call call_;
669  const RequestType* req_;
670  std::function<void()> call_requester_;
671  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
672  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
673  // callbacks_outstanding_ follows a refcount pattern
674  std::atomic<intptr_t> callbacks_outstanding_{
675  3}; // reserve for OnStarted, Finish, and CompletionOp
676  };
677 };
678 
679 template <class RequestType, class ResponseType>
681  public:
685  get_reactor)
686  : get_reactor_(std::move(get_reactor)) {}
687  void RunHandler(const HandlerParameter& param) final {
688  grpc_call_ref(param.call->call());
689 
690  auto* stream = new (grpc_call_arena_alloc(
691  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
692  ServerCallbackReaderWriterImpl(
693  static_cast<grpc::CallbackServerContext*>(param.server_context),
694  param.call, param.call_requester);
695  // Inlineable OnDone can be false in the CompletionOp callback because there
696  // is no bidi reactor that has an inlineable OnDone; this only applies to
697  // the DefaultReactor (which is unary).
698  param.server_context->BeginCompletionOp(
699  param.call,
700  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
701  stream);
702 
704  if (param.status.ok()) {
707  get_reactor_,
708  static_cast<grpc::CallbackServerContext*>(param.server_context));
709  }
710 
711  if (reactor == nullptr) {
712  // if deserialization or reactor creator failed, we need to fail the call
713  reactor = new (grpc_call_arena_alloc(
714  param.call->call(),
718  }
719 
720  stream->SetupReactor(reactor);
721  }
722 
723  private:
724  std::function<ServerBidiReactor<RequestType, ResponseType>*(
726  get_reactor_;
727 
728  class ServerCallbackReaderWriterImpl
729  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
730  public:
731  void Finish(grpc::Status s) override {
732  // A finish tag with only MaybeDone can have its callback inlined
733  // regardless even if OnDone is not inlineable because this callback just
734  // checks a ref and then decides whether or not to dispatch OnDone.
735  finish_tag_.Set(
736  call_.call(),
737  [this](bool) {
738  // Inlineable OnDone can be false here because there is
739  // no bidi reactor that has an inlineable OnDone; this
740  // only applies to the DefaultReactor (which is unary).
741  this->MaybeDone(/*inlineable_ondone=*/false);
742  },
743  &finish_ops_, /*can_inline=*/true);
744  finish_ops_.set_core_cq_tag(&finish_tag_);
745 
746  if (!ctx_->sent_initial_metadata_) {
747  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
748  ctx_->initial_metadata_flags());
749  if (ctx_->compression_level_set()) {
750  finish_ops_.set_compression_level(ctx_->compression_level());
751  }
752  ctx_->sent_initial_metadata_ = true;
753  }
754  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
755  call_.PerformOps(&finish_ops_);
756  }
757 
758  void SendInitialMetadata() override {
759  ABSL_CHECK(!ctx_->sent_initial_metadata_);
760  this->Ref();
761  // The callback for this function should not be inlined because it invokes
762  // a user-controlled reaction, but any resulting OnDone can be inlined in
763  // the EventEngine thread to which this callback is dispatched.
764  meta_tag_.Set(
765  call_.call(),
766  [this](bool ok) {
767  ServerBidiReactor<RequestType, ResponseType>* reactor =
768  reactor_.load(std::memory_order_relaxed);
769  reactor->OnSendInitialMetadataDone(ok);
770  this->MaybeDone(/*inlineable_ondone=*/true);
771  },
772  &meta_ops_, /*can_inline=*/false);
773  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
774  ctx_->initial_metadata_flags());
775  if (ctx_->compression_level_set()) {
776  meta_ops_.set_compression_level(ctx_->compression_level());
777  }
778  ctx_->sent_initial_metadata_ = true;
779  meta_ops_.set_core_cq_tag(&meta_tag_);
780  call_.PerformOps(&meta_ops_);
781  }
782 
783  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
784  this->Ref();
785  if (options.is_last_message()) {
786  options.set_buffer_hint();
787  }
788  if (!ctx_->sent_initial_metadata_) {
789  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
790  ctx_->initial_metadata_flags());
791  if (ctx_->compression_level_set()) {
792  write_ops_.set_compression_level(ctx_->compression_level());
793  }
794  ctx_->sent_initial_metadata_ = true;
795  }
796  // TODO(vjpai): don't assert
797  ABSL_CHECK(
798  write_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
799  .ok());
800  call_.PerformOps(&write_ops_);
801  }
802 
803  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
804  grpc::Status s) override {
805  // TODO(vjpai): don't assert
806  ABSL_CHECK(
807  finish_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
808  .ok());
809  Finish(std::move(s));
810  }
811 
812  void Read(RequestType* req) override {
813  this->Ref();
814  read_ops_.RecvMessage(req);
815  call_.PerformOps(&read_ops_);
816  }
817 
818  private:
819  friend class CallbackBidiHandler<RequestType, ResponseType>;
820 
821  ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx,
822  grpc::internal::Call* call,
823  std::function<void()> call_requester)
824  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
825 
826  grpc_call* call() override { return call_.call(); }
827 
828  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
829  reactor_.store(reactor, std::memory_order_relaxed);
830  // The callbacks for these functions should not be inlined because they
831  // invoke user-controlled reactions, but any resulting OnDones can be
832  // inlined in the EventEngine thread to which a callback is dispatched.
833  write_tag_.Set(
834  call_.call(),
835  [this, reactor](bool ok) {
836  reactor->OnWriteDone(ok);
837  this->MaybeDone(/*inlineable_ondone=*/true);
838  },
839  &write_ops_, /*can_inline=*/false);
840  write_ops_.set_core_cq_tag(&write_tag_);
841  read_tag_.Set(
842  call_.call(),
843  [this, reactor](bool ok) {
844  if (GPR_UNLIKELY(!ok)) {
845  ctx_->MaybeMarkCancelledOnRead();
846  }
847  reactor->OnReadDone(ok);
848  this->MaybeDone(/*inlineable_ondone=*/true);
849  },
850  &read_ops_, /*can_inline=*/false);
851  read_ops_.set_core_cq_tag(&read_tag_);
852  this->BindReactor(reactor);
853  this->MaybeCallOnCancel(reactor);
854  // Inlineable OnDone can be false here because there is no bidi
855  // reactor that has an inlineable OnDone; this only applies to the
856  // DefaultReactor (which is unary).
857  this->MaybeDone(/*inlineable_ondone=*/false);
858  }
859 
860  void CallOnDone() override {
861  reactor_.load(std::memory_order_relaxed)->OnDone();
862  grpc_call* call = call_.call();
863  auto call_requester = std::move(call_requester_);
864  if (ctx_->context_allocator() != nullptr) {
865  ctx_->context_allocator()->Release(ctx_);
866  }
867  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
868  grpc_call_unref(call);
869  call_requester();
870  }
871 
872  ServerReactor* reactor() override {
873  return reactor_.load(std::memory_order_relaxed);
874  }
875 
877  meta_ops_;
882  finish_ops_;
886  write_ops_;
889  read_ops_;
891 
892  grpc::CallbackServerContext* const ctx_;
893  grpc::internal::Call call_;
894  std::function<void()> call_requester_;
895  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
896  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
897  // callbacks_outstanding_ follows a refcount pattern
898  std::atomic<intptr_t> callbacks_outstanding_{
899  3}; // reserve for OnStarted, Finish, and CompletionOp
900  };
901 };
902 
903 } // namespace internal
904 } // namespace grpc
905 
906 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
grpc::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:191
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:148
grpc::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
message_allocator.h
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:658
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata >
grpc::CallbackServerContext
Definition: server_context.h:626
grpc::Deserialize
auto Deserialize(BufferPtr buffer, Message *msg)
Definition: serialization_traits.h:120
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:289
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:43
grpc::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:46
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:36
rpc_service_method.h
grpc::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:262
status.h
grpc::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:210
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:219
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:110
grpc::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:257
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:125
grpc_call_ref
GRPCAPI void grpc_call_ref(grpc_call *call)
Ref a call.
grpc::ServerCallbackReaderWriter
Definition: server_callback.h:247
grpc::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:37
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Unref a call.
grpc::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: server_callback_handlers.h:493
grpc::MessageAllocator< RequestType, ResponseType >
grpc.h
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:68
grpc::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:195
grpc_byte_buffer
Definition: grpc_types.h:41
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:59
grpc::internal::CallbackUnaryHandler
Definition: server_callback_handlers.h:35
grpc::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::DefaultMessageHolder
Definition: server_callback.h:170
grpc::internal::CallbackServerStreamingHandler
Definition: server_callback_handlers.h:449
grpc::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:451
grpc::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:687
grpc::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:48
grpc::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:224
grpc::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::internal::FinishOnlyReactor
Definition: server_context.h:88
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:40
server_callback.h
grpc::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:45
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:81
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:193
grpc::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:783
grpc::internal::CallbackBidiHandler
Definition: server_callback_handlers.h:680
grpc::UNIMPLEMENTED
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:117
grpc::ServerCallbackWriter
Definition: server_callback.h:230
call.h
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:70
server_context.h
std
Definition: async_unary_call.h:410
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:119
grpc::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:43
grpc::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::CallbackClientStreamingHandler
Definition: server_callback_handlers.h:255
grpc::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback.h:128
grpc::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:682
grpc::ServerCallbackReader
Definition: server_callback.h:216
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:174
grpc::MessageHolder< RequestType, ResponseType >
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:143
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:55
grpc::ServerCallbackUnary
Definition: server_callback.h:200
grpc::ServerUnaryReactor
Definition: server_callback.h:706
grpc::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:456
grpc::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback.h:96
grpc::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:83