GRPC C++  1.81.0
client_callback.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
20 #define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
21 
22 #include <grpc/grpc.h>
23 #include <grpc/impl/call.h>
24 #include <grpcpp/impl/call.h>
27 #include <grpcpp/impl/sync.h>
29 #include <grpcpp/support/config.h>
30 #include <grpcpp/support/status.h>
31 
32 #include <atomic>
33 #include <cstddef>
34 #include <functional>
35 #include <memory>
36 #include <type_traits>
37 #include <utility>
38 
39 #include "absl/log/absl_check.h"
40 
41 namespace grpc {
42 class Channel;
43 class ClientContext;
44 
45 namespace internal {
46 class RpcMethod;
47 
54 template <class InputMessage, class OutputMessage,
55  class BaseInputMessage = InputMessage,
56  class BaseOutputMessage = OutputMessage>
58  const grpc::internal::RpcMethod& method,
59  grpc::ClientContext* context,
60  const InputMessage* request, OutputMessage* result,
61  std::function<void(grpc::Status)>&& on_completion) {
62  static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
63  "Invalid input message specification");
64  static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
65  "Invalid output message specification");
67  channel, method, context, request, result, std::move(on_completion));
68 }
69 
70 template <class InputMessage, class OutputMessage>
71 class CallbackUnaryCallImpl {
72  public:
74  const grpc::internal::RpcMethod& method,
75  grpc::ClientContext* context,
76  const InputMessage* request, OutputMessage* result,
77  std::function<void(grpc::Status)>&& on_completion) {
78  grpc::CompletionQueue* cq = channel->CallbackCQ();
79  ABSL_CHECK_NE(cq, nullptr);
80  grpc::internal::Call call(channel->CreateCall(method, context, cq));
81 
82  using FullCallOpSet = grpc::internal::CallOpSet<
89 
90  struct OpSetAndTag {
91  FullCallOpSet opset;
93  };
94  const size_t alloc_sz = sizeof(OpSetAndTag);
95  auto* const alloced =
96  static_cast<OpSetAndTag*>(grpc_call_arena_alloc(call.call(), alloc_sz));
97  auto* ops = new (&alloced->opset) FullCallOpSet;
98  auto* tag = new (&alloced->tag) grpc::internal::CallbackWithStatusTag(
99  call.call(), std::move(on_completion), ops);
100 
101  // TODO(vjpai): Unify code with sync API as much as possible
102  grpc::Status s = ops->SendMessagePtr(request, channel->memory_allocator());
103  if (!s.ok()) {
104  tag->force_run(s);
105  return;
106  }
107  ops->SendInitialMetadata(&context->send_initial_metadata_,
108  context->initial_metadata_flags());
109  ops->RecvInitialMetadata(context);
110  ops->RecvMessage(result);
111  ops->AllowNoMessage();
112  ops->ClientSendClose();
113  ops->ClientRecvStatus(context, tag->status_ptr());
114  ops->set_core_cq_tag(tag);
115  ops->FillOps(&call);
116  }
117 };
118 
119 // Base class for public API classes.
121  public:
122  virtual ~ClientReactor() = default;
123 
131  virtual void OnDone(const grpc::Status& /*s*/) = 0;
132 
140  virtual bool InternalTrailersOnly(const grpc_call* call) const;
141 };
142 
143 } // namespace internal
144 
145 namespace experimental {
146 template <class RequestType, class ResponseType>
148 
151 } // namespace experimental
152 
153 // Forward declarations
154 template <class Request, class Response>
156 template <class Response>
158 template <class Request>
160 class ClientUnaryReactor;
161 
162 // NOTE: The streaming objects are not actually implemented in the public API.
163 // These interfaces are provided for mocking only. Typical applications
164 // will interact exclusively with the reactors that they define.
165 template <class Request, class Response>
167  public:
169  virtual void StartCall() = 0;
170  virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
171  virtual void WritesDone() = 0;
172  virtual void Read(Response* resp) = 0;
173  virtual void AddHold(int holds) = 0;
174  virtual void RemoveHold() = 0;
175 
176  protected:
178  reactor->BindStream(this);
179  }
180 };
181 
182 template <class Response>
184  public:
186  virtual void StartCall() = 0;
187  virtual void Read(Response* resp) = 0;
188  virtual void AddHold(int holds) = 0;
189  virtual void RemoveHold() = 0;
190 
191  protected:
193  reactor->BindReader(this);
194  }
195 };
196 
197 template <class Request>
199  public:
201  virtual void StartCall() = 0;
202  void Write(const Request* req) { Write(req, grpc::WriteOptions()); }
203  virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
204  void WriteLast(const Request* req, grpc::WriteOptions options) {
205  Write(req, options.set_last_message());
206  }
207  virtual void WritesDone() = 0;
208 
209  virtual void AddHold(int holds) = 0;
210  virtual void RemoveHold() = 0;
211 
212  protected:
214  reactor->BindWriter(this);
215  }
216 };
217 
219  public:
220  virtual ~ClientCallbackUnary() {}
221  virtual void StartCall() = 0;
222 
223  protected:
224  void BindReactor(ClientUnaryReactor* reactor);
225 };
226 
227 namespace experimental {
229  public:
231  virtual void StartCall() = 0;
232 
233  protected:
234  void BindReactor(ClientSessionReactor* reactor);
235 };
236 } // namespace experimental
237 
238 // The following classes are the reactor interfaces that are to be implemented
239 // by the user. They are passed in to the library as an argument to a call on a
240 // stub (either a codegen-ed call or a generic call). The streaming RPC is
241 // activated by calling StartCall, possibly after initiating StartRead,
242 // StartWrite, or AddHold operations on the streaming object. Note that none of
243 // the classes are pure; all reactions have a default empty reaction so that the
244 // user class only needs to override those reactions that it cares about.
245 // The reactor must be passed to the stub invocation before any of the below
246 // operations can be called and its reactions will be invoked by the library in
247 // response to the completion of various operations. Reactions must not include
248 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
249 // waiting on condition variables). Reactions may be invoked concurrently,
250 // except that OnDone is called after all others (assuming proper API usage).
251 // The reactor may not be deleted until OnDone is called.
252 
254 template <class Request, class Response>
255 class ClientBidiReactor : public internal::ClientReactor {
256  public:
261  void StartCall() { stream_->StartCall(); }
262 
268  void StartRead(Response* resp) { stream_->Read(resp); }
269 
276  void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
277 
284  void StartWrite(const Request* req, grpc::WriteOptions options) {
285  stream_->Write(req, options);
286  }
287 
297  void StartWriteLast(const Request* req, grpc::WriteOptions options) {
298  StartWrite(req, options.set_last_message());
299  }
300 
306  void StartWritesDone() { stream_->WritesDone(); }
307 
330  void AddHold() { AddMultipleHolds(1); }
331  void AddMultipleHolds(int holds) {
332  ABSL_DCHECK_GT(holds, 0);
333  stream_->AddHold(holds);
334  }
335  void RemoveHold() { stream_->RemoveHold(); }
336 
348  void OnDone(const grpc::Status& /*s*/) override {}
349 
357  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
358 
363  virtual void OnReadDone(bool /*ok*/) {}
364 
370  virtual void OnWriteDone(bool /*ok*/) {}
371 
379  virtual void OnWritesDoneDone(bool /*ok*/) {}
380 
381  private:
382  friend class ClientCallbackReaderWriter<Request, Response>;
383  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
384  stream_ = stream;
385  }
387 };
388 
391 template <class Response>
392 class ClientReadReactor : public internal::ClientReactor {
393  public:
394  void StartCall() { reader_->StartCall(); }
395  void StartRead(Response* resp) { reader_->Read(resp); }
396 
397  void AddHold() { AddMultipleHolds(1); }
398  void AddMultipleHolds(int holds) {
399  ABSL_DCHECK_GT(holds, 0);
400  reader_->AddHold(holds);
401  }
402  void RemoveHold() { reader_->RemoveHold(); }
403 
404  void OnDone(const grpc::Status& /*s*/) override {}
405  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
406  virtual void OnReadDone(bool /*ok*/) {}
407 
408  private:
409  friend class ClientCallbackReader<Response>;
410  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
412 };
413 
416 template <class Request>
417 class ClientWriteReactor : public internal::ClientReactor {
418  public:
419  void StartCall() { writer_->StartCall(); }
420  void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
421  void StartWrite(const Request* req, grpc::WriteOptions options) {
422  writer_->Write(req, options);
423  }
424  void StartWriteLast(const Request* req, grpc::WriteOptions options) {
425  StartWrite(req, options.set_last_message());
426  }
427  void StartWritesDone() { writer_->WritesDone(); }
428 
429  void AddHold() { AddMultipleHolds(1); }
430  void AddMultipleHolds(int holds) {
431  ABSL_DCHECK_GT(holds, 0);
432  writer_->AddHold(holds);
433  }
434  void RemoveHold() { writer_->RemoveHold(); }
435 
436  void OnDone(const grpc::Status& /*s*/) override {}
437  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
438  virtual void OnWriteDone(bool /*ok*/) {}
439  virtual void OnWritesDoneDone(bool /*ok*/) {}
440 
441  private:
442  friend class ClientCallbackWriter<Request>;
443  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
444 
446 };
447 
460  public:
461  void StartCall() { call_->StartCall(); }
462  void OnDone(const grpc::Status& /*s*/) override {}
463  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
464 
465  private:
466  friend class ClientCallbackUnary;
467  void BindCall(ClientCallbackUnary* call) { call_ = call; }
468  ClientCallbackUnary* call_;
469 };
470 
471 // Define function out-of-line from class to avoid forward declaration issue
473  reactor->BindCall(this);
474 }
475 
476 namespace experimental {
481  public:
482  void StartCall() { call_->StartCall(); }
483  void OnDone(const grpc::Status& /*s*/) override {}
484  virtual void OnSessionReady(grpc::internal::Call call) = 0;
485  virtual void OnSessionAcknowledged(bool /*ok*/) {}
486 
487  private:
488  friend class ClientCallbackSession;
489  void BindCall(ClientCallbackSession* call) { call_ = call; }
490  ClientCallbackSession* call_;
491 };
492 
493 // Define function out-of-line from class to avoid forward declaration issue
495  reactor->BindCall(this);
496 }
497 } // namespace experimental
498 
499 namespace internal {
500 
501 // Forward declare factory classes for friendship
502 template <class Request, class Response>
503 class ClientCallbackReaderWriterFactory;
504 template <class Response>
505 class ClientCallbackReaderFactory;
506 template <class Request>
507 class ClientCallbackWriterFactory;
508 
509 template <class Request, class Response>
510 class ClientCallbackReaderWriterImpl
511  : public ClientCallbackReaderWriter<Request, Response> {
512  public:
513  // always allocated against a call arena, no memory free required
514  static void operator delete(void* /*ptr*/, std::size_t size) {
515  ABSL_CHECK_EQ(size, sizeof(ClientCallbackReaderWriterImpl));
516  }
517 
518  // This operator should never be called as the memory should be freed as part
519  // of the arena destruction. It only exists to provide a matching operator
520  // delete to the operator new so that some compilers will not complain (see
521  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
522  // there are no tests catching the compiler warning.
523  static void operator delete(void*, void*) { ABSL_CHECK(false); }
524 
525  void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
526  // This call initiates two batches, plus any backlog, each with a callback
527  // 1. Send initial metadata (unless corked) + recv initial metadata
528  // 2. Any read backlog
529  // 3. Any write backlog
530  // 4. Recv trailing metadata (unless corked)
531  if (!start_corked_) {
532  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
533  context_->initial_metadata_flags());
534  }
535 
536  start_ops_.FillOps(&call_);
537 
538  {
539  grpc::internal::MutexLock lock(&start_mu_);
540 
541  if (backlog_.read_ops) {
542  read_ops_.FillOps(&call_);
543  }
544  if (backlog_.write_ops) {
545  write_ops_.FillOps(&call_);
546  }
547  if (backlog_.writes_done_ops) {
548  writes_done_ops_.FillOps(&call_);
549  }
550  finish_ops_.FillOps(&call_);
551  // The last thing in this critical section is to set started_ so that it
552  // can be used lock-free as well.
553  started_.store(true, std::memory_order_release);
554  }
555  // MaybeFinish outside the lock to make sure that destruction of this object
556  // doesn't take place while holding the lock (which would cause the lock to
557  // be released after destruction)
558  this->MaybeFinish(/*from_reaction=*/false);
559  }
560 
561  void Read(Response* msg) override {
562  read_ops_.RecvMessage(msg);
563  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
564  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
565  grpc::internal::MutexLock lock(&start_mu_);
566  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
567  backlog_.read_ops = true;
568  return;
569  }
570  }
571  read_ops_.FillOps(&call_);
572  }
573 
574  void Write(const Request* msg, grpc::WriteOptions options)
575  ABSL_LOCKS_EXCLUDED(start_mu_) override {
576  if (options.is_last_message()) {
577  options.set_buffer_hint();
578  write_ops_.ClientSendClose();
579  }
580 
581  // TODO(vjpai): don't assert
582  ABSL_CHECK(
583  write_ops_.SendMessagePtr(msg, options, channel_->memory_allocator())
584  .ok());
585  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
586  if (GPR_UNLIKELY(corked_write_needed_)) {
587  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
588  context_->initial_metadata_flags());
589  corked_write_needed_ = false;
590  }
591 
592  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
593  grpc::internal::MutexLock lock(&start_mu_);
594  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
595  backlog_.write_ops = true;
596  return;
597  }
598  }
599  write_ops_.FillOps(&call_);
600  }
601  void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
602  writes_done_ops_.ClientSendClose();
603  writes_done_tag_.Set(
604  call_.call(),
605  [this](bool ok) {
606  reactor_->OnWritesDoneDone(ok);
607  MaybeFinish(/*from_reaction=*/true);
608  },
609  &writes_done_ops_, /*can_inline=*/false);
610  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
611  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
612  if (GPR_UNLIKELY(corked_write_needed_)) {
613  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
614  context_->initial_metadata_flags());
615  corked_write_needed_ = false;
616  }
617  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
618  grpc::internal::MutexLock lock(&start_mu_);
619  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
620  backlog_.writes_done_ops = true;
621  return;
622  }
623  }
624  writes_done_ops_.FillOps(&call_);
625  }
626 
627  void AddHold(int holds) override {
628  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
629  }
630  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
631 
632  private:
633  friend class ClientCallbackReaderWriterFactory<Request, Response>;
634 
637  grpc::ClientContext* context,
639  : channel_(channel),
640  context_(context),
641  call_(call),
642  reactor_(reactor),
643  start_corked_(context_->initial_metadata_corked_),
644  corked_write_needed_(start_corked_) {
645  this->BindReactor(reactor);
646 
647  // Set up the unchanging parts of the start, read, and write tags and ops.
648  start_tag_.Set(
649  call_.call(),
650  [this](bool ok) {
651  reactor_->OnReadInitialMetadataDone(
652  ok && !reactor_->InternalTrailersOnly(call_.call()));
653  MaybeFinish(/*from_reaction=*/true);
654  },
655  &start_ops_, /*can_inline=*/false);
656  start_ops_.RecvInitialMetadata(context_);
657  start_ops_.set_core_cq_tag(&start_tag_);
658 
659  write_tag_.Set(
660  call_.call(),
661  [this](bool ok) {
662  reactor_->OnWriteDone(ok);
663  MaybeFinish(/*from_reaction=*/true);
664  },
665  &write_ops_, /*can_inline=*/false);
666  write_ops_.set_core_cq_tag(&write_tag_);
667 
668  read_tag_.Set(
669  call_.call(),
670  [this](bool ok) {
671  reactor_->OnReadDone(ok);
672  MaybeFinish(/*from_reaction=*/true);
673  },
674  &read_ops_, /*can_inline=*/false);
675  read_ops_.set_core_cq_tag(&read_tag_);
676 
677  // Also set up the Finish tag and op set.
678  finish_tag_.Set(
679  call_.call(),
680  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
681  &finish_ops_,
682  /*can_inline=*/false);
683  finish_ops_.ClientRecvStatus(context_, &finish_status_);
684  finish_ops_.set_core_cq_tag(&finish_tag_);
685  }
686 
687  // MaybeFinish can be called from reactions or from user-initiated operations
688  // like StartCall or RemoveHold. If this is the last operation or hold on this
689  // object, it will invoke the OnDone reaction. If MaybeFinish was called from
690  // a reaction, it can call OnDone directly. If not, it would need to schedule
691  // OnDone onto an EventEngine thread to avoid the possibility of deadlocking
692  // with any locks in the user code that invoked it.
693  void MaybeFinish(bool from_reaction) {
694  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
695  1, std::memory_order_acq_rel) == 1)) {
696  grpc::Status s = std::move(finish_status_);
697  auto* reactor = reactor_;
698  auto* call = call_.call();
699  this->~ClientCallbackReaderWriterImpl();
700  if (GPR_LIKELY(from_reaction)) {
701  grpc_call_unref(call);
702  reactor->OnDone(s);
703  } else {
705  call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
706  grpc_call_unref(call);
707  }
708  }
709  }
710 
711  grpc::ChannelInterface* const channel_;
712  grpc::ClientContext* const context_;
713  grpc::internal::Call call_;
714  ClientBidiReactor<Request, Response>* const reactor_;
715 
718  start_ops_;
720  const bool start_corked_;
721  bool corked_write_needed_; // no lock needed since only accessed in
722  // Write/WritesDone which cannot be concurrent
723 
726  grpc::Status finish_status_;
727 
731  write_ops_;
733 
736  writes_done_ops_;
738 
740  read_ops_;
742 
743  struct StartCallBacklog {
744  bool write_ops = false;
745  bool writes_done_ops = false;
746  bool read_ops = false;
747  };
748  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
749 
750  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
751  std::atomic<intptr_t> callbacks_outstanding_{3};
752  std::atomic_bool started_{false};
753  grpc::internal::Mutex start_mu_;
754 };
755 
756 template <class Request, class Response>
757 class ClientCallbackReaderWriterFactory {
758  public:
759  static void Create(grpc::ChannelInterface* channel,
760  const grpc::internal::RpcMethod& method,
761  grpc::ClientContext* context,
763  grpc::internal::Call call =
764  channel->CreateCall(method, context, channel->CallbackCQ());
765 
766  grpc_call_ref(call.call());
770  context, reactor);
771  }
772 };
773 
774 template <class Response>
775 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
776  public:
777  // always allocated against a call arena, no memory free required
778  static void operator delete(void* /*ptr*/, std::size_t size) {
779  ABSL_CHECK_EQ(size, sizeof(ClientCallbackReaderImpl));
780  }
781 
782  // This operator should never be called as the memory should be freed as part
783  // of the arena destruction. It only exists to provide a matching operator
784  // delete to the operator new so that some compilers will not complain (see
785  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
786  // there are no tests catching the compiler warning.
787  static void operator delete(void*, void*) { ABSL_CHECK(false); }
788 
789  void StartCall() override {
790  // This call initiates two batches, plus any backlog, each with a callback
791  // 1. Send initial metadata (unless corked) + recv initial metadata
792  // 2. Any backlog
793  // 3. Recv trailing metadata
794 
795  start_tag_.Set(
796  call_.call(),
797  [this](bool ok) {
798  reactor_->OnReadInitialMetadataDone(
799  ok && !reactor_->InternalTrailersOnly(call_.call()));
800  MaybeFinish(/*from_reaction=*/true);
801  },
802  &start_ops_, /*can_inline=*/false);
803  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
804  context_->initial_metadata_flags());
805  start_ops_.RecvInitialMetadata(context_);
806  start_ops_.set_core_cq_tag(&start_tag_);
807  start_ops_.FillOps(&call_);
808 
809  // Also set up the read tag so it doesn't have to be set up each time
810  read_tag_.Set(
811  call_.call(),
812  [this](bool ok) {
813  reactor_->OnReadDone(ok);
814  MaybeFinish(/*from_reaction=*/true);
815  },
816  &read_ops_, /*can_inline=*/false);
817  read_ops_.set_core_cq_tag(&read_tag_);
818 
819  {
820  grpc::internal::MutexLock lock(&start_mu_);
821  if (backlog_.read_ops) {
822  read_ops_.FillOps(&call_);
823  }
824  started_.store(true, std::memory_order_release);
825  }
826 
827  finish_tag_.Set(
828  call_.call(),
829  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
830  &finish_ops_, /*can_inline=*/false);
831  finish_ops_.ClientRecvStatus(context_, &finish_status_);
832  finish_ops_.set_core_cq_tag(&finish_tag_);
833  finish_ops_.FillOps(&call_);
834  }
835 
836  void Read(Response* msg) override {
837  read_ops_.RecvMessage(msg);
838  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
839  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
840  grpc::internal::MutexLock lock(&start_mu_);
841  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
842  backlog_.read_ops = true;
843  return;
844  }
845  }
846  read_ops_.FillOps(&call_);
847  }
848 
849  void AddHold(int holds) override {
850  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
851  }
852  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
853 
854  private:
855  friend class ClientCallbackReaderFactory<Response>;
856 
857  template <class Request>
860  grpc::ClientContext* context, Request* request,
862  : context_(context), call_(call), reactor_(reactor) {
863  this->BindReactor(reactor);
864  // TODO(vjpai): don't assert
865  ABSL_CHECK(
866  start_ops_.SendMessagePtr(request, channel->memory_allocator()).ok());
867  start_ops_.ClientSendClose();
868  }
869 
870  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
871  void MaybeFinish(bool from_reaction) {
872  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
873  1, std::memory_order_acq_rel) == 1)) {
874  grpc::Status s = std::move(finish_status_);
875  auto* reactor = reactor_;
876  auto* call = call_.call();
877  this->~ClientCallbackReaderImpl();
878  if (GPR_LIKELY(from_reaction)) {
879  grpc_call_unref(call);
880  reactor->OnDone(s);
881  } else {
883  call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
884  grpc_call_unref(call);
885  }
886  }
887  }
888 
889  grpc::ClientContext* const context_;
890  grpc::internal::Call call_;
891  ClientReadReactor<Response>* const reactor_;
892 
897  start_ops_;
899 
902  grpc::Status finish_status_;
903 
905  read_ops_;
907 
908  struct StartCallBacklog {
909  bool read_ops = false;
910  };
911  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
912 
913  // Minimum of 2 callbacks to pre-register for start and finish
914  std::atomic<intptr_t> callbacks_outstanding_{2};
915  std::atomic_bool started_{false};
916  grpc::internal::Mutex start_mu_;
917 };
918 
919 template <class Response>
920 class ClientCallbackReaderFactory {
921  public:
922  template <class Request>
923  static void Create(grpc::ChannelInterface* channel,
924  const grpc::internal::RpcMethod& method,
925  grpc::ClientContext* context, const Request* request,
926  ClientReadReactor<Response>* reactor) {
927  grpc::internal::Call call =
928  channel->CreateCall(method, context, channel->CallbackCQ());
929 
930  grpc_call_ref(call.call());
931  new (grpc_call_arena_alloc(call.call(),
933  ClientCallbackReaderImpl<Response>(channel, call, context, request,
934  reactor);
935  }
936 };
937 
938 template <class Request>
939 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
940  public:
941  // always allocated against a call arena, no memory free required
942  static void operator delete(void* /*ptr*/, std::size_t size) {
943  ABSL_CHECK_EQ(size, sizeof(ClientCallbackWriterImpl));
944  }
945 
946  // This operator should never be called as the memory should be freed as part
947  // of the arena destruction. It only exists to provide a matching operator
948  // delete to the operator new so that some compilers will not complain (see
949  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
950  // there are no tests catching the compiler warning.
951  static void operator delete(void*, void*) { ABSL_CHECK(false); }
952 
953  void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
954  // This call initiates two batches, plus any backlog, each with a callback
955  // 1. Send initial metadata (unless corked) + recv initial metadata
956  // 2. Any backlog
957  // 3. Recv trailing metadata
958 
959  if (!start_corked_) {
960  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
961  context_->initial_metadata_flags());
962  }
963  start_ops_.FillOps(&call_);
964 
965  {
966  grpc::internal::MutexLock lock(&start_mu_);
967 
968  if (backlog_.write_ops) {
969  write_ops_.FillOps(&call_);
970  }
971  if (backlog_.writes_done_ops) {
972  writes_done_ops_.FillOps(&call_);
973  }
974  finish_ops_.FillOps(&call_);
975  // The last thing in this critical section is to set started_ so that it
976  // can be used lock-free as well.
977  started_.store(true, std::memory_order_release);
978  }
979  // MaybeFinish outside the lock to make sure that destruction of this object
980  // doesn't take place while holding the lock (which would cause the lock to
981  // be released after destruction)
982  this->MaybeFinish(/*from_reaction=*/false);
983  }
984 
985  void Write(const Request* msg, grpc::WriteOptions options)
986  ABSL_LOCKS_EXCLUDED(start_mu_) override {
987  if (GPR_UNLIKELY(options.is_last_message())) {
988  options.set_buffer_hint();
989  write_ops_.ClientSendClose();
990  }
991 
992  // TODO(vjpai): don't assert
993  ABSL_CHECK(
994  write_ops_.SendMessagePtr(msg, options, channel_->memory_allocator())
995  .ok());
996  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
997 
998  if (GPR_UNLIKELY(corked_write_needed_)) {
999  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1000  context_->initial_metadata_flags());
1001  corked_write_needed_ = false;
1002  }
1003 
1004  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
1005  grpc::internal::MutexLock lock(&start_mu_);
1006  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
1007  backlog_.write_ops = true;
1008  return;
1009  }
1010  }
1011  write_ops_.FillOps(&call_);
1012  }
1013 
1014  void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
1015  writes_done_ops_.ClientSendClose();
1016  writes_done_tag_.Set(
1017  call_.call(),
1018  [this](bool ok) {
1019  reactor_->OnWritesDoneDone(ok);
1020  MaybeFinish(/*from_reaction=*/true);
1021  },
1022  &writes_done_ops_, /*can_inline=*/false);
1023  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
1024  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
1025 
1026  if (GPR_UNLIKELY(corked_write_needed_)) {
1027  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1028  context_->initial_metadata_flags());
1029  corked_write_needed_ = false;
1030  }
1031 
1032  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
1033  grpc::internal::MutexLock lock(&start_mu_);
1034  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
1035  backlog_.writes_done_ops = true;
1036  return;
1037  }
1038  }
1039  writes_done_ops_.FillOps(&call_);
1040  }
1041 
1042  void AddHold(int holds) override {
1043  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
1044  }
1045  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
1046 
1047  private:
1048  friend class ClientCallbackWriterFactory<Request>;
1049 
1050  template <class Response>
1052  grpc::internal::Call call,
1053  grpc::ClientContext* context, Response* response,
1054  ClientWriteReactor<Request>* reactor)
1055  : channel_(channel),
1056  context_(context),
1057  call_(call),
1058  reactor_(reactor),
1059  start_corked_(context_->initial_metadata_corked_),
1060  corked_write_needed_(start_corked_) {
1061  this->BindReactor(reactor);
1062 
1063  // Set up the unchanging parts of the start and write tags and ops.
1064  start_tag_.Set(
1065  call_.call(),
1066  [this](bool ok) {
1067  reactor_->OnReadInitialMetadataDone(
1068  ok && !reactor_->InternalTrailersOnly(call_.call()));
1069  MaybeFinish(/*from_reaction=*/true);
1070  },
1071  &start_ops_, /*can_inline=*/false);
1072  start_ops_.RecvInitialMetadata(context_);
1073  start_ops_.set_core_cq_tag(&start_tag_);
1074 
1075  write_tag_.Set(
1076  call_.call(),
1077  [this](bool ok) {
1078  reactor_->OnWriteDone(ok);
1079  MaybeFinish(/*from_reaction=*/true);
1080  },
1081  &write_ops_, /*can_inline=*/false);
1082  write_ops_.set_core_cq_tag(&write_tag_);
1083 
1084  // Also set up the Finish tag and op set.
1085  finish_ops_.RecvMessage(response);
1086  finish_ops_.AllowNoMessage();
1087  finish_tag_.Set(
1088  call_.call(),
1089  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1090  &finish_ops_,
1091  /*can_inline=*/false);
1092  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1093  finish_ops_.set_core_cq_tag(&finish_tag_);
1094  }
1095 
1096  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1097  void MaybeFinish(bool from_reaction) {
1098  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1099  1, std::memory_order_acq_rel) == 1)) {
1100  grpc::Status s = std::move(finish_status_);
1101  auto* reactor = reactor_;
1102  auto* call = call_.call();
1103  this->~ClientCallbackWriterImpl();
1104  if (GPR_LIKELY(from_reaction)) {
1105  grpc_call_unref(call);
1106  reactor->OnDone(s);
1107  } else {
1109  call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
1110  grpc_call_unref(call);
1111  }
1112  }
1113  }
1114 
1115  grpc::ChannelInterface* const channel_;
1116  grpc::ClientContext* const context_;
1117  grpc::internal::Call call_;
1118  ClientWriteReactor<Request>* const reactor_;
1119 
1122  start_ops_;
1124  const bool start_corked_;
1125  bool corked_write_needed_; // no lock needed since only accessed in
1126  // Write/WritesDone which cannot be concurrent
1127 
1130  finish_ops_;
1132  grpc::Status finish_status_;
1133 
1137  write_ops_;
1139 
1142  writes_done_ops_;
1143  grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1144 
1145  struct StartCallBacklog {
1146  bool write_ops = false;
1147  bool writes_done_ops = false;
1148  };
1149  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1150 
1151  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1152  std::atomic<intptr_t> callbacks_outstanding_{3};
1153  std::atomic_bool started_{false};
1154  grpc::internal::Mutex start_mu_;
1155 };
1156 
1157 template <class Request>
1158 class ClientCallbackWriterFactory {
1159  public:
1160  template <class Response>
1161  static void Create(grpc::ChannelInterface* channel,
1162  const grpc::internal::RpcMethod& method,
1163  grpc::ClientContext* context, Response* response,
1164  ClientWriteReactor<Request>* reactor) {
1165  grpc::internal::Call call =
1166  channel->CreateCall(method, context, channel->CallbackCQ());
1167 
1168  grpc_call_ref(call.call());
1169  new (grpc_call_arena_alloc(call.call(),
1171  ClientCallbackWriterImpl<Request>(channel, call, context, response,
1172  reactor);
1173  }
1174 };
1175 
1177  public:
1178  // always allocated against a call arena, no memory free required
1179  static void operator delete(void* /*ptr*/, std::size_t size) {
1180  ABSL_CHECK_EQ(size, sizeof(ClientCallbackUnaryImpl));
1181  }
1182 
1183  // This operator should never be called as the memory should be freed as part
1184  // of the arena destruction. It only exists to provide a matching operator
1185  // delete to the operator new so that some compilers will not complain (see
1186  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1187  // there are no tests catching the compiler warning.
1188  static void operator delete(void*, void*) { ABSL_CHECK(false); }
1189 
1190  void StartCall() override {
1191  // This call initiates two batches, each with a callback
1192  // 1. Send initial metadata + write + writes done + recv initial metadata
1193  // 2. Read message, recv trailing metadata
1194 
1195  start_tag_.Set(
1196  call_.call(),
1197  [this](bool ok) {
1198  reactor_->OnReadInitialMetadataDone(
1199  ok && !reactor_->InternalTrailersOnly(call_.call()));
1200  MaybeFinish();
1201  },
1202  &start_ops_, /*can_inline=*/false);
1203  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1204  context_->initial_metadata_flags());
1205  start_ops_.RecvInitialMetadata(context_);
1206  start_ops_.set_core_cq_tag(&start_tag_);
1207  start_ops_.FillOps(&call_);
1208 
1209  finish_tag_.Set(
1210  call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1211  /*can_inline=*/false);
1212  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1213  finish_ops_.set_core_cq_tag(&finish_tag_);
1214  finish_ops_.FillOps(&call_);
1215  }
1216 
1217  private:
1219 
1220  template <class Request, class Response>
1222  grpc::internal::Call call,
1223  grpc::ClientContext* context, Request* request,
1224  Response* response, ClientUnaryReactor* reactor)
1225  : context_(context), call_(call), reactor_(reactor) {
1226  this->BindReactor(reactor);
1227 
1228  // TODO(vjpai): don't assert
1229  ABSL_CHECK(
1230  start_ops_.SendMessagePtr(request, channel->memory_allocator()).ok());
1231  start_ops_.ClientSendClose();
1232  finish_ops_.RecvMessage(response);
1233  finish_ops_.AllowNoMessage();
1234  }
1235 
1236  // In the unary case, MaybeFinish is only ever invoked from a
1237  // library-initiated reaction, so it will just directly call OnDone if this is
1238  // the last reaction for this RPC.
1239  void MaybeFinish() {
1240  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1241  1, std::memory_order_acq_rel) == 1)) {
1242  grpc::Status s = std::move(finish_status_);
1243  auto* reactor = reactor_;
1244  auto* call = call_.call();
1245  this->~ClientCallbackUnaryImpl();
1246  grpc_call_unref(call);
1247  reactor->OnDone(s);
1248  }
1249  }
1250 
1251  grpc::ClientContext* const context_;
1252  grpc::internal::Call call_;
1253  ClientUnaryReactor* const reactor_;
1254 
1259  start_ops_;
1261 
1264  finish_ops_;
1266  grpc::Status finish_status_;
1267 
1268  // This call will have 2 callbacks: start and finish
1269  std::atomic<intptr_t> callbacks_outstanding_{2};
1270 };
1271 
1273  public:
1274  template <class Request, class Response, class BaseRequest = Request,
1275  class BaseResponse = Response>
1276  static void Create(grpc::ChannelInterface* channel,
1277  const grpc::internal::RpcMethod& method,
1278  grpc::ClientContext* context, const Request* request,
1279  Response* response, ClientUnaryReactor* reactor) {
1280  grpc::internal::Call call =
1281  channel->CreateCall(method, context, channel->CallbackCQ());
1282 
1283  grpc_call_ref(call.call());
1284 
1285  new (grpc_call_arena_alloc(call.call(), sizeof(ClientCallbackUnaryImpl)))
1286  ClientCallbackUnaryImpl(channel, call, context,
1287  static_cast<const BaseRequest*>(request),
1288  static_cast<BaseResponse*>(response), reactor);
1289  }
1290 };
1291 
1292 } // namespace internal
1293 
1294 namespace experimental {
1295 namespace internal {
1297  public:
1298  // always allocated against a call arena, no memory free required
1299  static void operator delete(void* /*ptr*/, std::size_t size) {
1300  ABSL_CHECK_EQ(size, sizeof(ClientCallbackSessionImpl));
1301  }
1302 
1303  // This operator should never be called as the memory should be freed as part
1304  // of the arena destruction. It only exists to provide a matching operator
1305  // delete to the operator new so that some compilers will not complain (see
1306  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1307  // there are no tests catching the compiler warning.
1308  static void operator delete(void*, void*) { ABSL_CHECK(false); }
1309 
1310  void StartCall() override {
1311  send_tag_.Set(
1312  call_.call(), [this](bool ok) { OnSendDone(ok); }, &send_ops_,
1313  /*can_inline=*/false);
1314 
1315  send_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1316  context_->initial_metadata_flags());
1317  send_ops_.set_core_cq_tag(&send_tag_);
1318  send_ops_.FillOps(&call_);
1319 
1320  // Also start the receive status op
1321  meta_tag_.Set(
1322  call_.call(), [this](bool ok) { OnRecvInitialMetadataDone(ok); },
1323  &meta_ops_,
1324  /*can_inline=*/false);
1325  meta_ops_.RecvInitialMetadata(context_);
1326  meta_ops_.set_core_cq_tag(&meta_tag_);
1327  meta_ops_.FillOps(&call_);
1328  finish_tag_.Set(
1329  call_.call(), [this](bool ok) { OnFinishDone(ok); }, &finish_ops_,
1330  /*can_inline=*/false);
1331  finish_ops_.ClientRecvStatus(context_, &status_);
1332  finish_ops_.set_core_cq_tag(&finish_tag_);
1333  finish_ops_.FillOps(&call_);
1334  }
1335 
1336  private:
1338 
1339  template <class Request>
1341  grpc::internal::Call call,
1342  grpc::ClientContext* context,
1343  const Request* request,
1345  : context_(context), call_(call), reactor_(reactor) {
1346  this->BindReactor(reactor);
1347  ABSL_CHECK(
1348  send_ops_.SendMessagePtr(request, channel->memory_allocator()).ok());
1349  }
1350 
1351  void OnSendDone(bool ok) {
1352  if (ok) {
1353  reactor_->OnSessionReady(call_);
1354  }
1355  MaybeFinish();
1356  }
1357 
1358  void OnRecvInitialMetadataDone(bool ok) {
1359  reactor_->OnSessionAcknowledged(
1360  ok && !reactor_->InternalTrailersOnly(call_.call()));
1361  MaybeFinish();
1362  }
1363 
1364  void OnFinishDone(bool /*ok*/) { MaybeFinish(); }
1365 
1366  void MaybeFinish() {
1367  if (--ops_outstanding_ == 0) {
1368  grpc::Status s = std::move(status_);
1369  auto* reactor = reactor_;
1370  auto* call = call_.call();
1371  this->~ClientCallbackSessionImpl();
1373  [reactor, s]() { reactor->OnDone(s); });
1374  grpc_call_unref(call);
1375  }
1376  }
1377 
1378  grpc::ClientContext* context_;
1379  grpc::internal::Call call_;
1383  send_ops_;
1386  meta_ops_;
1390  grpc::Status status_;
1391  std::atomic<int> ops_outstanding_{3};
1392 };
1393 
1394 // ClientCallbackSessionFactory is an experimental API for creating a
1395 // ClientCallbackSession. It is not part of the public API and may be removed or
1396 // changed without notice.
1398  private:
1399  template <class Request, class BaseRequest = Request>
1400  static void Create(grpc::ChannelInterface* channel,
1401  const grpc::internal::RpcMethod& method,
1402  grpc::ClientContext* context, const Request* request,
1404  grpc::internal::Call call =
1405  channel->CreateCall(method, context, channel->CallbackCQ());
1406 
1407  grpc_call_ref(call.call());
1408 
1410  ClientCallbackSessionImpl(channel, call, context,
1411  static_cast<const BaseRequest*>(request),
1412  reactor);
1413  }
1414 
1415  template <class RequestType, class ResponseType>
1417 };
1418 } // namespace internal
1419 } // namespace experimental
1420 } // namespace grpc
1421 
1422 #endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H
grpc::internal::ClientCallbackUnaryImpl
Definition: client_callback.h:1176
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:153
grpc::ClientReadReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:405
grpc::ClientWriteReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:437
grpc::ClientWriteReactor::StartWritesDone
void StartWritesDone()
Definition: client_callback.h:427
grpc::internal::CallOpRecvInitialMetadata
Definition: call_op_set.h:723
grpc::ClientCallbackWriter::~ClientCallbackWriter
virtual ~ClientCallbackWriter()
Definition: client_callback.h:200
grpc::internal::ClientCallbackReaderWriterImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:561
grpc::internal::ClientCallbackWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:1042
grpc::internal::ClientCallbackReaderWriterImpl::StartCall
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:525
grpc::internal::CallbackUnaryCallImpl::CallbackUnaryCallImpl
CallbackUnaryCallImpl(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(grpc::Status)> &&on_completion)
Definition: client_callback.h:73
grpc::internal::CallOpClientSendClose
Definition: call_op_set.h:621
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:276
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::internal::CallOpGenericRecvMessage
Definition: call_op_set.h:528
grpc::ClientBidiReactor::AddHold
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:330
grpc::internal::ClientCallbackReaderWriterImpl
Definition: client_context.h:72
grpc::ClientReadReactor::OnReadDone
virtual void OnReadDone(bool)
Definition: client_callback.h:406
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:97
grpc::internal::ClientCallbackReaderWriterImpl::Write
void Write(const Request *msg, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:574
grpc::internal::ClientCallbackReaderImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:852
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:287
grpc::WriteOptions::set_last_message
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:156
grpc::ClientUnaryReactor
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:459
grpc::ClientCallbackReaderWriter::Read
virtual void Read(Response *resp)=0
grpc::internal::ClientCallbackReaderWriterFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:759
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req, grpc::WriteOptions options)
Definition: client_callback.h:421
grpc::internal::CallOpSet::FillOps
void FillOps(Call *call) override
Definition: call_op_set.h:895
grpc::ClientWriteReactor::StartCall
void StartCall()
Definition: client_callback.h:419
grpc::ClientWriteReactor::OnDone
void OnDone(const grpc::Status &) override
Definition: client_callback.h:436
grpc::internal::ClientReactor::OnDone
virtual void OnDone(const grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
grpc::internal::ClientCallbackReaderWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:630
grpc::ClientReadReactor::OnDone
void OnDone(const grpc::Status &) override
Definition: client_callback.h:404
grpc::experimental::ClientCallbackSession
Definition: client_callback.h:228
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:34
grpc::ClientCallbackReaderWriter::WritesDone
virtual void WritesDone()=0
grpc::internal::ClientCallbackWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:1045
status.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:217
grpc::experimental::ClientCallbackSession::StartCall
virtual void StartCall()=0
grpc::internal::ClientCallbackWriterFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:1161
grpc::ClientBidiReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback.h:357
grpc::internal::CallbackWithStatusTag
Definition: callback_common.h:74
grpc::ClientCallbackReaderWriter::BindReactor
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:177
grpc::internal::CallbackUnaryCall
void CallbackUnaryCall(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(grpc::Status)> &&on_completion)
Perform a callback-based unary call.
Definition: client_callback.h:57
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:124
grpc::ClientCallbackUnary::BindReactor
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback.h:472
grpc_call_ref
GRPCAPI void grpc_call_ref(grpc_call *call)
Ref a call.
grpc::ClientBidiReactor::StartRead
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:268
grpc::ClientReadReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:398
grpc::internal::ClientCallbackWriterImpl::StartCall
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:953
grpc::internal::ClientCallbackWriterImpl::Write
void Write(const Request *msg, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:985
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req, grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:284
grpc::ClientBidiReactor::OnDone
void OnDone(const grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback.h:348
grpc::experimental::ClientSessionReactor::StartCall
void StartCall()
Definition: client_callback.h:482
grpc::internal::ClientReactor::~ClientReactor
virtual ~ClientReactor()=default
grpc::ClientCallbackUnary::StartCall
virtual void StartCall()=0
grpc::ClientCallbackReaderWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::OnReadDone
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:363
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: port_platform.h:861
grpc::ClientCallbackWriter::WritesDone
virtual void WritesDone()=0
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Unref a call.
grpc::internal::ClientCallbackWriterFactory
Definition: channel_interface.h:52
grpc::ClientReadReactor::StartCall
void StartCall()
Definition: client_callback.h:394
callback_common.h
grpc::ClientBidiReactor::OnWriteDone
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback.h:370
grpc::ClientContext
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:203
grpc::internal::ClientCallbackReaderImpl::StartCall
void StartCall() override
Definition: client_callback.h:789
grpc::internal::ClientCallbackReaderWriterFactory
Definition: channel_interface.h:48
grpc.h
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:68
grpc::internal::CallbackUnaryCallImpl
Definition: client_context.h:69
grpc::ClientUnaryReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:463
grpc::ClientCallbackWriter::BindReactor
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:213
grpc::ClientCallbackReader::RemoveHold
virtual void RemoveHold()=0
grpc::ClientCallbackReaderWriter
Definition: client_callback.h:166
grpc::ClientBidiReactor::StartWritesDone
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:306
grpc::ClientCallbackReader::Read
virtual void Read(Response *resp)=0
grpc::ClientReadReactor::StartRead
void StartRead(Response *resp)
Definition: client_callback.h:395
channel_interface.h
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:79
grpc::experimental::ClientSessionReactor::OnSessionReady
virtual void OnSessionReady(grpc::internal::Call call)=0
grpc::internal::ClientReactor
Definition: client_callback.h:120
grpc::experimental::internal::ClientCallbackSessionImpl::StartCall
void StartCall() override
Definition: client_callback.h:1310
grpc::internal::ClientCallbackUnaryFactory
Definition: client_callback.h:1272
grpc::internal::ClientCallbackReaderImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:836
grpc::ClientWriteReactor::StartWriteLast
void StartWriteLast(const Request *req, grpc::WriteOptions options)
Definition: client_callback.h:424
grpc::internal::ClientCallbackWriterImpl
Definition: client_context.h:76
grpc::ClientCallbackReaderWriter::Write
virtual void Write(const Request *req, grpc::WriteOptions options)=0
grpc::ClientWriteReactor
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:159
grpc_call_run_in_event_engine
void grpc_call_run_in_event_engine(const grpc_call *call, absl::AnyInvocable< void()> cb)
grpc::experimental::ClientSessionReactor
ClientSessionReactor is a reactor-style interface for a session RPC.
Definition: client_callback.h:480
grpc::ClientCallbackUnary
Definition: client_callback.h:218
grpc::ClientBidiReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:331
grpc::experimental::ClientCallbackSession::~ClientCallbackSession
virtual ~ClientCallbackSession()
Definition: client_callback.h:230
grpc::ClientReadReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:402
grpc::internal::ClientCallbackUnaryFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback.h:1276
grpc::ClientBidiReactor::StartWriteLast
void StartWriteLast(const Request *req, grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:297
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc::ClientCallbackWriter::RemoveHold
virtual void RemoveHold()=0
grpc::ClientBidiReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:379
grpc::internal::ClientCallbackWriterImpl::WritesDone
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:1014
grpc::ClientCallbackReader::~ClientCallbackReader
virtual ~ClientCallbackReader()
Definition: client_callback.h:185
grpc::experimental::GenericStubSession
Definition: client_callback.h:147
grpc::internal::CallbackWithSuccessTag::Set
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:180
grpc::internal::ClientCallbackReaderImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:849
grpc::ClientCallbackReader::BindReactor
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:192
grpc::internal::MutexLock
Definition: sync.h:80
grpc::ClientCallbackWriter
Definition: client_callback.h:198
grpc::ClientCallbackWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:335
grpc::ClientCallbackReaderWriter::~ClientCallbackReaderWriter
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:168
grpc::ClientCallbackReaderWriter::RemoveHold
virtual void RemoveHold()=0
grpc::ClientCallbackWriter::Write
void Write(const Request *req)
Definition: client_callback.h:202
config.h
grpc::ClientCallbackReader::StartCall
virtual void StartCall()=0
call.h
grpc::ClientWriteReactor::AddHold
void AddHold()
Definition: client_callback.h:429
grpc::experimental::ClientCallbackSession::BindReactor
void BindReactor(ClientSessionReactor *reactor)
Definition: client_callback.h:494
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:55
grpc::ClientUnaryReactor::StartCall
void StartCall()
Definition: client_callback.h:461
grpc::ClientCallbackReaderWriter::StartCall
virtual void StartCall()=0
call_op_set.h
call.h
grpc::internal::CallOpClientRecvStatus
Definition: call_op_set.h:771
grpc::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:104
grpc::ClientBidiReactor
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:155
grpc::internal::ClientCallbackReaderFactory
Definition: channel_interface.h:50
grpc::experimental::internal::ClientCallbackSessionImpl
Definition: client_callback.h:1296
grpc::experimental::internal::ClientCallbackSessionFactory
Definition: client_callback.h:1397
grpc::ClientCallbackWriter::WriteLast
void WriteLast(const Request *req, grpc::WriteOptions options)
Definition: client_callback.h:204
grpc::internal::ClientCallbackReaderImpl
Definition: client_context.h:74
grpc::internal::ClientReactor::InternalTrailersOnly
virtual bool InternalTrailersOnly(const grpc_call *call) const
InternalTrailersOnly is not part of the API and is not meant to be overridden.
grpc::ClientCallbackWriter::StartCall
virtual void StartCall()=0
grpc::internal::ClientCallbackUnaryImpl::StartCall
void StartCall() override
Definition: client_callback.h:1190
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req)
Definition: client_callback.h:420
grpc::experimental::ClientSessionReactor::OnSessionAcknowledged
virtual void OnSessionAcknowledged(bool)
Definition: client_callback.h:485
grpc::ClientBidiReactor::StartCall
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback.h:261
grpc::ClientWriteReactor::OnWriteDone
virtual void OnWriteDone(bool)
Definition: client_callback.h:438
grpc::internal::CallOpRecvMessage
Definition: call_op_set.h:427
grpc::internal::Mutex
Definition: sync.h:57
grpc::internal::RpcMethod
Descriptor of an RPC method.
Definition: rpc_method.h:29
grpc::ClientCallbackReader::AddHold
virtual void AddHold(int holds)=0
grpc::ClientWriteReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:430
grpc::ClientCallbackReader
Definition: client_callback.h:183
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: port_platform.h:860
grpc::ClientCallbackUnary::~ClientCallbackUnary
virtual ~ClientCallbackUnary()
Definition: client_callback.h:220
grpc::internal::CallOpSet::set_core_cq_tag
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:947
sync.h
grpc::internal::ClientCallbackReaderWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:627
grpc::experimental::ClientSessionReactor::OnDone
void OnDone(const grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:483
grpc::ClientWriteReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:434
grpc::ClientReadReactor
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:157
grpc::internal::ClientCallbackReaderWriterImpl::WritesDone
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:601
grpc::ClientReadReactor::AddHold
void AddHold()
Definition: client_callback.h:397
grpc::ClientUnaryReactor::OnDone
void OnDone(const grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:462
grpc::ChannelInterface::memory_allocator
virtual grpc_event_engine::experimental::MemoryAllocator * memory_allocator() const
Definition: channel_interface.h:113
grpc::internal::ClientCallbackReaderFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback.h:923
grpc::ClientWriteReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Definition: client_callback.h:439