18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
32 template <
class RequestType,
class ResponseType>
37 const RequestType*, ResponseType*)>
39 : get_reactor_(
std::move(get_reactor)) {}
43 allocator_ = allocator;
49 auto* allocator_state =
54 sizeof(ServerCallbackUnaryImpl)))
55 ServerCallbackUnaryImpl(
57 param.call, allocator_state, param.call_requester);
58 param.server_context->BeginCompletionOp(
59 param.call, [call](
bool) { call->MaybeDone(); }, call);
62 if (param.status.ok()) {
63 reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
69 if (reactor ==
nullptr) {
78 call->SetupReactor(reactor);
85 RequestType* request =
nullptr;
87 if (allocator_ !=
nullptr) {
94 *handler_data = allocator_state;
95 request = allocator_state->
request();
107 const RequestType*, ResponseType*)>
124 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
127 finish_ops_.set_core_cq_tag(&finish_tag_);
129 if (!ctx_->sent_initial_metadata_) {
131 ctx_->initial_metadata_flags());
132 if (ctx_->compression_level_set()) {
133 finish_ops_.set_compression_level(ctx_->compression_level());
135 ctx_->sent_initial_metadata_ =
true;
139 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
140 finish_ops_.SendMessagePtr(response()));
142 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
144 finish_ops_.set_core_cq_tag(&finish_tag_);
145 call_.PerformOps(&finish_ops_);
148 void SendInitialMetadata()
override {
159 ServerUnaryReactor* reactor =
160 reactor_.load(std::memory_order_relaxed);
161 reactor->OnSendInitialMetadataDone(ok);
162 this->MaybeDone(true);
165 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
166 ctx_->initial_metadata_flags());
167 if (ctx_->compression_level_set()) {
168 meta_ops_.set_compression_level(ctx_->compression_level());
170 ctx_->sent_initial_metadata_ =
true;
171 meta_ops_.set_core_cq_tag(&meta_tag_);
172 call_.PerformOps(&meta_ops_);
178 ServerCallbackUnaryImpl(
180 MessageHolder<RequestType, ResponseType>* allocator_state,
181 std::function<
void()> call_requester)
184 allocator_state_(allocator_state),
185 call_requester_(
std::move(call_requester)) {
186 ctx_->set_message_allocator_state(allocator_state);
193 void SetupReactor(ServerUnaryReactor* reactor) {
194 reactor_.store(reactor, std::memory_order_relaxed);
197 this->
MaybeDone(reactor->InternalInlineable());
200 const RequestType* request() {
return allocator_state_->request(); }
201 ResponseType* response() {
return allocator_state_->response(); }
203 void CallOnDone()
override {
204 reactor_.load(std::memory_order_relaxed)->OnDone();
206 auto call_requester = std::move(call_requester_);
207 allocator_state_->Release();
208 if (ctx_->context_allocator() !=
nullptr) {
209 ctx_->context_allocator()->Release(ctx_);
211 this->~ServerCallbackUnaryImpl();
216 ServerReactor* reactor()
override {
217 return reactor_.load(std::memory_order_relaxed);
231 MessageHolder<RequestType, ResponseType>*
const allocator_state_;
232 std::function<void()> call_requester_;
243 std::atomic<ServerUnaryReactor*> reactor_;
245 std::atomic<intptr_t> callbacks_outstanding_{
250 template <
class RequestType,
class ResponseType>
257 : get_reactor_(
std::move(get_reactor)) {}
263 sizeof(ServerCallbackReaderImpl)))
264 ServerCallbackReaderImpl(
266 param.call, param.call_requester);
270 param.server_context->BeginCompletionOp(
272 [reader](
bool) { reader->MaybeDone(
false); },
276 if (param.status.ok()) {
278 grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
284 if (reactor ==
nullptr) {
292 reader->SetupReactor(reactor);
312 this->MaybeDone(false);
315 if (!ctx_->sent_initial_metadata_) {
317 ctx_->initial_metadata_flags());
318 if (ctx_->compression_level_set()) {
319 finish_ops_.set_compression_level(ctx_->compression_level());
321 ctx_->sent_initial_metadata_ =
true;
325 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
326 finish_ops_.SendMessagePtr(&resp_));
328 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
330 finish_ops_.set_core_cq_tag(&finish_tag_);
331 call_.PerformOps(&finish_ops_);
334 void SendInitialMetadata()
override {
343 ServerReadReactor<RequestType>* reactor =
344 reactor_.load(std::memory_order_relaxed);
345 reactor->OnSendInitialMetadataDone(ok);
346 this->MaybeDone(true);
349 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
350 ctx_->initial_metadata_flags());
351 if (ctx_->compression_level_set()) {
352 meta_ops_.set_compression_level(ctx_->compression_level());
354 ctx_->sent_initial_metadata_ =
true;
355 meta_ops_.set_core_cq_tag(&meta_tag_);
356 call_.PerformOps(&meta_ops_);
359 void Read(RequestType* req)
override {
361 read_ops_.RecvMessage(req);
362 call_.PerformOps(&read_ops_);
370 std::function<
void()> call_requester)
371 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
373 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
374 reactor_.store(reactor, std::memory_order_relaxed);
380 [
this, reactor](
bool ok) {
381 if (GPR_UNLIKELY(!ok)) {
382 ctx_->MaybeMarkCancelledOnRead();
384 reactor->OnReadDone(ok);
385 this->MaybeDone(
true);
388 read_ops_.set_core_cq_tag(&read_tag_);
397 ~ServerCallbackReaderImpl() {}
399 ResponseType* response() {
return &resp_; }
401 void CallOnDone()
override {
402 reactor_.load(std::memory_order_relaxed)->OnDone();
404 auto call_requester = std::move(call_requester_);
405 if (ctx_->context_allocator() !=
nullptr) {
406 ctx_->context_allocator()->Release(ctx_);
408 this->~ServerCallbackReaderImpl();
413 ServerReactor* reactor()
override {
414 return reactor_.load(std::memory_order_relaxed);
432 std::function<void()> call_requester_;
434 std::atomic<ServerReadReactor<RequestType>*> reactor_;
436 std::atomic<intptr_t> callbacks_outstanding_{
441 template <
class RequestType,
class ResponseType>
448 : get_reactor_(
std::move(get_reactor)) {}
454 sizeof(ServerCallbackWriterImpl)))
455 ServerCallbackWriterImpl(
457 param.call,
static_cast<RequestType*
>(param.request),
458 param.call_requester);
462 param.server_context->BeginCompletionOp(
464 [writer](
bool) { writer->MaybeDone(
false); },
468 if (param.status.ok()) {
475 if (reactor ==
nullptr) {
483 writer->SetupReactor(reactor);
498 request->~RequestType();
519 this->MaybeDone(false);
522 finish_ops_.set_core_cq_tag(&finish_tag_);
524 if (!ctx_->sent_initial_metadata_) {
526 ctx_->initial_metadata_flags());
527 if (ctx_->compression_level_set()) {
528 finish_ops_.set_compression_level(ctx_->compression_level());
530 ctx_->sent_initial_metadata_ =
true;
532 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533 call_.PerformOps(&finish_ops_);
536 void SendInitialMetadata()
override {
545 ServerWriteReactor<ResponseType>* reactor =
546 reactor_.load(std::memory_order_relaxed);
547 reactor->OnSendInitialMetadataDone(ok);
548 this->MaybeDone(true);
551 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
552 ctx_->initial_metadata_flags());
553 if (ctx_->compression_level_set()) {
554 meta_ops_.set_compression_level(ctx_->compression_level());
556 ctx_->sent_initial_metadata_ =
true;
557 meta_ops_.set_core_cq_tag(&meta_tag_);
558 call_.PerformOps(&meta_ops_);
566 if (!ctx_->sent_initial_metadata_) {
567 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568 ctx_->initial_metadata_flags());
569 if (ctx_->compression_level_set()) {
570 write_ops_.set_compression_level(ctx_->compression_level());
572 ctx_->sent_initial_metadata_ =
true;
575 GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576 call_.PerformOps(&write_ops_);
583 GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
584 Finish(std::move(s));
588 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
592 std::function<
void()> call_requester)
596 call_requester_(
std::move(call_requester)) {}
598 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
599 reactor_.store(reactor, std::memory_order_relaxed);
605 [
this, reactor](
bool ok) {
606 reactor->OnWriteDone(ok);
607 this->MaybeDone(true);
610 write_ops_.set_core_cq_tag(&write_tag_);
611 this->BindReactor(reactor);
612 this->MaybeCallOnCancel(reactor);
616 this->MaybeDone(
false);
618 ~ServerCallbackWriterImpl() {
619 if (req_ !=
nullptr) {
620 req_->~RequestType();
624 const RequestType* request() {
return req_; }
626 void CallOnDone()
override {
627 reactor_.load(std::memory_order_relaxed)->OnDone();
629 auto call_requester = std::move(call_requester_);
630 if (ctx_->context_allocator() !=
nullptr) {
631 ctx_->context_allocator()->Release(ctx_);
633 this->~ServerCallbackWriterImpl();
638 ServerReactor* reactor()
override {
639 return reactor_.load(std::memory_order_relaxed);
657 const RequestType* req_;
658 std::function<void()> call_requester_;
660 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
662 std::atomic<intptr_t> callbacks_outstanding_{
667 template <
class RequestType,
class ResponseType>
674 : get_reactor_(
std::move(get_reactor)) {}
679 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
680 ServerCallbackReaderWriterImpl(
682 param.call, param.call_requester);
686 param.server_context->BeginCompletionOp(
688 [stream](
bool) { stream->MaybeDone(
false); },
692 if (param.status.ok()) {
699 if (reactor ==
nullptr) {
708 stream->SetupReactor(reactor);
712 std::function<ServerBidiReactor<RequestType, ResponseType>*(
716 class ServerCallbackReaderWriterImpl
729 this->MaybeDone(false);
732 finish_ops_.set_core_cq_tag(&finish_tag_);
734 if (!ctx_->sent_initial_metadata_) {
736 ctx_->initial_metadata_flags());
737 if (ctx_->compression_level_set()) {
738 finish_ops_.set_compression_level(ctx_->compression_level());
740 ctx_->sent_initial_metadata_ =
true;
742 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
743 call_.PerformOps(&finish_ops_);
746 void SendInitialMetadata()
override {
755 ServerBidiReactor<RequestType, ResponseType>* reactor =
756 reactor_.load(std::memory_order_relaxed);
757 reactor->OnSendInitialMetadataDone(ok);
758 this->MaybeDone(true);
761 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
762 ctx_->initial_metadata_flags());
763 if (ctx_->compression_level_set()) {
764 meta_ops_.set_compression_level(ctx_->compression_level());
766 ctx_->sent_initial_metadata_ =
true;
767 meta_ops_.set_core_cq_tag(&meta_tag_);
768 call_.PerformOps(&meta_ops_);
776 if (!ctx_->sent_initial_metadata_) {
777 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
778 ctx_->initial_metadata_flags());
779 if (ctx_->compression_level_set()) {
780 write_ops_.set_compression_level(ctx_->compression_level());
782 ctx_->sent_initial_metadata_ =
true;
785 GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
786 call_.PerformOps(&write_ops_);
792 GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
793 Finish(std::move(s));
796 void Read(RequestType* req)
override {
798 read_ops_.RecvMessage(req);
799 call_.PerformOps(&read_ops_);
803 friend class CallbackBidiHandler<RequestType, ResponseType>;
807 std::function<
void()> call_requester)
808 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
810 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
811 reactor_.store(reactor, std::memory_order_relaxed);
817 [
this, reactor](
bool ok) {
818 reactor->OnWriteDone(ok);
819 this->MaybeDone(true);
822 write_ops_.set_core_cq_tag(&write_tag_);
825 [
this, reactor](
bool ok) {
826 if (GPR_UNLIKELY(!ok)) {
827 ctx_->MaybeMarkCancelledOnRead();
829 reactor->OnReadDone(ok);
830 this->MaybeDone(
true);
833 read_ops_.set_core_cq_tag(&read_tag_);
834 this->BindReactor(reactor);
835 this->MaybeCallOnCancel(reactor);
839 this->MaybeDone(
false);
842 void CallOnDone()
override {
843 reactor_.load(std::memory_order_relaxed)->OnDone();
845 auto call_requester = std::move(call_requester_);
846 if (ctx_->context_allocator() !=
nullptr) {
847 ctx_->context_allocator()->Release(ctx_);
849 this->~ServerCallbackReaderWriterImpl();
854 ServerReactor* reactor()
override {
855 return reactor_.load(std::memory_order_relaxed);
876 std::function<void()> call_requester_;
878 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
880 std::atomic<intptr_t> callbacks_outstanding_{
888 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H