Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
20 #define GRPCPP_SUPPORT_SERVER_CALLBACK_H
24 #include <type_traits>
40 template <
class Request,
class Response>
41 class CallbackUnaryHandler;
42 template <
class Request,
class Response>
43 class CallbackClientStreamingHandler;
44 template <
class Request,
class Response>
45 class CallbackServerStreamingHandler;
46 template <
class Request,
class Response>
47 class CallbackBidiHandler;
62 template <
class Request,
class Response>
64 template <
class Request,
class Response>
66 template <
class Request,
class Response>
68 template <
class Request,
class Response>
95 ScheduleOnDone(reactor()->InternalInlineable());
101 ScheduleOnDone(inline_ondone);
109 CallOnCancel(reactor);
119 CallOnCancel(reactor());
125 void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
133 virtual void CallOnDone() = 0;
137 void ScheduleOnDone(
bool inline_ondone);
145 bool UnblockCancellation() {
146 return on_cancel_conditions_remaining_.fetch_sub(
147 1, std::memory_order_acq_rel) == 1;
152 return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
155 std::atomic_int on_cancel_conditions_remaining_{2};
156 std::atomic_int callbacks_outstanding_{
160 template <
class Request,
class Response>
173 Request request_obj_;
174 Response response_obj_;
180 class ServerUnaryReactor;
181 template <
class Request>
183 template <
class Response>
185 template <
class Request,
class Response>
200 template <
class Reactor>
202 reactor->InternalBindCall(
this);
206 template <
class Request>
212 virtual void Read(Request* msg) = 0;
216 reactor->InternalBindReader(
this);
220 template <
class Response>
233 reactor->InternalBindWriter(
this);
237 template <
class Request,
class Response>
244 virtual void Read(Request* msg) = 0;
251 reactor->InternalBindStream(
this);
267 template <
class Request,
class Response>
283 stream_.load(std::memory_order_acquire);
284 if (stream ==
nullptr) {
286 stream = stream_.load(std::memory_order_relaxed);
287 if (stream ==
nullptr) {
288 backlog_.send_initial_metadata_wanted =
true;
299 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
301 stream_.load(std::memory_order_acquire);
302 if (stream ==
nullptr) {
304 stream = stream_.load(std::memory_order_relaxed);
305 if (stream ==
nullptr) {
306 backlog_.read_wanted = req;
329 ABSL_LOCKS_EXCLUDED(stream_mu_) {
331 stream_.load(std::memory_order_acquire);
332 if (stream ==
nullptr) {
334 stream = stream_.load(std::memory_order_relaxed);
335 if (stream ==
nullptr) {
336 backlog_.write_wanted = resp;
337 backlog_.write_options_wanted = options;
341 stream->
Write(resp, options);
360 stream_.load(std::memory_order_acquire);
361 if (stream ==
nullptr) {
363 stream = stream_.load(std::memory_order_relaxed);
364 if (stream ==
nullptr) {
365 backlog_.write_and_finish_wanted =
true;
366 backlog_.write_wanted = resp;
367 backlog_.write_options_wanted = options;
368 backlog_.status_wanted = std::move(s);
395 stream_.load(std::memory_order_acquire);
396 if (stream ==
nullptr) {
398 stream = stream_.load(std::memory_order_relaxed);
399 if (stream ==
nullptr) {
400 backlog_.finish_wanted =
true;
401 backlog_.status_wanted = std::move(s);
405 stream->
Finish(std::move(s));
432 void OnDone()
override = 0;
443 virtual void InternalBindStream(
447 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
451 stream->
Read(backlog_.read_wanted);
455 std::move(backlog_.write_options_wanted),
456 std::move(backlog_.status_wanted));
459 stream->
Write(backlog_.write_wanted,
460 std::move(backlog_.write_options_wanted));
463 stream->
Finish(std::move(backlog_.status_wanted));
467 stream_.store(stream, std::memory_order_release);
475 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{
nullptr};
476 struct PreBindBacklog {
477 bool send_initial_metadata_wanted =
false;
478 bool write_and_finish_wanted =
false;
479 bool finish_wanted =
false;
480 Request* read_wanted =
nullptr;
481 const Response* write_wanted =
nullptr;
485 PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
489 template <
class Request>
490 class ServerReadReactor :
public internal::ServerReactor {
498 reader_.load(std::memory_order_acquire);
499 if (reader ==
nullptr) {
501 reader = reader_.load(std::memory_order_relaxed);
502 if (reader ==
nullptr) {
503 backlog_.send_initial_metadata_wanted =
true;
509 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
511 reader_.load(std::memory_order_acquire);
512 if (reader ==
nullptr) {
514 reader = reader_.load(std::memory_order_relaxed);
515 if (reader ==
nullptr) {
516 backlog_.read_wanted = req;
524 reader_.load(std::memory_order_acquire);
525 if (reader ==
nullptr) {
527 reader = reader_.load(std::memory_order_relaxed);
528 if (reader ==
nullptr) {
529 backlog_.finish_wanted =
true;
530 backlog_.status_wanted = std::move(s);
534 reader->
Finish(std::move(s));
540 void OnDone()
override = 0;
549 ABSL_LOCKS_EXCLUDED(reader_mu_) {
552 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
553 reader->SendInitialMetadata();
556 reader->Read(backlog_.read_wanted);
559 reader->Finish(std::move(backlog_.status_wanted));
562 reader_.store(reader, std::memory_order_release);
566 std::atomic<ServerCallbackReader<Request>*> reader_{
nullptr};
567 struct PreBindBacklog {
568 bool send_initial_metadata_wanted =
false;
569 bool finish_wanted =
false;
570 Request* read_wanted =
nullptr;
573 PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
577 template <
class Response>
578 class ServerWriteReactor :
public internal::ServerReactor {
586 writer_.load(std::memory_order_acquire);
587 if (writer ==
nullptr) {
589 writer = writer_.load(std::memory_order_relaxed);
590 if (writer ==
nullptr) {
591 backlog_.send_initial_metadata_wanted =
true;
601 ABSL_LOCKS_EXCLUDED(writer_mu_) {
603 writer_.load(std::memory_order_acquire);
604 if (writer ==
nullptr) {
606 writer = writer_.load(std::memory_order_relaxed);
607 if (writer ==
nullptr) {
608 backlog_.write_wanted = resp;
609 backlog_.write_options_wanted = options;
613 writer->
Write(resp, options);
618 writer_.load(std::memory_order_acquire);
619 if (writer ==
nullptr) {
621 writer = writer_.load(std::memory_order_relaxed);
622 if (writer ==
nullptr) {
623 backlog_.write_and_finish_wanted =
true;
624 backlog_.write_wanted = resp;
625 backlog_.write_options_wanted = options;
626 backlog_.status_wanted = std::move(s);
637 writer_.load(std::memory_order_acquire);
638 if (writer ==
nullptr) {
640 writer = writer_.load(std::memory_order_relaxed);
641 if (writer ==
nullptr) {
642 backlog_.finish_wanted =
true;
643 backlog_.status_wanted = std::move(s);
647 writer->
Finish(std::move(s));
653 void OnDone()
override = 0;
661 ABSL_LOCKS_EXCLUDED(writer_mu_) {
664 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
665 writer->SendInitialMetadata();
668 writer->WriteAndFinish(backlog_.write_wanted,
669 std::move(backlog_.write_options_wanted),
670 std::move(backlog_.status_wanted));
673 writer->Write(backlog_.write_wanted,
674 std::move(backlog_.write_options_wanted));
677 writer->Finish(std::move(backlog_.status_wanted));
681 writer_.store(writer, std::memory_order_release);
685 std::atomic<ServerCallbackWriter<Response>*> writer_{
nullptr};
686 struct PreBindBacklog {
687 bool send_initial_metadata_wanted =
false;
688 bool write_and_finish_wanted =
false;
689 bool finish_wanted =
false;
690 const Response* write_wanted =
nullptr;
694 PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
705 if (call ==
nullptr) {
707 call = call_.load(std::memory_order_relaxed);
708 if (call ==
nullptr) {
709 backlog_.send_initial_metadata_wanted =
true;
720 if (call ==
nullptr) {
722 call = call_.load(std::memory_order_relaxed);
723 if (call ==
nullptr) {
724 backlog_.finish_wanted =
true;
725 backlog_.status_wanted = std::move(s);
729 call->
Finish(std::move(s));
734 void OnDone()
override = 0;
742 ABSL_LOCKS_EXCLUDED(call_mu_) {
745 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
746 call->SendInitialMetadata();
749 call->Finish(std::move(backlog_.status_wanted));
752 call_.store(call, std::memory_order_release);
756 std::atomic<ServerCallbackUnary*> call_{
nullptr};
757 struct PreBindBacklog {
758 bool send_initial_metadata_wanted =
false;
759 bool finish_wanted =
false;
762 PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
767 template <
class Base>
768 class FinishOnlyReactor :
public Base {
775 template <
class Request>
777 template <
class Response>
780 template <
class Request,
class Response>
788 namespace experimental {
790 template <
class Request,
class Response>
797 #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:182
virtual void SendInitialMetadata()=0
~ServerCallbackReader() override
Definition: server_callback.h:209
void set_response(Response *response)
Definition: message_allocator.h:50
void OnCancel() override
Definition: server_callback.h:541
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_)
Send any initial metadata stored in the RPC context.
Definition: server_callback.h:281
void set_request(Request *request)
Definition: message_allocator.h:49
ServerReadReactor()
Definition: server_callback.h:492
virtual void Finish(grpc::Status s)=0
void StartWrite(const Response *resp)
Definition: server_callback.h:597
virtual void Finish(grpc::Status s)=0
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback.h:357
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:155
virtual ~ServerReactor()=default
void MaybeCallOnCancel()
Definition: server_callback.h:117
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:600
virtual void OnWriteDone(bool)
Definition: server_callback.h:652
ServerBidiReactor()
Definition: server_callback.h:275
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:651
virtual void Finish(grpc::Status s)=0
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:509
DefaultMessageHolder()
Definition: server_callback.h:163
~ServerCallbackReaderWriter() override
Definition: server_callback.h:240
void BindReactor(Reactor *reactor)
Definition: server_callback.h:201
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:107
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Definition: server_callback.h:632
~ServerReadReactor() override=default
Definition: server_callback.h:238
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback.h:318
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback.h:437
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a read operation.
Definition: server_callback.h:299
Did it work? If it didn't, why?
Definition: status.h:35
~ServerCallbackUnary() override
Definition: server_callback.h:193
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:733
The base class of ServerCallbackUnary etc.
Definition: server_callback.h:73
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback.h:427
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options.
Definition: server_callback.h:328
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
void OnDone() override=0
Notifies the application that all operations associated with this RPC have completed.
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:186
Definition: server_callback_handlers.h:33
virtual void SendInitialMetadata()=0
~ServerBidiReactor() override=default
Definition: server_callback.h:161
FinishOnlyReactor(grpc::Status s)
Definition: server_callback.h:770
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: server_callback.h:420
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:615
Definition: server_callback_handlers.h:442
virtual void Read(Request *msg)=0
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent.
Definition: server_callback.h:393
void BindReactor(ServerReadReactor< Request > *reactor)
Definition: server_callback.h:215
void BindReactor(ServerWriteReactor< Response > *reactor)
Definition: server_callback.h:232
Definition: server_context.h:88
~ServerCallbackWriter() override
Definition: server_callback.h:223
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:496
virtual ~ServerCallbackCall()
Definition: server_callback.h:75
Per-message write options.
Definition: call_op_set.h:78
void MaybeDone(bool inline_ondone)
Definition: server_callback.h:99
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:184
virtual void OnReadDone(bool)
Definition: server_callback.h:539
virtual void Read(Request *msg)=0
Definition: server_callback_handlers.h:668
void OnCancel() override
Definition: server_callback.h:654
~ServerWriteReactor() override=default
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:250
Definition: server_callback.h:221
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:584
virtual bool InternalInlineable()
Definition: server_callback.h:59
virtual void SendInitialMetadata()=0
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:538
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback.h:383
void OnDone() override
Definition: server_callback.h:771
Definition: server_callback.h:49
Definition: server_callback_handlers.h:251
void OnCancel() override
Definition: server_callback.h:735
void Ref()
Increases the reference count.
Definition: server_callback.h:125
virtual void OnCancel()=0
Definition: server_callback.h:207
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:522
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
Definition: message_allocator.h:39
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_)
StartSendInitialMetadata is exactly like ServerBidiReactor.
Definition: server_callback.h:703
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:635
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_)
Finish is similar to ServerBidiReactor except for one detail.
Definition: server_callback.h:718
virtual void OnSendInitialMetadataDone(bool)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback.h:414
~ServerUnaryReactor() override=default
Definition: server_callback.h:191
ServerUnaryReactor()
Definition: server_callback.h:699
ServerWriteReactor()
Definition: server_callback.h:580
Definition: server_callback.h:697
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
virtual void Finish(grpc::Status s)=0
virtual void SendInitialMetadata()=0
void MaybeDone()
Definition: server_callback.h:93
void Release() override
Definition: server_callback.h:167
::grpc::ServerBidiReactor< Request, Response > ServerBidiReactor
Definition: server_callback.h:791