GRPC C++  1.58.0
client_callback.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
20 #define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
21 
22 #include <atomic>
23 #include <functional>
24 
25 #include <grpc/grpc.h>
26 #include <grpc/support/log.h>
27 #include <grpcpp/impl/call.h>
29 #include <grpcpp/impl/sync.h>
31 #include <grpcpp/support/config.h>
32 #include <grpcpp/support/status.h>
33 
34 namespace grpc {
35 class Channel;
36 class ClientContext;
37 
38 namespace internal {
39 class RpcMethod;
40 
47 template <class InputMessage, class OutputMessage,
48  class BaseInputMessage = InputMessage,
49  class BaseOutputMessage = OutputMessage>
51  const grpc::internal::RpcMethod& method,
52  grpc::ClientContext* context,
53  const InputMessage* request, OutputMessage* result,
54  std::function<void(grpc::Status)> on_completion) {
55  static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
56  "Invalid input message specification");
57  static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
58  "Invalid output message specification");
60  channel, method, context, request, result, on_completion);
61 }
62 
63 template <class InputMessage, class OutputMessage>
64 class CallbackUnaryCallImpl {
65  public:
67  const grpc::internal::RpcMethod& method,
68  grpc::ClientContext* context,
69  const InputMessage* request, OutputMessage* result,
70  std::function<void(grpc::Status)> on_completion) {
71  grpc::CompletionQueue* cq = channel->CallbackCQ();
72  GPR_ASSERT(cq != nullptr);
73  grpc::internal::Call call(channel->CreateCall(method, context, cq));
74 
75  using FullCallOpSet = grpc::internal::CallOpSet<
82 
83  struct OpSetAndTag {
84  FullCallOpSet opset;
86  };
87  const size_t alloc_sz = sizeof(OpSetAndTag);
88  auto* const alloced =
89  static_cast<OpSetAndTag*>(grpc_call_arena_alloc(call.call(), alloc_sz));
90  auto* ops = new (&alloced->opset) FullCallOpSet;
91  auto* tag = new (&alloced->tag)
92  grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
93 
94  // TODO(vjpai): Unify code with sync API as much as possible
95  grpc::Status s = ops->SendMessagePtr(request);
96  if (!s.ok()) {
97  tag->force_run(s);
98  return;
99  }
100  ops->SendInitialMetadata(&context->send_initial_metadata_,
101  context->initial_metadata_flags());
102  ops->RecvInitialMetadata(context);
103  ops->RecvMessage(result);
104  ops->AllowNoMessage();
105  ops->ClientSendClose();
106  ops->ClientRecvStatus(context, tag->status_ptr());
107  ops->set_core_cq_tag(tag);
108  call.PerformOps(ops);
109  }
110 };
111 
112 // Base class for public API classes.
114  public:
115  virtual ~ClientReactor() = default;
116 
124  virtual void OnDone(const grpc::Status& /*s*/) = 0;
125 
133  virtual void InternalScheduleOnDone(grpc::Status s);
134 
142  virtual bool InternalTrailersOnly(const grpc_call* call) const;
143 };
144 
145 } // namespace internal
146 
147 // Forward declarations
148 template <class Request, class Response>
150 template <class Response>
152 template <class Request>
154 class ClientUnaryReactor;
155 
156 // NOTE: The streaming objects are not actually implemented in the public API.
157 // These interfaces are provided for mocking only. Typical applications
158 // will interact exclusively with the reactors that they define.
159 template <class Request, class Response>
161  public:
163  virtual void StartCall() = 0;
164  virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
165  virtual void WritesDone() = 0;
166  virtual void Read(Response* resp) = 0;
167  virtual void AddHold(int holds) = 0;
168  virtual void RemoveHold() = 0;
169 
170  protected:
172  reactor->BindStream(this);
173  }
174 };
175 
176 template <class Response>
178  public:
180  virtual void StartCall() = 0;
181  virtual void Read(Response* resp) = 0;
182  virtual void AddHold(int holds) = 0;
183  virtual void RemoveHold() = 0;
184 
185  protected:
187  reactor->BindReader(this);
188  }
189 };
190 
191 template <class Request>
193  public:
195  virtual void StartCall() = 0;
196  void Write(const Request* req) { Write(req, grpc::WriteOptions()); }
197  virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
198  void WriteLast(const Request* req, grpc::WriteOptions options) {
199  Write(req, options.set_last_message());
200  }
201  virtual void WritesDone() = 0;
202 
203  virtual void AddHold(int holds) = 0;
204  virtual void RemoveHold() = 0;
205 
206  protected:
208  reactor->BindWriter(this);
209  }
210 };
211 
213  public:
214  virtual ~ClientCallbackUnary() {}
215  virtual void StartCall() = 0;
216 
217  protected:
218  void BindReactor(ClientUnaryReactor* reactor);
219 };
220 
221 // The following classes are the reactor interfaces that are to be implemented
222 // by the user. They are passed in to the library as an argument to a call on a
223 // stub (either a codegen-ed call or a generic call). The streaming RPC is
224 // activated by calling StartCall, possibly after initiating StartRead,
225 // StartWrite, or AddHold operations on the streaming object. Note that none of
226 // the classes are pure; all reactions have a default empty reaction so that the
227 // user class only needs to override those reactions that it cares about.
228 // The reactor must be passed to the stub invocation before any of the below
229 // operations can be called and its reactions will be invoked by the library in
230 // response to the completion of various operations. Reactions must not include
231 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
232 // waiting on condition variables). Reactions may be invoked concurrently,
233 // except that OnDone is called after all others (assuming proper API usage).
234 // The reactor may not be deleted until OnDone is called.
235 
237 template <class Request, class Response>
238 class ClientBidiReactor : public internal::ClientReactor {
239  public:
244  void StartCall() { stream_->StartCall(); }
245 
251  void StartRead(Response* resp) { stream_->Read(resp); }
252 
259  void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
260 
267  void StartWrite(const Request* req, grpc::WriteOptions options) {
268  stream_->Write(req, options);
269  }
270 
280  void StartWriteLast(const Request* req, grpc::WriteOptions options) {
281  StartWrite(req, options.set_last_message());
282  }
283 
289  void StartWritesDone() { stream_->WritesDone(); }
290 
313  void AddHold() { AddMultipleHolds(1); }
314  void AddMultipleHolds(int holds) {
315  GPR_DEBUG_ASSERT(holds > 0);
316  stream_->AddHold(holds);
317  }
318  void RemoveHold() { stream_->RemoveHold(); }
319 
327  void OnDone(const grpc::Status& /*s*/) override {}
328 
337  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
338 
343  virtual void OnReadDone(bool /*ok*/) {}
344 
350  virtual void OnWriteDone(bool /*ok*/) {}
351 
359  virtual void OnWritesDoneDone(bool /*ok*/) {}
360 
361  private:
362  friend class ClientCallbackReaderWriter<Request, Response>;
363  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
364  stream_ = stream;
365  }
367 };
368 
371 template <class Response>
372 class ClientReadReactor : public internal::ClientReactor {
373  public:
374  void StartCall() { reader_->StartCall(); }
375  void StartRead(Response* resp) { reader_->Read(resp); }
376 
377  void AddHold() { AddMultipleHolds(1); }
378  void AddMultipleHolds(int holds) {
379  GPR_DEBUG_ASSERT(holds > 0);
380  reader_->AddHold(holds);
381  }
382  void RemoveHold() { reader_->RemoveHold(); }
383 
384  void OnDone(const grpc::Status& /*s*/) override {}
385  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
386  virtual void OnReadDone(bool /*ok*/) {}
387 
388  private:
389  friend class ClientCallbackReader<Response>;
390  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
392 };
393 
396 template <class Request>
397 class ClientWriteReactor : public internal::ClientReactor {
398  public:
399  void StartCall() { writer_->StartCall(); }
400  void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
401  void StartWrite(const Request* req, grpc::WriteOptions options) {
402  writer_->Write(req, options);
403  }
404  void StartWriteLast(const Request* req, grpc::WriteOptions options) {
405  StartWrite(req, options.set_last_message());
406  }
407  void StartWritesDone() { writer_->WritesDone(); }
408 
409  void AddHold() { AddMultipleHolds(1); }
410  void AddMultipleHolds(int holds) {
411  GPR_DEBUG_ASSERT(holds > 0);
412  writer_->AddHold(holds);
413  }
414  void RemoveHold() { writer_->RemoveHold(); }
415 
416  void OnDone(const grpc::Status& /*s*/) override {}
417  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
418  virtual void OnWriteDone(bool /*ok*/) {}
419  virtual void OnWritesDoneDone(bool /*ok*/) {}
420 
421  private:
422  friend class ClientCallbackWriter<Request>;
423  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
424 
426 };
427 
440  public:
441  void StartCall() { call_->StartCall(); }
442  void OnDone(const grpc::Status& /*s*/) override {}
443  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
444 
445  private:
446  friend class ClientCallbackUnary;
447  void BindCall(ClientCallbackUnary* call) { call_ = call; }
448  ClientCallbackUnary* call_;
449 };
450 
451 // Define function out-of-line from class to avoid forward declaration issue
453  reactor->BindCall(this);
454 }
455 
456 namespace internal {
457 
458 // Forward declare factory classes for friendship
459 template <class Request, class Response>
460 class ClientCallbackReaderWriterFactory;
461 template <class Response>
462 class ClientCallbackReaderFactory;
463 template <class Request>
464 class ClientCallbackWriterFactory;
465 
466 template <class Request, class Response>
467 class ClientCallbackReaderWriterImpl
468  : public ClientCallbackReaderWriter<Request, Response> {
469  public:
470  // always allocated against a call arena, no memory free required
471  static void operator delete(void* /*ptr*/, std::size_t size) {
473  }
474 
475  // This operator should never be called as the memory should be freed as part
476  // of the arena destruction. It only exists to provide a matching operator
477  // delete to the operator new so that some compilers will not complain (see
478  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
479  // there are no tests catching the compiler warning.
480  static void operator delete(void*, void*) { GPR_ASSERT(false); }
481 
482  void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
483  // This call initiates two batches, plus any backlog, each with a callback
484  // 1. Send initial metadata (unless corked) + recv initial metadata
485  // 2. Any read backlog
486  // 3. Any write backlog
487  // 4. Recv trailing metadata (unless corked)
488  if (!start_corked_) {
489  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
490  context_->initial_metadata_flags());
491  }
492 
493  call_.PerformOps(&start_ops_);
494 
495  {
496  grpc::internal::MutexLock lock(&start_mu_);
497 
498  if (backlog_.read_ops) {
499  call_.PerformOps(&read_ops_);
500  }
501  if (backlog_.write_ops) {
502  call_.PerformOps(&write_ops_);
503  }
504  if (backlog_.writes_done_ops) {
505  call_.PerformOps(&writes_done_ops_);
506  }
507  call_.PerformOps(&finish_ops_);
508  // The last thing in this critical section is to set started_ so that it
509  // can be used lock-free as well.
510  started_.store(true, std::memory_order_release);
511  }
512  // MaybeFinish outside the lock to make sure that destruction of this object
513  // doesn't take place while holding the lock (which would cause the lock to
514  // be released after destruction)
515  this->MaybeFinish(/*from_reaction=*/false);
516  }
517 
518  void Read(Response* msg) override {
519  read_ops_.RecvMessage(msg);
520  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
521  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
522  grpc::internal::MutexLock lock(&start_mu_);
523  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
524  backlog_.read_ops = true;
525  return;
526  }
527  }
528  call_.PerformOps(&read_ops_);
529  }
530 
531  void Write(const Request* msg, grpc::WriteOptions options)
532  ABSL_LOCKS_EXCLUDED(start_mu_) override {
533  if (options.is_last_message()) {
534  options.set_buffer_hint();
535  write_ops_.ClientSendClose();
536  }
537  // TODO(vjpai): don't assert
538  GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
539  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
540  if (GPR_UNLIKELY(corked_write_needed_)) {
541  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
542  context_->initial_metadata_flags());
543  corked_write_needed_ = false;
544  }
545 
546  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
547  grpc::internal::MutexLock lock(&start_mu_);
548  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
549  backlog_.write_ops = true;
550  return;
551  }
552  }
553  call_.PerformOps(&write_ops_);
554  }
555  void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
556  writes_done_ops_.ClientSendClose();
557  writes_done_tag_.Set(
558  call_.call(),
559  [this](bool ok) {
560  reactor_->OnWritesDoneDone(ok);
561  MaybeFinish(/*from_reaction=*/true);
562  },
563  &writes_done_ops_, /*can_inline=*/false);
564  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
565  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
566  if (GPR_UNLIKELY(corked_write_needed_)) {
567  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
568  context_->initial_metadata_flags());
569  corked_write_needed_ = false;
570  }
571  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
572  grpc::internal::MutexLock lock(&start_mu_);
573  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
574  backlog_.writes_done_ops = true;
575  return;
576  }
577  }
578  call_.PerformOps(&writes_done_ops_);
579  }
580 
581  void AddHold(int holds) override {
582  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
583  }
584  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
585 
586  private:
587  friend class ClientCallbackReaderWriterFactory<Request, Response>;
588 
590  grpc::ClientContext* context,
592  : context_(context),
593  call_(call),
594  reactor_(reactor),
595  start_corked_(context_->initial_metadata_corked_),
596  corked_write_needed_(start_corked_) {
597  this->BindReactor(reactor);
598 
599  // Set up the unchanging parts of the start, read, and write tags and ops.
600  start_tag_.Set(
601  call_.call(),
602  [this](bool ok) {
603  reactor_->OnReadInitialMetadataDone(
604  ok && !reactor_->InternalTrailersOnly(call_.call()));
605  MaybeFinish(/*from_reaction=*/true);
606  },
607  &start_ops_, /*can_inline=*/false);
608  start_ops_.RecvInitialMetadata(context_);
609  start_ops_.set_core_cq_tag(&start_tag_);
610 
611  write_tag_.Set(
612  call_.call(),
613  [this](bool ok) {
614  reactor_->OnWriteDone(ok);
615  MaybeFinish(/*from_reaction=*/true);
616  },
617  &write_ops_, /*can_inline=*/false);
618  write_ops_.set_core_cq_tag(&write_tag_);
619 
620  read_tag_.Set(
621  call_.call(),
622  [this](bool ok) {
623  reactor_->OnReadDone(ok);
624  MaybeFinish(/*from_reaction=*/true);
625  },
626  &read_ops_, /*can_inline=*/false);
627  read_ops_.set_core_cq_tag(&read_tag_);
628 
629  // Also set up the Finish tag and op set.
630  finish_tag_.Set(
631  call_.call(),
632  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
633  &finish_ops_,
634  /*can_inline=*/false);
635  finish_ops_.ClientRecvStatus(context_, &finish_status_);
636  finish_ops_.set_core_cq_tag(&finish_tag_);
637  }
638 
639  // MaybeFinish can be called from reactions or from user-initiated operations
640  // like StartCall or RemoveHold. If this is the last operation or hold on this
641  // object, it will invoke the OnDone reaction. If MaybeFinish was called from
642  // a reaction, it can call OnDone directly. If not, it would need to schedule
643  // OnDone onto an executor thread to avoid the possibility of deadlocking with
644  // any locks in the user code that invoked it.
645  void MaybeFinish(bool from_reaction) {
646  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
647  1, std::memory_order_acq_rel) == 1)) {
648  grpc::Status s = std::move(finish_status_);
649  auto* reactor = reactor_;
650  auto* call = call_.call();
651  this->~ClientCallbackReaderWriterImpl();
652  grpc_call_unref(call);
653  if (GPR_LIKELY(from_reaction)) {
654  reactor->OnDone(s);
655  } else {
656  reactor->InternalScheduleOnDone(std::move(s));
657  }
658  }
659  }
660 
661  grpc::ClientContext* const context_;
662  grpc::internal::Call call_;
663  ClientBidiReactor<Request, Response>* const reactor_;
664 
667  start_ops_;
669  const bool start_corked_;
670  bool corked_write_needed_; // no lock needed since only accessed in
671  // Write/WritesDone which cannot be concurrent
672 
675  grpc::Status finish_status_;
676 
680  write_ops_;
682 
685  writes_done_ops_;
687 
689  read_ops_;
691 
692  struct StartCallBacklog {
693  bool write_ops = false;
694  bool writes_done_ops = false;
695  bool read_ops = false;
696  };
697  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
698 
699  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
700  std::atomic<intptr_t> callbacks_outstanding_{3};
701  std::atomic_bool started_{false};
702  grpc::internal::Mutex start_mu_;
703 };
704 
705 template <class Request, class Response>
706 class ClientCallbackReaderWriterFactory {
707  public:
708  static void Create(grpc::ChannelInterface* channel,
709  const grpc::internal::RpcMethod& method,
710  grpc::ClientContext* context,
712  grpc::internal::Call call =
713  channel->CreateCall(method, context, channel->CallbackCQ());
714 
715  grpc_call_ref(call.call());
719  reactor);
720  }
721 };
722 
723 template <class Response>
724 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
725  public:
726  // always allocated against a call arena, no memory free required
727  static void operator delete(void* /*ptr*/, std::size_t size) {
728  GPR_ASSERT(size == sizeof(ClientCallbackReaderImpl));
729  }
730 
731  // This operator should never be called as the memory should be freed as part
732  // of the arena destruction. It only exists to provide a matching operator
733  // delete to the operator new so that some compilers will not complain (see
734  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
735  // there are no tests catching the compiler warning.
736  static void operator delete(void*, void*) { GPR_ASSERT(false); }
737 
738  void StartCall() override {
739  // This call initiates two batches, plus any backlog, each with a callback
740  // 1. Send initial metadata (unless corked) + recv initial metadata
741  // 2. Any backlog
742  // 3. Recv trailing metadata
743 
744  start_tag_.Set(
745  call_.call(),
746  [this](bool ok) {
747  reactor_->OnReadInitialMetadataDone(
748  ok && !reactor_->InternalTrailersOnly(call_.call()));
749  MaybeFinish(/*from_reaction=*/true);
750  },
751  &start_ops_, /*can_inline=*/false);
752  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
753  context_->initial_metadata_flags());
754  start_ops_.RecvInitialMetadata(context_);
755  start_ops_.set_core_cq_tag(&start_tag_);
756  call_.PerformOps(&start_ops_);
757 
758  // Also set up the read tag so it doesn't have to be set up each time
759  read_tag_.Set(
760  call_.call(),
761  [this](bool ok) {
762  reactor_->OnReadDone(ok);
763  MaybeFinish(/*from_reaction=*/true);
764  },
765  &read_ops_, /*can_inline=*/false);
766  read_ops_.set_core_cq_tag(&read_tag_);
767 
768  {
769  grpc::internal::MutexLock lock(&start_mu_);
770  if (backlog_.read_ops) {
771  call_.PerformOps(&read_ops_);
772  }
773  started_.store(true, std::memory_order_release);
774  }
775 
776  finish_tag_.Set(
777  call_.call(),
778  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
779  &finish_ops_, /*can_inline=*/false);
780  finish_ops_.ClientRecvStatus(context_, &finish_status_);
781  finish_ops_.set_core_cq_tag(&finish_tag_);
782  call_.PerformOps(&finish_ops_);
783  }
784 
785  void Read(Response* msg) override {
786  read_ops_.RecvMessage(msg);
787  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
788  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
789  grpc::internal::MutexLock lock(&start_mu_);
790  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
791  backlog_.read_ops = true;
792  return;
793  }
794  }
795  call_.PerformOps(&read_ops_);
796  }
797 
798  void AddHold(int holds) override {
799  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
800  }
801  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
802 
803  private:
804  friend class ClientCallbackReaderFactory<Response>;
805 
806  template <class Request>
808  grpc::ClientContext* context, Request* request,
810  : context_(context), call_(call), reactor_(reactor) {
811  this->BindReactor(reactor);
812  // TODO(vjpai): don't assert
813  GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
814  start_ops_.ClientSendClose();
815  }
816 
817  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
818  void MaybeFinish(bool from_reaction) {
819  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
820  1, std::memory_order_acq_rel) == 1)) {
821  grpc::Status s = std::move(finish_status_);
822  auto* reactor = reactor_;
823  auto* call = call_.call();
824  this->~ClientCallbackReaderImpl();
825  grpc_call_unref(call);
826  if (GPR_LIKELY(from_reaction)) {
827  reactor->OnDone(s);
828  } else {
829  reactor->InternalScheduleOnDone(std::move(s));
830  }
831  }
832  }
833 
834  grpc::ClientContext* const context_;
835  grpc::internal::Call call_;
836  ClientReadReactor<Response>* const reactor_;
837 
842  start_ops_;
844 
847  grpc::Status finish_status_;
848 
850  read_ops_;
852 
853  struct StartCallBacklog {
854  bool read_ops = false;
855  };
856  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
857 
858  // Minimum of 2 callbacks to pre-register for start and finish
859  std::atomic<intptr_t> callbacks_outstanding_{2};
860  std::atomic_bool started_{false};
861  grpc::internal::Mutex start_mu_;
862 };
863 
864 template <class Response>
865 class ClientCallbackReaderFactory {
866  public:
867  template <class Request>
868  static void Create(grpc::ChannelInterface* channel,
869  const grpc::internal::RpcMethod& method,
870  grpc::ClientContext* context, const Request* request,
871  ClientReadReactor<Response>* reactor) {
872  grpc::internal::Call call =
873  channel->CreateCall(method, context, channel->CallbackCQ());
874 
875  grpc_call_ref(call.call());
876  new (grpc_call_arena_alloc(call.call(),
878  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
879  }
880 };
881 
882 template <class Request>
883 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
884  public:
885  // always allocated against a call arena, no memory free required
886  static void operator delete(void* /*ptr*/, std::size_t size) {
887  GPR_ASSERT(size == sizeof(ClientCallbackWriterImpl));
888  }
889 
890  // This operator should never be called as the memory should be freed as part
891  // of the arena destruction. It only exists to provide a matching operator
892  // delete to the operator new so that some compilers will not complain (see
893  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
894  // there are no tests catching the compiler warning.
895  static void operator delete(void*, void*) { GPR_ASSERT(false); }
896 
897  void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
898  // This call initiates two batches, plus any backlog, each with a callback
899  // 1. Send initial metadata (unless corked) + recv initial metadata
900  // 2. Any backlog
901  // 3. Recv trailing metadata
902 
903  if (!start_corked_) {
904  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
905  context_->initial_metadata_flags());
906  }
907  call_.PerformOps(&start_ops_);
908 
909  {
910  grpc::internal::MutexLock lock(&start_mu_);
911 
912  if (backlog_.write_ops) {
913  call_.PerformOps(&write_ops_);
914  }
915  if (backlog_.writes_done_ops) {
916  call_.PerformOps(&writes_done_ops_);
917  }
918  call_.PerformOps(&finish_ops_);
919  // The last thing in this critical section is to set started_ so that it
920  // can be used lock-free as well.
921  started_.store(true, std::memory_order_release);
922  }
923  // MaybeFinish outside the lock to make sure that destruction of this object
924  // doesn't take place while holding the lock (which would cause the lock to
925  // be released after destruction)
926  this->MaybeFinish(/*from_reaction=*/false);
927  }
928 
929  void Write(const Request* msg, grpc::WriteOptions options)
930  ABSL_LOCKS_EXCLUDED(start_mu_) override {
931  if (GPR_UNLIKELY(options.is_last_message())) {
932  options.set_buffer_hint();
933  write_ops_.ClientSendClose();
934  }
935  // TODO(vjpai): don't assert
936  GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
937  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
938 
939  if (GPR_UNLIKELY(corked_write_needed_)) {
940  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
941  context_->initial_metadata_flags());
942  corked_write_needed_ = false;
943  }
944 
945  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
946  grpc::internal::MutexLock lock(&start_mu_);
947  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
948  backlog_.write_ops = true;
949  return;
950  }
951  }
952  call_.PerformOps(&write_ops_);
953  }
954 
955  void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
956  writes_done_ops_.ClientSendClose();
957  writes_done_tag_.Set(
958  call_.call(),
959  [this](bool ok) {
960  reactor_->OnWritesDoneDone(ok);
961  MaybeFinish(/*from_reaction=*/true);
962  },
963  &writes_done_ops_, /*can_inline=*/false);
964  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
965  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
966 
967  if (GPR_UNLIKELY(corked_write_needed_)) {
968  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
969  context_->initial_metadata_flags());
970  corked_write_needed_ = false;
971  }
972 
973  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
974  grpc::internal::MutexLock lock(&start_mu_);
975  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
976  backlog_.writes_done_ops = true;
977  return;
978  }
979  }
980  call_.PerformOps(&writes_done_ops_);
981  }
982 
983  void AddHold(int holds) override {
984  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
985  }
986  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
987 
988  private:
989  friend class ClientCallbackWriterFactory<Request>;
990 
991  template <class Response>
993  grpc::ClientContext* context, Response* response,
995  : context_(context),
996  call_(call),
997  reactor_(reactor),
998  start_corked_(context_->initial_metadata_corked_),
999  corked_write_needed_(start_corked_) {
1000  this->BindReactor(reactor);
1001 
1002  // Set up the unchanging parts of the start and write tags and ops.
1003  start_tag_.Set(
1004  call_.call(),
1005  [this](bool ok) {
1006  reactor_->OnReadInitialMetadataDone(
1007  ok && !reactor_->InternalTrailersOnly(call_.call()));
1008  MaybeFinish(/*from_reaction=*/true);
1009  },
1010  &start_ops_, /*can_inline=*/false);
1011  start_ops_.RecvInitialMetadata(context_);
1012  start_ops_.set_core_cq_tag(&start_tag_);
1013 
1014  write_tag_.Set(
1015  call_.call(),
1016  [this](bool ok) {
1017  reactor_->OnWriteDone(ok);
1018  MaybeFinish(/*from_reaction=*/true);
1019  },
1020  &write_ops_, /*can_inline=*/false);
1021  write_ops_.set_core_cq_tag(&write_tag_);
1022 
1023  // Also set up the Finish tag and op set.
1024  finish_ops_.RecvMessage(response);
1025  finish_ops_.AllowNoMessage();
1026  finish_tag_.Set(
1027  call_.call(),
1028  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1029  &finish_ops_,
1030  /*can_inline=*/false);
1031  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1032  finish_ops_.set_core_cq_tag(&finish_tag_);
1033  }
1034 
1035  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1036  void MaybeFinish(bool from_reaction) {
1037  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1038  1, std::memory_order_acq_rel) == 1)) {
1039  grpc::Status s = std::move(finish_status_);
1040  auto* reactor = reactor_;
1041  auto* call = call_.call();
1042  this->~ClientCallbackWriterImpl();
1043  grpc_call_unref(call);
1044  if (GPR_LIKELY(from_reaction)) {
1045  reactor->OnDone(s);
1046  } else {
1047  reactor->InternalScheduleOnDone(std::move(s));
1048  }
1049  }
1050  }
1051 
1052  grpc::ClientContext* const context_;
1053  grpc::internal::Call call_;
1054  ClientWriteReactor<Request>* const reactor_;
1055 
1058  start_ops_;
1060  const bool start_corked_;
1061  bool corked_write_needed_; // no lock needed since only accessed in
1062  // Write/WritesDone which cannot be concurrent
1063 
1066  finish_ops_;
1068  grpc::Status finish_status_;
1069 
1073  write_ops_;
1075 
1078  writes_done_ops_;
1079  grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1080 
1081  struct StartCallBacklog {
1082  bool write_ops = false;
1083  bool writes_done_ops = false;
1084  };
1085  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1086 
1087  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1088  std::atomic<intptr_t> callbacks_outstanding_{3};
1089  std::atomic_bool started_{false};
1090  grpc::internal::Mutex start_mu_;
1091 };
1092 
1093 template <class Request>
1094 class ClientCallbackWriterFactory {
1095  public:
1096  template <class Response>
1097  static void Create(grpc::ChannelInterface* channel,
1098  const grpc::internal::RpcMethod& method,
1099  grpc::ClientContext* context, Response* response,
1100  ClientWriteReactor<Request>* reactor) {
1101  grpc::internal::Call call =
1102  channel->CreateCall(method, context, channel->CallbackCQ());
1103 
1104  grpc_call_ref(call.call());
1105  new (grpc_call_arena_alloc(call.call(),
1107  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1108  }
1109 };
1110 
1112  public:
1113  // always allocated against a call arena, no memory free required
1114  static void operator delete(void* /*ptr*/, std::size_t size) {
1115  GPR_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1116  }
1117 
1118  // This operator should never be called as the memory should be freed as part
1119  // of the arena destruction. It only exists to provide a matching operator
1120  // delete to the operator new so that some compilers will not complain (see
1121  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1122  // there are no tests catching the compiler warning.
1123  static void operator delete(void*, void*) { GPR_ASSERT(false); }
1124 
1125  void StartCall() override {
1126  // This call initiates two batches, each with a callback
1127  // 1. Send initial metadata + write + writes done + recv initial metadata
1128  // 2. Read message, recv trailing metadata
1129 
1130  start_tag_.Set(
1131  call_.call(),
1132  [this](bool ok) {
1133  reactor_->OnReadInitialMetadataDone(
1134  ok && !reactor_->InternalTrailersOnly(call_.call()));
1135  MaybeFinish();
1136  },
1137  &start_ops_, /*can_inline=*/false);
1138  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1139  context_->initial_metadata_flags());
1140  start_ops_.RecvInitialMetadata(context_);
1141  start_ops_.set_core_cq_tag(&start_tag_);
1142  call_.PerformOps(&start_ops_);
1143 
1144  finish_tag_.Set(
1145  call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1146  /*can_inline=*/false);
1147  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1148  finish_ops_.set_core_cq_tag(&finish_tag_);
1149  call_.PerformOps(&finish_ops_);
1150  }
1151 
1152  private:
1154 
1155  template <class Request, class Response>
1157  grpc::ClientContext* context, Request* request,
1158  Response* response, ClientUnaryReactor* reactor)
1159  : context_(context), call_(call), reactor_(reactor) {
1160  this->BindReactor(reactor);
1161  // TODO(vjpai): don't assert
1162  GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
1163  start_ops_.ClientSendClose();
1164  finish_ops_.RecvMessage(response);
1165  finish_ops_.AllowNoMessage();
1166  }
1167 
1168  // In the unary case, MaybeFinish is only ever invoked from a
1169  // library-initiated reaction, so it will just directly call OnDone if this is
1170  // the last reaction for this RPC.
1171  void MaybeFinish() {
1172  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1173  1, std::memory_order_acq_rel) == 1)) {
1174  grpc::Status s = std::move(finish_status_);
1175  auto* reactor = reactor_;
1176  auto* call = call_.call();
1177  this->~ClientCallbackUnaryImpl();
1178  grpc_call_unref(call);
1179  reactor->OnDone(s);
1180  }
1181  }
1182 
1183  grpc::ClientContext* const context_;
1184  grpc::internal::Call call_;
1185  ClientUnaryReactor* const reactor_;
1186 
1191  start_ops_;
1193 
1196  finish_ops_;
1198  grpc::Status finish_status_;
1199 
1200  // This call will have 2 callbacks: start and finish
1201  std::atomic<intptr_t> callbacks_outstanding_{2};
1202 };
1203 
1205  public:
1206  template <class Request, class Response, class BaseRequest = Request,
1207  class BaseResponse = Response>
1208  static void Create(grpc::ChannelInterface* channel,
1209  const grpc::internal::RpcMethod& method,
1210  grpc::ClientContext* context, const Request* request,
1211  Response* response, ClientUnaryReactor* reactor) {
1212  grpc::internal::Call call =
1213  channel->CreateCall(method, context, channel->CallbackCQ());
1214 
1215  grpc_call_ref(call.call());
1216 
1217  new (grpc_call_arena_alloc(call.call(), sizeof(ClientCallbackUnaryImpl)))
1218  ClientCallbackUnaryImpl(call, context,
1219  static_cast<const BaseRequest*>(request),
1220  static_cast<BaseResponse*>(response), reactor);
1221  }
1222 };
1223 
1224 } // namespace internal
1225 } // namespace grpc
1226 
1227 #endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H
grpc::internal::ClientCallbackUnaryImpl
Definition: client_callback.h:1111
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:136
grpc::ClientReadReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:385
grpc::ClientWriteReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:417
grpc::ClientWriteReactor::StartWritesDone
void StartWritesDone()
Definition: client_callback.h:407
grpc::internal::CallOpRecvInitialMetadata
Definition: call_op_set.h:721
grpc::ClientCallbackWriter::~ClientCallbackWriter
virtual ~ClientCallbackWriter()
Definition: client_callback.h:194
grpc::internal::ClientCallbackReaderWriterImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:518
grpc::internal::ClientCallbackWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:983
grpc::internal::ClientCallbackReaderWriterImpl::StartCall
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:482
grpc::internal::CallOpClientSendClose
Definition: call_op_set.h:619
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:259
grpc_call_arena_alloc
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
grpc::internal::CallOpGenericRecvMessage
Definition: call_op_set.h:526
grpc::ClientBidiReactor::AddHold
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:313
grpc::internal::ClientCallbackReaderWriterImpl
Definition: client_context.h:68
grpc::ClientReadReactor::OnReadDone
virtual void OnReadDone(bool)
Definition: client_callback.h:386
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:96
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: log.h:103
grpc::internal::ClientCallbackReaderWriterImpl::Write
void Write(const Request *msg, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:531
grpc::internal::ClientCallbackReaderImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:801
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:286
grpc::internal::CallbackUnaryCallImpl::CallbackUnaryCallImpl
CallbackUnaryCallImpl(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(grpc::Status)> on_completion)
Definition: client_callback.h:66
grpc::WriteOptions::set_last_message
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:155
grpc::ClientUnaryReactor
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:439
grpc::internal::ClientReactor::InternalScheduleOnDone
virtual void InternalScheduleOnDone(grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
grpc::ClientCallbackReaderWriter::Read
virtual void Read(Response *resp)=0
grpc::internal::ClientCallbackReaderWriterFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:708
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req, grpc::WriteOptions options)
Definition: client_callback.h:401
grpc::ClientWriteReactor::StartCall
void StartCall()
Definition: client_callback.h:399
grpc::ClientWriteReactor::OnDone
void OnDone(const grpc::Status &) override
Definition: client_callback.h:416
grpc::internal::ClientReactor::OnDone
virtual void OnDone(const grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
grpc::internal::ClientCallbackReaderWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:584
grpc::ClientReadReactor::OnDone
void OnDone(const grpc::Status &) override
Definition: client_callback.h:384
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:36
grpc::internal::CallbackUnaryCall
void CallbackUnaryCall(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(grpc::Status)> on_completion)
Perform a callback-based unary call.
Definition: client_callback.h:50
GPR_ASSERT
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:95
grpc::ClientCallbackReaderWriter::WritesDone
virtual void WritesDone()=0
grpc::internal::ClientCallbackWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:986
status.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:216
grpc::internal::ClientCallbackWriterFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:1097
grpc::ClientBidiReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback.h:337
grpc::internal::CallbackWithStatusTag
Definition: callback_common.h:70
grpc::ClientCallbackReaderWriter::BindReactor
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:171
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:126
grpc::ClientCallbackUnary::BindReactor
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback.h:452
grpc_call_ref
GRPCAPI void grpc_call_ref(grpc_call *call)
Ref a call.
grpc::ClientBidiReactor::StartRead
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:251
grpc::ClientReadReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:378
grpc::internal::ClientCallbackWriterImpl::StartCall
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:897
grpc::internal::ClientCallbackWriterImpl::Write
void Write(const Request *msg, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:929
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req, grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:267
grpc::ClientBidiReactor::OnDone
void OnDone(const grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback.h:327
grpc::internal::ClientReactor::~ClientReactor
virtual ~ClientReactor()=default
grpc::ClientCallbackUnary::StartCall
virtual void StartCall()=0
grpc::ClientCallbackReaderWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::OnReadDone
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:343
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:35
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: port_platform.h:763
grpc::ClientCallbackWriter::WritesDone
virtual void WritesDone()=0
log.h
grpc_call_unref
GRPCAPI void grpc_call_unref(grpc_call *call)
Unref a call.
grpc::internal::ClientCallbackWriterFactory
Definition: channel_interface.h:49
grpc::ClientReadReactor::StartCall
void StartCall()
Definition: client_callback.h:374
callback_common.h
grpc::ClientBidiReactor::OnWriteDone
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback.h:350
grpc::ClientContext
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:193
grpc::internal::ClientCallbackReaderImpl::StartCall
void StartCall() override
Definition: client_callback.h:738
grpc::internal::ClientCallbackReaderWriterFactory
Definition: channel_interface.h:45
grpc.h
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
grpc::internal::CallbackUnaryCallImpl
Definition: client_context.h:66
grpc::ClientUnaryReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:443
grpc::ClientCallbackWriter::BindReactor
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:207
grpc::ClientCallbackReader::RemoveHold
virtual void RemoveHold()=0
grpc::ClientCallbackReaderWriter
Definition: client_callback.h:160
grpc::ClientBidiReactor::StartWritesDone
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:289
grpc::ClientCallbackReader::Read
virtual void Read(Response *resp)=0
grpc::ClientReadReactor::StartRead
void StartRead(Response *resp)
Definition: client_callback.h:375
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
grpc::internal::ClientReactor
Definition: client_callback.h:113
grpc::internal::ClientCallbackUnaryFactory
Definition: client_callback.h:1204
grpc::internal::ClientCallbackReaderImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:785
grpc::ClientWriteReactor::StartWriteLast
void StartWriteLast(const Request *req, grpc::WriteOptions options)
Definition: client_callback.h:404
grpc::internal::ClientCallbackWriterImpl
Definition: client_context.h:72
grpc::ClientCallbackReaderWriter::Write
virtual void Write(const Request *req, grpc::WriteOptions options)=0
grpc::ClientWriteReactor
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:153
grpc::ClientCallbackUnary
Definition: client_callback.h:212
grpc::ClientBidiReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:314
grpc::ClientReadReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:382
grpc::internal::ClientCallbackUnaryFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback.h:1208
grpc::ClientBidiReactor::StartWriteLast
void StartWriteLast(const Request *req, grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:280
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:78
grpc::ClientCallbackWriter::RemoveHold
virtual void RemoveHold()=0
grpc::ClientBidiReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:359
grpc::internal::ClientCallbackWriterImpl::WritesDone
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:955
grpc::ClientCallbackReader::~ClientCallbackReader
virtual ~ClientCallbackReader()
Definition: client_callback.h:179
grpc::internal::CallbackWithSuccessTag::Set
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:163
grpc::internal::ClientCallbackReaderImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:798
grpc::ClientCallbackReader::BindReactor
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:186
grpc::internal::MutexLock
Definition: sync.h:80
grpc::ClientCallbackWriter
Definition: client_callback.h:192
grpc::ClientCallbackWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:318
grpc::ClientCallbackReaderWriter::~ClientCallbackReaderWriter
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:162
grpc::ClientCallbackReaderWriter::RemoveHold
virtual void RemoveHold()=0
grpc::ClientCallbackWriter::Write
void Write(const Request *req)
Definition: client_callback.h:196
config.h
grpc::ClientCallbackReader::StartCall
virtual void StartCall()=0
grpc::ClientWriteReactor::AddHold
void AddHold()
Definition: client_callback.h:409
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:70
grpc::ClientUnaryReactor::StartCall
void StartCall()
Definition: client_callback.h:441
grpc::ClientCallbackReaderWriter::StartCall
virtual void StartCall()=0
call_op_set.h
call.h
grpc::internal::CallOpClientRecvStatus
Definition: call_op_set.h:769
grpc::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:103
grpc::ClientBidiReactor
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:149
grpc::internal::ClientCallbackReaderFactory
Definition: channel_interface.h:47
grpc::ClientCallbackWriter::WriteLast
void WriteLast(const Request *req, grpc::WriteOptions options)
Definition: client_callback.h:198
grpc::internal::ClientCallbackReaderImpl
Definition: client_context.h:70
grpc::internal::ClientReactor::InternalTrailersOnly
virtual bool InternalTrailersOnly(const grpc_call *call) const
InternalTrailersOnly is not part of the API and is not meant to be overridden.
grpc::ClientCallbackWriter::StartCall
virtual void StartCall()=0
grpc::internal::ClientCallbackUnaryImpl::StartCall
void StartCall() override
Definition: client_callback.h:1125
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req)
Definition: client_callback.h:400
grpc::ClientBidiReactor::StartCall
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback.h:244
grpc::ClientWriteReactor::OnWriteDone
virtual void OnWriteDone(bool)
Definition: client_callback.h:418
grpc::internal::CallOpRecvMessage
Definition: call_op_set.h:424
grpc::internal::Mutex
Definition: sync.h:57
grpc::internal::RpcMethod
Descriptor of an RPC method.
Definition: rpc_method.h:29
grpc::ClientCallbackReader::AddHold
virtual void AddHold(int holds)=0
grpc::ClientWriteReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:410
grpc::ClientCallbackReader
Definition: client_callback.h:177
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: port_platform.h:762
grpc::ClientCallbackUnary::~ClientCallbackUnary
virtual ~ClientCallbackUnary()
Definition: client_callback.h:214
grpc::internal::CallOpSet::set_core_cq_tag
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:943
sync.h
grpc::internal::ClientCallbackReaderWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:581
grpc::ClientWriteReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:414
grpc::ClientReadReactor
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:151
grpc::internal::ClientCallbackReaderWriterImpl::WritesDone
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:555
grpc::ClientReadReactor::AddHold
void AddHold()
Definition: client_callback.h:377
grpc::ClientUnaryReactor::OnDone
void OnDone(const grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:442
grpc::internal::ClientCallbackReaderFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback.h:868
grpc::ClientWriteReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Definition: client_callback.h:419