18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
21 #include "absl/log/absl_check.h"
35 template <
class RequestType,
class ResponseType>
40 const RequestType*, ResponseType*)>
42 : get_reactor_(
std::move(get_reactor)) {}
46 allocator_ = allocator;
52 auto* allocator_state =
57 sizeof(ServerCallbackUnaryImpl)))
58 ServerCallbackUnaryImpl(
60 param.call, allocator_state, param.call_requester);
61 param.server_context->BeginCompletionOp(
62 param.call, [call](
bool) { call->MaybeDone(); }, call);
65 if (param.status.ok()) {
66 reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
72 if (reactor ==
nullptr) {
81 call->SetupReactor(reactor);
88 RequestType* request =
nullptr;
90 if (allocator_ !=
nullptr) {
97 *handler_data = allocator_state;
98 request = allocator_state->
request();
110 const RequestType*, ResponseType*)>
127 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
130 finish_ops_.set_core_cq_tag(&finish_tag_);
132 if (!ctx_->sent_initial_metadata_) {
134 ctx_->initial_metadata_flags());
135 if (ctx_->compression_level_set()) {
136 finish_ops_.set_compression_level(ctx_->compression_level());
138 ctx_->sent_initial_metadata_ =
true;
142 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
143 finish_ops_.SendMessagePtr(response()));
145 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
147 finish_ops_.set_core_cq_tag(&finish_tag_);
148 call_.PerformOps(&finish_ops_);
151 void SendInitialMetadata()
override {
152 ABSL_CHECK(!ctx_->sent_initial_metadata_);
162 ServerUnaryReactor* reactor =
163 reactor_.load(std::memory_order_relaxed);
164 reactor->OnSendInitialMetadataDone(ok);
165 this->MaybeDone(true);
168 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
169 ctx_->initial_metadata_flags());
170 if (ctx_->compression_level_set()) {
171 meta_ops_.set_compression_level(ctx_->compression_level());
173 ctx_->sent_initial_metadata_ =
true;
174 meta_ops_.set_core_cq_tag(&meta_tag_);
175 call_.PerformOps(&meta_ops_);
181 ServerCallbackUnaryImpl(
183 MessageHolder<RequestType, ResponseType>* allocator_state,
184 std::function<
void()> call_requester)
187 allocator_state_(allocator_state),
188 call_requester_(
std::move(call_requester)) {
189 ctx_->set_message_allocator_state(allocator_state);
198 void SetupReactor(ServerUnaryReactor* reactor) {
199 reactor_.store(reactor, std::memory_order_relaxed);
202 this->
MaybeDone(reactor->InternalInlineable());
205 const RequestType* request() {
return allocator_state_->request(); }
206 ResponseType* response() {
return allocator_state_->response(); }
208 void CallOnDone()
override {
209 reactor_.load(std::memory_order_relaxed)->OnDone();
211 auto call_requester = std::move(call_requester_);
212 allocator_state_->Release();
213 if (ctx_->context_allocator() !=
nullptr) {
214 ctx_->context_allocator()->Release(ctx_);
216 this->~ServerCallbackUnaryImpl();
221 ServerReactor* reactor()
override {
222 return reactor_.load(std::memory_order_relaxed);
236 MessageHolder<RequestType, ResponseType>*
const allocator_state_;
237 std::function<void()> call_requester_;
248 std::atomic<ServerUnaryReactor*> reactor_;
250 std::atomic<intptr_t> callbacks_outstanding_{
255 template <
class RequestType,
class ResponseType>
262 : get_reactor_(
std::move(get_reactor)) {}
268 sizeof(ServerCallbackReaderImpl)))
269 ServerCallbackReaderImpl(
271 param.call, param.call_requester);
275 param.server_context->BeginCompletionOp(
277 [reader](
bool) { reader->MaybeDone(
false); },
281 if (param.status.ok()) {
283 grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
289 if (reactor ==
nullptr) {
297 reader->SetupReactor(reactor);
317 this->MaybeDone(false);
320 if (!ctx_->sent_initial_metadata_) {
322 ctx_->initial_metadata_flags());
323 if (ctx_->compression_level_set()) {
324 finish_ops_.set_compression_level(ctx_->compression_level());
326 ctx_->sent_initial_metadata_ =
true;
330 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
331 finish_ops_.SendMessagePtr(&resp_));
333 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
335 finish_ops_.set_core_cq_tag(&finish_tag_);
336 call_.PerformOps(&finish_ops_);
339 void SendInitialMetadata()
override {
340 ABSL_CHECK(!ctx_->sent_initial_metadata_);
348 ServerReadReactor<RequestType>* reactor =
349 reactor_.load(std::memory_order_relaxed);
350 reactor->OnSendInitialMetadataDone(ok);
351 this->MaybeDone(true);
354 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
355 ctx_->initial_metadata_flags());
356 if (ctx_->compression_level_set()) {
357 meta_ops_.set_compression_level(ctx_->compression_level());
359 ctx_->sent_initial_metadata_ =
true;
360 meta_ops_.set_core_cq_tag(&meta_tag_);
361 call_.PerformOps(&meta_ops_);
364 void Read(RequestType* req)
override {
366 read_ops_.RecvMessage(req);
367 call_.PerformOps(&read_ops_);
375 std::function<
void()> call_requester)
376 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
380 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
381 reactor_.store(reactor, std::memory_order_relaxed);
387 [
this, reactor](
bool ok) {
388 if (GPR_UNLIKELY(!ok)) {
389 ctx_->MaybeMarkCancelledOnRead();
391 reactor->OnReadDone(ok);
392 this->MaybeDone(
true);
395 read_ops_.set_core_cq_tag(&read_tag_);
404 ~ServerCallbackReaderImpl() {}
406 ResponseType* response() {
return &resp_; }
408 void CallOnDone()
override {
409 reactor_.load(std::memory_order_relaxed)->OnDone();
411 auto call_requester = std::move(call_requester_);
412 if (ctx_->context_allocator() !=
nullptr) {
413 ctx_->context_allocator()->Release(ctx_);
415 this->~ServerCallbackReaderImpl();
420 ServerReactor* reactor()
override {
421 return reactor_.load(std::memory_order_relaxed);
439 std::function<void()> call_requester_;
441 std::atomic<ServerReadReactor<RequestType>*> reactor_;
443 std::atomic<intptr_t> callbacks_outstanding_{
448 template <
class RequestType,
class ResponseType>
455 : get_reactor_(
std::move(get_reactor)) {}
461 sizeof(ServerCallbackWriterImpl)))
462 ServerCallbackWriterImpl(
464 param.call,
static_cast<RequestType*
>(param.request),
465 param.call_requester);
469 param.server_context->BeginCompletionOp(
471 [writer](
bool) { writer->MaybeDone(
false); },
475 if (param.status.ok()) {
482 if (reactor ==
nullptr) {
490 writer->SetupReactor(reactor);
505 request->~RequestType();
526 this->MaybeDone(false);
529 finish_ops_.set_core_cq_tag(&finish_tag_);
531 if (!ctx_->sent_initial_metadata_) {
533 ctx_->initial_metadata_flags());
534 if (ctx_->compression_level_set()) {
535 finish_ops_.set_compression_level(ctx_->compression_level());
537 ctx_->sent_initial_metadata_ =
true;
539 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
540 call_.PerformOps(&finish_ops_);
543 void SendInitialMetadata()
override {
544 ABSL_CHECK(!ctx_->sent_initial_metadata_);
552 ServerWriteReactor<ResponseType>* reactor =
553 reactor_.load(std::memory_order_relaxed);
554 reactor->OnSendInitialMetadataDone(ok);
555 this->MaybeDone(true);
558 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
559 ctx_->initial_metadata_flags());
560 if (ctx_->compression_level_set()) {
561 meta_ops_.set_compression_level(ctx_->compression_level());
563 ctx_->sent_initial_metadata_ =
true;
564 meta_ops_.set_core_cq_tag(&meta_tag_);
565 call_.PerformOps(&meta_ops_);
573 if (!ctx_->sent_initial_metadata_) {
574 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
575 ctx_->initial_metadata_flags());
576 if (ctx_->compression_level_set()) {
577 write_ops_.set_compression_level(ctx_->compression_level());
579 ctx_->sent_initial_metadata_ =
true;
582 ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
583 call_.PerformOps(&write_ops_);
590 ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
591 Finish(std::move(s));
595 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
599 std::function<
void()> call_requester)
603 call_requester_(
std::move(call_requester)) {}
607 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
608 reactor_.store(reactor, std::memory_order_relaxed);
614 [
this, reactor](
bool ok) {
615 reactor->OnWriteDone(ok);
616 this->MaybeDone(true);
619 write_ops_.set_core_cq_tag(&write_tag_);
620 this->BindReactor(reactor);
621 this->MaybeCallOnCancel(reactor);
625 this->MaybeDone(
false);
627 ~ServerCallbackWriterImpl() {
628 if (req_ !=
nullptr) {
629 req_->~RequestType();
633 const RequestType* request() {
return req_; }
635 void CallOnDone()
override {
636 reactor_.load(std::memory_order_relaxed)->OnDone();
638 auto call_requester = std::move(call_requester_);
639 if (ctx_->context_allocator() !=
nullptr) {
640 ctx_->context_allocator()->Release(ctx_);
642 this->~ServerCallbackWriterImpl();
647 ServerReactor* reactor()
override {
648 return reactor_.load(std::memory_order_relaxed);
666 const RequestType* req_;
667 std::function<void()> call_requester_;
669 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
671 std::atomic<intptr_t> callbacks_outstanding_{
676 template <
class RequestType,
class ResponseType>
683 : get_reactor_(
std::move(get_reactor)) {}
688 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
689 ServerCallbackReaderWriterImpl(
691 param.call, param.call_requester);
695 param.server_context->BeginCompletionOp(
697 [stream](
bool) { stream->MaybeDone(
false); },
701 if (param.status.ok()) {
708 if (reactor ==
nullptr) {
717 stream->SetupReactor(reactor);
721 std::function<ServerBidiReactor<RequestType, ResponseType>*(
725 class ServerCallbackReaderWriterImpl
738 this->MaybeDone(false);
741 finish_ops_.set_core_cq_tag(&finish_tag_);
743 if (!ctx_->sent_initial_metadata_) {
745 ctx_->initial_metadata_flags());
746 if (ctx_->compression_level_set()) {
747 finish_ops_.set_compression_level(ctx_->compression_level());
749 ctx_->sent_initial_metadata_ =
true;
751 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
752 call_.PerformOps(&finish_ops_);
755 void SendInitialMetadata()
override {
756 ABSL_CHECK(!ctx_->sent_initial_metadata_);
764 ServerBidiReactor<RequestType, ResponseType>* reactor =
765 reactor_.load(std::memory_order_relaxed);
766 reactor->OnSendInitialMetadataDone(ok);
767 this->MaybeDone(true);
770 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
771 ctx_->initial_metadata_flags());
772 if (ctx_->compression_level_set()) {
773 meta_ops_.set_compression_level(ctx_->compression_level());
775 ctx_->sent_initial_metadata_ =
true;
776 meta_ops_.set_core_cq_tag(&meta_tag_);
777 call_.PerformOps(&meta_ops_);
785 if (!ctx_->sent_initial_metadata_) {
786 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
787 ctx_->initial_metadata_flags());
788 if (ctx_->compression_level_set()) {
789 write_ops_.set_compression_level(ctx_->compression_level());
791 ctx_->sent_initial_metadata_ =
true;
794 ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
795 call_.PerformOps(&write_ops_);
801 ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
802 Finish(std::move(s));
805 void Read(RequestType* req)
override {
807 read_ops_.RecvMessage(req);
808 call_.PerformOps(&read_ops_);
812 friend class CallbackBidiHandler<RequestType, ResponseType>;
816 std::function<
void()> call_requester)
817 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
821 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
822 reactor_.store(reactor, std::memory_order_relaxed);
828 [
this, reactor](
bool ok) {
829 reactor->OnWriteDone(ok);
830 this->MaybeDone(true);
833 write_ops_.set_core_cq_tag(&write_tag_);
836 [
this, reactor](
bool ok) {
837 if (GPR_UNLIKELY(!ok)) {
838 ctx_->MaybeMarkCancelledOnRead();
840 reactor->OnReadDone(ok);
841 this->MaybeDone(
true);
844 read_ops_.set_core_cq_tag(&read_tag_);
845 this->BindReactor(reactor);
846 this->MaybeCallOnCancel(reactor);
850 this->MaybeDone(
false);
853 void CallOnDone()
override {
854 reactor_.load(std::memory_order_relaxed)->OnDone();
856 auto call_requester = std::move(call_requester_);
857 if (ctx_->context_allocator() !=
nullptr) {
858 ctx_->context_allocator()->Release(ctx_);
860 this->~ServerCallbackReaderWriterImpl();
865 ServerReactor* reactor()
override {
866 return reactor_.load(std::memory_order_relaxed);
887 std::function<void()> call_requester_;
889 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
891 std::atomic<intptr_t> callbacks_outstanding_{
899 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H