Go to the documentation of this file.
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
25 #include <type_traits>
42 template <
class Request,
class Response>
44 template <
class Request,
class Response>
46 template <
class Request,
class Response>
48 template <
class Request,
class Response>
64 template <
class Request,
class Response>
66 template <
class Request,
class Response>
68 template <
class Request,
class Response>
70 template <
class Request,
class Response>
97 ScheduleOnDone(reactor()->InternalInlineable());
103 ScheduleOnDone(inline_ondone);
111 CallOnCancel(reactor);
121 CallOnCancel(reactor());
127 void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
135 virtual void CallOnDone() = 0;
139 void ScheduleOnDone(
bool inline_ondone);
147 bool UnblockCancellation() {
148 return on_cancel_conditions_remaining_.fetch_sub(
149 1, std::memory_order_acq_rel) == 1;
154 return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
157 std::atomic_int on_cancel_conditions_remaining_{2};
158 std::atomic_int callbacks_outstanding_{
162 template <
class Request,
class Response>
175 Request request_obj_;
176 Response response_obj_;
182 class ServerUnaryReactor;
183 template <
class Request>
185 template <
class Response>
187 template <
class Request,
class Response>
202 template <
class Reactor>
204 reactor->InternalBindCall(
this);
208 template <
class Request>
214 virtual void Read(Request* msg) = 0;
218 reactor->InternalBindReader(
this);
222 template <
class Response>
235 reactor->InternalBindWriter(
this);
239 template <
class Request,
class Response>
246 virtual void Read(Request* msg) = 0;
253 reactor->InternalBindStream(
this);
269 template <
class Request,
class Response>
285 stream_.load(std::memory_order_acquire);
286 if (stream ==
nullptr) {
288 stream = stream_.load(std::memory_order_relaxed);
289 if (stream ==
nullptr) {
290 backlog_.send_initial_metadata_wanted =
true;
301 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
303 stream_.load(std::memory_order_acquire);
304 if (stream ==
nullptr) {
306 stream = stream_.load(std::memory_order_relaxed);
307 if (stream ==
nullptr) {
308 backlog_.read_wanted = req;
331 ABSL_LOCKS_EXCLUDED(stream_mu_) {
333 stream_.load(std::memory_order_acquire);
334 if (stream ==
nullptr) {
336 stream = stream_.load(std::memory_order_relaxed);
337 if (stream ==
nullptr) {
338 backlog_.write_wanted = resp;
339 backlog_.write_options_wanted = options;
343 stream->
Write(resp, options);
362 stream_.load(std::memory_order_acquire);
363 if (stream ==
nullptr) {
365 stream = stream_.load(std::memory_order_relaxed);
366 if (stream ==
nullptr) {
367 backlog_.write_and_finish_wanted =
true;
368 backlog_.write_wanted = resp;
369 backlog_.write_options_wanted = options;
370 backlog_.status_wanted = std::move(s);
397 stream_.load(std::memory_order_acquire);
398 if (stream ==
nullptr) {
400 stream = stream_.load(std::memory_order_relaxed);
401 if (stream ==
nullptr) {
402 backlog_.finish_wanted =
true;
403 backlog_.status_wanted = std::move(s);
407 stream->
Finish(std::move(s));
434 void OnDone()
override = 0;
445 virtual void InternalBindStream(
449 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
453 stream->
Read(backlog_.read_wanted);
457 std::move(backlog_.write_options_wanted),
458 std::move(backlog_.status_wanted));
461 stream->
Write(backlog_.write_wanted,
462 std::move(backlog_.write_options_wanted));
465 stream->
Finish(std::move(backlog_.status_wanted));
469 stream_.store(stream, std::memory_order_release);
477 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{
nullptr};
478 struct PreBindBacklog {
479 bool send_initial_metadata_wanted =
false;
480 bool write_and_finish_wanted =
false;
481 bool finish_wanted =
false;
482 Request* read_wanted =
nullptr;
483 const Response* write_wanted =
nullptr;
487 PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
491 template <
class Request>
492 class ServerReadReactor :
public internal::ServerReactor {
500 reader_.load(std::memory_order_acquire);
501 if (reader ==
nullptr) {
503 reader = reader_.load(std::memory_order_relaxed);
504 if (reader ==
nullptr) {
505 backlog_.send_initial_metadata_wanted =
true;
511 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
513 reader_.load(std::memory_order_acquire);
514 if (reader ==
nullptr) {
516 reader = reader_.load(std::memory_order_relaxed);
517 if (reader ==
nullptr) {
518 backlog_.read_wanted = req;
526 reader_.load(std::memory_order_acquire);
527 if (reader ==
nullptr) {
529 reader = reader_.load(std::memory_order_relaxed);
530 if (reader ==
nullptr) {
531 backlog_.finish_wanted =
true;
532 backlog_.status_wanted = std::move(s);
536 reader->
Finish(std::move(s));
542 void OnDone()
override = 0;
551 ABSL_LOCKS_EXCLUDED(reader_mu_) {
554 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
555 reader->SendInitialMetadata();
558 reader->Read(backlog_.read_wanted);
561 reader->Finish(std::move(backlog_.status_wanted));
564 reader_.store(reader, std::memory_order_release);
568 std::atomic<ServerCallbackReader<Request>*> reader_{
nullptr};
569 struct PreBindBacklog {
570 bool send_initial_metadata_wanted =
false;
571 bool finish_wanted =
false;
572 Request* read_wanted =
nullptr;
575 PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
579 template <
class Response>
580 class ServerWriteReactor :
public internal::ServerReactor {
588 writer_.load(std::memory_order_acquire);
589 if (writer ==
nullptr) {
591 writer = writer_.load(std::memory_order_relaxed);
592 if (writer ==
nullptr) {
593 backlog_.send_initial_metadata_wanted =
true;
603 ABSL_LOCKS_EXCLUDED(writer_mu_) {
605 writer_.load(std::memory_order_acquire);
606 if (writer ==
nullptr) {
608 writer = writer_.load(std::memory_order_relaxed);
609 if (writer ==
nullptr) {
610 backlog_.write_wanted = resp;
611 backlog_.write_options_wanted = options;
615 writer->
Write(resp, options);
620 writer_.load(std::memory_order_acquire);
621 if (writer ==
nullptr) {
623 writer = writer_.load(std::memory_order_relaxed);
624 if (writer ==
nullptr) {
625 backlog_.write_and_finish_wanted =
true;
626 backlog_.write_wanted = resp;
627 backlog_.write_options_wanted = options;
628 backlog_.status_wanted = std::move(s);
639 writer_.load(std::memory_order_acquire);
640 if (writer ==
nullptr) {
642 writer = writer_.load(std::memory_order_relaxed);
643 if (writer ==
nullptr) {
644 backlog_.finish_wanted =
true;
645 backlog_.status_wanted = std::move(s);
649 writer->
Finish(std::move(s));
655 void OnDone()
override = 0;
663 ABSL_LOCKS_EXCLUDED(writer_mu_) {
666 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
667 writer->SendInitialMetadata();
670 writer->WriteAndFinish(backlog_.write_wanted,
671 std::move(backlog_.write_options_wanted),
672 std::move(backlog_.status_wanted));
675 writer->Write(backlog_.write_wanted,
676 std::move(backlog_.write_options_wanted));
679 writer->Finish(std::move(backlog_.status_wanted));
683 writer_.store(writer, std::memory_order_release);
687 std::atomic<ServerCallbackWriter<Response>*> writer_{
nullptr};
688 struct PreBindBacklog {
689 bool send_initial_metadata_wanted =
false;
690 bool write_and_finish_wanted =
false;
691 bool finish_wanted =
false;
692 const Response* write_wanted =
nullptr;
696 PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
707 if (call ==
nullptr) {
709 call = call_.load(std::memory_order_relaxed);
710 if (call ==
nullptr) {
711 backlog_.send_initial_metadata_wanted =
true;
722 if (call ==
nullptr) {
724 call = call_.load(std::memory_order_relaxed);
725 if (call ==
nullptr) {
726 backlog_.finish_wanted =
true;
727 backlog_.status_wanted = std::move(s);
731 call->
Finish(std::move(s));
736 void OnDone()
override = 0;
744 ABSL_LOCKS_EXCLUDED(call_mu_) {
747 if (
GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
748 call->SendInitialMetadata();
751 call->Finish(std::move(backlog_.status_wanted));
754 call_.store(call, std::memory_order_release);
758 std::atomic<ServerCallbackUnary*> call_{
nullptr};
759 struct PreBindBacklog {
760 bool send_initial_metadata_wanted =
false;
761 bool finish_wanted =
false;
764 PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
769 template <
class Base>
777 template <
class Request>
779 template <
class Response>
782 template <
class Request,
class Response>
790 namespace experimental {
792 template <
class Request,
class Response>
799 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:184
virtual void SendInitialMetadata()=0
~ServerCallbackReader() override
Definition: server_callback.h:211
void set_response(Response *response)
Definition: message_allocator.h:52
void OnCancel() override
Definition: server_callback.h:543
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_)
Send any initial metadata stored in the RPC context.
Definition: server_callback.h:283
void set_request(Request *request)
Definition: message_allocator.h:51
ServerReadReactor()
Definition: server_callback.h:494
virtual void Finish(grpc::Status s)=0
void StartWrite(const Response *resp)
Definition: server_callback.h:599
virtual void Finish(grpc::Status s)=0
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options and final RPC Status, which also causes any trailin...
Definition: server_callback.h:359
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:158
virtual ~ServerReactor()=default
void MaybeCallOnCancel()
Definition: server_callback.h:119
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:602
virtual void OnWriteDone(bool)
Definition: server_callback.h:654
ServerBidiReactor()
Definition: server_callback.h:277
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:653
virtual void Finish(grpc::Status s)=0
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:511
DefaultMessageHolder()
Definition: server_callback.h:165
~ServerCallbackReaderWriter() override
Definition: server_callback.h:242
void BindReactor(Reactor *reactor)
Definition: server_callback.h:203
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:109
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Definition: server_callback.h:634
~ServerReadReactor() override=default
Definition: server_callback.h:240
void StartWrite(const Response *resp)
Initiate a write operation.
Definition: server_callback.h:320
void OnCancel() override
Notifies the application that this RPC has been cancelled.
Definition: server_callback.h:439
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
void StartRead(Request *req) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a read operation.
Definition: server_callback.h:301
Did it work? If it didn't, why?
Definition: status.h:35
~ServerCallbackUnary() override
Definition: server_callback.h:195
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:735
The base class of ServerCallbackUnary etc.
Definition: server_callback.h:75
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite (or StartWriteLast) operation completed.
Definition: server_callback.h:429
void StartWrite(const Response *resp, grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(stream_mu_)
Initiate a write operation with specified options.
Definition: server_callback.h:330
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
void OnDone() override=0
Notifies the application that all operations associated with this RPC have completed.
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:188
Definition: server_callback.h:43
virtual void SendInitialMetadata()=0
~ServerBidiReactor() override=default
Definition: server_callback.h:163
FinishOnlyReactor(grpc::Status s)
Definition: server_callback.h:772
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: server_callback.h:422
void StartWriteAndFinish(const Response *resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:617
Definition: server_callback.h:47
virtual void Read(Request *msg)=0
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
Indicate that the stream is to be finished and the trailing metadata and RPC status are to be sent.
Definition: server_callback.h:395
void BindReactor(ServerReadReactor< Request > *reactor)
Definition: server_callback.h:217
void BindReactor(ServerWriteReactor< Response > *reactor)
Definition: server_callback.h:234
Definition: server_callback.h:770
~ServerCallbackWriter() override
Definition: server_callback.h:225
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:498
virtual ~ServerCallbackCall()
Definition: server_callback.h:77
Per-message write options.
Definition: call_op_set.h:81
void MaybeDone(bool inline_ondone)
Definition: server_callback.h:101
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:186
virtual void OnReadDone(bool)
Definition: server_callback.h:541
virtual void Read(Request *msg)=0
Definition: server_callback.h:49
void OnCancel() override
Definition: server_callback.h:656
~ServerWriteReactor() override=default
void BindReactor(ServerBidiReactor< Request, Response > *reactor)
Definition: server_callback.h:252
Definition: server_callback.h:223
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_)
The following operation initiations are exactly like ServerBidiReactor.
Definition: server_callback.h:586
virtual bool InternalInlineable()
Definition: server_callback.h:61
virtual void SendInitialMetadata()=0
virtual void OnSendInitialMetadataDone(bool)
The following notifications are exactly like ServerBidiReactor.
Definition: server_callback.h:540
void StartWriteLast(const Response *resp, grpc::WriteOptions options)
Inform system of a planned write operation with specified options, but allow the library to schedule ...
Definition: server_callback.h:385
void OnDone() override
Definition: server_callback.h:773
Definition: server_callback.h:51
Definition: server_callback.h:45
void OnCancel() override
Definition: server_callback.h:737
void Ref()
Increases the reference count.
Definition: server_callback.h:127
virtual void OnCancel()=0
Definition: server_callback.h:209
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_)
Definition: server_callback.h:524
virtual void Write(const Response *msg, grpc::WriteOptions options)=0
Definition: message_allocator.h:41
void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_)
StartSendInitialMetadata is exactly like ServerBidiReactor.
Definition: server_callback.h:705
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
Definition: server_callback.h:637
void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_)
Finish is similar to ServerBidiReactor except for one detail.
Definition: server_callback.h:720
virtual void OnSendInitialMetadataDone(bool)
Notifies the application that an explicit StartSendInitialMetadata operation completed.
Definition: server_callback.h:416
~ServerUnaryReactor() override=default
Definition: server_callback.h:193
ServerUnaryReactor()
Definition: server_callback.h:701
ServerWriteReactor()
Definition: server_callback.h:582
Definition: server_callback.h:699
virtual void WriteAndFinish(const Response *msg, grpc::WriteOptions options, grpc::Status s)=0
virtual void Finish(grpc::Status s)=0
virtual void SendInitialMetadata()=0
void MaybeDone()
Definition: server_callback.h:95
void Release() override
Definition: server_callback.h:169
::grpc::ServerBidiReactor< Request, Response > ServerBidiReactor
Definition: server_callback.h:793