Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H
20 #define GRPCPP_SUPPORT_ASYNC_STREAM_H
29 #include "absl/log/absl_check.h"
105 virtual void Read(R* msg,
void* tag) = 0;
126 virtual void Write(
const W& msg,
void* tag) = 0;
177 class ClientAsyncReaderFactory {
191 const W& request,
bool start,
void* tag) {
204 class ClientAsyncReader final :
public ClientAsyncReaderInterface<R> {
207 static void operator delete(
void* , std::size_t size) {
216 static void operator delete(
void*,
void*) { ABSL_CHECK(
false); }
219 ABSL_CHECK(!started_);
221 StartCallInternal(tag);
233 ABSL_CHECK(started_);
234 ABSL_CHECK(!context_->initial_metadata_received_);
237 meta_ops_.RecvInitialMetadata(context_);
241 void Read(R* msg,
void* tag)
override {
242 ABSL_CHECK(started_);
244 if (!context_->initial_metadata_received_) {
245 read_ops_.RecvInitialMetadata(context_);
247 read_ops_.RecvMessage(msg);
257 ABSL_CHECK(started_);
259 if (!context_->initial_metadata_received_) {
260 finish_ops_.RecvInitialMetadata(context_);
262 finish_ops_.ClientRecvStatus(context_, status);
270 const W& request,
bool start,
void* tag)
271 : context_(context), call_(call), started_(start) {
273 ABSL_CHECK(init_ops_.SendMessage(request).ok());
274 init_ops_.ClientSendClose();
276 StartCallInternal(tag);
278 ABSL_CHECK(tag ==
nullptr);
282 void StartCallInternal(
void* tag) {
283 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
284 context_->initial_metadata_flags());
321 class ClientAsyncWriterFactory {
339 bool start,
void* tag) {
352 class ClientAsyncWriter final :
public ClientAsyncWriterInterface<W> {
355 static void operator delete(
void* , std::size_t size) {
364 static void operator delete(
void*,
void*) { ABSL_CHECK(
false); }
367 ABSL_CHECK(!started_);
369 StartCallInternal(tag);
380 ABSL_CHECK(started_);
381 ABSL_CHECK(!context_->initial_metadata_received_);
384 meta_ops_.RecvInitialMetadata(context_);
388 void Write(
const W& msg,
void* tag)
override {
389 ABSL_CHECK(started_);
392 ABSL_CHECK(write_ops_.SendMessage(msg).ok());
397 ABSL_CHECK(started_);
401 write_ops_.ClientSendClose();
404 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
409 ABSL_CHECK(started_);
411 write_ops_.ClientSendClose();
423 ABSL_CHECK(started_);
425 if (!context_->initial_metadata_received_) {
426 finish_ops_.RecvInitialMetadata(context_);
428 finish_ops_.ClientRecvStatus(context_, status);
436 R* response,
bool start,
void* tag)
437 : context_(context), call_(call), started_(start) {
438 finish_ops_.RecvMessage(response);
439 finish_ops_.AllowNoMessage();
441 StartCallInternal(tag);
443 ABSL_CHECK(tag ==
nullptr);
447 void StartCallInternal(
void* tag) {
448 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
449 context_->initial_metadata_flags());
452 if (!context_->initial_metadata_corked_) {
476 template <
class W,
class R>
490 template <
class W,
class R>
491 class ClientAsyncReaderWriterFactory {
503 bool start,
void* tag) {
517 template <
class W,
class R>
518 class ClientAsyncReaderWriter final
519 :
public ClientAsyncReaderWriterInterface<W, R> {
522 static void operator delete(
void* , std::size_t size) {
531 static void operator delete(
void*,
void*) { ABSL_CHECK(
false); }
534 ABSL_CHECK(!started_);
536 StartCallInternal(tag);
547 ABSL_CHECK(started_);
548 ABSL_CHECK(!context_->initial_metadata_received_);
551 meta_ops_.RecvInitialMetadata(context_);
555 void Read(R* msg,
void* tag)
override {
556 ABSL_CHECK(started_);
558 if (!context_->initial_metadata_received_) {
559 read_ops_.RecvInitialMetadata(context_);
561 read_ops_.RecvMessage(msg);
565 void Write(
const W& msg,
void* tag)
override {
566 ABSL_CHECK(started_);
569 ABSL_CHECK(write_ops_.SendMessage(msg).ok());
574 ABSL_CHECK(started_);
578 write_ops_.ClientSendClose();
581 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
586 ABSL_CHECK(started_);
588 write_ops_.ClientSendClose();
597 ABSL_CHECK(started_);
599 if (!context_->initial_metadata_received_) {
600 finish_ops_.RecvInitialMetadata(context_);
602 finish_ops_.ClientRecvStatus(context_, status);
610 : context_(context), call_(call), started_(start) {
612 StartCallInternal(tag);
614 ABSL_CHECK(tag ==
nullptr);
618 void StartCallInternal(
void* tag) {
619 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
620 context_->initial_metadata_flags());
623 if (!context_->initial_metadata_corked_) {
646 template <
class W,
class R>
702 template <
class W,
class R>
706 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
714 ABSL_CHECK(!ctx_->sent_initial_metadata_);
717 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
718 ctx_->initial_metadata_flags());
722 ctx_->sent_initial_metadata_ =
true;
726 void Read(R* msg,
void* tag)
override {
728 read_ops_.RecvMessage(msg);
745 if (!ctx_->sent_initial_metadata_) {
746 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
747 ctx_->initial_metadata_flags());
751 ctx_->sent_initial_metadata_ =
true;
755 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
756 finish_ops_.SendMessage(msg));
758 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
773 ABSL_CHECK(!status.
ok());
775 if (!ctx_->sent_initial_metadata_) {
776 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
777 ctx_->initial_metadata_flags());
781 ctx_->sent_initial_metadata_ =
true;
783 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
853 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
863 ABSL_CHECK(!ctx_->sent_initial_metadata_);
866 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
867 ctx_->initial_metadata_flags());
871 ctx_->sent_initial_metadata_ =
true;
875 void Write(
const W& msg,
void* tag)
override {
877 EnsureInitialMetadataSent(&write_ops_);
879 ABSL_CHECK(write_ops_.SendMessage(msg).ok());
889 EnsureInitialMetadataSent(&write_ops_);
891 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
908 EnsureInitialMetadataSent(&write_ops_);
910 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
911 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
928 EnsureInitialMetadataSent(&finish_ops_);
929 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
937 void EnsureInitialMetadataSent(T* ops) {
938 if (!ctx_->sent_initial_metadata_) {
939 ops->SendInitialMetadata(&ctx_->initial_metadata_,
940 ctx_->initial_metadata_flags());
944 ctx_->sent_initial_metadata_ =
true;
962 template <
class W,
class R>
1014 template <
class W,
class R>
1019 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1029 ABSL_CHECK(!ctx_->sent_initial_metadata_);
1032 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1033 ctx_->initial_metadata_flags());
1037 ctx_->sent_initial_metadata_ =
true;
1041 void Read(R* msg,
void* tag)
override {
1043 read_ops_.RecvMessage(msg);
1047 void Write(
const W& msg,
void* tag)
override {
1049 EnsureInitialMetadataSent(&write_ops_);
1051 ABSL_CHECK(write_ops_.SendMessage(msg).ok());
1060 EnsureInitialMetadataSent(&write_ops_);
1061 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
1079 EnsureInitialMetadataSent(&write_ops_);
1081 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
1082 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1099 EnsureInitialMetadataSent(&finish_ops_);
1101 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1111 void EnsureInitialMetadataSent(T* ops) {
1112 if (!ctx_->sent_initial_metadata_) {
1113 ops->SendInitialMetadata(&ctx_->initial_metadata_,
1114 ctx_->initial_metadata_flags());
1118 ctx_->sent_initial_metadata_ =
true;
1138 #endif // GRPCPP_SUPPORT_ASYNC_STREAM_H
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:596
void Read(R *msg, void *tag) override
Definition: async_stream.h:1041
void StartCall(void *tag) override
Definition: async_stream.h:366
Definition: service_type.h:38
Definition: call_op_set.h:621
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
Definition: call_op_set.h:528
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:110
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:578
Definition: call_op_set.h:656
Represents a gRPC server.
Definition: server.h:57
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics of this method.
Definition: async_stream.h:546
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:97
void Read(R *msg, void *tag) override
Definition: async_stream.h:555
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:37
Definition: call_op_set.h:288
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:1055
void Write(const W &msg, void *tag) override
Definition: async_stream.h:388
Async server-side API for doing server streaming RPCs, where the outgoing message stream from the ser...
Definition: server_context.h:58
Definition: channel_interface.h:42
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:157
void StartCall(void *tag) override
Definition: async_stream.h:218
void Finish(const W &msg, const grpc::Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:743
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes (half-close the client stream).
void Write(const W &msg, void *tag) override
Definition: async_stream.h:875
static ClientAsyncWriter< W > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, R *response, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:335
Straightforward wrapping of the C call object.
Definition: call.h:36
void WritesDone(void *tag) override
Definition: async_stream.h:585
ServerAsyncReader(grpc::ServerContext *ctx)
Definition: async_stream.h:705
void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:1076
virtual void FinishWithError(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain non-OK status code.
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
static ClientAsyncReaderWriter< W, R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:500
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:232
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:396
bool ok() const
Is the status OK?
Definition: status.h:125
void Write(const W &msg, void *tag) override
Definition: async_stream.h:1047
void Read(R *msg, void *tag) override
Definition: async_stream.h:726
Async client-side interface for bi-directional streaming, where the client-to-server message stream h...
Definition: async_stream.h:477
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:422
ServerAsyncReaderWriter(grpc::ServerContext *ctx)
Definition: async_stream.h:1018
Definition: async_stream.h:802
void WriteLast(const W &msg, grpc::WriteOptions options, void *tag)
Request the writing of msg and coalesce it with the writing of trailing metadata, using WriteOptions ...
Definition: async_stream.h:163
Did it work? If it didn't, why?
Definition: status.h:34
Definition: channel_interface.h:38
virtual void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag)=0
Request the writing of msg and coalesce it with trailing metadata which contains status,...
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:35
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236
virtual void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag)=0
Request the writing of msg and coalesce it with trailing metadata which contains status,...
void WritesDone(void *tag) override
Definition: async_stream.h:408
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:194
Server-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:963
virtual void Finish(const W &msg, const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code and also send out msg response ...
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:883
Async client-side API for doing server-streaming RPCs, where the incoming message stream coming from ...
Definition: client_context.h:86
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:1028
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes (half-close the client stream).
virtual void Finish(grpc::Status *status, void *tag)=0
Indicate that the stream is to be finished and request notification for when the call has been ended.
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:379
void Finish(const grpc::Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.Finish method for semantics.
Definition: async_stream.h:1097
virtual void StartCall(void *tag)=0
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
static ClientAsyncReader< R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:187
virtual void Write(const W &msg, void *tag)=0
Request the writing of msg with identifying tag tag.
Common interface for client side asynchronous writing.
Definition: async_stream.h:308
void Read(R *msg, void *tag) override
Definition: async_stream.h:241
virtual ~AsyncWriterInterface()
Definition: async_stream.h:112
virtual void Read(R *msg, void *tag)=0
Read a message of type R into msg.
void FinishWithError(const grpc::Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:772
void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag) override
See the ServerAsyncWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:905
Async API on the client side for doing client-streaming RPCs, where the outgoing message stream going...
Definition: client_context.h:88
void set_output_tag(void *return_tag)
Definition: call_op_set.h:939
Per-message write options.
Definition: call_op_set.h:80
virtual void Finish(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code.
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:88
Async client-side interface for bi-directional streaming, where the outgoing message stream going to ...
Definition: client_context.h:90
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:713
void Write(const W &msg, void *tag) override
Definition: async_stream.h:565
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:573
grpc_call * call() const
Definition: call.h:70
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
Definition: call_op_set.h:771
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:118
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:104
Definition: channel_interface.h:40
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:256
virtual ~AsyncReaderInterface()
Definition: async_stream.h:90
void StartCall(void *tag) override
Definition: async_stream.h:533
Definition: async_stream.h:647
void Finish(const grpc::Status &status, void *tag) override
See the ServerAsyncWriterInterface.Finish method for semantics.
Definition: async_stream.h:926
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:862
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:173
Async server-side API for doing client-streaming RPCs, where the incoming message stream from the cli...
Definition: server_context.h:56
Definition: call_op_set.h:426
Descriptor of an RPC method.
Definition: rpc_method.h:29
Async server-side API for doing bidirectional streaming RPCs, where the incoming message stream comin...
Definition: server_context.h:62
ServerAsyncWriter(grpc::ServerContext *ctx)
Definition: async_stream.h:852
Definition: async_stream.h:171
virtual void Finish(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code.