Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
20 #define GRPCPP_SUPPORT_SERVER_CALLBACK_H
24 #include <type_traits>
26 #include "absl/functional/any_invocable.h"
43 template <
class Request,
class Response>
44 class CallbackUnaryHandler;
45 template <
class Request,
class Response>
46 class CallbackClientStreamingHandler;
47 template <
class Request,
class Response>
48 class CallbackServerStreamingHandler;
49 template <
class Request,
class Response>
50 class CallbackBidiHandler;
65 template <
class Request,
class Response>
67 template <
class Request,
class Response>
69 template <
class Request,
class Response>
71 template <
class Request,
class Response>
98 ScheduleOnDone(reactor()->InternalInlineable());
104 ScheduleOnDone(inline_ondone);
112 CallOnCancel(reactor);
122 CallOnCancel(reactor());
128 void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
135 virtual void RunAsync(absl::AnyInvocable<
void()> cb) {
142 virtual void CallOnDone() = 0;
146 void ScheduleOnDone(
bool inline_ondone);
150 void CallOnCancel(ServerReactor* reactor);
154 bool UnblockCancellation() {
155 return on_cancel_conditions_remaining_.fetch_sub(
156 1, std::memory_order_acq_rel) == 1;
161 return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
164 std::atomic_int on_cancel_conditions_remaining_{2};
165 std::atomic_int callbacks_outstanding_{
169 template <
class Request,
class Response>
182 Request request_obj_;
183 Response response_obj_;
189 class ServerUnaryReactor;
190 template <
class Request>
192 template <
class Response>
194 template <
class Request,
class Response>
209 template <
class Reactor>
211 reactor->InternalBindCall(
this);
215 template <
class Request>
221 virtual void Read(Request* msg) = 0;
225 reactor->InternalBindReader(
this);
229 template <
class Response>
242 reactor->InternalBindWriter(
this);
246 template <
class Request,
class Response>
253 virtual void Read(Request* msg) = 0;
260 reactor->InternalBindStream(
this);
276 template <
class Request,
class Response>
292 stream_.load(std::memory_order_acquire);
293 if (stream ==
nullptr) {
295 stream = stream_.load(std::memory_order_relaxed);
296 if (stream ==
nullptr) {
297 backlog_.send_initial_metadata_wanted =
true;
308 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
310 stream_.load(std::memory_order_acquire);
311 if (stream ==
nullptr) {
313 stream = stream_.load(std::memory_order_relaxed);
314 if (stream ==
nullptr) {
315 backlog_.read_wanted = req;
338 ABSL_LOCKS_EXCLUDED(stream_mu_) {
340 stream_.load(std::memory_order_acquire);
341 if (stream ==
nullptr) {
343 stream = stream_.load(std::memory_order_relaxed);
344 if (stream ==
nullptr) {
345 backlog_.write_wanted = resp;
346 backlog_.write_options_wanted = options;
350 stream->
Write(resp, options);
369 stream_.load(std::memory_order_acquire);
370 if (stream ==
nullptr) {
372 stream = stream_.load(std::memory_order_relaxed);
373 if (stream ==
nullptr) {
374 backlog_.write_and_finish_wanted =
true;
375 backlog_.write_wanted = resp;
376 backlog_.write_options_wanted = options;
377 backlog_.status_wanted = std::move(s);
404 stream_.load(std::memory_order_acquire);
405 if (stream ==
nullptr) {
407 stream = stream_.load(std::memory_order_relaxed);
408 if (stream ==
nullptr) {
409 backlog_.finish_wanted =
true;
410 backlog_.status_wanted = std::move(s);
414 stream->
Finish(std::move(s));
441 void OnDone()
override = 0;
452 virtual void InternalBindStream(
456 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
460 stream->
Read(backlog_.read_wanted);
464 std::move(backlog_.write_options_wanted),
465 std::move(backlog_.status_wanted));
468 stream->
Write(backlog_.write_wanted,
469 std::move(backlog_.write_options_wanted));
472 stream->
Finish(std::move(backlog_.status_wanted));
476 stream_.store(stream, std::memory_order_release);
484 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{
nullptr};
485 struct PreBindBacklog {
486 bool send_initial_metadata_wanted =
false;
487 bool write_and_finish_wanted =
false;
488 bool finish_wanted =
false;
489 Request* read_wanted =
nullptr;
490 const Response* write_wanted =
nullptr;
494 PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
498 template <
class Request>
499 class ServerReadReactor :
public internal::ServerReactor {
507 reader_.load(std::memory_order_acquire);
508 if (reader ==
nullptr) {
510 reader = reader_.load(std::memory_order_relaxed);
511 if (reader ==
nullptr) {
512 backlog_.send_initial_metadata_wanted =
true;
518 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
520 reader_.load(std::memory_order_acquire);
521 if (reader ==
nullptr) {
523 reader = reader_.load(std::memory_order_relaxed);
524 if (reader ==
nullptr) {
525 backlog_.read_wanted = req;
533 reader_.load(std::memory_order_acquire);
534 if (reader ==
nullptr) {
536 reader = reader_.load(std::memory_order_relaxed);
537 if (reader ==
nullptr) {
538 backlog_.finish_wanted =
true;
539 backlog_.status_wanted = std::move(s);
543 reader->
Finish(std::move(s));
549 void OnDone()
override = 0;
558 ABSL_LOCKS_EXCLUDED(reader_mu_) {
561 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
562 reader->SendInitialMetadata();
565 reader->Read(backlog_.read_wanted);
568 reader->Finish(std::move(backlog_.status_wanted));
571 reader_.store(reader, std::memory_order_release);
575 std::atomic<ServerCallbackReader<Request>*> reader_{
nullptr};
576 struct PreBindBacklog {
577 bool send_initial_metadata_wanted =
false;
578 bool finish_wanted =
false;
579 Request* read_wanted =
nullptr;
582 PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
586 template <
class Response>
587 class ServerWriteReactor :
public internal::ServerReactor {
595 writer_.load(std::memory_order_acquire);
596 if (writer ==
nullptr) {
598 writer = writer_.load(std::memory_order_relaxed);
599 if (writer ==
nullptr) {
600 backlog_.send_initial_metadata_wanted =
true;
610 ABSL_LOCKS_EXCLUDED(writer_mu_) {
612 writer_.load(std::memory_order_acquire);
613 if (writer ==
nullptr) {
615 writer = writer_.load(std::memory_order_relaxed);
616 if (writer ==
nullptr) {
617 backlog_.write_wanted = resp;
618 backlog_.write_options_wanted = options;
622 writer->
Write(resp, options);
627 writer_.load(std::memory_order_acquire);
628 if (writer ==
nullptr) {
630 writer = writer_.load(std::memory_order_relaxed);
631 if (writer ==
nullptr) {
632 backlog_.write_and_finish_wanted =
true;
633 backlog_.write_wanted = resp;
634 backlog_.write_options_wanted = options;
635 backlog_.status_wanted = std::move(s);
646 writer_.load(std::memory_order_acquire);
647 if (writer ==
nullptr) {
649 writer = writer_.load(std::memory_order_relaxed);
650 if (writer ==
nullptr) {
651 backlog_.finish_wanted =
true;
652 backlog_.status_wanted = std::move(s);
656 writer->
Finish(std::move(s));
662 void OnDone()
override = 0;
670 ABSL_LOCKS_EXCLUDED(writer_mu_) {
673 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
674 writer->SendInitialMetadata();
677 writer->WriteAndFinish(backlog_.write_wanted,
678 std::move(backlog_.write_options_wanted),
679 std::move(backlog_.status_wanted));
682 writer->Write(backlog_.write_wanted,
683 std::move(backlog_.write_options_wanted));
686 writer->Finish(std::move(backlog_.status_wanted));
690 writer_.store(writer, std::memory_order_release);
694 std::atomic<ServerCallbackWriter<Response>*> writer_{
nullptr};
695 struct PreBindBacklog {
696 bool send_initial_metadata_wanted =
false;
697 bool write_and_finish_wanted =
false;
698 bool finish_wanted =
false;
699 const Response* write_wanted =
nullptr;
703 PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
714 if (call ==
nullptr) {
716 call = call_.load(std::memory_order_relaxed);
717 if (call ==
nullptr) {
718 backlog_.send_initial_metadata_wanted =
true;
729 if (call ==
nullptr) {
731 call = call_.load(std::memory_order_relaxed);
732 if (call ==
nullptr) {
733 backlog_.finish_wanted =
true;
734 backlog_.status_wanted = std::move(s);
738 call->
Finish(std::move(s));
743 void OnDone()
override = 0;
751 ABSL_LOCKS_EXCLUDED(call_mu_) {
754 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
755 call->SendInitialMetadata();
758 call->Finish(std::move(backlog_.status_wanted));
761 call_.store(call, std::memory_order_release);
765 std::atomic<ServerCallbackUnary*> call_{
nullptr};
766 struct PreBindBacklog {
767 bool send_initial_metadata_wanted =
false;
768 bool finish_wanted =
false;
771 PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
776 template <
class Base>
777 class FinishOnlyReactor :
public Base {
784 template <
class Request>
786 template <
class Response>
789 template <
class Request,
class Response>
797 namespace experimental {
799 template <
class Request,
class Response>
806 #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:191
virtual void SendInitialMetadata()=0
~ServerCallbackReader() override
Definition: server_callback.h:218
void set_response(Response *response)
Definition: message_allocator.h:50
void OnCancel() override
Definition: server_callback.h:550
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_)
Send any initial metadata stored in the RPC context.
Definition: server_callback.h:290
void set_request(Request *request)
Definition: message_allocator.h:49
ServerReadReactor()
Definition: server_callback.h:501
virtual void Finish(grpc::Status s)=0
void StartWrite(const Response *resp)
Definition: server_callback.h:606
virtual void Finish(grpc::Status s)=0
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback.h:366
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:157
virtual ~ServerReactor()=default
void MaybeCallOnCancel()
Definition: server_callback.h:120
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:609
virtual void OnWriteDone(bool)
Definition: server_callback.h:661
ServerBidiReactor()
Definition: server_callback.h:284
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:660
virtual void Finish(grpc::Status s)=0
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:518
DefaultMessageHolder()
Definition: server_callback.h:172
~ServerCallbackReaderWriter() override
Definition: server_callback.h:249
void BindReactor(Reactor *reactor)
Definition: server_callback.h:210
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:110
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Definition: server_callback.h:641
~ServerReadReactor() override=default
Definition: server_callback.h:247
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback.h:327
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback.h:446
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a read operation.
Definition: server_callback.h:308
Did it work? If it didn't, why?
Definition: status.h:34
~ServerCallbackUnary() override
Definition: server_callback.h:202
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:742
The base class of ServerCallbackUnary etc.
Definition: server_callback.h:76
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback.h:436
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options.
Definition: server_callback.h:337
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
void OnDone() override=0
Notifies the application that all operations associated with this RPC have completed.
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:69
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:195
Definition: server_callback_handlers.h:36
virtual void SendInitialMetadata()=0
~ServerBidiReactor() override=default
Definition: server_callback.h:170
FinishOnlyReactor(grpc::Status s)
Definition: server_callback.h:779
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: server_callback.h:429
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:624
Definition: server_callback_handlers.h:449
virtual void Read(Request *msg)=0
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent.
Definition: server_callback.h:402
void BindReactor(ServerReadReactor< Request > *reactor)
Definition: server_callback.h:224
void BindReactor(ServerWriteReactor< Response > *reactor)
Definition: server_callback.h:241
Definition: server_context.h:88
~ServerCallbackWriter() override
Definition: server_callback.h:232
void grpc_call_run_in_event_engine(const grpc_call *call, absl::AnyInvocable< void()> cb)
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:505
virtual ~ServerCallbackCall()
Definition: server_callback.h:78
Per-message write options.
Definition: call_op_set.h:80
void MaybeDone(bool inline_ondone)
Definition: server_callback.h:102
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:193
virtual void OnReadDone(bool)
Definition: server_callback.h:548
virtual void Read(Request *msg)=0
Definition: server_callback_handlers.h:677
void OnCancel() override
Definition: server_callback.h:663
~ServerWriteReactor() override=default
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:259
Definition: server_callback.h:230
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:593
virtual bool InternalInlineable()
Definition: server_callback.h:62
virtual void SendInitialMetadata()=0
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:547
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback.h:392
void OnDone() override
Definition: server_callback.h:780
Definition: server_callback.h:52
Definition: server_callback_handlers.h:256
void OnCancel() override
Definition: server_callback.h:744
void Ref()
Increases the reference count.
Definition: server_callback.h:128
virtual void OnCancel()=0
Definition: server_callback.h:216
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:531
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
Definition: message_allocator.h:39
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_)
StartSendInitialMetadata is exactly like ServerBidiReactor.
Definition: server_callback.h:712
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:644
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_)
Finish is similar to ServerBidiReactor except for one detail.
Definition: server_callback.h:727
virtual void OnSendInitialMetadataDone(bool)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback.h:423
~ServerUnaryReactor() override=default
Definition: server_callback.h:200
ServerUnaryReactor()
Definition: server_callback.h:708
ServerWriteReactor()
Definition: server_callback.h:589
Definition: server_callback.h:706
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
virtual void Finish(grpc::Status s)=0
virtual void SendInitialMetadata()=0
void MaybeDone()
Definition: server_callback.h:96
void Release() override
Definition: server_callback.h:176
::grpc::ServerBidiReactor< Request, Response > ServerBidiReactor
Definition: server_callback.h:800