Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H
20 #define GRPCPP_SUPPORT_ASYNC_STREAM_H
29 #include "absl/log/absl_check.h"
105 virtual void Read(R* msg,
void* tag) = 0;
126 virtual void Write(
const W& msg,
void* tag) = 0;
177 class ClientAsyncReaderFactory {
191 const W& request,
bool start,
void* tag) {
204 class ClientAsyncReader final :
public ClientAsyncReaderInterface<R> {
207 static void operator delete(
void* , std::size_t size) {
216 static void operator delete(
void*,
void*) { ABSL_CHECK(
false); }
219 ABSL_CHECK(!started_);
221 StartCallInternal(tag);
233 ABSL_CHECK(started_);
234 ABSL_CHECK(!context_->initial_metadata_received_);
237 meta_ops_.RecvInitialMetadata(context_);
241 void Read(R* msg,
void* tag)
override {
242 ABSL_CHECK(started_);
244 if (!context_->initial_metadata_received_) {
245 read_ops_.RecvInitialMetadata(context_);
247 read_ops_.RecvMessage(msg);
257 ABSL_CHECK(started_);
259 if (!context_->initial_metadata_received_) {
260 finish_ops_.RecvInitialMetadata(context_);
262 finish_ops_.ClientRecvStatus(context_, status);
272 : channel_(channel), context_(context), call_(call), started_(start) {
276 init_ops_.ClientSendClose();
278 StartCallInternal(tag);
280 ABSL_CHECK(tag ==
nullptr);
284 void StartCallInternal(
void* tag) {
285 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
286 context_->initial_metadata_flags());
324 class ClientAsyncWriterFactory {
342 bool start,
void* tag) {
355 class ClientAsyncWriter final :
public ClientAsyncWriterInterface<W> {
358 static void operator delete(
void* , std::size_t size) {
367 static void operator delete(
void*,
void*) { ABSL_CHECK(
false); }
370 ABSL_CHECK(!started_);
372 StartCallInternal(tag);
383 ABSL_CHECK(started_);
384 ABSL_CHECK(!context_->initial_metadata_received_);
387 meta_ops_.RecvInitialMetadata(context_);
391 void Write(
const W& msg,
void* tag)
override {
392 ABSL_CHECK(started_);
400 ABSL_CHECK(started_);
404 write_ops_.ClientSendClose();
415 ABSL_CHECK(started_);
417 write_ops_.ClientSendClose();
429 ABSL_CHECK(started_);
431 if (!context_->initial_metadata_received_) {
432 finish_ops_.RecvInitialMetadata(context_);
434 finish_ops_.ClientRecvStatus(context_, status);
444 : channel_(channel), context_(context), call_(call), started_(start) {
445 finish_ops_.RecvMessage(response);
446 finish_ops_.AllowNoMessage();
448 StartCallInternal(tag);
450 ABSL_CHECK(tag ==
nullptr);
454 void StartCallInternal(
void* tag) {
455 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
456 context_->initial_metadata_flags());
459 if (!context_->initial_metadata_corked_) {
484 template <
class W,
class R>
498 template <
class W,
class R>
499 class ClientAsyncReaderWriterFactory {
511 bool start,
void* tag) {
525 template <
class W,
class R>
526 class ClientAsyncReaderWriter final
527 :
public ClientAsyncReaderWriterInterface<W, R> {
530 static void operator delete(
void* , std::size_t size) {
539 static void operator delete(
void*,
void*) { ABSL_CHECK(
false); }
542 ABSL_CHECK(!started_);
544 StartCallInternal(tag);
555 ABSL_CHECK(started_);
556 ABSL_CHECK(!context_->initial_metadata_received_);
559 meta_ops_.RecvInitialMetadata(context_);
563 void Read(R* msg,
void* tag)
override {
564 ABSL_CHECK(started_);
566 if (!context_->initial_metadata_received_) {
567 read_ops_.RecvInitialMetadata(context_);
569 read_ops_.RecvMessage(msg);
573 void Write(
const W& msg,
void* tag)
override {
574 ABSL_CHECK(started_);
582 ABSL_CHECK(started_);
586 write_ops_.ClientSendClose();
596 ABSL_CHECK(started_);
598 write_ops_.ClientSendClose();
607 ABSL_CHECK(started_);
609 if (!context_->initial_metadata_received_) {
610 finish_ops_.RecvInitialMetadata(context_);
612 finish_ops_.ClientRecvStatus(context_, status);
621 : channel_(channel), context_(context), call_(call), started_(start) {
623 StartCallInternal(tag);
625 ABSL_CHECK(tag ==
nullptr);
629 void StartCallInternal(
void* tag) {
630 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
631 context_->initial_metadata_flags());
634 if (!context_->initial_metadata_corked_) {
658 template <
class W,
class R>
714 template <
class W,
class R>
718 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
726 ABSL_CHECK(!ctx_->sent_initial_metadata_);
729 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
730 ctx_->initial_metadata_flags());
734 ctx_->sent_initial_metadata_ =
true;
738 void Read(R* msg,
void* tag)
override {
740 read_ops_.RecvMessage(msg);
757 if (!ctx_->sent_initial_metadata_) {
758 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
759 ctx_->initial_metadata_flags());
763 ctx_->sent_initial_metadata_ =
true;
767 finish_ops_.ServerSendStatus(
768 &ctx_->trailing_metadata_,
771 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
786 ABSL_CHECK(!status.
ok());
788 if (!ctx_->sent_initial_metadata_) {
789 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
790 ctx_->initial_metadata_flags());
794 ctx_->sent_initial_metadata_ =
true;
796 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
866 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
876 ABSL_CHECK(!ctx_->sent_initial_metadata_);
879 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
880 ctx_->initial_metadata_flags());
884 ctx_->sent_initial_metadata_ =
true;
888 void Write(
const W& msg,
void* tag)
override {
890 EnsureInitialMetadataSent(&write_ops_);
902 EnsureInitialMetadataSent(&write_ops_);
922 EnsureInitialMetadataSent(&write_ops_);
926 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
943 EnsureInitialMetadataSent(&finish_ops_);
944 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
952 void EnsureInitialMetadataSent(T* ops) {
953 if (!ctx_->sent_initial_metadata_) {
954 ops->SendInitialMetadata(&ctx_->initial_metadata_,
955 ctx_->initial_metadata_flags());
959 ctx_->sent_initial_metadata_ =
true;
977 template <
class W,
class R>
1029 template <
class W,
class R>
1034 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1044 ABSL_CHECK(!ctx_->sent_initial_metadata_);
1047 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1048 ctx_->initial_metadata_flags());
1052 ctx_->sent_initial_metadata_ =
true;
1056 void Read(R* msg,
void* tag)
override {
1058 read_ops_.RecvMessage(msg);
1062 void Write(
const W& msg,
void* tag)
override {
1064 EnsureInitialMetadataSent(&write_ops_);
1075 EnsureInitialMetadataSent(&write_ops_);
1095 EnsureInitialMetadataSent(&write_ops_);
1099 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1116 EnsureInitialMetadataSent(&finish_ops_);
1118 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1128 void EnsureInitialMetadataSent(T* ops) {
1129 if (!ctx_->sent_initial_metadata_) {
1130 ops->SendInitialMetadata(&ctx_->initial_metadata_,
1131 ctx_->initial_metadata_flags());
1135 ctx_->sent_initial_metadata_ =
true;
1155 #endif // GRPCPP_SUPPORT_ASYNC_STREAM_H
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:606
void Read(R *msg, void *tag) override
Definition: async_stream.h:1056
void StartCall(void *tag) override
Definition: async_stream.h:369
Definition: service_type.h:38
Definition: call_op_set.h:623
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
Definition: call_op_set.h:530
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:110
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:586
Definition: call_op_set.h:658
Represents a gRPC server.
Definition: server.h:58
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics of this method.
Definition: async_stream.h:554
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:97
void Read(R *msg, void *tag) override
Definition: async_stream.h:563
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:37
Definition: call_op_set.h:289
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:1070
void Write(const W &msg, void *tag) override
Definition: async_stream.h:391
Async server-side API for doing server streaming RPCs, where the outgoing message stream from the ser...
Definition: server_context.h:58
Definition: channel_interface.h:43
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:158
void StartCall(void *tag) override
Definition: async_stream.h:218
void Finish(const W &msg, const grpc::Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:755
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes (half-close the client stream).
void Write(const W &msg, void *tag) override
Definition: async_stream.h:888
static ClientAsyncWriter< W > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, R *response, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:338
Straightforward wrapping of the C call object.
Definition: call.h:36
grpc_event_engine::experimental::MemoryAllocator * memory_allocator()
Definition: server_context.h:300
void WritesDone(void *tag) override
Definition: async_stream.h:595
ServerAsyncReader(grpc::ServerContext *ctx)
Definition: async_stream.h:717
void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:1092
virtual void FinishWithError(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain non-OK status code.
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
static ClientAsyncReaderWriter< W, R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:508
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:232
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:399
bool ok() const
Is the status OK?
Definition: status.h:125
void Write(const W &msg, void *tag) override
Definition: async_stream.h:1062
void Read(R *msg, void *tag) override
Definition: async_stream.h:738
Async client-side interface for bi-directional streaming, where the client-to-server message stream h...
Definition: async_stream.h:485
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:428
ServerAsyncReaderWriter(grpc::ServerContext *ctx)
Definition: async_stream.h:1033
Definition: async_stream.h:815
void WriteLast(const W &msg, grpc::WriteOptions options, void *tag)
Request the writing of msg and coalesce it with the writing of trailing metadata, using WriteOptions ...
Definition: async_stream.h:163
Did it work? If it didn't, why?
Definition: status.h:34
Definition: channel_interface.h:39
virtual void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag)=0
Request the writing of msg and coalesce it with trailing metadata which contains status,...
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:35
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236
virtual void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag)=0
Request the writing of msg and coalesce it with trailing metadata which contains status,...
void WritesDone(void *tag) override
Definition: async_stream.h:414
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:194
Server-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:978
virtual void Finish(const W &msg, const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code and also send out msg response ...
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:896
Async client-side API for doing server-streaming RPCs, where the incoming message stream coming from ...
Definition: client_context.h:86
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:1043
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes (half-close the client stream).
virtual void Finish(grpc::Status *status, void *tag)=0
Indicate that the stream is to be finished and request notification for when the call has been ended.
Codegen interface for grpc::Channel.
Definition: channel_interface.h:72
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:382
void Finish(const grpc::Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.Finish method for semantics.
Definition: async_stream.h:1114
virtual void StartCall(void *tag)=0
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
static ClientAsyncReader< R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:187
virtual void Write(const W &msg, void *tag)=0
Request the writing of msg with identifying tag tag.
Common interface for client side asynchronous writing.
Definition: async_stream.h:311
void Read(R *msg, void *tag) override
Definition: async_stream.h:241
virtual ~AsyncWriterInterface()
Definition: async_stream.h:112
virtual void Read(R *msg, void *tag)=0
Read a message of type R into msg.
void FinishWithError(const grpc::Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:785
void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag) override
See the ServerAsyncWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:919
Async API on the client side for doing client-streaming RPCs, where the outgoing message stream going...
Definition: client_context.h:88
void set_output_tag(void *return_tag)
Definition: call_op_set.h:941
Per-message write options.
Definition: call_op_set.h:81
virtual void Finish(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code.
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:88
Async client-side interface for bi-directional streaming, where the outgoing message stream going to ...
Definition: client_context.h:90
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:725
void Write(const W &msg, void *tag) override
Definition: async_stream.h:573
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:581
grpc_call * call() const
Definition: call.h:70
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
Definition: call_op_set.h:773
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:119
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:104
Definition: channel_interface.h:41
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:256
virtual ~AsyncReaderInterface()
Definition: async_stream.h:90
void StartCall(void *tag) override
Definition: async_stream.h:541
Definition: async_stream.h:659
void Finish(const grpc::Status &status, void *tag) override
See the ServerAsyncWriterInterface.Finish method for semantics.
Definition: async_stream.h:941
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:875
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:174
Async server-side API for doing client-streaming RPCs, where the incoming message stream from the cli...
Definition: server_context.h:56
Definition: call_op_set.h:429
Descriptor of an RPC method.
Definition: rpc_method.h:29
Async server-side API for doing bidirectional streaming RPCs, where the incoming message stream comin...
Definition: server_context.h:62
ServerAsyncWriter(grpc::ServerContext *ctx)
Definition: async_stream.h:865
virtual grpc_event_engine::experimental::MemoryAllocator * memory_allocator() const
Definition: channel_interface.h:106
Definition: async_stream.h:171
virtual void Finish(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code.