GRPC C++  1.81.0
server_callback.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
20 #define GRPCPP_SUPPORT_SERVER_CALLBACK_H
21 
22 #include <grpc/impl/call.h>
23 #include <grpcpp/impl/call.h>
25 #include <grpcpp/impl/sync.h>
27 #include <grpcpp/support/config.h>
29 #include <grpcpp/support/status.h>
30 
31 #include <atomic>
32 #include <functional>
33 #include <type_traits>
34 
35 #include "absl/functional/any_invocable.h"
36 
37 namespace grpc {
38 
39 class Server;
40 
41 // Declare base class of all reactors as internal
42 namespace experimental {
43 namespace internal {
44 void BindSessionToInnerServer(grpc_call* call, grpc::Server* inner_server);
45 } // namespace internal
46 } // namespace experimental
47 
48 namespace internal {
49 
50 // Forward declarations
51 template <class Request, class Response>
52 class CallbackUnaryHandler;
53 template <class Request, class Response>
54 class CallbackClientStreamingHandler;
55 template <class Request, class Response>
56 class CallbackServerStreamingHandler;
57 template <class Request, class Response>
58 class CallbackBidiHandler;
59 
61  public:
62  virtual ~ServerReactor() = default;
63  virtual void OnDone() = 0;
64  virtual void OnCancel() = 0;
65 
66  // The following is not API. It is for internal use only and specifies whether
67  // all reactions of this Reactor can be run without extra EventEngine
68  // scheduling. This should only be used for internally-defined reactors with
69  // trivial reactions.
70  virtual bool InternalInlineable() { return false; }
71 
72  private:
73  template <class Request, class Response>
74  friend class CallbackUnaryHandler;
75  template <class Request, class Response>
77  template <class Request, class Response>
79  template <class Request, class Response>
80  friend class CallbackBidiHandler;
81 };
82 
85  public:
86  virtual ~ServerCallbackCall() {}
87 
88  // This object is responsible for tracking when it is safe to call OnDone and
89  // OnCancel. OnDone should not be called until the method handler is complete,
90  // Finish has been called, the ServerContext CompletionOp (which tracks
91  // cancellation or successful completion) has completed, and all outstanding
92  // Read/Write actions have seen their reactions. OnCancel should not be called
93  // until after the method handler is done and the RPC has completed with a
94  // cancellation. This is tracked by counting how many of these conditions have
95  // been met and calling OnCancel when none remain unmet.
96 
97  // Public versions of MaybeDone: one where we don't know the reactor in
98  // advance (used for the ServerContext CompletionOp), and one for where we
99  // know the inlineability of the OnDone reaction. You should set the inline
100  // flag to true if either the Reactor is InternalInlineable() or if this
101  // callback is already being forced to run dispatched to an EventEngine thread
102  // (typically because it contains additional work than just the MaybeDone).
103 
104  void MaybeDone() {
105  if (GPR_UNLIKELY(Unref() == 1)) {
106  ScheduleOnDone(reactor()->InternalInlineable());
107  }
108  }
109 
110  void MaybeDone(bool inline_ondone) {
111  if (GPR_UNLIKELY(Unref() == 1)) {
112  ScheduleOnDone(inline_ondone);
113  }
114  }
115 
116  // Fast version called with known reactor passed in, used from derived
117  // classes, typically in non-cancel case
119  if (GPR_UNLIKELY(UnblockCancellation())) {
120  CallOnCancel(reactor);
121  }
122  }
123 
124  // Slower version called from object that doesn't know the reactor a priori
125  // (such as the ServerContext CompletionOp which is formed before the
126  // reactor). This is used in cancel cases only, so it's ok to be slower and
127  // invoke a virtual function.
129  if (GPR_UNLIKELY(UnblockCancellation())) {
130  CallOnCancel(reactor());
131  }
132  }
133 
134  protected:
136  void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
137 
138  private:
139  virtual ServerReactor* reactor() = 0;
140 
141  virtual grpc_call* call() = 0;
142 
143  virtual void RunAsync(absl::AnyInvocable<void()> cb) {
144  grpc_call_run_in_event_engine(call(), std::move(cb));
145  }
146 
147  // CallOnDone performs the work required at completion of the RPC: invoking
148  // the OnDone function and doing all necessary cleanup. This function is only
149  // ever invoked on a fully-Unref'fed ServerCallbackCall.
150  virtual void CallOnDone() = 0;
151 
152  // If the OnDone reaction is inlineable, execute it inline. Otherwise run it
153  // async on EventEngine.
154  void ScheduleOnDone(bool inline_ondone);
155 
156  // If the OnCancel reaction is inlineable, execute it inline. Otherwise run it
157  // async on EventEngine.
158  void CallOnCancel(ServerReactor* reactor);
159 
160  // Implement the cancellation constraint counter. Return true if OnCancel
161  // should be called, false otherwise.
162  bool UnblockCancellation() {
163  return on_cancel_conditions_remaining_.fetch_sub(
164  1, std::memory_order_acq_rel) == 1;
165  }
166 
168  int Unref() {
169  return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
170  }
171 
172  std::atomic_int on_cancel_conditions_remaining_{2};
173  std::atomic_int callbacks_outstanding_{
174  3}; // reserve for start, Finish, and CompletionOp
175 };
176 
177 template <class Request, class Response>
178 class DefaultMessageHolder : public MessageHolder<Request, Response> {
179  public:
181  this->set_request(&request_obj_);
182  this->set_response(&response_obj_);
183  }
184  void Release() override {
185  // the object is allocated in the call arena.
187  }
188 
189  private:
190  Request request_obj_;
191  Response response_obj_;
192 };
193 
194 } // namespace internal
195 
196 // Forward declarations
197 class ServerUnaryReactor;
198 template <class Request>
200 template <class Response>
202 template <class Request, class Response>
204 
205 namespace experimental {
206 class ServerSessionReactor;
207 class ServerCallbackSession;
208 } // namespace experimental
209 
210 // NOTE: The actual call/stream object classes are provided as API only to
211 // support mocking. There are no implementations of these class interfaces in
212 // the API.
214  public:
215  ~ServerCallbackUnary() override {}
216  virtual void Finish(grpc::Status s) = 0;
217  virtual void SendInitialMetadata() = 0;
218 
219  protected:
220  // Use a template rather than explicitly specifying ServerUnaryReactor to
221  // delay binding and avoid a circular forward declaration issue
222  template <class Reactor>
223  void BindReactor(Reactor* reactor) {
224  reactor->InternalBindCall(this);
225  }
226 };
227 
228 namespace experimental {
230  public:
232 
233  virtual void Finish(grpc::Status s) = 0;
234  virtual void SendInitialMetadata() = 0;
235  virtual void BindInnerServer(grpc::Server* inner_server) = 0;
236 
237  protected:
238  template <class Reactor>
239  void BindReactor(Reactor* reactor) {
240  reactor->InternalBindSession(this);
241  }
242 };
243 } // namespace experimental
244 
245 template <class Request>
247  public:
248  ~ServerCallbackReader() override {}
249  virtual void Finish(grpc::Status s) = 0;
250  virtual void SendInitialMetadata() = 0;
251  virtual void Read(Request* msg) = 0;
252 
253  protected:
255  reactor->InternalBindReader(this);
256  }
257 };
258 
259 template <class Response>
261  public:
262  ~ServerCallbackWriter() override {}
263 
264  virtual void Finish(grpc::Status s) = 0;
265  virtual void SendInitialMetadata() = 0;
266  virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
267  virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
268  grpc::Status s) = 0;
269 
270  protected:
272  reactor->InternalBindWriter(this);
273  }
274 };
275 
276 template <class Request, class Response>
278  public:
280 
281  virtual void Finish(grpc::Status s) = 0;
282  virtual void SendInitialMetadata() = 0;
283  virtual void Read(Request* msg) = 0;
284  virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
285  virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
286  grpc::Status s) = 0;
287 
288  protected:
290  reactor->InternalBindStream(this);
291  }
292 };
293 
294 // The following classes are the reactor interfaces that are to be implemented
295 // by the user, returned as the output parameter of the method handler for a
296 // callback method. Note that none of the classes are pure; all reactions have a
297 // default empty reaction so that the user class only needs to override those
298 // reactions that it cares about. The reaction methods will be invoked by the
299 // library in response to the completion of various operations. Reactions must
300 // not include blocking operations (such as blocking I/O, starting synchronous
301 // RPCs, or waiting on condition variables). Reactions may be invoked
302 // concurrently, except that OnDone is called after all others (assuming proper
303 // API usage). The reactor may not be deleted until OnDone is called.
304 
306 template <class Request, class Response>
307 class ServerBidiReactor : public internal::ServerReactor {
308  public:
309  // NOTE: Initializing stream_ as a constructor initializer rather than a
310  // default initializer because gcc-4.x requires a copy constructor for
311  // default initializing a templated member, which isn't ok for atomic.
312  // TODO(vjpai): Switch to default constructor and default initializer when
313  // gcc-4.x is no longer supported
314  ServerBidiReactor() : stream_(nullptr) {}
315  ~ServerBidiReactor() override = default;
316 
320  void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) {
322  stream_.load(std::memory_order_acquire);
323  if (stream == nullptr) {
324  grpc::internal::MutexLock l(&stream_mu_);
325  stream = stream_.load(std::memory_order_relaxed);
326  if (stream == nullptr) {
327  backlog_.send_initial_metadata_wanted = true;
328  return;
329  }
330  }
331  stream->SendInitialMetadata();
332  }
333 
338  void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
340  stream_.load(std::memory_order_acquire);
341  if (stream == nullptr) {
342  grpc::internal::MutexLock l(&stream_mu_);
343  stream = stream_.load(std::memory_order_relaxed);
344  if (stream == nullptr) {
345  backlog_.read_wanted = req;
346  return;
347  }
348  }
349  stream->Read(req);
350  }
351 
357  void StartWrite(const Response* resp) {
359  }
360 
367  void StartWrite(const Response* resp, grpc::WriteOptions options)
368  ABSL_LOCKS_EXCLUDED(stream_mu_) {
370  stream_.load(std::memory_order_acquire);
371  if (stream == nullptr) {
372  grpc::internal::MutexLock l(&stream_mu_);
373  stream = stream_.load(std::memory_order_relaxed);
374  if (stream == nullptr) {
375  backlog_.write_wanted = resp;
376  backlog_.write_options_wanted = options;
377  return;
378  }
379  }
380  stream->Write(resp, options);
381  }
382 
396  void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
397  grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
399  stream_.load(std::memory_order_acquire);
400  if (stream == nullptr) {
401  grpc::internal::MutexLock l(&stream_mu_);
402  stream = stream_.load(std::memory_order_relaxed);
403  if (stream == nullptr) {
404  backlog_.write_and_finish_wanted = true;
405  backlog_.write_wanted = resp;
406  backlog_.write_options_wanted = options;
407  backlog_.status_wanted = std::move(s);
408  return;
409  }
410  }
411  stream->WriteAndFinish(resp, options, std::move(s));
412  }
413 
422  void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
423  StartWrite(resp, options.set_last_message());
424  }
425 
432  void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
434  stream_.load(std::memory_order_acquire);
435  if (stream == nullptr) {
436  grpc::internal::MutexLock l(&stream_mu_);
437  stream = stream_.load(std::memory_order_relaxed);
438  if (stream == nullptr) {
439  backlog_.finish_wanted = true;
440  backlog_.status_wanted = std::move(s);
441  return;
442  }
443  }
444  stream->Finish(std::move(s));
445  }
446 
453  virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
454 
459  virtual void OnReadDone(bool /*ok*/) {}
460 
466  virtual void OnWriteDone(bool /*ok*/) {}
467 
471  void OnDone() override = 0;
472 
476  void OnCancel() override {}
477 
478  private:
479  friend class ServerCallbackReaderWriter<Request, Response>;
480  // May be overridden by internal implementation details. This is not a public
481  // customization point.
482  virtual void InternalBindStream(
484  grpc::internal::MutexLock l(&stream_mu_);
485 
486  if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
487  stream->SendInitialMetadata();
488  }
489  if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
490  stream->Read(backlog_.read_wanted);
491  }
492  if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
493  stream->WriteAndFinish(backlog_.write_wanted,
494  std::move(backlog_.write_options_wanted),
495  std::move(backlog_.status_wanted));
496  } else {
497  if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
498  stream->Write(backlog_.write_wanted,
499  std::move(backlog_.write_options_wanted));
500  }
501  if (GPR_UNLIKELY(backlog_.finish_wanted)) {
502  stream->Finish(std::move(backlog_.status_wanted));
503  }
504  }
505  // Set stream_ last so that other functions can use it lock-free
506  stream_.store(stream, std::memory_order_release);
507  }
508 
509  grpc::internal::Mutex stream_mu_;
510  // TODO(vjpai): Make stream_or_backlog_ into an std::variant once C++17 or
511  // ABSL is supported since stream and backlog are mutually exclusive in this
512  // class. Do likewise with the remaining reactor classes and their backlogs
513  // as well.
514  std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
515  struct PreBindBacklog {
516  bool send_initial_metadata_wanted = false;
517  bool write_and_finish_wanted = false;
518  bool finish_wanted = false;
519  Request* read_wanted = nullptr;
520  const Response* write_wanted = nullptr;
521  grpc::WriteOptions write_options_wanted;
522  grpc::Status status_wanted;
523  };
524  PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
525 };
526 
528 template <class Request>
529 class ServerReadReactor : public internal::ServerReactor {
530  public:
531  ServerReadReactor() : reader_(nullptr) {}
532  ~ServerReadReactor() override = default;
533 
535  void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) {
537  reader_.load(std::memory_order_acquire);
538  if (reader == nullptr) {
539  grpc::internal::MutexLock l(&reader_mu_);
540  reader = reader_.load(std::memory_order_relaxed);
541  if (reader == nullptr) {
542  backlog_.send_initial_metadata_wanted = true;
543  return;
544  }
545  }
546  reader->SendInitialMetadata();
547  }
548  void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
550  reader_.load(std::memory_order_acquire);
551  if (reader == nullptr) {
552  grpc::internal::MutexLock l(&reader_mu_);
553  reader = reader_.load(std::memory_order_relaxed);
554  if (reader == nullptr) {
555  backlog_.read_wanted = req;
556  return;
557  }
558  }
559  reader->Read(req);
560  }
561  void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) {
563  reader_.load(std::memory_order_acquire);
564  if (reader == nullptr) {
565  grpc::internal::MutexLock l(&reader_mu_);
566  reader = reader_.load(std::memory_order_relaxed);
567  if (reader == nullptr) {
568  backlog_.finish_wanted = true;
569  backlog_.status_wanted = std::move(s);
570  return;
571  }
572  }
573  reader->Finish(std::move(s));
574  }
575 
577  virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
578  virtual void OnReadDone(bool /*ok*/) {}
579  void OnDone() override = 0;
580  void OnCancel() override {}
581 
582  private:
583  friend class ServerCallbackReader<Request>;
584 
585  // May be overridden by internal implementation details. This is not a public
586  // customization point.
587  virtual void InternalBindReader(ServerCallbackReader<Request>* reader)
588  ABSL_LOCKS_EXCLUDED(reader_mu_) {
589  grpc::internal::MutexLock l(&reader_mu_);
590 
591  if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
592  reader->SendInitialMetadata();
593  }
594  if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
595  reader->Read(backlog_.read_wanted);
596  }
597  if (GPR_UNLIKELY(backlog_.finish_wanted)) {
598  reader->Finish(std::move(backlog_.status_wanted));
599  }
600  // Set reader_ last so that other functions can use it lock-free
601  reader_.store(reader, std::memory_order_release);
602  }
603 
604  grpc::internal::Mutex reader_mu_;
605  std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
606  struct PreBindBacklog {
607  bool send_initial_metadata_wanted = false;
608  bool finish_wanted = false;
609  Request* read_wanted = nullptr;
610  grpc::Status status_wanted;
611  };
612  PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
613 };
614 
616 template <class Response>
617 class ServerWriteReactor : public internal::ServerReactor {
618  public:
619  ServerWriteReactor() : writer_(nullptr) {}
620  ~ServerWriteReactor() override = default;
621 
623  void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) {
625  writer_.load(std::memory_order_acquire);
626  if (writer == nullptr) {
627  grpc::internal::MutexLock l(&writer_mu_);
628  writer = writer_.load(std::memory_order_relaxed);
629  if (writer == nullptr) {
630  backlog_.send_initial_metadata_wanted = true;
631  return;
632  }
633  }
634  writer->SendInitialMetadata();
635  }
636  void StartWrite(const Response* resp) {
638  }
639  void StartWrite(const Response* resp, grpc::WriteOptions options)
640  ABSL_LOCKS_EXCLUDED(writer_mu_) {
642  writer_.load(std::memory_order_acquire);
643  if (writer == nullptr) {
644  grpc::internal::MutexLock l(&writer_mu_);
645  writer = writer_.load(std::memory_order_relaxed);
646  if (writer == nullptr) {
647  backlog_.write_wanted = resp;
648  backlog_.write_options_wanted = options;
649  return;
650  }
651  }
652  writer->Write(resp, options);
653  }
654  void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
655  grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
657  writer_.load(std::memory_order_acquire);
658  if (writer == nullptr) {
659  grpc::internal::MutexLock l(&writer_mu_);
660  writer = writer_.load(std::memory_order_relaxed);
661  if (writer == nullptr) {
662  backlog_.write_and_finish_wanted = true;
663  backlog_.write_wanted = resp;
664  backlog_.write_options_wanted = options;
665  backlog_.status_wanted = std::move(s);
666  return;
667  }
668  }
669  writer->WriteAndFinish(resp, options, std::move(s));
670  }
671  void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
672  StartWrite(resp, options.set_last_message());
673  }
674  void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
676  writer_.load(std::memory_order_acquire);
677  if (writer == nullptr) {
678  grpc::internal::MutexLock l(&writer_mu_);
679  writer = writer_.load(std::memory_order_relaxed);
680  if (writer == nullptr) {
681  backlog_.finish_wanted = true;
682  backlog_.status_wanted = std::move(s);
683  return;
684  }
685  }
686  writer->Finish(std::move(s));
687  }
688 
690  virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
691  virtual void OnWriteDone(bool /*ok*/) {}
692  void OnDone() override = 0;
693  void OnCancel() override {}
694 
695  private:
696  friend class ServerCallbackWriter<Response>;
697  // May be overridden by internal implementation details. This is not a public
698  // customization point.
699  virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer)
700  ABSL_LOCKS_EXCLUDED(writer_mu_) {
701  grpc::internal::MutexLock l(&writer_mu_);
702 
703  if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
704  writer->SendInitialMetadata();
705  }
706  if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
707  writer->WriteAndFinish(backlog_.write_wanted,
708  std::move(backlog_.write_options_wanted),
709  std::move(backlog_.status_wanted));
710  } else {
711  if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
712  writer->Write(backlog_.write_wanted,
713  std::move(backlog_.write_options_wanted));
714  }
715  if (GPR_UNLIKELY(backlog_.finish_wanted)) {
716  writer->Finish(std::move(backlog_.status_wanted));
717  }
718  }
719  // Set writer_ last so that other functions can use it lock-free
720  writer_.store(writer, std::memory_order_release);
721  }
722 
723  grpc::internal::Mutex writer_mu_;
724  std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
725  struct PreBindBacklog {
726  bool send_initial_metadata_wanted = false;
727  bool write_and_finish_wanted = false;
728  bool finish_wanted = false;
729  const Response* write_wanted = nullptr;
730  grpc::WriteOptions write_options_wanted;
731  grpc::Status status_wanted;
732  };
733  PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
734 };
735 
737  public:
738  ServerUnaryReactor() : call_(nullptr) {}
739  ~ServerUnaryReactor() override = default;
740 
742  void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) {
743  ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
744  if (call == nullptr) {
745  grpc::internal::MutexLock l(&call_mu_);
746  call = call_.load(std::memory_order_relaxed);
747  if (call == nullptr) {
748  backlog_.send_initial_metadata_wanted = true;
749  return;
750  }
751  }
752  call->SendInitialMetadata();
753  }
757  void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) {
758  ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
759  if (call == nullptr) {
760  grpc::internal::MutexLock l(&call_mu_);
761  call = call_.load(std::memory_order_relaxed);
762  if (call == nullptr) {
763  backlog_.finish_wanted = true;
764  backlog_.status_wanted = std::move(s);
765  return;
766  }
767  }
768  call->Finish(std::move(s));
769  }
770 
772  virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
773  void OnDone() override = 0;
774  void OnCancel() override {}
775 
776  private:
777  friend class ServerCallbackUnary;
778  // May be overridden by internal implementation details. This is not a public
779  // customization point.
780  virtual void InternalBindCall(ServerCallbackUnary* call)
781  ABSL_LOCKS_EXCLUDED(call_mu_) {
782  grpc::internal::MutexLock l(&call_mu_);
783 
784  if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
785  call->SendInitialMetadata();
786  }
787  if (GPR_UNLIKELY(backlog_.finish_wanted)) {
788  call->Finish(std::move(backlog_.status_wanted));
789  }
790  // Set call_ last so that other functions can use it lock-free
791  call_.store(call, std::memory_order_release);
792  }
793 
794  grpc::internal::Mutex call_mu_;
795  std::atomic<ServerCallbackUnary*> call_{nullptr};
796  struct PreBindBacklog {
797  bool send_initial_metadata_wanted = false;
798  bool finish_wanted = false;
799  grpc::Status status_wanted;
800  };
801  PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
802 };
803 
804 namespace experimental {
806  public:
807  ServerSessionReactor() : session_(nullptr) {}
808  ~ServerSessionReactor() override = default;
809 
812  void StartVirtualRPCs() ABSL_LOCKS_EXCLUDED(session_mu_) {
813  ServerCallbackSession* session = session_.load(std::memory_order_acquire);
814  if (session == nullptr) {
815  grpc::internal::MutexLock l(&session_mu_);
816  session = session_.load(std::memory_order_relaxed);
817  if (session == nullptr) {
818  backlog_.send_initial_metadata_wanted = true;
819  return;
820  }
821  }
822  session->SendInitialMetadata();
823  }
824 
828  void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(session_mu_) {
829  ServerCallbackSession* session = session_.load(std::memory_order_acquire);
830  if (session == nullptr) {
831  grpc::internal::MutexLock l(&session_mu_);
832  session = session_.load(std::memory_order_relaxed);
833  if (session == nullptr) {
834  backlog_.finish_wanted = true;
835  backlog_.status_wanted = std::move(s);
836  return;
837  }
838  }
839  session->Finish(std::move(s));
840  }
841 
843  virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
844  void OnDone() override = 0;
845  void OnCancel() override {}
846 
847  private:
848  friend class ServerCallbackSession;
849  // May be overridden by internal implementation details. This is not a public
850  // customization point.
851  virtual void InternalBindSession(ServerCallbackSession* session)
852  ABSL_LOCKS_EXCLUDED(session_mu_) {
853  grpc::internal::MutexLock l(&session_mu_);
854 
855  if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
856  session->SendInitialMetadata();
857  }
858  if (GPR_UNLIKELY(backlog_.finish_wanted)) {
859  session->Finish(std::move(backlog_.status_wanted));
860  }
861  // Set session_ last so that other functions can use it lock-free
862  session_.store(session, std::memory_order_release);
863  }
864 
865  grpc::internal::Mutex session_mu_;
866  std::atomic<ServerCallbackSession*> session_{nullptr};
867  struct PreBindBacklog {
868  bool send_initial_metadata_wanted = false;
869  bool finish_wanted = false;
870  grpc::Status status_wanted;
871  };
872 
873  // Resolves a race condition where application code calls reactor methods
874  // before gRPC binds the session to the reactor.
875  // These early operations are stored here and executed during
876  // InternalBindSession().
877  PreBindBacklog backlog_ ABSL_GUARDED_BY(session_mu_);
878 };
879 } // namespace experimental
880 
881 namespace internal {
882 
883 template <class Base>
884 class FinishOnlyReactor : public Base {
885  public:
886  explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); }
887  void OnDone() override { this->~FinishOnlyReactor(); }
888 };
889 
891 template <class Request>
893 template <class Response>
896 template <class Request, class Response>
899 
900 } // namespace internal
901 
902 namespace experimental {
903 namespace internal {
906 } // namespace internal
907 } // namespace experimental
908 
909 // TODO(vjpai): Remove namespace experimental when last known users are migrated
910 // off.
911 namespace experimental {
912 
913 template <class Request, class Response>
915 
916 } // namespace experimental
917 
918 } // namespace grpc
919 
920 #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H
grpc::ServerReadReactor
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:199
grpc::ServerCallbackReaderWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::ServerCallbackReader::~ServerCallbackReader
~ServerCallbackReader() override
Definition: server_callback.h:248
grpc::MessageHolder< Request, Response >::set_response
void set_response(Response *response)
Definition: message_allocator.h:50
grpc::ServerReadReactor::OnCancel
void OnCancel() override
Definition: server_callback.h:580
grpc::ServerBidiReactor::StartSendInitialMetadata
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_)
Send any initial metadata stored in the RPC context.
Definition: server_callback.h:320
grpc::experimental::ServerSessionReactor::StartVirtualRPCs
void StartVirtualRPCs() ABSL_LOCKS_EXCLUDED(session_mu_)
StartVirtualRPCs is exactly like ServerBidiReactor's StartSendInitialMetadata.
Definition: server_callback.h:812
grpc::MessageHolder< Request, Response >::set_request
void set_request(Request *request)
Definition: message_allocator.h:49
message_allocator.h
grpc::ServerReadReactor::ServerReadReactor
ServerReadReactor()
Definition: server_callback.h:531
grpc::ServerReadReactor::OnDone
void OnDone() override=0
grpc::ServerCallbackReaderWriter::Finish
virtual void Finish(grpc::Status s)=0
grpc::ServerWriteReactor::StartWrite
void StartWrite(const Response *resp)
Definition: server_callback.h:636
grpc::Server
Represents a gRPC server.
Definition: server.h:58
grpc::ServerCallbackWriter::Finish
virtual void Finish(grpc::Status s)=0
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::ServerUnaryReactor::OnDone
void OnDone() override=0
grpc::ServerBidiReactor::StartWriteAndFinish
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback.h:396
grpc::WriteOptions::set_last_message
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:156
grpc::experimental::internal::BindSessionToInnerServer
void BindSessionToInnerServer(grpc_call *call, grpc::Server *inner_server)
grpc::internal::ServerReactor::~ServerReactor
virtual ~ServerReactor()=default
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel()
Definition: server_callback.h:128
grpc::ServerWriteReactor::StartWrite
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:639
grpc::ServerWriteReactor::OnWriteDone
virtual void OnWriteDone(bool)
Definition: server_callback.h:691
grpc::ServerBidiReactor::ServerBidiReactor
ServerBidiReactor()
Definition: server_callback.h:314
grpc::ServerWriteReactor::OnSendInitialMetadataDone
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:690
grpc::ServerCallbackReader::Finish
virtual void Finish(grpc::Status s)=0
grpc::ServerReadReactor::StartRead
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:548
grpc::internal::DefaultMessageHolder::DefaultMessageHolder
DefaultMessageHolder()
Definition: server_callback.h:180
grpc::experimental::ServerSessionReactor::~ServerSessionReactor
~ServerSessionReactor() override=default
grpc::ServerCallbackReaderWriter::~ServerCallbackReaderWriter
~ServerCallbackReaderWriter() override
Definition: server_callback.h:279
status.h
grpc::ServerCallbackUnary::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:223
grpc::internal::ServerCallbackCall::MaybeCallOnCancel
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:118
grpc::ServerWriteReactor::StartWriteLast
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Definition: server_callback.h:671
grpc::ServerReadReactor::~ServerReadReactor
~ServerReadReactor() override=default
grpc::ServerCallbackReaderWriter
Definition: server_callback.h:277
grpc::ServerBidiReactor::StartWrite
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback.h:357
grpc::experimental::ServerCallbackSession::BindInnerServer
virtual void BindInnerServer(grpc::Server *inner_server)=0
grpc::ServerBidiReactor::OnCancel
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback.h:476
grpc::ServerCallbackReaderWriter::Write
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
grpc::experimental::ServerCallbackSession::~ServerCallbackSession
~ServerCallbackSession() override
Definition: server_callback.h:231
grpc::ServerBidiReactor::StartRead
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a read operation.
Definition: server_callback.h:338
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: port_platform.h:861
grpc::ServerWriteReactor::OnDone
void OnDone() override=0
grpc::ServerCallbackUnary::~ServerCallbackUnary
~ServerCallbackUnary() override
Definition: server_callback.h:215
grpc::ServerUnaryReactor::OnSendInitialMetadataDone
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:772
grpc::experimental::ServerSessionReactor::ServerSessionReactor
ServerSessionReactor()
Definition: server_callback.h:807
grpc::experimental::ServerSessionReactor
Definition: server_callback.h:805
grpc::internal::ServerCallbackCall
The base class of ServerCallbackUnary etc.
Definition: server_callback.h:84
grpc::experimental::ServerSessionReactor::OnSendInitialMetadataDone
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:843
grpc::ServerBidiReactor::OnWriteDone
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback.h:466
grpc::ServerBidiReactor::StartWrite
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options.
Definition: server_callback.h:367
callback_common.h
grpc::experimental::ServerSessionReactor::Finish
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(session_mu_)
Finish is similar to ServerBidiReactor except for one detail.
Definition: server_callback.h:828
grpc::ServerCallbackReaderWriter::WriteAndFinish
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
grpc::ServerBidiReactor::OnDone
void OnDone() override=0
Notifies the application that all operations associated with this RPC have completed.
grpc_call
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:68
grpc::ServerBidiReactor
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:203
grpc::internal::CallbackUnaryHandler
Definition: server_callback_handlers.h:37
grpc::ServerCallbackWriter::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::ServerBidiReactor::~ServerBidiReactor
~ServerBidiReactor() override=default
grpc::internal::DefaultMessageHolder
Definition: server_callback.h:178
grpc::internal::FinishOnlyReactor::FinishOnlyReactor
FinishOnlyReactor(grpc::Status s)
Definition: server_callback.h:886
grpc::ServerBidiReactor::OnReadDone
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: server_callback.h:459
grpc::internal::ServerReactor::OnDone
virtual void OnDone()=0
grpc::ServerWriteReactor::StartWriteAndFinish
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:654
grpc::internal::CallbackServerStreamingHandler
Definition: server_callback_handlers.h:451
grpc::experimental::ServerCallbackSession
Definition: server_callback.h:229
grpc::ServerCallbackReaderWriter::Read
virtual void Read(Request *msg)=0
grpc::ServerBidiReactor::Finish
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent.
Definition: server_callback.h:432
grpc::ServerCallbackReader::BindReactor
void BindReactor(ServerReadReactor< Request > *reactor)
Definition: server_callback.h:254
grpc::ServerCallbackWriter::BindReactor
void BindReactor(ServerWriteReactor< Response > *reactor)
Definition: server_callback.h:271
grpc::internal::FinishOnlyReactor
Definition: server_context.h:88
grpc::ServerCallbackWriter::~ServerCallbackWriter
~ServerCallbackWriter() override
Definition: server_callback.h:262
grpc_call_run_in_event_engine
void grpc_call_run_in_event_engine(const grpc_call *call, absl::AnyInvocable< void()> cb)
grpc::ServerReadReactor::StartSendInitialMetadata
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:535
grpc::internal::ServerCallbackCall::~ServerCallbackCall
virtual ~ServerCallbackCall()
Definition: server_callback.h:86
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:79
grpc::experimental::ServerCallbackSession::BindReactor
void BindReactor(Reactor *reactor)
Definition: server_callback.h:239
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone(bool inline_ondone)
Definition: server_callback.h:110
grpc::ServerWriteReactor
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:201
grpc::ServerReadReactor::OnReadDone
virtual void OnReadDone(bool)
Definition: server_callback.h:578
grpc::ServerCallbackReader::Read
virtual void Read(Request *msg)=0
grpc::internal::CallbackBidiHandler
Definition: server_callback_handlers.h:682
grpc::internal::MutexLock
Definition: sync.h:80
grpc::ServerWriteReactor::OnCancel
void OnCancel() override
Definition: server_callback.h:693
grpc::ServerWriteReactor::~ServerWriteReactor
~ServerWriteReactor() override=default
grpc::ServerCallbackReaderWriter::BindReactor
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:289
config.h
grpc::ServerCallbackWriter
Definition: server_callback.h:260
call.h
grpc::experimental::ServerSessionReactor::OnDone
void OnDone() override=0
grpc::ServerWriteReactor::StartSendInitialMetadata
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:623
call_op_set.h
call.h
grpc::experimental::internal::UnimplementedSessionReactor
grpc::internal::FinishOnlyReactor< ServerSessionReactor > UnimplementedSessionReactor
Definition: server_callback.h:905
grpc::internal::ServerReactor::InternalInlineable
virtual bool InternalInlineable()
Definition: server_callback.h:70
grpc::ServerCallbackUnary::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::ServerReadReactor::OnSendInitialMetadataDone
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:577
grpc::ServerBidiReactor::StartWriteLast
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback.h:422
grpc::internal::FinishOnlyReactor::OnDone
void OnDone() override
Definition: server_callback.h:887
grpc::internal::ServerReactor
Definition: server_callback.h:60
grpc::experimental::ServerCallbackSession::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::CallbackClientStreamingHandler
Definition: server_callback_handlers.h:257
grpc::ServerUnaryReactor::OnCancel
void OnCancel() override
Definition: server_callback.h:774
grpc::internal::ServerCallbackCall::Ref
void Ref()
Increases the reference count.
Definition: server_callback.h:136
grpc::internal::ServerReactor::OnCancel
virtual void OnCancel()=0
grpc::ServerCallbackReader
Definition: server_callback.h:246
grpc::ServerReadReactor::Finish
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:561
grpc::ServerCallbackWriter::Write
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
grpc::MessageHolder
Definition: message_allocator.h:39
grpc::internal::Mutex
Definition: sync.h:57
grpc::ServerUnaryReactor::StartSendInitialMetadata
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_)
StartSendInitialMetadata is exactly like ServerBidiReactor.
Definition: server_callback.h:742
grpc::ServerWriteReactor::Finish
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:674
grpc::ServerUnaryReactor::Finish
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_)
Finish is similar to ServerBidiReactor except for one detail.
Definition: server_callback.h:757
grpc::ServerBidiReactor::OnSendInitialMetadataDone
virtual void OnSendInitialMetadataDone(bool)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback.h:453
grpc::ServerUnaryReactor::~ServerUnaryReactor
~ServerUnaryReactor() override=default
grpc::ServerCallbackUnary
Definition: server_callback.h:213
grpc::ServerUnaryReactor::ServerUnaryReactor
ServerUnaryReactor()
Definition: server_callback.h:738
grpc::experimental::ServerCallbackSession::Finish
virtual void Finish(grpc::Status s)=0
grpc::ServerWriteReactor::ServerWriteReactor
ServerWriteReactor()
Definition: server_callback.h:619
grpc::ServerUnaryReactor
Definition: server_callback.h:736
grpc::ServerCallbackWriter::WriteAndFinish
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
sync.h
grpc::ServerCallbackUnary::Finish
virtual void Finish(grpc::Status s)=0
grpc::ServerCallbackReader::SendInitialMetadata
virtual void SendInitialMetadata()=0
grpc::internal::ServerCallbackCall::MaybeDone
void MaybeDone()
Definition: server_callback.h:104
grpc::experimental::ServerSessionReactor::OnCancel
void OnCancel() override
Definition: server_callback.h:845
grpc::internal::DefaultMessageHolder::Release
void Release() override
Definition: server_callback.h:184
grpc::experimental::ServerBidiReactor
::grpc::ServerBidiReactor< Request, Response > ServerBidiReactor
Definition: server_callback.h:914