18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
29 #include "absl/log/absl_check.h"
34 template <
class RequestType,
class ResponseType>
39 const RequestType*, ResponseType*)>
41 : get_reactor_(
std::move(get_reactor)) {}
45 allocator_ = allocator;
51 auto* allocator_state =
56 sizeof(ServerCallbackUnaryImpl)))
57 ServerCallbackUnaryImpl(
59 param.call, allocator_state, param.call_requester);
60 param.server_context->BeginCompletionOp(
61 param.call, [call](
bool) { call->MaybeDone(); }, call);
64 if (param.status.ok()) {
65 reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
71 if (reactor ==
nullptr) {
80 call->SetupReactor(reactor);
87 RequestType* request =
nullptr;
89 if (allocator_ !=
nullptr) {
96 *handler_data = allocator_state;
97 request = allocator_state->
request();
108 const RequestType*, ResponseType*)>
125 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
128 finish_ops_.set_core_cq_tag(&finish_tag_);
130 if (!ctx_->sent_initial_metadata_) {
132 ctx_->initial_metadata_flags());
133 if (ctx_->compression_level_set()) {
134 finish_ops_.set_compression_level(ctx_->compression_level());
136 ctx_->sent_initial_metadata_ =
true;
140 finish_ops_.ServerSendStatus(
141 &ctx_->trailing_metadata_,
142 finish_ops_.SendMessagePtr(response(), ctx_->memory_allocator()));
144 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
146 finish_ops_.set_core_cq_tag(&finish_tag_);
147 call_.PerformOps(&finish_ops_);
150 void SendInitialMetadata()
override {
151 ABSL_CHECK(!ctx_->sent_initial_metadata_);
161 ServerUnaryReactor* reactor =
162 reactor_.load(std::memory_order_relaxed);
163 reactor->OnSendInitialMetadataDone(ok);
164 this->MaybeDone(true);
167 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
168 ctx_->initial_metadata_flags());
169 if (ctx_->compression_level_set()) {
170 meta_ops_.set_compression_level(ctx_->compression_level());
172 ctx_->sent_initial_metadata_ =
true;
173 meta_ops_.set_core_cq_tag(&meta_tag_);
174 call_.PerformOps(&meta_ops_);
180 ServerCallbackUnaryImpl(
182 MessageHolder<RequestType, ResponseType>* allocator_state,
183 std::function<
void()> call_requester)
186 allocator_state_(allocator_state),
187 call_requester_(
std::move(call_requester)) {
188 ctx_->set_message_allocator_state(allocator_state);
197 void SetupReactor(ServerUnaryReactor* reactor) {
198 reactor_.store(reactor, std::memory_order_relaxed);
201 this->
MaybeDone(reactor->InternalInlineable());
204 const RequestType* request() {
return allocator_state_->request(); }
205 ResponseType* response() {
return allocator_state_->response(); }
207 void CallOnDone()
override {
208 reactor_.load(std::memory_order_relaxed)->OnDone();
210 auto call_requester = std::move(call_requester_);
211 allocator_state_->Release();
212 if (ctx_->context_allocator() !=
nullptr) {
213 ctx_->context_allocator()->Release(ctx_);
215 this->~ServerCallbackUnaryImpl();
220 ServerReactor* reactor()
override {
221 return reactor_.load(std::memory_order_relaxed);
235 MessageHolder<RequestType, ResponseType>*
const allocator_state_;
236 std::function<void()> call_requester_;
247 std::atomic<ServerUnaryReactor*> reactor_;
249 std::atomic<intptr_t> callbacks_outstanding_{
254 template <
class RequestType,
class ResponseType>
261 : get_reactor_(
std::move(get_reactor)) {}
267 sizeof(ServerCallbackReaderImpl)))
268 ServerCallbackReaderImpl(
270 param.call, param.call_requester);
274 param.server_context->BeginCompletionOp(
276 [reader](
bool) { reader->MaybeDone(
false); },
280 if (param.status.ok()) {
282 grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
288 if (reactor ==
nullptr) {
296 reader->SetupReactor(reactor);
316 this->MaybeDone(false);
319 if (!ctx_->sent_initial_metadata_) {
321 ctx_->initial_metadata_flags());
322 if (ctx_->compression_level_set()) {
323 finish_ops_.set_compression_level(ctx_->compression_level());
325 ctx_->sent_initial_metadata_ =
true;
329 finish_ops_.ServerSendStatus(
330 &ctx_->trailing_metadata_,
331 finish_ops_.SendMessagePtr(&resp_, ctx_->memory_allocator()));
333 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
335 finish_ops_.set_core_cq_tag(&finish_tag_);
336 call_.PerformOps(&finish_ops_);
339 void SendInitialMetadata()
override {
340 ABSL_CHECK(!ctx_->sent_initial_metadata_);
348 ServerReadReactor<RequestType>* reactor =
349 reactor_.load(std::memory_order_relaxed);
350 reactor->OnSendInitialMetadataDone(ok);
351 this->MaybeDone(true);
354 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
355 ctx_->initial_metadata_flags());
356 if (ctx_->compression_level_set()) {
357 meta_ops_.set_compression_level(ctx_->compression_level());
359 ctx_->sent_initial_metadata_ =
true;
360 meta_ops_.set_core_cq_tag(&meta_tag_);
361 call_.PerformOps(&meta_ops_);
364 void Read(RequestType* req)
override {
366 read_ops_.RecvMessage(req);
367 call_.PerformOps(&read_ops_);
375 std::function<
void()> call_requester)
376 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
380 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
381 reactor_.store(reactor, std::memory_order_relaxed);
387 [
this, reactor](
bool ok) {
388 if (GPR_UNLIKELY(!ok)) {
389 ctx_->MaybeMarkCancelledOnRead();
391 reactor->OnReadDone(ok);
392 this->MaybeDone(
true);
395 read_ops_.set_core_cq_tag(&read_tag_);
404 ~ServerCallbackReaderImpl() {}
406 ResponseType* response() {
return &resp_; }
408 void CallOnDone()
override {
409 reactor_.load(std::memory_order_relaxed)->OnDone();
411 auto call_requester = std::move(call_requester_);
412 if (ctx_->context_allocator() !=
nullptr) {
413 ctx_->context_allocator()->Release(ctx_);
415 this->~ServerCallbackReaderImpl();
420 ServerReactor* reactor()
override {
421 return reactor_.load(std::memory_order_relaxed);
439 std::function<void()> call_requester_;
441 std::atomic<ServerReadReactor<RequestType>*> reactor_;
443 std::atomic<intptr_t> callbacks_outstanding_{
448 template <
class RequestType,
class ResponseType>
455 : get_reactor_(
std::move(get_reactor)) {}
461 sizeof(ServerCallbackWriterImpl)))
462 ServerCallbackWriterImpl(
464 param.call,
static_cast<RequestType*
>(param.request),
465 param.call_requester);
469 param.server_context->BeginCompletionOp(
471 [writer](
bool) { writer->MaybeDone(
false); },
475 if (param.status.ok()) {
482 if (reactor ==
nullptr) {
490 writer->SetupReactor(reactor);
504 request->~RequestType();
525 this->MaybeDone(false);
528 finish_ops_.set_core_cq_tag(&finish_tag_);
530 if (!ctx_->sent_initial_metadata_) {
532 ctx_->initial_metadata_flags());
533 if (ctx_->compression_level_set()) {
534 finish_ops_.set_compression_level(ctx_->compression_level());
536 ctx_->sent_initial_metadata_ =
true;
538 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
539 call_.PerformOps(&finish_ops_);
542 void SendInitialMetadata()
override {
543 ABSL_CHECK(!ctx_->sent_initial_metadata_);
551 ServerWriteReactor<ResponseType>* reactor =
552 reactor_.load(std::memory_order_relaxed);
553 reactor->OnSendInitialMetadataDone(ok);
554 this->MaybeDone(true);
557 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
558 ctx_->initial_metadata_flags());
559 if (ctx_->compression_level_set()) {
560 meta_ops_.set_compression_level(ctx_->compression_level());
562 ctx_->sent_initial_metadata_ =
true;
563 meta_ops_.set_core_cq_tag(&meta_tag_);
564 call_.PerformOps(&meta_ops_);
572 if (!ctx_->sent_initial_metadata_) {
573 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574 ctx_->initial_metadata_flags());
575 if (ctx_->compression_level_set()) {
576 write_ops_.set_compression_level(ctx_->compression_level());
578 ctx_->sent_initial_metadata_ =
true;
582 write_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
584 call_.PerformOps(&write_ops_);
592 finish_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
594 Finish(std::move(s));
598 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
602 std::function<
void()> call_requester)
606 call_requester_(
std::move(call_requester)) {}
610 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
611 reactor_.store(reactor, std::memory_order_relaxed);
617 [
this, reactor](
bool ok) {
618 reactor->OnWriteDone(ok);
619 this->MaybeDone(true);
622 write_ops_.set_core_cq_tag(&write_tag_);
623 this->BindReactor(reactor);
624 this->MaybeCallOnCancel(reactor);
628 this->MaybeDone(
false);
630 ~ServerCallbackWriterImpl() {
631 if (req_ !=
nullptr) {
632 req_->~RequestType();
636 const RequestType* request() {
return req_; }
638 void CallOnDone()
override {
639 reactor_.load(std::memory_order_relaxed)->OnDone();
641 auto call_requester = std::move(call_requester_);
642 if (ctx_->context_allocator() !=
nullptr) {
643 ctx_->context_allocator()->Release(ctx_);
645 this->~ServerCallbackWriterImpl();
650 ServerReactor* reactor()
override {
651 return reactor_.load(std::memory_order_relaxed);
669 const RequestType* req_;
670 std::function<void()> call_requester_;
672 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
674 std::atomic<intptr_t> callbacks_outstanding_{
679 template <
class RequestType,
class ResponseType>
686 : get_reactor_(
std::move(get_reactor)) {}
691 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
692 ServerCallbackReaderWriterImpl(
694 param.call, param.call_requester);
698 param.server_context->BeginCompletionOp(
700 [stream](
bool) { stream->MaybeDone(
false); },
704 if (param.status.ok()) {
711 if (reactor ==
nullptr) {
720 stream->SetupReactor(reactor);
724 std::function<ServerBidiReactor<RequestType, ResponseType>*(
728 class ServerCallbackReaderWriterImpl
741 this->MaybeDone(false);
744 finish_ops_.set_core_cq_tag(&finish_tag_);
746 if (!ctx_->sent_initial_metadata_) {
748 ctx_->initial_metadata_flags());
749 if (ctx_->compression_level_set()) {
750 finish_ops_.set_compression_level(ctx_->compression_level());
752 ctx_->sent_initial_metadata_ =
true;
754 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
755 call_.PerformOps(&finish_ops_);
758 void SendInitialMetadata()
override {
759 ABSL_CHECK(!ctx_->sent_initial_metadata_);
767 ServerBidiReactor<RequestType, ResponseType>* reactor =
768 reactor_.load(std::memory_order_relaxed);
769 reactor->OnSendInitialMetadataDone(ok);
770 this->MaybeDone(true);
773 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
774 ctx_->initial_metadata_flags());
775 if (ctx_->compression_level_set()) {
776 meta_ops_.set_compression_level(ctx_->compression_level());
778 ctx_->sent_initial_metadata_ =
true;
779 meta_ops_.set_core_cq_tag(&meta_tag_);
780 call_.PerformOps(&meta_ops_);
788 if (!ctx_->sent_initial_metadata_) {
789 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
790 ctx_->initial_metadata_flags());
791 if (ctx_->compression_level_set()) {
792 write_ops_.set_compression_level(ctx_->compression_level());
794 ctx_->sent_initial_metadata_ =
true;
798 write_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
800 call_.PerformOps(&write_ops_);
807 finish_ops_.SendMessagePtr(resp, options, ctx_->memory_allocator())
809 Finish(std::move(s));
812 void Read(RequestType* req)
override {
814 read_ops_.RecvMessage(req);
815 call_.PerformOps(&read_ops_);
819 friend class CallbackBidiHandler<RequestType, ResponseType>;
823 std::function<
void()> call_requester)
824 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
828 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
829 reactor_.store(reactor, std::memory_order_relaxed);
835 [
this, reactor](
bool ok) {
836 reactor->OnWriteDone(ok);
837 this->MaybeDone(true);
840 write_ops_.set_core_cq_tag(&write_tag_);
843 [
this, reactor](
bool ok) {
844 if (GPR_UNLIKELY(!ok)) {
845 ctx_->MaybeMarkCancelledOnRead();
847 reactor->OnReadDone(ok);
848 this->MaybeDone(
true);
851 read_ops_.set_core_cq_tag(&read_tag_);
852 this->BindReactor(reactor);
853 this->MaybeCallOnCancel(reactor);
857 this->MaybeDone(
false);
860 void CallOnDone()
override {
861 reactor_.load(std::memory_order_relaxed)->OnDone();
863 auto call_requester = std::move(call_requester_);
864 if (ctx_->context_allocator() !=
nullptr) {
865 ctx_->context_allocator()->Release(ctx_);
867 this->~ServerCallbackReaderWriterImpl();
872 ServerReactor* reactor()
override {
873 return reactor_.load(std::memory_order_relaxed);
894 std::function<void()> call_requester_;
896 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
898 std::atomic<intptr_t> callbacks_outstanding_{
906 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H