18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
31 #include "absl/log/absl_check.h"
36 template <
class RequestType,
class ResponseType>
41 const RequestType*, ResponseType*)>
43 : get_reactor_(
std::move(get_reactor)) {}
47 allocator_ = allocator;
53 auto* allocator_state =
58 sizeof(ServerCallbackUnaryImpl)))
59 ServerCallbackUnaryImpl(
61 param.call, allocator_state, param.call_requester);
62 param.server_context->BeginCompletionOp(
63 param.call, [call](
bool) { call->MaybeDone(); }, call);
66 if (param.status.ok()) {
67 reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
73 if (reactor ==
nullptr) {
82 call->SetupReactor(reactor);
89 RequestType* request =
nullptr;
91 if (allocator_ !=
nullptr) {
98 *handler_data = allocator_state;
99 request = allocator_state->
request();
110 const RequestType*, ResponseType*)>
127 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
130 finish_ops_.set_core_cq_tag(&finish_tag_);
132 if (!ctx_->sent_initial_metadata_) {
134 ctx_->initial_metadata_flags());
135 if (ctx_->compression_level_set()) {
136 finish_ops_.set_compression_level(ctx_->compression_level());
138 ctx_->sent_initial_metadata_ =
true;
142 finish_ops_.ServerSendStatus(
143 &ctx_->trailing_metadata_,
144 finish_ops_.SendMessagePtr(response(), ctx_->memory_allocator()));
146 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
148 finish_ops_.set_core_cq_tag(&finish_tag_);
149 finish_ops_.FillOps(&call_);
152 void SendInitialMetadata()
override {
153 ABSL_CHECK(!ctx_->sent_initial_metadata_);
163 ServerUnaryReactor* reactor =
164 reactor_.load(std::memory_order_relaxed);
165 reactor->OnSendInitialMetadataDone(ok);
166 this->MaybeDone(true);
169 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170 ctx_->initial_metadata_flags());
171 if (ctx_->compression_level_set()) {
172 meta_ops_.set_compression_level(ctx_->compression_level());
174 ctx_->sent_initial_metadata_ =
true;
175 meta_ops_.set_core_cq_tag(&meta_tag_);
176 meta_ops_.FillOps(&call_);
182 ServerCallbackUnaryImpl(
184 MessageHolder<RequestType, ResponseType>* allocator_state,
185 std::function<
void()> call_requester)
188 allocator_state_(allocator_state),
189 call_requester_(
std::move(call_requester)) {
190 ctx_->set_message_allocator_state(allocator_state);
199 void SetupReactor(ServerUnaryReactor* reactor) {
200 reactor_.store(reactor, std::memory_order_relaxed);
203 this->
MaybeDone(reactor->InternalInlineable());
206 const RequestType* request() {
return allocator_state_->request(); }
207 ResponseType* response() {
return allocator_state_->response(); }
209 void CallOnDone()
override {
210 reactor_.load(std::memory_order_relaxed)->OnDone();
212 auto call_requester = std::move(call_requester_);
213 allocator_state_->Release();
214 if (ctx_->context_allocator() !=
nullptr) {
215 ctx_->context_allocator()->Release(ctx_);
217 this->~ServerCallbackUnaryImpl();
222 ServerReactor* reactor()
override {
223 return reactor_.load(std::memory_order_relaxed);
237 MessageHolder<RequestType, ResponseType>*
const allocator_state_;
238 std::function<void()> call_requester_;
249 std::atomic<ServerUnaryReactor*> reactor_;
251 std::atomic<intptr_t> callbacks_outstanding_{
256 template <
class RequestType,
class ResponseType>
263 : get_reactor_(
std::move(get_reactor)) {}
269 sizeof(ServerCallbackReaderImpl)))
270 ServerCallbackReaderImpl(
272 param.call, param.call_requester);
276 param.server_context->BeginCompletionOp(
278 [reader](
bool) { reader->MaybeDone(
false); },
282 if (param.status.ok()) {
284 grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
290 if (reactor ==
nullptr) {
298 reader->SetupReactor(reactor);
318 this->MaybeDone(false);
321 if (!ctx_->sent_initial_metadata_) {
323 ctx_->initial_metadata_flags());
324 if (ctx_->compression_level_set()) {
325 finish_ops_.set_compression_level(ctx_->compression_level());
327 ctx_->sent_initial_metadata_ =
true;
331 finish_ops_.ServerSendStatus(
332 &ctx_->trailing_metadata_,
333 finish_ops_.SendMessagePtr(&resp_, ctx_->memory_allocator()));
335 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
337 finish_ops_.set_core_cq_tag(&finish_tag_);
338 finish_ops_.FillOps(&call_);
341 void SendInitialMetadata()
override {
342 ABSL_CHECK(!ctx_->sent_initial_metadata_);
350 ServerReadReactor<RequestType>* reactor =
351 reactor_.load(std::memory_order_relaxed);
352 reactor->OnSendInitialMetadataDone(ok);
353 this->MaybeDone(true);
356 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
357 ctx_->initial_metadata_flags());
358 if (ctx_->compression_level_set()) {
359 meta_ops_.set_compression_level(ctx_->compression_level());
361 ctx_->sent_initial_metadata_ =
true;
362 meta_ops_.set_core_cq_tag(&meta_tag_);
363 meta_ops_.FillOps(&call_);
366 void Read(RequestType* req)
override {
368 read_ops_.RecvMessage(req);
369 read_ops_.FillOps(&call_);
377 std::function<
void()> call_requester)
378 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
382 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
383 reactor_.store(reactor, std::memory_order_relaxed);
389 [
this, reactor](
bool ok) {
390 if (GPR_UNLIKELY(!ok)) {
391 ctx_->MaybeMarkCancelledOnRead();
393 reactor->OnReadDone(ok);
394 this->MaybeDone(
true);
397 read_ops_.set_core_cq_tag(&read_tag_);
406 ~ServerCallbackReaderImpl() {}
408 ResponseType* response() {
return &resp_; }
410 void CallOnDone()
override {
411 reactor_.load(std::memory_order_relaxed)->OnDone();
413 auto call_requester = std::move(call_requester_);
414 if (ctx_->context_allocator() !=
nullptr) {
415 ctx_->context_allocator()->Release(ctx_);
417 this->~ServerCallbackReaderImpl();
422 ServerReactor* reactor()
override {
423 return reactor_.load(std::memory_order_relaxed);
441 std::function<void()> call_requester_;
443 std::atomic<ServerReadReactor<RequestType>*> reactor_;
445 std::atomic<intptr_t> callbacks_outstanding_{
450 template <
class RequestType,
class ResponseType>
457 : get_reactor_(
std::move(get_reactor)) {}
463 sizeof(ServerCallbackWriterImpl)))
464 ServerCallbackWriterImpl(
466 param.call,
static_cast<RequestType*
>(param.request),
467 param.call_requester);
471 param.server_context->BeginCompletionOp(
473 [writer](
bool) { writer->MaybeDone(
false); },
477 if (param.status.ok()) {
484 if (reactor ==
nullptr) {
492 writer->SetupReactor(reactor);
506 request->~RequestType();
527 this->MaybeDone(false);
530 finish_ops_.set_core_cq_tag(&finish_tag_);
532 if (!ctx_->sent_initial_metadata_) {
534 ctx_->initial_metadata_flags());
535 if (ctx_->compression_level_set()) {
536 finish_ops_.set_compression_level(ctx_->compression_level());
538 ctx_->sent_initial_metadata_ =
true;
540 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
541 finish_ops_.FillOps(&call_);
544 void SendInitialMetadata()
override {
545 ABSL_CHECK(!ctx_->sent_initial_metadata_);
553 ServerWriteReactor<ResponseType>* reactor =
554 reactor_.load(std::memory_order_relaxed);
555 reactor->OnSendInitialMetadataDone(ok);
556 this->MaybeDone(true);
559 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
560 ctx_->initial_metadata_flags());
561 if (ctx_->compression_level_set()) {
562 meta_ops_.set_compression_level(ctx_->compression_level());
564 ctx_->sent_initial_metadata_ =
true;
565 meta_ops_.set_core_cq_tag(&meta_tag_);
566 meta_ops_.FillOps(&call_);
574 if (!ctx_->sent_initial_metadata_) {
575 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
576 ctx_->initial_metadata_flags());
577 if (ctx_->compression_level_set()) {
578 write_ops_.set_compression_level(ctx_->compression_level());
580 ctx_->sent_initial_metadata_ =
true;
584 write_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
586 write_ops_.FillOps(&call_);
594 finish_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
596 Finish(std::move(s));
600 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
604 std::function<
void()> call_requester)
608 call_requester_(
std::move(call_requester)) {}
612 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
613 reactor_.store(reactor, std::memory_order_relaxed);
619 [
this, reactor](
bool ok) {
620 reactor->OnWriteDone(ok);
621 this->MaybeDone(true);
624 write_ops_.set_core_cq_tag(&write_tag_);
625 this->BindReactor(reactor);
626 this->MaybeCallOnCancel(reactor);
630 this->MaybeDone(
false);
632 ~ServerCallbackWriterImpl() {
633 if (req_ !=
nullptr) {
634 req_->~RequestType();
638 const RequestType* request() {
return req_; }
640 void CallOnDone()
override {
641 reactor_.load(std::memory_order_relaxed)->OnDone();
643 auto call_requester = std::move(call_requester_);
644 if (ctx_->context_allocator() !=
nullptr) {
645 ctx_->context_allocator()->Release(ctx_);
647 this->~ServerCallbackWriterImpl();
652 ServerReactor* reactor()
override {
653 return reactor_.load(std::memory_order_relaxed);
671 const RequestType* req_;
672 std::function<void()> call_requester_;
674 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
676 std::atomic<intptr_t> callbacks_outstanding_{
681 template <
class RequestType,
class ResponseType>
688 : get_reactor_(
std::move(get_reactor)) {}
693 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
694 ServerCallbackReaderWriterImpl(
696 param.call, param.call_requester);
700 param.server_context->BeginCompletionOp(
702 [stream](
bool) { stream->MaybeDone(
false); },
706 if (param.status.ok()) {
713 if (reactor ==
nullptr) {
722 stream->SetupReactor(reactor);
726 std::function<ServerBidiReactor<RequestType, ResponseType>*(
730 class ServerCallbackReaderWriterImpl
743 this->MaybeDone(false);
746 finish_ops_.set_core_cq_tag(&finish_tag_);
748 if (!ctx_->sent_initial_metadata_) {
750 ctx_->initial_metadata_flags());
751 if (ctx_->compression_level_set()) {
752 finish_ops_.set_compression_level(ctx_->compression_level());
754 ctx_->sent_initial_metadata_ =
true;
756 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
757 finish_ops_.FillOps(&call_);
760 void SendInitialMetadata()
override {
761 ABSL_CHECK(!ctx_->sent_initial_metadata_);
769 ServerBidiReactor<RequestType, ResponseType>* reactor =
770 reactor_.load(std::memory_order_relaxed);
771 reactor->OnSendInitialMetadataDone(ok);
772 this->MaybeDone(true);
775 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
776 ctx_->initial_metadata_flags());
777 if (ctx_->compression_level_set()) {
778 meta_ops_.set_compression_level(ctx_->compression_level());
780 ctx_->sent_initial_metadata_ =
true;
781 meta_ops_.set_core_cq_tag(&meta_tag_);
782 meta_ops_.FillOps(&call_);
790 if (!ctx_->sent_initial_metadata_) {
791 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
792 ctx_->initial_metadata_flags());
793 if (ctx_->compression_level_set()) {
794 write_ops_.set_compression_level(ctx_->compression_level());
796 ctx_->sent_initial_metadata_ =
true;
800 write_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
802 write_ops_.FillOps(&call_);
809 finish_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
811 Finish(std::move(s));
814 void Read(RequestType* req)
override {
816 read_ops_.RecvMessage(req);
817 read_ops_.FillOps(&call_);
821 friend class CallbackBidiHandler<RequestType, ResponseType>;
825 std::function<
void()> call_requester)
826 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
830 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
831 reactor_.store(reactor, std::memory_order_relaxed);
837 [
this, reactor](
bool ok) {
838 reactor->OnWriteDone(ok);
839 this->MaybeDone(true);
842 write_ops_.set_core_cq_tag(&write_tag_);
845 [
this, reactor](
bool ok) {
846 if (GPR_UNLIKELY(!ok)) {
847 ctx_->MaybeMarkCancelledOnRead();
849 reactor->OnReadDone(ok);
850 this->MaybeDone(
true);
853 read_ops_.set_core_cq_tag(&read_tag_);
854 this->BindReactor(reactor);
855 this->MaybeCallOnCancel(reactor);
859 this->MaybeDone(
false);
862 void CallOnDone()
override {
863 reactor_.load(std::memory_order_relaxed)->OnDone();
865 auto call_requester = std::move(call_requester_);
866 if (ctx_->context_allocator() !=
nullptr) {
867 ctx_->context_allocator()->Release(ctx_);
869 this->~ServerCallbackReaderWriterImpl();
874 ServerReactor* reactor()
override {
875 return reactor_.load(std::memory_order_relaxed);
896 std::function<void()> call_requester_;
898 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
900 std::atomic<intptr_t> callbacks_outstanding_{
907 namespace experimental {
910 template <
class RequestType>
918 : get_reactor_(
std::move(get_reactor)), service_(service) {
919 ABSL_CHECK(service_ !=
nullptr && service_->is_virtual_service_);
925 auto* allocator_state =
927 param.internal_data);
930 inner_server =
static_cast<grpc::Server*
>(service_->server_);
931 ABSL_CHECK(inner_server !=
nullptr);
934 sizeof(ServerCallbackSessionImpl)))
935 ServerCallbackSessionImpl(
937 param.call, allocator_state, param.call_requester, inner_server);
939 param.server_context->BeginCompletionOp(
940 param.call, [call](
bool) { call->MaybeDone(); }, call);
943 if (param.status.ok()) {
951 if (reactor ==
nullptr) {
960 call->SetupReactor(reactor);
967 RequestType* request =
nullptr;
973 *handler_data = allocator_state;
974 request = allocator_state->
request();
989 class ServerCallbackSessionImpl
993 if (ctx_->IsCancelled()) {
995 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
1008 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
1010 &finish_ops_,
true);
1011 finish_ops_.set_core_cq_tag(&finish_tag_);
1013 bool is_first_metadata = !ctx_->sent_initial_metadata_;
1014 if (is_first_metadata) {
1015 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1016 ctx_->initial_metadata_flags());
1017 if (ctx_->compression_level_set()) {
1018 finish_ops_.set_compression_level(ctx_->compression_level());
1020 ctx_->sent_initial_metadata_ =
true;
1022 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
1023 finish_ops_.set_core_cq_tag(&finish_tag_);
1024 finish_ops_.FillOps(&call_);
1027 void SendInitialMetadata()
override {
1028 ABSL_CHECK(!ctx_->sent_initial_metadata_);
1038 grpc::experimental::ServerSessionReactor* reactor =
1039 reactor_.load(std::memory_order_relaxed);
1040 reactor->OnSendInitialMetadataDone(ok);
1041 this->MaybeDone(true);
1044 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1045 ctx_->initial_metadata_flags());
1046 if (ctx_->compression_level_set()) {
1047 meta_ops_.set_compression_level(ctx_->compression_level());
1049 ctx_->sent_initial_metadata_ =
true;
1050 meta_ops_.set_core_cq_tag(&meta_tag_);
1051 meta_ops_.FillOps(&call_);
1055 BindInnerServer(inner_server_);
1058 void BindInnerServer(
grpc::Server* inner_server)
override {
1064 friend class CallbackSessionHandler<RequestType>;
1066 ServerCallbackSessionImpl(
1068 MessageHolder<RequestType, grpc::ByteBuffer>* allocator_state,
1069 std::function<
void()> call_requester,
grpc::Server* inner_server)
1072 allocator_state_(allocator_state),
1073 call_requester_(
std::move(call_requester)),
1074 inner_server_(inner_server) {
1075 ABSL_CHECK(inner_server_ !=
nullptr);
1076 ctx_->set_message_allocator_state(allocator_state);
1086 reactor_.store(reactor, std::memory_order_relaxed);
1087 this->BindReactor(reactor);
1088 this->MaybeCallOnCancel(reactor);
1092 const RequestType* request() {
return allocator_state_->request(); }
1094 void CallOnDone()
override {
1095 reactor_.load(std::memory_order_relaxed)->OnDone();
1097 auto call_requester = std::move(call_requester_);
1098 allocator_state_->Release();
1099 if (ctx_->context_allocator() !=
nullptr) {
1100 ctx_->context_allocator()->Release(ctx_);
1102 this->~ServerCallbackSessionImpl();
1108 return reactor_.load(std::memory_order_relaxed);
1121 MessageHolder<RequestType, grpc::ByteBuffer>*
const allocator_state_;
1122 std::function<void()> call_requester_;
1125 std::atomic<grpc::experimental::ServerSessionReactor*> reactor_;
1127 std::atomic<intptr_t> callbacks_outstanding_{
1136 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H