GRPC C++  1.46.2
client_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20 
21 // IWYU pragma: private, include <grpcpp/support/client_callback.h>
22 
23 #include <atomic>
24 #include <functional>
25 
34 
35 namespace grpc {
36 class Channel;
37 class ClientContext;
38 
39 namespace internal {
40 class RpcMethod;
41 
48 template <class InputMessage, class OutputMessage,
49  class BaseInputMessage = InputMessage,
50  class BaseOutputMessage = OutputMessage>
52  const grpc::internal::RpcMethod& method,
53  grpc::ClientContext* context,
54  const InputMessage* request, OutputMessage* result,
55  std::function<void(grpc::Status)> on_completion) {
56  static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
57  "Invalid input message specification");
58  static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
59  "Invalid output message specification");
61  channel, method, context, request, result, on_completion);
62 }
63 
64 template <class InputMessage, class OutputMessage>
65 class CallbackUnaryCallImpl {
66  public:
68  const grpc::internal::RpcMethod& method,
69  grpc::ClientContext* context,
70  const InputMessage* request, OutputMessage* result,
71  std::function<void(grpc::Status)> on_completion) {
72  grpc::CompletionQueue* cq = channel->CallbackCQ();
73  GPR_CODEGEN_ASSERT(cq != nullptr);
74  grpc::internal::Call call(channel->CreateCall(method, context, cq));
75 
76  using FullCallOpSet = grpc::internal::CallOpSet<
83 
84  struct OpSetAndTag {
85  FullCallOpSet opset;
87  };
88  const size_t alloc_sz = sizeof(OpSetAndTag);
89  auto* const alloced = static_cast<OpSetAndTag*>(
91  alloc_sz));
92  auto* ops = new (&alloced->opset) FullCallOpSet;
93  auto* tag = new (&alloced->tag)
94  grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
95 
96  // TODO(vjpai): Unify code with sync API as much as possible
97  grpc::Status s = ops->SendMessagePtr(request);
98  if (!s.ok()) {
99  tag->force_run(s);
100  return;
101  }
102  ops->SendInitialMetadata(&context->send_initial_metadata_,
103  context->initial_metadata_flags());
104  ops->RecvInitialMetadata(context);
105  ops->RecvMessage(result);
106  ops->AllowNoMessage();
107  ops->ClientSendClose();
108  ops->ClientRecvStatus(context, tag->status_ptr());
109  ops->set_core_cq_tag(tag);
110  call.PerformOps(ops);
111  }
112 };
113 
114 // Base class for public API classes.
116  public:
117  virtual ~ClientReactor() = default;
118 
126  virtual void OnDone(const grpc::Status& /*s*/) = 0;
127 
135  virtual void InternalScheduleOnDone(grpc::Status s);
136 
144  virtual bool InternalTrailersOnly(const grpc_call* call) const;
145 };
146 
147 } // namespace internal
148 
149 // Forward declarations
150 template <class Request, class Response>
152 template <class Response>
154 template <class Request>
156 class ClientUnaryReactor;
157 
158 // NOTE: The streaming objects are not actually implemented in the public API.
159 // These interfaces are provided for mocking only. Typical applications
160 // will interact exclusively with the reactors that they define.
161 template <class Request, class Response>
163  public:
165  virtual void StartCall() = 0;
166  virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
167  virtual void WritesDone() = 0;
168  virtual void Read(Response* resp) = 0;
169  virtual void AddHold(int holds) = 0;
170  virtual void RemoveHold() = 0;
171 
172  protected:
174  reactor->BindStream(this);
175  }
176 };
177 
178 template <class Response>
180  public:
182  virtual void StartCall() = 0;
183  virtual void Read(Response* resp) = 0;
184  virtual void AddHold(int holds) = 0;
185  virtual void RemoveHold() = 0;
186 
187  protected:
189  reactor->BindReader(this);
190  }
191 };
192 
193 template <class Request>
195  public:
197  virtual void StartCall() = 0;
198  void Write(const Request* req) { Write(req, grpc::WriteOptions()); }
199  virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
200  void WriteLast(const Request* req, grpc::WriteOptions options) {
201  Write(req, options.set_last_message());
202  }
203  virtual void WritesDone() = 0;
204 
205  virtual void AddHold(int holds) = 0;
206  virtual void RemoveHold() = 0;
207 
208  protected:
210  reactor->BindWriter(this);
211  }
212 };
213 
215  public:
216  virtual ~ClientCallbackUnary() {}
217  virtual void StartCall() = 0;
218 
219  protected:
220  void BindReactor(ClientUnaryReactor* reactor);
221 };
222 
223 // The following classes are the reactor interfaces that are to be implemented
224 // by the user. They are passed in to the library as an argument to a call on a
225 // stub (either a codegen-ed call or a generic call). The streaming RPC is
226 // activated by calling StartCall, possibly after initiating StartRead,
227 // StartWrite, or AddHold operations on the streaming object. Note that none of
228 // the classes are pure; all reactions have a default empty reaction so that the
229 // user class only needs to override those reactions that it cares about.
230 // The reactor must be passed to the stub invocation before any of the below
231 // operations can be called and its reactions will be invoked by the library in
232 // response to the completion of various operations. Reactions must not include
233 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
234 // waiting on condition variables). Reactions may be invoked concurrently,
235 // except that OnDone is called after all others (assuming proper API usage).
236 // The reactor may not be deleted until OnDone is called.
237 
239 template <class Request, class Response>
240 class ClientBidiReactor : public internal::ClientReactor {
241  public:
246  void StartCall() { stream_->StartCall(); }
247 
253  void StartRead(Response* resp) { stream_->Read(resp); }
254 
261  void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
262 
269  void StartWrite(const Request* req, grpc::WriteOptions options) {
270  stream_->Write(req, options);
271  }
272 
282  void StartWriteLast(const Request* req, grpc::WriteOptions options) {
283  StartWrite(req, options.set_last_message());
284  }
285 
291  void StartWritesDone() { stream_->WritesDone(); }
292 
315  void AddHold() { AddMultipleHolds(1); }
316  void AddMultipleHolds(int holds) {
317  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
318  stream_->AddHold(holds);
319  }
320  void RemoveHold() { stream_->RemoveHold(); }
321 
329  void OnDone(const grpc::Status& /*s*/) override {}
330 
339  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
340 
345  virtual void OnReadDone(bool /*ok*/) {}
346 
352  virtual void OnWriteDone(bool /*ok*/) {}
353 
361  virtual void OnWritesDoneDone(bool /*ok*/) {}
362 
363  private:
364  friend class ClientCallbackReaderWriter<Request, Response>;
365  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
366  stream_ = stream;
367  }
369 };
370 
373 template <class Response>
374 class ClientReadReactor : public internal::ClientReactor {
375  public:
376  void StartCall() { reader_->StartCall(); }
377  void StartRead(Response* resp) { reader_->Read(resp); }
378 
379  void AddHold() { AddMultipleHolds(1); }
380  void AddMultipleHolds(int holds) {
381  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
382  reader_->AddHold(holds);
383  }
384  void RemoveHold() { reader_->RemoveHold(); }
385 
386  void OnDone(const grpc::Status& /*s*/) override {}
387  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
388  virtual void OnReadDone(bool /*ok*/) {}
389 
390  private:
391  friend class ClientCallbackReader<Response>;
392  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
394 };
395 
398 template <class Request>
399 class ClientWriteReactor : public internal::ClientReactor {
400  public:
401  void StartCall() { writer_->StartCall(); }
402  void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
403  void StartWrite(const Request* req, grpc::WriteOptions options) {
404  writer_->Write(req, options);
405  }
406  void StartWriteLast(const Request* req, grpc::WriteOptions options) {
407  StartWrite(req, options.set_last_message());
408  }
409  void StartWritesDone() { writer_->WritesDone(); }
410 
411  void AddHold() { AddMultipleHolds(1); }
412  void AddMultipleHolds(int holds) {
413  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
414  writer_->AddHold(holds);
415  }
416  void RemoveHold() { writer_->RemoveHold(); }
417 
418  void OnDone(const grpc::Status& /*s*/) override {}
419  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
420  virtual void OnWriteDone(bool /*ok*/) {}
421  virtual void OnWritesDoneDone(bool /*ok*/) {}
422 
423  private:
424  friend class ClientCallbackWriter<Request>;
425  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
426 
428 };
429 
442  public:
443  void StartCall() { call_->StartCall(); }
444  void OnDone(const grpc::Status& /*s*/) override {}
445  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
446 
447  private:
448  friend class ClientCallbackUnary;
449  void BindCall(ClientCallbackUnary* call) { call_ = call; }
450  ClientCallbackUnary* call_;
451 };
452 
453 // Define function out-of-line from class to avoid forward declaration issue
455  reactor->BindCall(this);
456 }
457 
458 namespace internal {
459 
460 // Forward declare factory classes for friendship
461 template <class Request, class Response>
462 class ClientCallbackReaderWriterFactory;
463 template <class Response>
464 class ClientCallbackReaderFactory;
465 template <class Request>
466 class ClientCallbackWriterFactory;
467 
468 template <class Request, class Response>
470  : public ClientCallbackReaderWriter<Request, Response> {
471  public:
472  // always allocated against a call arena, no memory free required
473  static void operator delete(void* /*ptr*/, std::size_t size) {
475  }
476 
477  // This operator should never be called as the memory should be freed as part
478  // of the arena destruction. It only exists to provide a matching operator
479  // delete to the operator new so that some compilers will not complain (see
480  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
481  // there are no tests catching the compiler warning.
482  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
483 
484  void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
485  // This call initiates two batches, plus any backlog, each with a callback
486  // 1. Send initial metadata (unless corked) + recv initial metadata
487  // 2. Any read backlog
488  // 3. Any write backlog
489  // 4. Recv trailing metadata (unless corked)
490  if (!start_corked_) {
491  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
492  context_->initial_metadata_flags());
493  }
494 
495  call_.PerformOps(&start_ops_);
496 
497  {
498  grpc::internal::MutexLock lock(&start_mu_);
499 
500  if (backlog_.read_ops) {
501  call_.PerformOps(&read_ops_);
502  }
503  if (backlog_.write_ops) {
504  call_.PerformOps(&write_ops_);
505  }
506  if (backlog_.writes_done_ops) {
507  call_.PerformOps(&writes_done_ops_);
508  }
509  call_.PerformOps(&finish_ops_);
510  // The last thing in this critical section is to set started_ so that it
511  // can be used lock-free as well.
512  started_.store(true, std::memory_order_release);
513  }
514  // MaybeFinish outside the lock to make sure that destruction of this object
515  // doesn't take place while holding the lock (which would cause the lock to
516  // be released after destruction)
517  this->MaybeFinish(/*from_reaction=*/false);
518  }
519 
520  void Read(Response* msg) override {
521  read_ops_.RecvMessage(msg);
522  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
523  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
524  grpc::internal::MutexLock lock(&start_mu_);
525  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
526  backlog_.read_ops = true;
527  return;
528  }
529  }
530  call_.PerformOps(&read_ops_);
531  }
532 
533  void Write(const Request* msg, grpc::WriteOptions options)
534  ABSL_LOCKS_EXCLUDED(start_mu_) override {
535  if (options.is_last_message()) {
536  options.set_buffer_hint();
537  write_ops_.ClientSendClose();
538  }
539  // TODO(vjpai): don't assert
540  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
541  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
542  if (GPR_UNLIKELY(corked_write_needed_)) {
543  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
544  context_->initial_metadata_flags());
545  corked_write_needed_ = false;
546  }
547 
548  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
549  grpc::internal::MutexLock lock(&start_mu_);
550  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
551  backlog_.write_ops = true;
552  return;
553  }
554  }
555  call_.PerformOps(&write_ops_);
556  }
557  void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
558  writes_done_ops_.ClientSendClose();
559  writes_done_tag_.Set(
560  call_.call(),
561  [this](bool ok) {
562  reactor_->OnWritesDoneDone(ok);
563  MaybeFinish(/*from_reaction=*/true);
564  },
565  &writes_done_ops_, /*can_inline=*/false);
566  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
567  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
568  if (GPR_UNLIKELY(corked_write_needed_)) {
569  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
570  context_->initial_metadata_flags());
571  corked_write_needed_ = false;
572  }
573  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
574  grpc::internal::MutexLock lock(&start_mu_);
575  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
576  backlog_.writes_done_ops = true;
577  return;
578  }
579  }
580  call_.PerformOps(&writes_done_ops_);
581  }
582 
583  void AddHold(int holds) override {
584  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
585  }
586  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
587 
588  private:
589  friend class ClientCallbackReaderWriterFactory<Request, Response>;
590 
592  grpc::ClientContext* context,
594  : context_(context),
595  call_(call),
596  reactor_(reactor),
597  start_corked_(context_->initial_metadata_corked_),
598  corked_write_needed_(start_corked_) {
599  this->BindReactor(reactor);
600 
601  // Set up the unchanging parts of the start, read, and write tags and ops.
602  start_tag_.Set(
603  call_.call(),
604  [this](bool ok) {
605  reactor_->OnReadInitialMetadataDone(
606  ok && !reactor_->InternalTrailersOnly(call_.call()));
607  MaybeFinish(/*from_reaction=*/true);
608  },
609  &start_ops_, /*can_inline=*/false);
610  start_ops_.RecvInitialMetadata(context_);
611  start_ops_.set_core_cq_tag(&start_tag_);
612 
613  write_tag_.Set(
614  call_.call(),
615  [this](bool ok) {
616  reactor_->OnWriteDone(ok);
617  MaybeFinish(/*from_reaction=*/true);
618  },
619  &write_ops_, /*can_inline=*/false);
620  write_ops_.set_core_cq_tag(&write_tag_);
621 
622  read_tag_.Set(
623  call_.call(),
624  [this](bool ok) {
625  reactor_->OnReadDone(ok);
626  MaybeFinish(/*from_reaction=*/true);
627  },
628  &read_ops_, /*can_inline=*/false);
629  read_ops_.set_core_cq_tag(&read_tag_);
630 
631  // Also set up the Finish tag and op set.
632  finish_tag_.Set(
633  call_.call(),
634  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
635  &finish_ops_,
636  /*can_inline=*/false);
637  finish_ops_.ClientRecvStatus(context_, &finish_status_);
638  finish_ops_.set_core_cq_tag(&finish_tag_);
639  }
640 
641  // MaybeFinish can be called from reactions or from user-initiated operations
642  // like StartCall or RemoveHold. If this is the last operation or hold on this
643  // object, it will invoke the OnDone reaction. If MaybeFinish was called from
644  // a reaction, it can call OnDone directly. If not, it would need to schedule
645  // OnDone onto an executor thread to avoid the possibility of deadlocking with
646  // any locks in the user code that invoked it.
647  void MaybeFinish(bool from_reaction) {
648  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
649  1, std::memory_order_acq_rel) == 1)) {
650  grpc::Status s = std::move(finish_status_);
651  auto* reactor = reactor_;
652  auto* call = call_.call();
653  this->~ClientCallbackReaderWriterImpl();
655  if (GPR_LIKELY(from_reaction)) {
656  reactor->OnDone(s);
657  } else {
658  reactor->InternalScheduleOnDone(std::move(s));
659  }
660  }
661  }
662 
663  grpc::ClientContext* const context_;
664  grpc::internal::Call call_;
665  ClientBidiReactor<Request, Response>* const reactor_;
666 
669  start_ops_;
671  const bool start_corked_;
672  bool corked_write_needed_; // no lock needed since only accessed in
673  // Write/WritesDone which cannot be concurrent
674 
677  grpc::Status finish_status_;
678 
682  write_ops_;
684 
687  writes_done_ops_;
689 
691  read_ops_;
693 
694  struct StartCallBacklog {
695  bool write_ops = false;
696  bool writes_done_ops = false;
697  bool read_ops = false;
698  };
699  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
700 
701  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
702  std::atomic<intptr_t> callbacks_outstanding_{3};
703  std::atomic_bool started_{false};
704  grpc::internal::Mutex start_mu_;
705 };
706 
707 template <class Request, class Response>
708 class ClientCallbackReaderWriterFactory {
709  public:
710  static void Create(grpc::ChannelInterface* channel,
711  const grpc::internal::RpcMethod& method,
712  grpc::ClientContext* context,
714  grpc::internal::Call call =
715  channel->CreateCall(method, context, channel->CallbackCQ());
716 
721  reactor);
722  }
723 };
724 
725 template <class Response>
727  public:
728  // always allocated against a call arena, no memory free required
729  static void operator delete(void* /*ptr*/, std::size_t size) {
731  }
732 
733  // This operator should never be called as the memory should be freed as part
734  // of the arena destruction. It only exists to provide a matching operator
735  // delete to the operator new so that some compilers will not complain (see
736  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
737  // there are no tests catching the compiler warning.
738  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
739 
740  void StartCall() override {
741  // This call initiates two batches, plus any backlog, each with a callback
742  // 1. Send initial metadata (unless corked) + recv initial metadata
743  // 2. Any backlog
744  // 3. Recv trailing metadata
745 
746  start_tag_.Set(
747  call_.call(),
748  [this](bool ok) {
749  reactor_->OnReadInitialMetadataDone(
750  ok && !reactor_->InternalTrailersOnly(call_.call()));
751  MaybeFinish(/*from_reaction=*/true);
752  },
753  &start_ops_, /*can_inline=*/false);
754  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
755  context_->initial_metadata_flags());
756  start_ops_.RecvInitialMetadata(context_);
757  start_ops_.set_core_cq_tag(&start_tag_);
758  call_.PerformOps(&start_ops_);
759 
760  // Also set up the read tag so it doesn't have to be set up each time
761  read_tag_.Set(
762  call_.call(),
763  [this](bool ok) {
764  reactor_->OnReadDone(ok);
765  MaybeFinish(/*from_reaction=*/true);
766  },
767  &read_ops_, /*can_inline=*/false);
768  read_ops_.set_core_cq_tag(&read_tag_);
769 
770  {
771  grpc::internal::MutexLock lock(&start_mu_);
772  if (backlog_.read_ops) {
773  call_.PerformOps(&read_ops_);
774  }
775  started_.store(true, std::memory_order_release);
776  }
777 
778  finish_tag_.Set(
779  call_.call(),
780  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
781  &finish_ops_, /*can_inline=*/false);
782  finish_ops_.ClientRecvStatus(context_, &finish_status_);
783  finish_ops_.set_core_cq_tag(&finish_tag_);
784  call_.PerformOps(&finish_ops_);
785  }
786 
787  void Read(Response* msg) override {
788  read_ops_.RecvMessage(msg);
789  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
790  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
791  grpc::internal::MutexLock lock(&start_mu_);
792  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
793  backlog_.read_ops = true;
794  return;
795  }
796  }
797  call_.PerformOps(&read_ops_);
798  }
799 
800  void AddHold(int holds) override {
801  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
802  }
803  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
804 
805  private:
806  friend class ClientCallbackReaderFactory<Response>;
807 
808  template <class Request>
810  grpc::ClientContext* context, Request* request,
812  : context_(context), call_(call), reactor_(reactor) {
813  this->BindReactor(reactor);
814  // TODO(vjpai): don't assert
815  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
816  start_ops_.ClientSendClose();
817  }
818 
819  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
820  void MaybeFinish(bool from_reaction) {
821  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
822  1, std::memory_order_acq_rel) == 1)) {
823  grpc::Status s = std::move(finish_status_);
824  auto* reactor = reactor_;
825  auto* call = call_.call();
826  this->~ClientCallbackReaderImpl();
828  if (GPR_LIKELY(from_reaction)) {
829  reactor->OnDone(s);
830  } else {
831  reactor->InternalScheduleOnDone(std::move(s));
832  }
833  }
834  }
835 
836  grpc::ClientContext* const context_;
837  grpc::internal::Call call_;
838  ClientReadReactor<Response>* const reactor_;
839 
844  start_ops_;
846 
849  grpc::Status finish_status_;
850 
852  read_ops_;
854 
855  struct StartCallBacklog {
856  bool read_ops = false;
857  };
858  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
859 
860  // Minimum of 2 callbacks to pre-register for start and finish
861  std::atomic<intptr_t> callbacks_outstanding_{2};
862  std::atomic_bool started_{false};
863  grpc::internal::Mutex start_mu_;
864 };
865 
866 template <class Response>
867 class ClientCallbackReaderFactory {
868  public:
869  template <class Request>
870  static void Create(grpc::ChannelInterface* channel,
871  const grpc::internal::RpcMethod& method,
872  grpc::ClientContext* context, const Request* request,
873  ClientReadReactor<Response>* reactor) {
874  grpc::internal::Call call =
875  channel->CreateCall(method, context, channel->CallbackCQ());
876 
879  call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
880  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
881  }
882 };
883 
884 template <class Request>
886  public:
887  // always allocated against a call arena, no memory free required
888  static void operator delete(void* /*ptr*/, std::size_t size) {
890  }
891 
892  // This operator should never be called as the memory should be freed as part
893  // of the arena destruction. It only exists to provide a matching operator
894  // delete to the operator new so that some compilers will not complain (see
895  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
896  // there are no tests catching the compiler warning.
897  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
898 
899  void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
900  // This call initiates two batches, plus any backlog, each with a callback
901  // 1. Send initial metadata (unless corked) + recv initial metadata
902  // 2. Any backlog
903  // 3. Recv trailing metadata
904 
905  if (!start_corked_) {
906  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
907  context_->initial_metadata_flags());
908  }
909  call_.PerformOps(&start_ops_);
910 
911  {
912  grpc::internal::MutexLock lock(&start_mu_);
913 
914  if (backlog_.write_ops) {
915  call_.PerformOps(&write_ops_);
916  }
917  if (backlog_.writes_done_ops) {
918  call_.PerformOps(&writes_done_ops_);
919  }
920  call_.PerformOps(&finish_ops_);
921  // The last thing in this critical section is to set started_ so that it
922  // can be used lock-free as well.
923  started_.store(true, std::memory_order_release);
924  }
925  // MaybeFinish outside the lock to make sure that destruction of this object
926  // doesn't take place while holding the lock (which would cause the lock to
927  // be released after destruction)
928  this->MaybeFinish(/*from_reaction=*/false);
929  }
930 
931  void Write(const Request* msg, grpc::WriteOptions options)
932  ABSL_LOCKS_EXCLUDED(start_mu_) override {
933  if (GPR_UNLIKELY(options.is_last_message())) {
934  options.set_buffer_hint();
935  write_ops_.ClientSendClose();
936  }
937  // TODO(vjpai): don't assert
938  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
939  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
940 
941  if (GPR_UNLIKELY(corked_write_needed_)) {
942  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
943  context_->initial_metadata_flags());
944  corked_write_needed_ = false;
945  }
946 
947  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
948  grpc::internal::MutexLock lock(&start_mu_);
949  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
950  backlog_.write_ops = true;
951  return;
952  }
953  }
954  call_.PerformOps(&write_ops_);
955  }
956 
957  void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
958  writes_done_ops_.ClientSendClose();
959  writes_done_tag_.Set(
960  call_.call(),
961  [this](bool ok) {
962  reactor_->OnWritesDoneDone(ok);
963  MaybeFinish(/*from_reaction=*/true);
964  },
965  &writes_done_ops_, /*can_inline=*/false);
966  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
967  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
968 
969  if (GPR_UNLIKELY(corked_write_needed_)) {
970  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
971  context_->initial_metadata_flags());
972  corked_write_needed_ = false;
973  }
974 
975  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
976  grpc::internal::MutexLock lock(&start_mu_);
977  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
978  backlog_.writes_done_ops = true;
979  return;
980  }
981  }
982  call_.PerformOps(&writes_done_ops_);
983  }
984 
985  void AddHold(int holds) override {
986  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
987  }
988  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
989 
990  private:
991  friend class ClientCallbackWriterFactory<Request>;
992 
993  template <class Response>
995  grpc::ClientContext* context, Response* response,
997  : context_(context),
998  call_(call),
999  reactor_(reactor),
1000  start_corked_(context_->initial_metadata_corked_),
1001  corked_write_needed_(start_corked_) {
1002  this->BindReactor(reactor);
1003 
1004  // Set up the unchanging parts of the start and write tags and ops.
1005  start_tag_.Set(
1006  call_.call(),
1007  [this](bool ok) {
1008  reactor_->OnReadInitialMetadataDone(
1009  ok && !reactor_->InternalTrailersOnly(call_.call()));
1010  MaybeFinish(/*from_reaction=*/true);
1011  },
1012  &start_ops_, /*can_inline=*/false);
1013  start_ops_.RecvInitialMetadata(context_);
1014  start_ops_.set_core_cq_tag(&start_tag_);
1015 
1016  write_tag_.Set(
1017  call_.call(),
1018  [this](bool ok) {
1019  reactor_->OnWriteDone(ok);
1020  MaybeFinish(/*from_reaction=*/true);
1021  },
1022  &write_ops_, /*can_inline=*/false);
1023  write_ops_.set_core_cq_tag(&write_tag_);
1024 
1025  // Also set up the Finish tag and op set.
1026  finish_ops_.RecvMessage(response);
1027  finish_ops_.AllowNoMessage();
1028  finish_tag_.Set(
1029  call_.call(),
1030  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1031  &finish_ops_,
1032  /*can_inline=*/false);
1033  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1034  finish_ops_.set_core_cq_tag(&finish_tag_);
1035  }
1036 
1037  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1038  void MaybeFinish(bool from_reaction) {
1039  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1040  1, std::memory_order_acq_rel) == 1)) {
1041  grpc::Status s = std::move(finish_status_);
1042  auto* reactor = reactor_;
1043  auto* call = call_.call();
1044  this->~ClientCallbackWriterImpl();
1046  if (GPR_LIKELY(from_reaction)) {
1047  reactor->OnDone(s);
1048  } else {
1049  reactor->InternalScheduleOnDone(std::move(s));
1050  }
1051  }
1052  }
1053 
1054  grpc::ClientContext* const context_;
1055  grpc::internal::Call call_;
1056  ClientWriteReactor<Request>* const reactor_;
1057 
1060  start_ops_;
1062  const bool start_corked_;
1063  bool corked_write_needed_; // no lock needed since only accessed in
1064  // Write/WritesDone which cannot be concurrent
1065 
1068  finish_ops_;
1070  grpc::Status finish_status_;
1071 
1075  write_ops_;
1077 
1080  writes_done_ops_;
1081  grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1082 
1083  struct StartCallBacklog {
1084  bool write_ops = false;
1085  bool writes_done_ops = false;
1086  };
1087  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1088 
1089  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1090  std::atomic<intptr_t> callbacks_outstanding_{3};
1091  std::atomic_bool started_{false};
1092  grpc::internal::Mutex start_mu_;
1093 };
1094 
1095 template <class Request>
1096 class ClientCallbackWriterFactory {
1097  public:
1098  template <class Response>
1099  static void Create(grpc::ChannelInterface* channel,
1100  const grpc::internal::RpcMethod& method,
1101  grpc::ClientContext* context, Response* response,
1102  ClientWriteReactor<Request>* reactor) {
1103  grpc::internal::Call call =
1104  channel->CreateCall(method, context, channel->CallbackCQ());
1105 
1108  call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1109  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1110  }
1111 };
1112 
1114  public:
1115  // always allocated against a call arena, no memory free required
1116  static void operator delete(void* /*ptr*/, std::size_t size) {
1118  }
1119 
1120  // This operator should never be called as the memory should be freed as part
1121  // of the arena destruction. It only exists to provide a matching operator
1122  // delete to the operator new so that some compilers will not complain (see
1123  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1124  // there are no tests catching the compiler warning.
1125  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1126 
1127  void StartCall() override {
1128  // This call initiates two batches, each with a callback
1129  // 1. Send initial metadata + write + writes done + recv initial metadata
1130  // 2. Read message, recv trailing metadata
1131 
1132  start_tag_.Set(
1133  call_.call(),
1134  [this](bool ok) {
1135  reactor_->OnReadInitialMetadataDone(
1136  ok && !reactor_->InternalTrailersOnly(call_.call()));
1137  MaybeFinish();
1138  },
1139  &start_ops_, /*can_inline=*/false);
1140  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1141  context_->initial_metadata_flags());
1142  start_ops_.RecvInitialMetadata(context_);
1143  start_ops_.set_core_cq_tag(&start_tag_);
1144  call_.PerformOps(&start_ops_);
1145 
1146  finish_tag_.Set(
1147  call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1148  /*can_inline=*/false);
1149  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1150  finish_ops_.set_core_cq_tag(&finish_tag_);
1151  call_.PerformOps(&finish_ops_);
1152  }
1153 
1154  private:
1156 
1157  template <class Request, class Response>
1159  grpc::ClientContext* context, Request* request,
1160  Response* response, ClientUnaryReactor* reactor)
1161  : context_(context), call_(call), reactor_(reactor) {
1162  this->BindReactor(reactor);
1163  // TODO(vjpai): don't assert
1164  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1165  start_ops_.ClientSendClose();
1166  finish_ops_.RecvMessage(response);
1167  finish_ops_.AllowNoMessage();
1168  }
1169 
1170  // In the unary case, MaybeFinish is only ever invoked from a
1171  // library-initiated reaction, so it will just directly call OnDone if this is
1172  // the last reaction for this RPC.
1173  void MaybeFinish() {
1174  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1175  1, std::memory_order_acq_rel) == 1)) {
1176  grpc::Status s = std::move(finish_status_);
1177  auto* reactor = reactor_;
1178  auto* call = call_.call();
1179  this->~ClientCallbackUnaryImpl();
1181  reactor->OnDone(s);
1182  }
1183  }
1184 
1185  grpc::ClientContext* const context_;
1186  grpc::internal::Call call_;
1187  ClientUnaryReactor* const reactor_;
1188 
1193  start_ops_;
1195 
1198  finish_ops_;
1200  grpc::Status finish_status_;
1201 
1202  // This call will have 2 callbacks: start and finish
1203  std::atomic<intptr_t> callbacks_outstanding_{2};
1204 };
1205 
1207  public:
1208  template <class Request, class Response, class BaseRequest = Request,
1209  class BaseResponse = Response>
1210  static void Create(grpc::ChannelInterface* channel,
1211  const grpc::internal::RpcMethod& method,
1212  grpc::ClientContext* context, const Request* request,
1213  Response* response, ClientUnaryReactor* reactor) {
1214  grpc::internal::Call call =
1215  channel->CreateCall(method, context, channel->CallbackCQ());
1216 
1218 
1220  call.call(), sizeof(ClientCallbackUnaryImpl)))
1221  ClientCallbackUnaryImpl(call, context,
1222  static_cast<const BaseRequest*>(request),
1223  static_cast<BaseResponse*>(response), reactor);
1224  }
1225 };
1226 
1227 } // namespace internal
1228 } // namespace grpc
1229 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
grpc::internal::ClientCallbackUnaryImpl
Definition: client_callback.h:1113
grpc::internal::CallbackWithSuccessTag
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:137
grpc::ClientReadReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:387
grpc::ClientWriteReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:419
grpc::ClientWriteReactor::StartWritesDone
void StartWritesDone()
Definition: client_callback.h:409
grpc::internal::CallOpRecvInitialMetadata
Definition: call_op_set.h:728
grpc::ClientCallbackWriter::~ClientCallbackWriter
virtual ~ClientCallbackWriter()
Definition: client_callback.h:196
grpc::internal::ClientCallbackReaderWriterImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:520
grpc::internal::ClientCallbackWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:985
grpc::internal::ClientCallbackReaderWriterImpl::StartCall
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:484
grpc::internal::CallOpClientSendClose
Definition: call_op_set.h:626
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:261
grpc::internal::CallOpGenericRecvMessage
Definition: call_op_set.h:533
grpc::ClientBidiReactor::AddHold
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:315
grpc::internal::ClientCallbackReaderWriterImpl
Definition: client_callback.h:469
grpc::ClientReadReactor::OnReadDone
virtual void OnReadDone(bool)
Definition: client_callback.h:388
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:859
grpc::internal::ClientCallbackReaderWriterImpl::Write
void Write(const Request *msg, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:533
status.h
grpc::internal::ClientCallbackReaderImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:803
grpc::CoreCodegenInterface::grpc_call_ref
virtual void grpc_call_ref(grpc_call *call)=0
grpc::internal::CallOpSendMessage
Definition: call_op_set.h:289
grpc::internal::CallbackUnaryCallImpl::CallbackUnaryCallImpl
CallbackUnaryCallImpl(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(grpc::Status)> on_completion)
Definition: client_callback.h:67
grpc::WriteOptions::set_last_message
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:158
grpc::ClientUnaryReactor
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:441
grpc::internal::ClientReactor::InternalScheduleOnDone
virtual void InternalScheduleOnDone(grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
grpc::ClientCallbackReaderWriter::Read
virtual void Read(Response *resp)=0
grpc::CoreCodegenInterface::grpc_call_arena_alloc
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: port_platform.h:687
grpc::internal::ClientCallbackReaderWriterFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:710
config.h
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req, grpc::WriteOptions options)
Definition: client_callback.h:403
grpc::ClientWriteReactor::StartCall
void StartCall()
Definition: client_callback.h:401
grpc::ClientWriteReactor::OnDone
void OnDone(const grpc::Status &) override
Definition: client_callback.h:418
grpc::internal::ClientReactor::OnDone
virtual void OnDone(const grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
grpc::internal::ClientCallbackReaderWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:586
grpc::ClientReadReactor::OnDone
void OnDone(const grpc::Status &) override
Definition: client_callback.h:386
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: call.h:37
grpc::internal::CallbackUnaryCall
void CallbackUnaryCall(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(grpc::Status)> on_completion)
Perform a callback-based unary call.
Definition: client_callback.h:51
grpc::ClientCallbackReaderWriter::WritesDone
virtual void WritesDone()=0
grpc::internal::ClientCallbackWriterImpl::RemoveHold
void RemoveHold() override
Definition: client_callback.h:988
core_codegen_interface.h
grpc::internal::CallOpSendInitialMetadata
Definition: call_op_set.h:219
grpc::internal::ClientCallbackWriterFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:1099
grpc::ClientBidiReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback.h:339
grpc::internal::CallbackWithStatusTag
Definition: callback_common.h:71
grpc::ClientCallbackReaderWriter::BindReactor
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:173
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:120
grpc::ClientCallbackUnary::BindReactor
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback.h:454
grpc::ClientBidiReactor::StartRead
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:253
grpc::ClientReadReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:380
grpc::internal::ClientCallbackWriterImpl::StartCall
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:899
grpc::internal::ClientCallbackWriterImpl::Write
void Write(const Request *msg, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:931
grpc::ClientBidiReactor::StartWrite
void StartWrite(const Request *req, grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:269
grpc::ClientBidiReactor::OnDone
void OnDone(const grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback.h:329
grpc::internal::ClientReactor::~ClientReactor
virtual ~ClientReactor()=default
grpc::ClientCallbackUnary::StartCall
virtual void StartCall()=0
grpc::ClientCallbackReaderWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::OnReadDone
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:345
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:33
grpc::ClientCallbackWriter::WritesDone
virtual void WritesDone()=0
grpc::internal::ClientCallbackWriterFactory
Definition: channel_interface.h:51
grpc::ClientReadReactor::StartCall
void StartCall()
Definition: client_callback.h:376
grpc::ClientBidiReactor::OnWriteDone
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback.h:352
grpc::ClientContext
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:195
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: port_platform.h:688
grpc::internal::ClientCallbackReaderImpl::StartCall
void StartCall() override
Definition: client_callback.h:740
grpc::internal::ClientCallbackReaderWriterFactory
Definition: channel_interface.h:47
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:69
grpc::internal::CallbackUnaryCallImpl
Definition: channel_interface.h:38
grpc::ClientUnaryReactor::OnReadInitialMetadataDone
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:445
grpc::ClientCallbackWriter::BindReactor
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:209
grpc::ClientCallbackReader::RemoveHold
virtual void RemoveHold()=0
grpc::ClientCallbackReaderWriter
Definition: client_callback.h:162
grpc::ClientBidiReactor::StartWritesDone
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:291
grpc::ClientCallbackReader::Read
virtual void Read(Response *resp)=0
grpc::ClientReadReactor::StartRead
void StartRead(Response *resp)
Definition: client_callback.h:377
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:73
grpc::internal::ClientReactor
Definition: client_callback.h:115
grpc::internal::ClientCallbackUnaryFactory
Definition: client_callback.h:1206
grpc::internal::ClientCallbackReaderImpl::Read
void Read(Response *msg) override
Definition: client_callback.h:787
grpc::ClientWriteReactor::StartWriteLast
void StartWriteLast(const Request *req, grpc::WriteOptions options)
Definition: client_callback.h:406
sync.h
grpc::internal::ClientCallbackWriterImpl
Definition: client_callback.h:885
grpc::ClientCallbackReaderWriter::Write
virtual void Write(const Request *req, grpc::WriteOptions options)=0
grpc::ClientWriteReactor
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:155
grpc::ClientCallbackUnary
Definition: client_callback.h:214
grpc::ClientBidiReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:316
grpc::CoreCodegenInterface::grpc_call_unref
virtual void grpc_call_unref(grpc_call *call)=0
grpc::ClientReadReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:384
grpc::internal::ClientCallbackUnaryFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback.h:1210
grpc::ClientBidiReactor::StartWriteLast
void StartWriteLast(const Request *req, grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:282
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:81
grpc::ClientCallbackWriter::RemoveHold
virtual void RemoveHold()=0
grpc::ClientBidiReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:361
grpc::internal::ClientCallbackWriterImpl::WritesDone
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:957
callback_common.h
grpc::ClientCallbackReader::~ClientCallbackReader
virtual ~ClientCallbackReader()
Definition: client_callback.h:181
grpc::internal::CallbackWithSuccessTag::Set
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:164
grpc::internal::ClientCallbackReaderImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:800
grpc::ClientCallbackReader::BindReactor
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:188
grpc::internal::MutexLock
Definition: sync.h:86
grpc::ClientCallbackWriter
Definition: client_callback.h:194
grpc::ClientCallbackWriter::AddHold
virtual void AddHold(int holds)=0
grpc::ClientBidiReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:320
grpc::ClientCallbackReaderWriter::~ClientCallbackReaderWriter
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:164
grpc::ClientCallbackReaderWriter::RemoveHold
virtual void RemoveHold()=0
channel_interface.h
grpc::ClientCallbackWriter::Write
void Write(const Request *req)
Definition: client_callback.h:198
grpc::ClientCallbackReader::StartCall
virtual void StartCall()=0
grpc::ClientWriteReactor::AddHold
void AddHold()
Definition: client_callback.h:411
grpc::internal::Call::PerformOps
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:67
grpc::internal::Call::call
grpc_call * call() const
Definition: call.h:71
grpc::ClientUnaryReactor::StartCall
void StartCall()
Definition: client_callback.h:443
call.h
grpc::ClientCallbackReaderWriter::StartCall
virtual void StartCall()=0
call_op_set.h
grpc::internal::CallOpClientRecvStatus
Definition: call_op_set.h:776
grpc::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:104
grpc::ClientBidiReactor
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:151
grpc::internal::ClientCallbackReaderFactory
Definition: channel_interface.h:49
grpc::ClientCallbackWriter::WriteLast
void WriteLast(const Request *req, grpc::WriteOptions options)
Definition: client_callback.h:200
grpc::internal::ClientCallbackReaderImpl
Definition: client_callback.h:726
grpc::internal::ClientReactor::InternalTrailersOnly
virtual bool InternalTrailersOnly(const grpc_call *call) const
InternalTrailersOnly is not part of the API and is not meant to be overridden.
grpc::ClientCallbackWriter::StartCall
virtual void StartCall()=0
grpc::internal::ClientCallbackUnaryImpl::StartCall
void StartCall() override
Definition: client_callback.h:1127
grpc::ClientWriteReactor::StartWrite
void StartWrite(const Request *req)
Definition: client_callback.h:402
grpc::ClientBidiReactor::StartCall
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback.h:246
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:98
GPR_CODEGEN_ASSERT
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:149
grpc::ClientWriteReactor::OnWriteDone
virtual void OnWriteDone(bool)
Definition: client_callback.h:420
grpc::internal::CallOpRecvMessage
Definition: byte_buffer.h:52
grpc::internal::Mutex
Definition: sync.h:59
grpc::internal::RpcMethod
Descriptor of an RPC method.
Definition: rpc_method.h:31
grpc::ClientCallbackReader::AddHold
virtual void AddHold(int holds)=0
grpc::ClientWriteReactor::AddMultipleHolds
void AddMultipleHolds(int holds)
Definition: client_callback.h:412
grpc::ClientCallbackReader
Definition: client_callback.h:179
grpc::ClientCallbackUnary::~ClientCallbackUnary
virtual ~ClientCallbackUnary()
Definition: client_callback.h:216
grpc::internal::CallOpSet::set_core_cq_tag
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:951
grpc::internal::ClientCallbackReaderWriterImpl::AddHold
void AddHold(int holds) override
Definition: client_callback.h:583
grpc::ClientWriteReactor::RemoveHold
void RemoveHold()
Definition: client_callback.h:416
grpc::ClientReadReactor
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:153
grpc::internal::ClientCallbackReaderWriterImpl::WritesDone
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:557
grpc::ClientReadReactor::AddHold
void AddHold()
Definition: client_callback.h:379
grpc::ClientUnaryReactor::OnDone
void OnDone(const grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:444
grpc::internal::ClientCallbackReaderFactory::Create
static void Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback.h:870
GPR_CODEGEN_DEBUG_ASSERT
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:158
grpc::ClientWriteReactor::OnWritesDoneDone
virtual void OnWritesDoneDone(bool)
Definition: client_callback.h:421