18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
29 #include "absl/log/absl_check.h"
34 template <
class RequestType,
class ResponseType>
39 const RequestType*, ResponseType*)>
41 : get_reactor_(
std::move(get_reactor)) {}
45 allocator_ = allocator;
51 auto* allocator_state =
56 sizeof(ServerCallbackUnaryImpl)))
57 ServerCallbackUnaryImpl(
59 param.call, allocator_state, param.call_requester);
60 param.server_context->BeginCompletionOp(
61 param.call, [call](
bool) { call->MaybeDone(); }, call);
64 if (param.status.ok()) {
65 reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
71 if (reactor ==
nullptr) {
80 call->SetupReactor(reactor);
87 RequestType* request =
nullptr;
89 if (allocator_ !=
nullptr) {
96 *handler_data = allocator_state;
97 request = allocator_state->
request();
109 const RequestType*, ResponseType*)>
126 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
129 finish_ops_.set_core_cq_tag(&finish_tag_);
131 if (!ctx_->sent_initial_metadata_) {
133 ctx_->initial_metadata_flags());
134 if (ctx_->compression_level_set()) {
135 finish_ops_.set_compression_level(ctx_->compression_level());
137 ctx_->sent_initial_metadata_ =
true;
141 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
142 finish_ops_.SendMessagePtr(response()));
144 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
146 finish_ops_.set_core_cq_tag(&finish_tag_);
147 call_.PerformOps(&finish_ops_);
150 void SendInitialMetadata()
override {
151 ABSL_CHECK(!ctx_->sent_initial_metadata_);
161 ServerUnaryReactor* reactor =
162 reactor_.load(std::memory_order_relaxed);
163 reactor->OnSendInitialMetadataDone(ok);
164 this->MaybeDone(true);
167 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
168 ctx_->initial_metadata_flags());
169 if (ctx_->compression_level_set()) {
170 meta_ops_.set_compression_level(ctx_->compression_level());
172 ctx_->sent_initial_metadata_ =
true;
173 meta_ops_.set_core_cq_tag(&meta_tag_);
174 call_.PerformOps(&meta_ops_);
180 ServerCallbackUnaryImpl(
182 MessageHolder<RequestType, ResponseType>* allocator_state,
183 std::function<
void()> call_requester)
186 allocator_state_(allocator_state),
187 call_requester_(
std::move(call_requester)) {
188 ctx_->set_message_allocator_state(allocator_state);
197 void SetupReactor(ServerUnaryReactor* reactor) {
198 reactor_.store(reactor, std::memory_order_relaxed);
201 this->
MaybeDone(reactor->InternalInlineable());
204 const RequestType* request() {
return allocator_state_->request(); }
205 ResponseType* response() {
return allocator_state_->response(); }
207 void CallOnDone()
override {
208 reactor_.load(std::memory_order_relaxed)->OnDone();
210 auto call_requester = std::move(call_requester_);
211 allocator_state_->Release();
212 if (ctx_->context_allocator() !=
nullptr) {
213 ctx_->context_allocator()->Release(ctx_);
215 this->~ServerCallbackUnaryImpl();
220 ServerReactor* reactor()
override {
221 return reactor_.load(std::memory_order_relaxed);
235 MessageHolder<RequestType, ResponseType>*
const allocator_state_;
236 std::function<void()> call_requester_;
247 std::atomic<ServerUnaryReactor*> reactor_;
249 std::atomic<intptr_t> callbacks_outstanding_{
254 template <
class RequestType,
class ResponseType>
261 : get_reactor_(
std::move(get_reactor)) {}
267 sizeof(ServerCallbackReaderImpl)))
268 ServerCallbackReaderImpl(
270 param.call, param.call_requester);
274 param.server_context->BeginCompletionOp(
276 [reader](
bool) { reader->MaybeDone(
false); },
280 if (param.status.ok()) {
282 grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
288 if (reactor ==
nullptr) {
296 reader->SetupReactor(reactor);
316 this->MaybeDone(false);
319 if (!ctx_->sent_initial_metadata_) {
321 ctx_->initial_metadata_flags());
322 if (ctx_->compression_level_set()) {
323 finish_ops_.set_compression_level(ctx_->compression_level());
325 ctx_->sent_initial_metadata_ =
true;
329 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
330 finish_ops_.SendMessagePtr(&resp_));
332 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
334 finish_ops_.set_core_cq_tag(&finish_tag_);
335 call_.PerformOps(&finish_ops_);
338 void SendInitialMetadata()
override {
339 ABSL_CHECK(!ctx_->sent_initial_metadata_);
347 ServerReadReactor<RequestType>* reactor =
348 reactor_.load(std::memory_order_relaxed);
349 reactor->OnSendInitialMetadataDone(ok);
350 this->MaybeDone(true);
353 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354 ctx_->initial_metadata_flags());
355 if (ctx_->compression_level_set()) {
356 meta_ops_.set_compression_level(ctx_->compression_level());
358 ctx_->sent_initial_metadata_ =
true;
359 meta_ops_.set_core_cq_tag(&meta_tag_);
360 call_.PerformOps(&meta_ops_);
363 void Read(RequestType* req)
override {
365 read_ops_.RecvMessage(req);
366 call_.PerformOps(&read_ops_);
374 std::function<
void()> call_requester)
375 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
379 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
380 reactor_.store(reactor, std::memory_order_relaxed);
386 [
this, reactor](
bool ok) {
387 if (GPR_UNLIKELY(!ok)) {
388 ctx_->MaybeMarkCancelledOnRead();
390 reactor->OnReadDone(ok);
391 this->MaybeDone(
true);
394 read_ops_.set_core_cq_tag(&read_tag_);
403 ~ServerCallbackReaderImpl() {}
405 ResponseType* response() {
return &resp_; }
407 void CallOnDone()
override {
408 reactor_.load(std::memory_order_relaxed)->OnDone();
410 auto call_requester = std::move(call_requester_);
411 if (ctx_->context_allocator() !=
nullptr) {
412 ctx_->context_allocator()->Release(ctx_);
414 this->~ServerCallbackReaderImpl();
419 ServerReactor* reactor()
override {
420 return reactor_.load(std::memory_order_relaxed);
438 std::function<void()> call_requester_;
440 std::atomic<ServerReadReactor<RequestType>*> reactor_;
442 std::atomic<intptr_t> callbacks_outstanding_{
447 template <
class RequestType,
class ResponseType>
454 : get_reactor_(
std::move(get_reactor)) {}
460 sizeof(ServerCallbackWriterImpl)))
461 ServerCallbackWriterImpl(
463 param.call,
static_cast<RequestType*
>(param.request),
464 param.call_requester);
468 param.server_context->BeginCompletionOp(
470 [writer](
bool) { writer->MaybeDone(
false); },
474 if (param.status.ok()) {
481 if (reactor ==
nullptr) {
489 writer->SetupReactor(reactor);
504 request->~RequestType();
525 this->MaybeDone(false);
528 finish_ops_.set_core_cq_tag(&finish_tag_);
530 if (!ctx_->sent_initial_metadata_) {
532 ctx_->initial_metadata_flags());
533 if (ctx_->compression_level_set()) {
534 finish_ops_.set_compression_level(ctx_->compression_level());
536 ctx_->sent_initial_metadata_ =
true;
538 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
539 call_.PerformOps(&finish_ops_);
542 void SendInitialMetadata()
override {
543 ABSL_CHECK(!ctx_->sent_initial_metadata_);
551 ServerWriteReactor<ResponseType>* reactor =
552 reactor_.load(std::memory_order_relaxed);
553 reactor->OnSendInitialMetadataDone(ok);
554 this->MaybeDone(true);
557 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
558 ctx_->initial_metadata_flags());
559 if (ctx_->compression_level_set()) {
560 meta_ops_.set_compression_level(ctx_->compression_level());
562 ctx_->sent_initial_metadata_ =
true;
563 meta_ops_.set_core_cq_tag(&meta_tag_);
564 call_.PerformOps(&meta_ops_);
572 if (!ctx_->sent_initial_metadata_) {
573 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574 ctx_->initial_metadata_flags());
575 if (ctx_->compression_level_set()) {
576 write_ops_.set_compression_level(ctx_->compression_level());
578 ctx_->sent_initial_metadata_ =
true;
581 ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
582 call_.PerformOps(&write_ops_);
589 ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
590 Finish(std::move(s));
594 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
598 std::function<
void()> call_requester)
602 call_requester_(
std::move(call_requester)) {}
606 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
607 reactor_.store(reactor, std::memory_order_relaxed);
613 [
this, reactor](
bool ok) {
614 reactor->OnWriteDone(ok);
615 this->MaybeDone(true);
618 write_ops_.set_core_cq_tag(&write_tag_);
619 this->BindReactor(reactor);
620 this->MaybeCallOnCancel(reactor);
624 this->MaybeDone(
false);
626 ~ServerCallbackWriterImpl() {
627 if (req_ !=
nullptr) {
628 req_->~RequestType();
632 const RequestType* request() {
return req_; }
634 void CallOnDone()
override {
635 reactor_.load(std::memory_order_relaxed)->OnDone();
637 auto call_requester = std::move(call_requester_);
638 if (ctx_->context_allocator() !=
nullptr) {
639 ctx_->context_allocator()->Release(ctx_);
641 this->~ServerCallbackWriterImpl();
646 ServerReactor* reactor()
override {
647 return reactor_.load(std::memory_order_relaxed);
665 const RequestType* req_;
666 std::function<void()> call_requester_;
668 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
670 std::atomic<intptr_t> callbacks_outstanding_{
675 template <
class RequestType,
class ResponseType>
682 : get_reactor_(
std::move(get_reactor)) {}
687 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
688 ServerCallbackReaderWriterImpl(
690 param.call, param.call_requester);
694 param.server_context->BeginCompletionOp(
696 [stream](
bool) { stream->MaybeDone(
false); },
700 if (param.status.ok()) {
707 if (reactor ==
nullptr) {
716 stream->SetupReactor(reactor);
720 std::function<ServerBidiReactor<RequestType, ResponseType>*(
724 class ServerCallbackReaderWriterImpl
737 this->MaybeDone(false);
740 finish_ops_.set_core_cq_tag(&finish_tag_);
742 if (!ctx_->sent_initial_metadata_) {
744 ctx_->initial_metadata_flags());
745 if (ctx_->compression_level_set()) {
746 finish_ops_.set_compression_level(ctx_->compression_level());
748 ctx_->sent_initial_metadata_ =
true;
750 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
751 call_.PerformOps(&finish_ops_);
754 void SendInitialMetadata()
override {
755 ABSL_CHECK(!ctx_->sent_initial_metadata_);
763 ServerBidiReactor<RequestType, ResponseType>* reactor =
764 reactor_.load(std::memory_order_relaxed);
765 reactor->OnSendInitialMetadataDone(ok);
766 this->MaybeDone(true);
769 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
770 ctx_->initial_metadata_flags());
771 if (ctx_->compression_level_set()) {
772 meta_ops_.set_compression_level(ctx_->compression_level());
774 ctx_->sent_initial_metadata_ =
true;
775 meta_ops_.set_core_cq_tag(&meta_tag_);
776 call_.PerformOps(&meta_ops_);
784 if (!ctx_->sent_initial_metadata_) {
785 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
786 ctx_->initial_metadata_flags());
787 if (ctx_->compression_level_set()) {
788 write_ops_.set_compression_level(ctx_->compression_level());
790 ctx_->sent_initial_metadata_ =
true;
793 ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
794 call_.PerformOps(&write_ops_);
800 ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
801 Finish(std::move(s));
804 void Read(RequestType* req)
override {
806 read_ops_.RecvMessage(req);
807 call_.PerformOps(&read_ops_);
811 friend class CallbackBidiHandler<RequestType, ResponseType>;
815 std::function<
void()> call_requester)
816 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
820 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
821 reactor_.store(reactor, std::memory_order_relaxed);
827 [
this, reactor](
bool ok) {
828 reactor->OnWriteDone(ok);
829 this->MaybeDone(true);
832 write_ops_.set_core_cq_tag(&write_tag_);
835 [
this, reactor](
bool ok) {
836 if (GPR_UNLIKELY(!ok)) {
837 ctx_->MaybeMarkCancelledOnRead();
839 reactor->OnReadDone(ok);
840 this->MaybeDone(
true);
843 read_ops_.set_core_cq_tag(&read_tag_);
844 this->BindReactor(reactor);
845 this->MaybeCallOnCancel(reactor);
849 this->MaybeDone(
false);
852 void CallOnDone()
override {
853 reactor_.load(std::memory_order_relaxed)->OnDone();
855 auto call_requester = std::move(call_requester_);
856 if (ctx_->context_allocator() !=
nullptr) {
857 ctx_->context_allocator()->Release(ctx_);
859 this->~ServerCallbackReaderWriterImpl();
864 ServerReactor* reactor()
override {
865 return reactor_.load(std::memory_order_relaxed);
886 std::function<void()> call_requester_;
888 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
890 std::atomic<intptr_t> callbacks_outstanding_{
898 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H