Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
20 #define GRPCPP_SUPPORT_SERVER_CALLBACK_H
33 #include <type_traits>
35 #include "absl/functional/any_invocable.h"
42 namespace experimental {
51 template <
class Request,
class Response>
52 class CallbackUnaryHandler;
53 template <
class Request,
class Response>
54 class CallbackClientStreamingHandler;
55 template <
class Request,
class Response>
56 class CallbackServerStreamingHandler;
57 template <
class Request,
class Response>
58 class CallbackBidiHandler;
73 template <
class Request,
class Response>
75 template <
class Request,
class Response>
77 template <
class Request,
class Response>
79 template <
class Request,
class Response>
106 ScheduleOnDone(reactor()->InternalInlineable());
112 ScheduleOnDone(inline_ondone);
120 CallOnCancel(reactor);
130 CallOnCancel(reactor());
136 void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
143 virtual void RunAsync(absl::AnyInvocable<
void()> cb) {
150 virtual void CallOnDone() = 0;
154 void ScheduleOnDone(
bool inline_ondone);
158 void CallOnCancel(ServerReactor* reactor);
162 bool UnblockCancellation() {
163 return on_cancel_conditions_remaining_.fetch_sub(
164 1, std::memory_order_acq_rel) == 1;
169 return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
172 std::atomic_int on_cancel_conditions_remaining_{2};
173 std::atomic_int callbacks_outstanding_{
177 template <
class Request,
class Response>
190 Request request_obj_;
191 Response response_obj_;
197 class ServerUnaryReactor;
198 template <
class Request>
200 template <
class Response>
202 template <
class Request,
class Response>
205 namespace experimental {
206 class ServerSessionReactor;
207 class ServerCallbackSession;
222 template <
class Reactor>
224 reactor->InternalBindCall(
this);
228 namespace experimental {
238 template <
class Reactor>
240 reactor->InternalBindSession(
this);
245 template <
class Request>
251 virtual void Read(Request* msg) = 0;
255 reactor->InternalBindReader(
this);
259 template <
class Response>
272 reactor->InternalBindWriter(
this);
276 template <
class Request,
class Response>
283 virtual void Read(Request* msg) = 0;
290 reactor->InternalBindStream(
this);
306 template <
class Request,
class Response>
322 stream_.load(std::memory_order_acquire);
323 if (stream ==
nullptr) {
325 stream = stream_.load(std::memory_order_relaxed);
326 if (stream ==
nullptr) {
327 backlog_.send_initial_metadata_wanted =
true;
338 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
340 stream_.load(std::memory_order_acquire);
341 if (stream ==
nullptr) {
343 stream = stream_.load(std::memory_order_relaxed);
344 if (stream ==
nullptr) {
345 backlog_.read_wanted = req;
368 ABSL_LOCKS_EXCLUDED(stream_mu_) {
370 stream_.load(std::memory_order_acquire);
371 if (stream ==
nullptr) {
373 stream = stream_.load(std::memory_order_relaxed);
374 if (stream ==
nullptr) {
375 backlog_.write_wanted = resp;
376 backlog_.write_options_wanted = options;
380 stream->
Write(resp, options);
399 stream_.load(std::memory_order_acquire);
400 if (stream ==
nullptr) {
402 stream = stream_.load(std::memory_order_relaxed);
403 if (stream ==
nullptr) {
404 backlog_.write_and_finish_wanted =
true;
405 backlog_.write_wanted = resp;
406 backlog_.write_options_wanted = options;
407 backlog_.status_wanted = std::move(s);
434 stream_.load(std::memory_order_acquire);
435 if (stream ==
nullptr) {
437 stream = stream_.load(std::memory_order_relaxed);
438 if (stream ==
nullptr) {
439 backlog_.finish_wanted =
true;
440 backlog_.status_wanted = std::move(s);
444 stream->
Finish(std::move(s));
471 void OnDone()
override = 0;
482 virtual void InternalBindStream(
486 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
490 stream->
Read(backlog_.read_wanted);
494 std::move(backlog_.write_options_wanted),
495 std::move(backlog_.status_wanted));
498 stream->
Write(backlog_.write_wanted,
499 std::move(backlog_.write_options_wanted));
502 stream->
Finish(std::move(backlog_.status_wanted));
506 stream_.store(stream, std::memory_order_release);
514 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{
nullptr};
515 struct PreBindBacklog {
516 bool send_initial_metadata_wanted =
false;
517 bool write_and_finish_wanted =
false;
518 bool finish_wanted =
false;
519 Request* read_wanted =
nullptr;
520 const Response* write_wanted =
nullptr;
524 PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
528 template <
class Request>
529 class ServerReadReactor :
public internal::ServerReactor {
537 reader_.load(std::memory_order_acquire);
538 if (reader ==
nullptr) {
540 reader = reader_.load(std::memory_order_relaxed);
541 if (reader ==
nullptr) {
542 backlog_.send_initial_metadata_wanted =
true;
548 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
550 reader_.load(std::memory_order_acquire);
551 if (reader ==
nullptr) {
553 reader = reader_.load(std::memory_order_relaxed);
554 if (reader ==
nullptr) {
555 backlog_.read_wanted = req;
563 reader_.load(std::memory_order_acquire);
564 if (reader ==
nullptr) {
566 reader = reader_.load(std::memory_order_relaxed);
567 if (reader ==
nullptr) {
568 backlog_.finish_wanted =
true;
569 backlog_.status_wanted = std::move(s);
573 reader->
Finish(std::move(s));
579 void OnDone()
override = 0;
588 ABSL_LOCKS_EXCLUDED(reader_mu_) {
591 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
592 reader->SendInitialMetadata();
595 reader->Read(backlog_.read_wanted);
598 reader->Finish(std::move(backlog_.status_wanted));
601 reader_.store(reader, std::memory_order_release);
605 std::atomic<ServerCallbackReader<Request>*> reader_{
nullptr};
606 struct PreBindBacklog {
607 bool send_initial_metadata_wanted =
false;
608 bool finish_wanted =
false;
609 Request* read_wanted =
nullptr;
612 PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
616 template <
class Response>
617 class ServerWriteReactor :
public internal::ServerReactor {
625 writer_.load(std::memory_order_acquire);
626 if (writer ==
nullptr) {
628 writer = writer_.load(std::memory_order_relaxed);
629 if (writer ==
nullptr) {
630 backlog_.send_initial_metadata_wanted =
true;
640 ABSL_LOCKS_EXCLUDED(writer_mu_) {
642 writer_.load(std::memory_order_acquire);
643 if (writer ==
nullptr) {
645 writer = writer_.load(std::memory_order_relaxed);
646 if (writer ==
nullptr) {
647 backlog_.write_wanted = resp;
648 backlog_.write_options_wanted = options;
652 writer->
Write(resp, options);
657 writer_.load(std::memory_order_acquire);
658 if (writer ==
nullptr) {
660 writer = writer_.load(std::memory_order_relaxed);
661 if (writer ==
nullptr) {
662 backlog_.write_and_finish_wanted =
true;
663 backlog_.write_wanted = resp;
664 backlog_.write_options_wanted = options;
665 backlog_.status_wanted = std::move(s);
676 writer_.load(std::memory_order_acquire);
677 if (writer ==
nullptr) {
679 writer = writer_.load(std::memory_order_relaxed);
680 if (writer ==
nullptr) {
681 backlog_.finish_wanted =
true;
682 backlog_.status_wanted = std::move(s);
686 writer->
Finish(std::move(s));
692 void OnDone()
override = 0;
700 ABSL_LOCKS_EXCLUDED(writer_mu_) {
703 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
704 writer->SendInitialMetadata();
707 writer->WriteAndFinish(backlog_.write_wanted,
708 std::move(backlog_.write_options_wanted),
709 std::move(backlog_.status_wanted));
712 writer->Write(backlog_.write_wanted,
713 std::move(backlog_.write_options_wanted));
716 writer->Finish(std::move(backlog_.status_wanted));
720 writer_.store(writer, std::memory_order_release);
724 std::atomic<ServerCallbackWriter<Response>*> writer_{
nullptr};
725 struct PreBindBacklog {
726 bool send_initial_metadata_wanted =
false;
727 bool write_and_finish_wanted =
false;
728 bool finish_wanted =
false;
729 const Response* write_wanted =
nullptr;
733 PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
744 if (call ==
nullptr) {
746 call = call_.load(std::memory_order_relaxed);
747 if (call ==
nullptr) {
748 backlog_.send_initial_metadata_wanted =
true;
759 if (call ==
nullptr) {
761 call = call_.load(std::memory_order_relaxed);
762 if (call ==
nullptr) {
763 backlog_.finish_wanted =
true;
764 backlog_.status_wanted = std::move(s);
768 call->
Finish(std::move(s));
773 void OnDone()
override = 0;
781 ABSL_LOCKS_EXCLUDED(call_mu_) {
784 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
785 call->SendInitialMetadata();
788 call->Finish(std::move(backlog_.status_wanted));
791 call_.store(call, std::memory_order_release);
795 std::atomic<ServerCallbackUnary*> call_{
nullptr};
796 struct PreBindBacklog {
797 bool send_initial_metadata_wanted =
false;
798 bool finish_wanted =
false;
801 PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
804 namespace experimental {
814 if (session ==
nullptr) {
816 session = session_.load(std::memory_order_relaxed);
817 if (session ==
nullptr) {
818 backlog_.send_initial_metadata_wanted =
true;
830 if (session ==
nullptr) {
832 session = session_.load(std::memory_order_relaxed);
833 if (session ==
nullptr) {
834 backlog_.finish_wanted =
true;
835 backlog_.status_wanted = std::move(s);
839 session->
Finish(std::move(s));
844 void OnDone()
override = 0;
852 ABSL_LOCKS_EXCLUDED(session_mu_) {
855 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
856 session->SendInitialMetadata();
859 session->Finish(std::move(backlog_.status_wanted));
862 session_.store(session, std::memory_order_release);
866 std::atomic<ServerCallbackSession*> session_{
nullptr};
867 struct PreBindBacklog {
868 bool send_initial_metadata_wanted =
false;
869 bool finish_wanted =
false;
877 PreBindBacklog backlog_ ABSL_GUARDED_BY(session_mu_);
883 template <
class Base>
884 class FinishOnlyReactor :
public Base {
891 template <
class Request>
893 template <
class Response>
896 template <
class Request,
class Response>
902 namespace experimental {
911 namespace experimental {
913 template <
class Request,
class Response>
920 #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:199
virtual void SendInitialMetadata()=0
~ServerCallbackReader() override
Definition: server_callback.h:248
void set_response(Response *response)
Definition: message_allocator.h:50
void OnCancel() override
Definition: server_callback.h:580
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_)
Send any initial metadata stored in the RPC context.
Definition: server_callback.h:320
void StartVirtualRPCs() ABSL_LOCKS_EXCLUDED(session_mu_)
StartVirtualRPCs is exactly like ServerBidiReactor's StartSendInitialMetadata.
Definition: server_callback.h:812
void set_request(Request *request)
Definition: message_allocator.h:49
ServerReadReactor()
Definition: server_callback.h:531
virtual void Finish(grpc::Status s)=0
void StartWrite(const Response *resp)
Definition: server_callback.h:636
Represents a gRPC server.
Definition: server.h:58
virtual void Finish(grpc::Status s)=0
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback.h:396
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:156
void BindSessionToInnerServer(grpc_call *call, grpc::Server *inner_server)
virtual ~ServerReactor()=default
void MaybeCallOnCancel()
Definition: server_callback.h:128
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:639
virtual void OnWriteDone(bool)
Definition: server_callback.h:691
ServerBidiReactor()
Definition: server_callback.h:314
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:690
virtual void Finish(grpc::Status s)=0
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:548
DefaultMessageHolder()
Definition: server_callback.h:180
~ServerSessionReactor() override=default
~ServerCallbackReaderWriter() override
Definition: server_callback.h:279
void BindReactor(Reactor *reactor)
Definition: server_callback.h:223
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:118
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Definition: server_callback.h:671
~ServerReadReactor() override=default
Definition: server_callback.h:277
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback.h:357
virtual void BindInnerServer(grpc::Server *inner_server)=0
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback.h:476
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
~ServerCallbackSession() override
Definition: server_callback.h:231
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a read operation.
Definition: server_callback.h:338
Did it work? If it didn't, why?
Definition: status.h:34
~ServerCallbackUnary() override
Definition: server_callback.h:215
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:772
ServerSessionReactor()
Definition: server_callback.h:807
Definition: server_callback.h:805
The base class of ServerCallbackUnary etc.
Definition: server_callback.h:84
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:843
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback.h:466
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options.
Definition: server_callback.h:367
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(session_mu_)
Finish is similar to ServerBidiReactor except for one detail.
Definition: server_callback.h:828
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
void OnDone() override=0
Notifies the application that all operations associated with this RPC have completed.
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:68
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:203
Definition: server_callback_handlers.h:37
virtual void SendInitialMetadata()=0
~ServerBidiReactor() override=default
Definition: server_callback.h:178
FinishOnlyReactor(grpc::Status s)
Definition: server_callback.h:886
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: server_callback.h:459
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:654
Definition: server_callback_handlers.h:451
Definition: server_callback.h:229
virtual void Read(Request *msg)=0
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent.
Definition: server_callback.h:432
void BindReactor(ServerReadReactor< Request > *reactor)
Definition: server_callback.h:254
void BindReactor(ServerWriteReactor< Response > *reactor)
Definition: server_callback.h:271
Definition: server_context.h:88
~ServerCallbackWriter() override
Definition: server_callback.h:262
void grpc_call_run_in_event_engine(const grpc_call *call, absl::AnyInvocable< void()> cb)
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:535
virtual ~ServerCallbackCall()
Definition: server_callback.h:86
Per-message write options.
Definition: call_op_set.h:79
void BindReactor(Reactor *reactor)
Definition: server_callback.h:239
void MaybeDone(bool inline_ondone)
Definition: server_callback.h:110
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:201
virtual void OnReadDone(bool)
Definition: server_callback.h:578
virtual void Read(Request *msg)=0
Definition: server_callback_handlers.h:682
void OnCancel() override
Definition: server_callback.h:693
~ServerWriteReactor() override=default
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:289
Definition: server_callback.h:260
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:623
grpc::internal::FinishOnlyReactor< ServerSessionReactor > UnimplementedSessionReactor
Definition: server_callback.h:905
virtual bool InternalInlineable()
Definition: server_callback.h:70
virtual void SendInitialMetadata()=0
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:577
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback.h:422
void OnDone() override
Definition: server_callback.h:887
Definition: server_callback.h:60
virtual void SendInitialMetadata()=0
Definition: server_callback_handlers.h:257
void OnCancel() override
Definition: server_callback.h:774
void Ref()
Increases the reference count.
Definition: server_callback.h:136
virtual void OnCancel()=0
Definition: server_callback.h:246
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:561
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
Definition: message_allocator.h:39
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_)
StartSendInitialMetadata is exactly like ServerBidiReactor.
Definition: server_callback.h:742
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:674
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_)
Finish is similar to ServerBidiReactor except for one detail.
Definition: server_callback.h:757
virtual void OnSendInitialMetadataDone(bool)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback.h:453
~ServerUnaryReactor() override=default
Definition: server_callback.h:213
ServerUnaryReactor()
Definition: server_callback.h:738
virtual void Finish(grpc::Status s)=0
ServerWriteReactor()
Definition: server_callback.h:619
Definition: server_callback.h:736
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
virtual void Finish(grpc::Status s)=0
virtual void SendInitialMetadata()=0
void MaybeDone()
Definition: server_callback.h:104
void OnCancel() override
Definition: server_callback.h:845
void Release() override
Definition: server_callback.h:184
::grpc::ServerBidiReactor< Request, Response > ServerBidiReactor
Definition: server_callback.h:914