Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H
20 #define GRPCPP_SUPPORT_ASYNC_STREAM_H
22 #include "absl/log/absl_check.h"
102 virtual void Read(R* msg,
void* tag) = 0;
123 virtual void Write(
const W& msg,
void* tag) = 0;
174 class ClientAsyncReaderFactory {
188 const W& request,
bool start,
void* tag) {
201 class ClientAsyncReader final :
public ClientAsyncReaderInterface<R> {
204 static void operator delete(
void* , std::size_t size) {
213 static void operator delete(
void*,
void*) { ABSL_CHECK(
false); }
216 ABSL_CHECK(!started_);
218 StartCallInternal(tag);
230 ABSL_CHECK(started_);
231 ABSL_CHECK(!context_->initial_metadata_received_);
234 meta_ops_.RecvInitialMetadata(context_);
238 void Read(R* msg,
void* tag)
override {
239 ABSL_CHECK(started_);
241 if (!context_->initial_metadata_received_) {
242 read_ops_.RecvInitialMetadata(context_);
244 read_ops_.RecvMessage(msg);
254 ABSL_CHECK(started_);
256 if (!context_->initial_metadata_received_) {
257 finish_ops_.RecvInitialMetadata(context_);
259 finish_ops_.ClientRecvStatus(context_, status);
267 const W& request,
bool start,
void* tag)
268 : context_(context), call_(call), started_(start) {
270 ABSL_CHECK(init_ops_.SendMessage(request).ok());
271 init_ops_.ClientSendClose();
273 StartCallInternal(tag);
275 ABSL_CHECK(tag ==
nullptr);
279 void StartCallInternal(
void* tag) {
280 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
281 context_->initial_metadata_flags());
318 class ClientAsyncWriterFactory {
336 bool start,
void* tag) {
349 class ClientAsyncWriter final :
public ClientAsyncWriterInterface<W> {
352 static void operator delete(
void* , std::size_t size) {
361 static void operator delete(
void*,
void*) { ABSL_CHECK(
false); }
364 ABSL_CHECK(!started_);
366 StartCallInternal(tag);
377 ABSL_CHECK(started_);
378 ABSL_CHECK(!context_->initial_metadata_received_);
381 meta_ops_.RecvInitialMetadata(context_);
385 void Write(
const W& msg,
void* tag)
override {
386 ABSL_CHECK(started_);
389 ABSL_CHECK(write_ops_.SendMessage(msg).ok());
394 ABSL_CHECK(started_);
398 write_ops_.ClientSendClose();
401 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
406 ABSL_CHECK(started_);
408 write_ops_.ClientSendClose();
420 ABSL_CHECK(started_);
422 if (!context_->initial_metadata_received_) {
423 finish_ops_.RecvInitialMetadata(context_);
425 finish_ops_.ClientRecvStatus(context_, status);
433 R* response,
bool start,
void* tag)
434 : context_(context), call_(call), started_(start) {
435 finish_ops_.RecvMessage(response);
436 finish_ops_.AllowNoMessage();
438 StartCallInternal(tag);
440 ABSL_CHECK(tag ==
nullptr);
444 void StartCallInternal(
void* tag) {
445 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
446 context_->initial_metadata_flags());
449 if (!context_->initial_metadata_corked_) {
473 template <
class W,
class R>
487 template <
class W,
class R>
488 class ClientAsyncReaderWriterFactory {
500 bool start,
void* tag) {
514 template <
class W,
class R>
515 class ClientAsyncReaderWriter final
516 :
public ClientAsyncReaderWriterInterface<W, R> {
519 static void operator delete(
void* , std::size_t size) {
528 static void operator delete(
void*,
void*) { ABSL_CHECK(
false); }
531 ABSL_CHECK(!started_);
533 StartCallInternal(tag);
544 ABSL_CHECK(started_);
545 ABSL_CHECK(!context_->initial_metadata_received_);
548 meta_ops_.RecvInitialMetadata(context_);
552 void Read(R* msg,
void* tag)
override {
553 ABSL_CHECK(started_);
555 if (!context_->initial_metadata_received_) {
556 read_ops_.RecvInitialMetadata(context_);
558 read_ops_.RecvMessage(msg);
562 void Write(
const W& msg,
void* tag)
override {
563 ABSL_CHECK(started_);
566 ABSL_CHECK(write_ops_.SendMessage(msg).ok());
571 ABSL_CHECK(started_);
575 write_ops_.ClientSendClose();
578 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
583 ABSL_CHECK(started_);
585 write_ops_.ClientSendClose();
594 ABSL_CHECK(started_);
596 if (!context_->initial_metadata_received_) {
597 finish_ops_.RecvInitialMetadata(context_);
599 finish_ops_.ClientRecvStatus(context_, status);
607 : context_(context), call_(call), started_(start) {
609 StartCallInternal(tag);
611 ABSL_CHECK(tag ==
nullptr);
615 void StartCallInternal(
void* tag) {
616 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
617 context_->initial_metadata_flags());
620 if (!context_->initial_metadata_corked_) {
643 template <
class W,
class R>
699 template <
class W,
class R>
703 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
711 ABSL_CHECK(!ctx_->sent_initial_metadata_);
714 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
715 ctx_->initial_metadata_flags());
719 ctx_->sent_initial_metadata_ =
true;
723 void Read(R* msg,
void* tag)
override {
725 read_ops_.RecvMessage(msg);
742 if (!ctx_->sent_initial_metadata_) {
743 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
744 ctx_->initial_metadata_flags());
748 ctx_->sent_initial_metadata_ =
true;
752 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
753 finish_ops_.SendMessage(msg));
755 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
770 ABSL_CHECK(!status.
ok());
772 if (!ctx_->sent_initial_metadata_) {
773 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
774 ctx_->initial_metadata_flags());
778 ctx_->sent_initial_metadata_ =
true;
780 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
850 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
860 ABSL_CHECK(!ctx_->sent_initial_metadata_);
863 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
864 ctx_->initial_metadata_flags());
868 ctx_->sent_initial_metadata_ =
true;
872 void Write(
const W& msg,
void* tag)
override {
874 EnsureInitialMetadataSent(&write_ops_);
876 ABSL_CHECK(write_ops_.SendMessage(msg).ok());
886 EnsureInitialMetadataSent(&write_ops_);
888 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
905 EnsureInitialMetadataSent(&write_ops_);
907 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
908 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
925 EnsureInitialMetadataSent(&finish_ops_);
926 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
934 void EnsureInitialMetadataSent(T* ops) {
935 if (!ctx_->sent_initial_metadata_) {
936 ops->SendInitialMetadata(&ctx_->initial_metadata_,
937 ctx_->initial_metadata_flags());
941 ctx_->sent_initial_metadata_ =
true;
959 template <
class W,
class R>
1011 template <
class W,
class R>
1016 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1026 ABSL_CHECK(!ctx_->sent_initial_metadata_);
1029 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1030 ctx_->initial_metadata_flags());
1034 ctx_->sent_initial_metadata_ =
true;
1038 void Read(R* msg,
void* tag)
override {
1040 read_ops_.RecvMessage(msg);
1044 void Write(
const W& msg,
void* tag)
override {
1046 EnsureInitialMetadataSent(&write_ops_);
1048 ABSL_CHECK(write_ops_.SendMessage(msg).ok());
1057 EnsureInitialMetadataSent(&write_ops_);
1058 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
1076 EnsureInitialMetadataSent(&write_ops_);
1078 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
1079 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1096 EnsureInitialMetadataSent(&finish_ops_);
1098 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1108 void EnsureInitialMetadataSent(T* ops) {
1109 if (!ctx_->sent_initial_metadata_) {
1110 ops->SendInitialMetadata(&ctx_->initial_metadata_,
1111 ctx_->initial_metadata_flags());
1115 ctx_->sent_initial_metadata_ =
true;
1135 #endif // GRPCPP_SUPPORT_ASYNC_STREAM_H
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:593
void Read(R *msg, void *tag) override
Definition: async_stream.h:1038
void StartCall(void *tag) override
Definition: async_stream.h:363
Definition: service_type.h:39
Definition: call_op_set.h:621
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
Definition: call_op_set.h:528
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:107
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:578
Definition: call_op_set.h:656
Represents a gRPC server.
Definition: server.h:57
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics of this method.
Definition: async_stream.h:543
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:98
void Read(R *msg, void *tag) override
Definition: async_stream.h:552
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:38
Definition: call_op_set.h:288
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:1052
void Write(const W &msg, void *tag) override
Definition: async_stream.h:385
Async server-side API for doing server streaming RPCs, where the outgoing message stream from the ser...
Definition: server_context.h:58
Definition: channel_interface.h:42
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:157
void StartCall(void *tag) override
Definition: async_stream.h:215
void Finish(const W &msg, const grpc::Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:740
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes (half-close the client stream).
void Write(const W &msg, void *tag) override
Definition: async_stream.h:872
static ClientAsyncWriter< W > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, R *response, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:332
Straightforward wrapping of the C call object.
Definition: call.h:36
void WritesDone(void *tag) override
Definition: async_stream.h:582
ServerAsyncReader(grpc::ServerContext *ctx)
Definition: async_stream.h:702
void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:1073
virtual void FinishWithError(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain non-OK status code.
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
static ClientAsyncReaderWriter< W, R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:497
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:229
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:393
bool ok() const
Is the status OK?
Definition: status.h:125
void Write(const W &msg, void *tag) override
Definition: async_stream.h:1044
void Read(R *msg, void *tag) override
Definition: async_stream.h:723
Async client-side interface for bi-directional streaming, where the client-to-server message stream h...
Definition: async_stream.h:474
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:419
ServerAsyncReaderWriter(grpc::ServerContext *ctx)
Definition: async_stream.h:1015
Definition: async_stream.h:799
void WriteLast(const W &msg, grpc::WriteOptions options, void *tag)
Request the writing of msg and coalesce it with the writing of trailing metadata, using WriteOptions ...
Definition: async_stream.h:160
Did it work? If it didn't, why?
Definition: status.h:34
Definition: channel_interface.h:38
virtual void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag)=0
Request the writing of msg and coalesce it with trailing metadata which contains status,...
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:36
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236
virtual void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag)=0
Request the writing of msg and coalesce it with trailing metadata which contains status,...
void WritesDone(void *tag) override
Definition: async_stream.h:405
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:195
Server-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:960
virtual void Finish(const W &msg, const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code and also send out msg response ...
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:880
Async client-side API for doing server-streaming RPCs, where the incoming message stream coming from ...
Definition: client_context.h:87
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:1025
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes (half-close the client stream).
virtual void Finish(grpc::Status *status, void *tag)=0
Indicate that the stream is to be finished and request notification for when the call has been ended.
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:376
void Finish(const grpc::Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.Finish method for semantics.
Definition: async_stream.h:1094
virtual void StartCall(void *tag)=0
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
static ClientAsyncReader< R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:184
virtual void Write(const W &msg, void *tag)=0
Request the writing of msg with identifying tag tag.
Common interface for client side asynchronous writing.
Definition: async_stream.h:305
void Read(R *msg, void *tag) override
Definition: async_stream.h:238
virtual ~AsyncWriterInterface()
Definition: async_stream.h:109
virtual void Read(R *msg, void *tag)=0
Read a message of type R into msg.
void FinishWithError(const grpc::Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:769
void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag) override
See the ServerAsyncWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:902
Async API on the client side for doing client-streaming RPCs, where the outgoing message stream going...
Definition: client_context.h:89
void set_output_tag(void *return_tag)
Definition: call_op_set.h:939
Per-message write options.
Definition: call_op_set.h:80
virtual void Finish(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code.
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:85
Async client-side interface for bi-directional streaming, where the outgoing message stream going to ...
Definition: client_context.h:91
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:710
void Write(const W &msg, void *tag) override
Definition: async_stream.h:562
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:570
grpc_call * call() const
Definition: call.h:70
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
Definition: call_op_set.h:771
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:118
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:105
Definition: channel_interface.h:40
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:253
virtual ~AsyncReaderInterface()
Definition: async_stream.h:87
void StartCall(void *tag) override
Definition: async_stream.h:530
Definition: async_stream.h:644
void Finish(const grpc::Status &status, void *tag) override
See the ServerAsyncWriterInterface.Finish method for semantics.
Definition: async_stream.h:923
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:859
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:173
Async server-side API for doing client-streaming RPCs, where the incoming message stream from the cli...
Definition: server_context.h:56
Definition: call_op_set.h:426
Descriptor of an RPC method.
Definition: rpc_method.h:29
Async server-side API for doing bidirectional streaming RPCs, where the incoming message stream comin...
Definition: server_context.h:62
ServerAsyncWriter(grpc::ServerContext *ctx)
Definition: async_stream.h:849
Definition: async_stream.h:168
virtual void Finish(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code.