GRPC C++  1.62.0
server_callback_handlers.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 
18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/log.h>
24 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/status.h>
28 
29 namespace grpc {
30 namespace internal {
31 
32 template <class RequestType, class ResponseType>
34  public:
37  const RequestType*, ResponseType*)>
38  get_reactor)
39  : get_reactor_(std::move(get_reactor)) {}
40 
43  allocator_ = allocator;
44  }
45 
46  void RunHandler(const HandlerParameter& param) final {
47  // Arena allocate a controller structure (that includes request/response)
48  grpc_call_ref(param.call->call());
49  auto* allocator_state =
51  param.internal_data);
52 
53  auto* call = new (grpc_call_arena_alloc(param.call->call(),
54  sizeof(ServerCallbackUnaryImpl)))
55  ServerCallbackUnaryImpl(
56  static_cast<grpc::CallbackServerContext*>(param.server_context),
57  param.call, allocator_state, param.call_requester);
58  param.server_context->BeginCompletionOp(
59  param.call, [call](bool) { call->MaybeDone(); }, call);
60 
61  ServerUnaryReactor* reactor = nullptr;
62  if (param.status.ok()) {
63  reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
64  get_reactor_,
65  static_cast<grpc::CallbackServerContext*>(param.server_context),
66  call->request(), call->response());
67  }
68 
69  if (reactor == nullptr) {
70  // if deserialization or reactor creator failed, we need to fail the call
71  reactor = new (grpc_call_arena_alloc(param.call->call(),
75  }
76 
78  call->SetupReactor(reactor);
79  }
80 
82  grpc::Status* status, void** handler_data) final {
83  grpc::ByteBuffer buf;
84  buf.set_buffer(req);
85  RequestType* request = nullptr;
87  if (allocator_ != nullptr) {
88  allocator_state = allocator_->AllocateMessages();
89  } else {
90  allocator_state = new (grpc_call_arena_alloc(
93  }
94  *handler_data = allocator_state;
95  request = allocator_state->request();
96  *status =
98  buf.Release();
99  if (status->ok()) {
100  return request;
101  }
102  return nullptr;
103  }
104 
105  private:
107  const RequestType*, ResponseType*)>
108  get_reactor_;
109  MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
110 
111  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
112  public:
113  void Finish(grpc::Status s) override {
114  // A callback that only contains a call to MaybeDone can be run as an
115  // inline callback regardless of whether or not OnDone is inlineable
116  // because if the actual OnDone callback needs to be scheduled, MaybeDone
117  // is responsible for dispatching to an executor thread if needed. Thus,
118  // when setting up the finish_tag_, we can set its own callback to
119  // inlineable.
120  finish_tag_.Set(
121  call_.call(),
122  [this](bool) {
123  this->MaybeDone(
124  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
125  },
126  &finish_ops_, /*can_inline=*/true);
127  finish_ops_.set_core_cq_tag(&finish_tag_);
128 
129  if (!ctx_->sent_initial_metadata_) {
130  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
131  ctx_->initial_metadata_flags());
132  if (ctx_->compression_level_set()) {
133  finish_ops_.set_compression_level(ctx_->compression_level());
134  }
135  ctx_->sent_initial_metadata_ = true;
136  }
137  // The response is dropped if the status is not OK.
138  if (s.ok()) {
139  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
140  finish_ops_.SendMessagePtr(response()));
141  } else {
142  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
143  }
144  finish_ops_.set_core_cq_tag(&finish_tag_);
145  call_.PerformOps(&finish_ops_);
146  }
147 
148  void SendInitialMetadata() override {
149  GPR_ASSERT(!ctx_->sent_initial_metadata_);
150  this->Ref();
151  // The callback for this function should not be marked inline because it
152  // is directly invoking a user-controlled reaction
153  // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
154  // thread. However, any OnDone needed after that can be inlined because it
155  // is already running on an executor thread.
156  meta_tag_.Set(
157  call_.call(),
158  [this](bool ok) {
159  ServerUnaryReactor* reactor =
160  reactor_.load(std::memory_order_relaxed);
161  reactor->OnSendInitialMetadataDone(ok);
162  this->MaybeDone(/*inlineable_ondone=*/true);
163  },
164  &meta_ops_, /*can_inline=*/false);
165  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
166  ctx_->initial_metadata_flags());
167  if (ctx_->compression_level_set()) {
168  meta_ops_.set_compression_level(ctx_->compression_level());
169  }
170  ctx_->sent_initial_metadata_ = true;
171  meta_ops_.set_core_cq_tag(&meta_tag_);
172  call_.PerformOps(&meta_ops_);
173  }
174 
175  private:
176  friend class CallbackUnaryHandler<RequestType, ResponseType>;
177 
178  ServerCallbackUnaryImpl(
180  MessageHolder<RequestType, ResponseType>* allocator_state,
181  std::function<void()> call_requester)
182  : ctx_(ctx),
183  call_(*call),
184  allocator_state_(allocator_state),
185  call_requester_(std::move(call_requester)) {
186  ctx_->set_message_allocator_state(allocator_state);
187  }
188 
193  void SetupReactor(ServerUnaryReactor* reactor) {
194  reactor_.store(reactor, std::memory_order_relaxed);
195  this->BindReactor(reactor);
196  this->MaybeCallOnCancel(reactor);
197  this->MaybeDone(reactor->InternalInlineable());
198  }
199 
200  const RequestType* request() { return allocator_state_->request(); }
201  ResponseType* response() { return allocator_state_->response(); }
202 
203  void CallOnDone() override {
204  reactor_.load(std::memory_order_relaxed)->OnDone();
205  grpc_call* call = call_.call();
206  auto call_requester = std::move(call_requester_);
207  allocator_state_->Release();
208  if (ctx_->context_allocator() != nullptr) {
209  ctx_->context_allocator()->Release(ctx_);
210  }
211  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
212  grpc_call_unref(call);
213  call_requester();
214  }
215 
216  ServerReactor* reactor() override {
217  return reactor_.load(std::memory_order_relaxed);
218  }
219 
221  meta_ops_;
226  finish_ops_;
228 
229  grpc::CallbackServerContext* const ctx_;
230  grpc::internal::Call call_;
231  MessageHolder<RequestType, ResponseType>* const allocator_state_;
232  std::function<void()> call_requester_;
233  // reactor_ can always be loaded/stored with relaxed memory ordering because
234  // its value is only set once, independently of other data in the object,
235  // and the loads that use it will always actually come provably later even
236  // though they are from different threads since they are triggered by
237  // actions initiated only by the setting up of the reactor_ variable. In
238  // a sense, it's a delayed "const": it gets its value from the SetupReactor
239  // method (not the constructor, so it's not a true const), but it doesn't
240  // change after that and it only gets used by actions caused, directly or
241  // indirectly, by that setup. This comment also applies to the reactor_
242  // variables of the other streaming objects in this file.
243  std::atomic<ServerUnaryReactor*> reactor_;
244  // callbacks_outstanding_ follows a refcount pattern
245  std::atomic<intptr_t> callbacks_outstanding_{
246  3}; // reserve for start, Finish, and CompletionOp
247  };
248 };
249 
250 template <class RequestType, class ResponseType>
252  public:
254  std::function<ServerReadReactor<RequestType>*(
255  grpc::CallbackServerContext*, ResponseType*)>
256  get_reactor)
257  : get_reactor_(std::move(get_reactor)) {}
258  void RunHandler(const HandlerParameter& param) final {
259  // Arena allocate a reader structure (that includes response)
260  grpc_call_ref(param.call->call());
261 
262  auto* reader = new (grpc_call_arena_alloc(param.call->call(),
263  sizeof(ServerCallbackReaderImpl)))
264  ServerCallbackReaderImpl(
265  static_cast<grpc::CallbackServerContext*>(param.server_context),
266  param.call, param.call_requester);
267  // Inlineable OnDone can be false in the CompletionOp callback because there
268  // is no read reactor that has an inlineable OnDone; this only applies to
269  // the DefaultReactor (which is unary).
270  param.server_context->BeginCompletionOp(
271  param.call,
272  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
273  reader);
274 
275  ServerReadReactor<RequestType>* reactor = nullptr;
276  if (param.status.ok()) {
277  reactor =
278  grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
279  get_reactor_,
280  static_cast<grpc::CallbackServerContext*>(param.server_context),
281  reader->response());
282  }
283 
284  if (reactor == nullptr) {
285  // if deserialization or reactor creator failed, we need to fail the call
286  reactor = new (grpc_call_arena_alloc(
287  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
290  }
291 
292  reader->SetupReactor(reactor);
293  }
294 
295  private:
296  std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*,
297  ResponseType*)>
298  get_reactor_;
299 
300  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
301  public:
302  void Finish(grpc::Status s) override {
303  // A finish tag with only MaybeDone can have its callback inlined
304  // regardless even if OnDone is not inlineable because this callback just
305  // checks a ref and then decides whether or not to dispatch OnDone.
306  finish_tag_.Set(
307  call_.call(),
308  [this](bool) {
309  // Inlineable OnDone can be false here because there is
310  // no read reactor that has an inlineable OnDone; this
311  // only applies to the DefaultReactor (which is unary).
312  this->MaybeDone(/*inlineable_ondone=*/false);
313  },
314  &finish_ops_, /*can_inline=*/true);
315  if (!ctx_->sent_initial_metadata_) {
316  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
317  ctx_->initial_metadata_flags());
318  if (ctx_->compression_level_set()) {
319  finish_ops_.set_compression_level(ctx_->compression_level());
320  }
321  ctx_->sent_initial_metadata_ = true;
322  }
323  // The response is dropped if the status is not OK.
324  if (s.ok()) {
325  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
326  finish_ops_.SendMessagePtr(&resp_));
327  } else {
328  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
329  }
330  finish_ops_.set_core_cq_tag(&finish_tag_);
331  call_.PerformOps(&finish_ops_);
332  }
333 
334  void SendInitialMetadata() override {
335  GPR_ASSERT(!ctx_->sent_initial_metadata_);
336  this->Ref();
337  // The callback for this function should not be inlined because it invokes
338  // a user-controlled reaction, but any resulting OnDone can be inlined in
339  // the executor to which this callback is dispatched.
340  meta_tag_.Set(
341  call_.call(),
342  [this](bool ok) {
343  ServerReadReactor<RequestType>* reactor =
344  reactor_.load(std::memory_order_relaxed);
345  reactor->OnSendInitialMetadataDone(ok);
346  this->MaybeDone(/*inlineable_ondone=*/true);
347  },
348  &meta_ops_, /*can_inline=*/false);
349  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
350  ctx_->initial_metadata_flags());
351  if (ctx_->compression_level_set()) {
352  meta_ops_.set_compression_level(ctx_->compression_level());
353  }
354  ctx_->sent_initial_metadata_ = true;
355  meta_ops_.set_core_cq_tag(&meta_tag_);
356  call_.PerformOps(&meta_ops_);
357  }
358 
359  void Read(RequestType* req) override {
360  this->Ref();
361  read_ops_.RecvMessage(req);
362  call_.PerformOps(&read_ops_);
363  }
364 
365  private:
366  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
367 
368  ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx,
369  grpc::internal::Call* call,
370  std::function<void()> call_requester)
371  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
372 
373  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
374  reactor_.store(reactor, std::memory_order_relaxed);
375  // The callback for this function should not be inlined because it invokes
376  // a user-controlled reaction, but any resulting OnDone can be inlined in
377  // the executor to which this callback is dispatched.
378  read_tag_.Set(
379  call_.call(),
380  [this, reactor](bool ok) {
381  if (GPR_UNLIKELY(!ok)) {
382  ctx_->MaybeMarkCancelledOnRead();
383  }
384  reactor->OnReadDone(ok);
385  this->MaybeDone(/*inlineable_ondone=*/true);
386  },
387  &read_ops_, /*can_inline=*/false);
388  read_ops_.set_core_cq_tag(&read_tag_);
389  this->BindReactor(reactor);
390  this->MaybeCallOnCancel(reactor);
391  // Inlineable OnDone can be false here because there is no read
392  // reactor that has an inlineable OnDone; this only applies to the
393  // DefaultReactor (which is unary).
394  this->MaybeDone(/*inlineable_ondone=*/false);
395  }
396 
397  ~ServerCallbackReaderImpl() {}
398 
399  ResponseType* response() { return &resp_; }
400 
401  void CallOnDone() override {
402  reactor_.load(std::memory_order_relaxed)->OnDone();
403  grpc_call* call = call_.call();
404  auto call_requester = std::move(call_requester_);
405  if (ctx_->context_allocator() != nullptr) {
406  ctx_->context_allocator()->Release(ctx_);
407  }
408  this->~ServerCallbackReaderImpl(); // explicitly call destructor
409  grpc_call_unref(call);
410  call_requester();
411  }
412 
413  ServerReactor* reactor() override {
414  return reactor_.load(std::memory_order_relaxed);
415  }
416 
418  meta_ops_;
423  finish_ops_;
426  read_ops_;
428 
429  grpc::CallbackServerContext* const ctx_;
430  grpc::internal::Call call_;
431  ResponseType resp_;
432  std::function<void()> call_requester_;
433  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
434  std::atomic<ServerReadReactor<RequestType>*> reactor_;
435  // callbacks_outstanding_ follows a refcount pattern
436  std::atomic<intptr_t> callbacks_outstanding_{
437  3}; // reserve for OnStarted, Finish, and CompletionOp
438  };
439 };
440 
441 template <class RequestType, class ResponseType>
443  public:
445  std::function<ServerWriteReactor<ResponseType>*(
446  grpc::CallbackServerContext*, const RequestType*)>
447  get_reactor)
448  : get_reactor_(std::move(get_reactor)) {}
449  void RunHandler(const HandlerParameter& param) final {
450  // Arena allocate a writer structure
451  grpc_call_ref(param.call->call());
452 
453  auto* writer = new (grpc_call_arena_alloc(param.call->call(),
454  sizeof(ServerCallbackWriterImpl)))
455  ServerCallbackWriterImpl(
456  static_cast<grpc::CallbackServerContext*>(param.server_context),
457  param.call, static_cast<RequestType*>(param.request),
458  param.call_requester);
459  // Inlineable OnDone can be false in the CompletionOp callback because there
460  // is no write reactor that has an inlineable OnDone; this only applies to
461  // the DefaultReactor (which is unary).
462  param.server_context->BeginCompletionOp(
463  param.call,
464  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
465  writer);
466 
467  ServerWriteReactor<ResponseType>* reactor = nullptr;
468  if (param.status.ok()) {
471  get_reactor_,
472  static_cast<grpc::CallbackServerContext*>(param.server_context),
473  writer->request());
474  }
475  if (reactor == nullptr) {
476  // if deserialization or reactor creator failed, we need to fail the call
477  reactor = new (grpc_call_arena_alloc(
478  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
481  }
482 
483  writer->SetupReactor(reactor);
484  }
485 
487  grpc::Status* status, void** /*handler_data*/) final {
488  grpc::ByteBuffer buf;
489  buf.set_buffer(req);
490  auto* request =
491  new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
492  *status =
494  buf.Release();
495  if (status->ok()) {
496  return request;
497  }
498  request->~RequestType();
499  return nullptr;
500  }
501 
502  private:
503  std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*,
504  const RequestType*)>
505  get_reactor_;
506 
507  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
508  public:
509  void Finish(grpc::Status s) override {
510  // A finish tag with only MaybeDone can have its callback inlined
511  // regardless even if OnDone is not inlineable because this callback just
512  // checks a ref and then decides whether or not to dispatch OnDone.
513  finish_tag_.Set(
514  call_.call(),
515  [this](bool) {
516  // Inlineable OnDone can be false here because there is
517  // no write reactor that has an inlineable OnDone; this
518  // only applies to the DefaultReactor (which is unary).
519  this->MaybeDone(/*inlineable_ondone=*/false);
520  },
521  &finish_ops_, /*can_inline=*/true);
522  finish_ops_.set_core_cq_tag(&finish_tag_);
523 
524  if (!ctx_->sent_initial_metadata_) {
525  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
526  ctx_->initial_metadata_flags());
527  if (ctx_->compression_level_set()) {
528  finish_ops_.set_compression_level(ctx_->compression_level());
529  }
530  ctx_->sent_initial_metadata_ = true;
531  }
532  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533  call_.PerformOps(&finish_ops_);
534  }
535 
536  void SendInitialMetadata() override {
537  GPR_ASSERT(!ctx_->sent_initial_metadata_);
538  this->Ref();
539  // The callback for this function should not be inlined because it invokes
540  // a user-controlled reaction, but any resulting OnDone can be inlined in
541  // the executor to which this callback is dispatched.
542  meta_tag_.Set(
543  call_.call(),
544  [this](bool ok) {
545  ServerWriteReactor<ResponseType>* reactor =
546  reactor_.load(std::memory_order_relaxed);
547  reactor->OnSendInitialMetadataDone(ok);
548  this->MaybeDone(/*inlineable_ondone=*/true);
549  },
550  &meta_ops_, /*can_inline=*/false);
551  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
552  ctx_->initial_metadata_flags());
553  if (ctx_->compression_level_set()) {
554  meta_ops_.set_compression_level(ctx_->compression_level());
555  }
556  ctx_->sent_initial_metadata_ = true;
557  meta_ops_.set_core_cq_tag(&meta_tag_);
558  call_.PerformOps(&meta_ops_);
559  }
560 
561  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
562  this->Ref();
563  if (options.is_last_message()) {
564  options.set_buffer_hint();
565  }
566  if (!ctx_->sent_initial_metadata_) {
567  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568  ctx_->initial_metadata_flags());
569  if (ctx_->compression_level_set()) {
570  write_ops_.set_compression_level(ctx_->compression_level());
571  }
572  ctx_->sent_initial_metadata_ = true;
573  }
574  // TODO(vjpai): don't assert
575  GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576  call_.PerformOps(&write_ops_);
577  }
578 
579  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
580  grpc::Status s) override {
581  // This combines the write into the finish callback
582  // TODO(vjpai): don't assert
583  GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
584  Finish(std::move(s));
585  }
586 
587  private:
588  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
589 
590  ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx,
591  grpc::internal::Call* call, const RequestType* req,
592  std::function<void()> call_requester)
593  : ctx_(ctx),
594  call_(*call),
595  req_(req),
596  call_requester_(std::move(call_requester)) {}
597 
598  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
599  reactor_.store(reactor, std::memory_order_relaxed);
600  // The callback for this function should not be inlined because it invokes
601  // a user-controlled reaction, but any resulting OnDone can be inlined in
602  // the executor to which this callback is dispatched.
603  write_tag_.Set(
604  call_.call(),
605  [this, reactor](bool ok) {
606  reactor->OnWriteDone(ok);
607  this->MaybeDone(/*inlineable_ondone=*/true);
608  },
609  &write_ops_, /*can_inline=*/false);
610  write_ops_.set_core_cq_tag(&write_tag_);
611  this->BindReactor(reactor);
612  this->MaybeCallOnCancel(reactor);
613  // Inlineable OnDone can be false here because there is no write
614  // reactor that has an inlineable OnDone; this only applies to the
615  // DefaultReactor (which is unary).
616  this->MaybeDone(/*inlineable_ondone=*/false);
617  }
618  ~ServerCallbackWriterImpl() {
619  if (req_ != nullptr) {
620  req_->~RequestType();
621  }
622  }
623 
624  const RequestType* request() { return req_; }
625 
626  void CallOnDone() override {
627  reactor_.load(std::memory_order_relaxed)->OnDone();
628  grpc_call* call = call_.call();
629  auto call_requester = std::move(call_requester_);
630  if (ctx_->context_allocator() != nullptr) {
631  ctx_->context_allocator()->Release(ctx_);
632  }
633  this->~ServerCallbackWriterImpl(); // explicitly call destructor
634  grpc_call_unref(call);
635  call_requester();
636  }
637 
638  ServerReactor* reactor() override {
639  return reactor_.load(std::memory_order_relaxed);
640  }
641 
643  meta_ops_;
648  finish_ops_;
652  write_ops_;
654 
655  grpc::CallbackServerContext* const ctx_;
656  grpc::internal::Call call_;
657  const RequestType* req_;
658  std::function<void()> call_requester_;
659  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
660  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
661  // callbacks_outstanding_ follows a refcount pattern
662  std::atomic<intptr_t> callbacks_outstanding_{
663  3}; // reserve for OnStarted, Finish, and CompletionOp
664  };
665 };
666 
667 template <class RequestType, class ResponseType>
669  public:
673  get_reactor)
674  : get_reactor_(std::move(get_reactor)) {}
675  void RunHandler(const HandlerParameter& param) final {
676  grpc_call_ref(param.call->call());
677 
678  auto* stream = new (grpc_call_arena_alloc(
679  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
680  ServerCallbackReaderWriterImpl(
681  static_cast<grpc::CallbackServerContext*>(param.server_context),
682  param.call, param.call_requester);
683  // Inlineable OnDone can be false in the CompletionOp callback because there
684  // is no bidi reactor that has an inlineable OnDone; this only applies to
685  // the DefaultReactor (which is unary).
686  param.server_context->BeginCompletionOp(
687  param.call,
688  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
689  stream);
690 
692  if (param.status.ok()) {
695  get_reactor_,
696  static_cast<grpc::CallbackServerContext*>(param.server_context));
697  }
698 
699  if (reactor == nullptr) {
700  // if deserialization or reactor creator failed, we need to fail the call
701  reactor = new (grpc_call_arena_alloc(
702  param.call->call(),
706  }
707 
708  stream->SetupReactor(reactor);
709  }
710 
711  private:
712  std::function<ServerBidiReactor<RequestType, ResponseType>*(
714  get_reactor_;
715 
716  class ServerCallbackReaderWriterImpl
717  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
718  public:
719  void Finish(grpc::Status s) override {
720  // A finish tag with only MaybeDone can have its callback inlined
721  // regardless even if OnDone is not inlineable because this callback just
722  // checks a ref and then decides whether or not to dispatch OnDone.
723  finish_tag_.Set(
724  call_.call(),
725  [this](bool) {
726  // Inlineable OnDone can be false here because there is
727  // no bidi reactor that has an inlineable OnDone; this
728  // only applies to the DefaultReactor (which is unary).
729  this->MaybeDone(/*inlineable_ondone=*/false);
730  },
731  &finish_ops_, /*can_inline=*/true);
732  finish_ops_.set_core_cq_tag(&finish_tag_);
733 
734  if (!ctx_->sent_initial_metadata_) {
735  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
736  ctx_->initial_metadata_flags());
737  if (ctx_->compression_level_set()) {
738  finish_ops_.set_compression_level(ctx_->compression_level());
739  }
740  ctx_->sent_initial_metadata_ = true;
741  }
742  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
743  call_.PerformOps(&finish_ops_);
744  }
745 
746  void SendInitialMetadata() override {
747  GPR_ASSERT(!ctx_->sent_initial_metadata_);
748  this->Ref();
749  // The callback for this function should not be inlined because it invokes
750  // a user-controlled reaction, but any resulting OnDone can be inlined in
751  // the executor to which this callback is dispatched.
752  meta_tag_.Set(
753  call_.call(),
754  [this](bool ok) {
755  ServerBidiReactor<RequestType, ResponseType>* reactor =
756  reactor_.load(std::memory_order_relaxed);
757  reactor->OnSendInitialMetadataDone(ok);
758  this->MaybeDone(/*inlineable_ondone=*/true);
759  },
760  &meta_ops_, /*can_inline=*/false);
761  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
762  ctx_->initial_metadata_flags());
763  if (ctx_->compression_level_set()) {
764  meta_ops_.set_compression_level(ctx_->compression_level());
765  }
766  ctx_->sent_initial_metadata_ = true;
767  meta_ops_.set_core_cq_tag(&meta_tag_);
768  call_.PerformOps(&meta_ops_);
769  }
770 
771  void Write(const ResponseType* resp, grpc::WriteOptions options) override {
772  this->Ref();
773  if (options.is_last_message()) {
774  options.set_buffer_hint();
775  }
776  if (!ctx_->sent_initial_metadata_) {
777  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
778  ctx_->initial_metadata_flags());
779  if (ctx_->compression_level_set()) {
780  write_ops_.set_compression_level(ctx_->compression_level());
781  }
782  ctx_->sent_initial_metadata_ = true;
783  }
784  // TODO(vjpai): don't assert
785  GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
786  call_.PerformOps(&write_ops_);
787  }
788 
789  void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
790  grpc::Status s) override {
791  // TODO(vjpai): don't assert
792  GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
793  Finish(std::move(s));
794  }
795 
796  void Read(RequestType* req) override {
797  this->Ref();
798  read_ops_.RecvMessage(req);
799  call_.PerformOps(&read_ops_);
800  }
801 
802  private:
803  friend class CallbackBidiHandler<RequestType, ResponseType>;
804 
805  ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx,
806  grpc::internal::Call* call,
807  std::function<void()> call_requester)
808  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
809 
810  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
811  reactor_.store(reactor, std::memory_order_relaxed);
812  // The callbacks for these functions should not be inlined because they
813  // invoke user-controlled reactions, but any resulting OnDones can be
814  // inlined in the executor to which a callback is dispatched.
815  write_tag_.Set(
816  call_.call(),
817  [this, reactor](bool ok) {
818  reactor->OnWriteDone(ok);
819  this->MaybeDone(/*inlineable_ondone=*/true);
820  },
821  &write_ops_, /*can_inline=*/false);
822  write_ops_.set_core_cq_tag(&write_tag_);
823  read_tag_.Set(
824  call_.call(),
825  [this, reactor](bool ok) {
826  if (GPR_UNLIKELY(!ok)) {
827  ctx_->MaybeMarkCancelledOnRead();
828  }
829  reactor->OnReadDone(ok);
830  this->MaybeDone(/*inlineable_ondone=*/true);
831  },
832  &read_ops_, /*can_inline=*/false);
833  read_ops_.set_core_cq_tag(&read_tag_);
834  this->BindReactor(reactor);
835  this->MaybeCallOnCancel(reactor);
836  // Inlineable OnDone can be false here because there is no bidi
837  // reactor that has an inlineable OnDone; this only applies to the
838  // DefaultReactor (which is unary).
839  this->MaybeDone(/*inlineable_ondone=*/false);
840  }
841 
842  void CallOnDone() override {
843  reactor_.load(std::memory_order_relaxed)->OnDone();
844  grpc_call* call = call_.call();
845  auto call_requester = std::move(call_requester_);
846  if (ctx_->context_allocator() != nullptr) {
847  ctx_->context_allocator()->Release(ctx_);
848  }
849  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
850  grpc_call_unref(call);
851  call_requester();
852  }
853 
854  ServerReactor* reactor() override {
855  return reactor_.load(std::memory_order_relaxed);
856  }
857 
859  meta_ops_;
864  finish_ops_;
868  write_ops_;
871  read_ops_;
873 
874  grpc::CallbackServerContext* const ctx_;
875  grpc::internal::Call call_;
876  std::function<void()> call_requester_;
877  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
878  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
879  // callbacks_outstanding_ follows a refcount pattern
880  std::atomic<intptr_t> callbacks_outstanding_{
881  3}; // reserve for OnStarted, Finish, and CompletionOp
882  };
883 };
884 
885 } // namespace internal
886 } // namespace grpc
887 
888 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
grpc::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:182
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
grpc::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
message_allocator.h
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::internal::CallOpServerSendStatus
Definition: call_op_set.h:654
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata >
grpc::CallbackServerContext
Definition: server_context.h:612
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:286
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:41
grpc::MessageHolder::response
ResponseT * response()
Definition: message_allocator.h:46
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:36
GPR_ASSERT
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:95
rpc_service_method.h
grpc::internal::CallbackClientStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:258
status.h
grpc::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:201
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:216
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:107
grpc::internal::CallbackClientStreamingHandler::CallbackClientStreamingHandler
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:253
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:126
grpc_call_ref
GRPCAPI void grpc_call_ref(grpc_call *call)
Ref a call.
grpc::ServerCallbackReaderWriter
Definition: server_callback.h:238
grpc::internal::CallbackUnaryHandler::CallbackUnaryHandler
CallbackUnaryHandler(std::function< ServerUnaryReactor *(grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:35
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:35
log.h
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Unref a call.
grpc::internal::CallbackServerStreamingHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **) final
Definition: server_callback_handlers.h:486
grpc::MessageAllocator< RequestType, ResponseType >
grpc.h
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:186
grpc_byte_buffer
Definition: grpc_types.h:43
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc::internal::CallbackUnaryHandler
Definition: server_callback_handlers.h:33
grpc::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::DefaultMessageHolder
Definition: server_callback.h:161
grpc::internal::CallbackServerStreamingHandler
Definition: server_callback_handlers.h:442
grpc::internal::CallbackServerStreamingHandler::CallbackServerStreamingHandler
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:444
grpc::internal::CallbackBidiHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:675
grpc::SerializationTraits
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
grpc::internal::CallbackUnaryHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:46
grpc::ServerCallbackReader< RequestType >::BindReactor
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:215
grpc::MessageAllocator::AllocateMessages
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
grpc::internal::FinishOnlyReactor
Definition: server_context.h:88
grpc::internal::MethodHandler
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
server_callback.h
grpc::MessageHolder::request
RequestT * request()
Definition: message_allocator.h:45
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:78
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:184
grpc::internal::UnimplementedUnaryReactor
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:774
grpc::internal::CallbackBidiHandler
Definition: server_callback_handlers.h:668
grpc::UNIMPLEMENTED
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:117
grpc::ServerCallbackWriter
Definition: server_callback.h:221
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:70
server_context.h
std
Definition: async_unary_call.h:405
grpc::WriteOptions::set_buffer_hint
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:116
grpc::internal::CallbackUnaryHandler::SetMessageAllocator
void SetMessageAllocator(MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:41
grpc::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::CallbackClientStreamingHandler
Definition: server_callback_handlers.h:251
grpc::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback.h:125
grpc::internal::CallbackBidiHandler::CallbackBidiHandler
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:670
grpc::ServerCallbackReader
Definition: server_callback.h:207
grpc::WriteOptions::is_last_message
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:171
grpc::MessageHolder< RequestType, ResponseType >
grpc::ByteBuffer::Release
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:144
grpc::internal::CatchingReactorGetter
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:53
grpc::ServerCallbackUnary
Definition: server_callback.h:191
grpc::ServerUnaryReactor
Definition: server_callback.h:697
grpc::internal::CallbackServerStreamingHandler::RunHandler
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:449
grpc::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback.h:93
grpc::internal::CallbackUnaryHandler::Deserialize
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:81