Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H
20 #define GRPCPP_SUPPORT_ASYNC_STREAM_H
100 virtual void Read(R* msg,
void* tag) = 0;
121 virtual void Write(
const W& msg,
void* tag) = 0;
172 class ClientAsyncReaderFactory {
186 const W& request,
bool start,
void* tag) {
199 class ClientAsyncReader final :
public ClientAsyncReaderInterface<R> {
202 static void operator delete(
void* , std::size_t size) {
211 static void operator delete(
void*,
void*) {
GPR_ASSERT(
false); }
216 StartCallInternal(tag);
229 GPR_ASSERT(!context_->initial_metadata_received_);
232 meta_ops_.RecvInitialMetadata(context_);
236 void Read(R* msg,
void* tag)
override {
239 if (!context_->initial_metadata_received_) {
240 read_ops_.RecvInitialMetadata(context_);
242 read_ops_.RecvMessage(msg);
254 if (!context_->initial_metadata_received_) {
255 finish_ops_.RecvInitialMetadata(context_);
257 finish_ops_.ClientRecvStatus(context_, status);
265 const W& request,
bool start,
void* tag)
266 : context_(context), call_(call), started_(start) {
268 GPR_ASSERT(init_ops_.SendMessage(request).ok());
269 init_ops_.ClientSendClose();
271 StartCallInternal(tag);
277 void StartCallInternal(
void* tag) {
278 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
279 context_->initial_metadata_flags());
316 class ClientAsyncWriterFactory {
334 bool start,
void* tag) {
347 class ClientAsyncWriter final :
public ClientAsyncWriterInterface<W> {
350 static void operator delete(
void* , std::size_t size) {
359 static void operator delete(
void*,
void*) {
GPR_ASSERT(
false); }
364 StartCallInternal(tag);
376 GPR_ASSERT(!context_->initial_metadata_received_);
379 meta_ops_.RecvInitialMetadata(context_);
383 void Write(
const W& msg,
void* tag)
override {
396 write_ops_.ClientSendClose();
399 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
406 write_ops_.ClientSendClose();
420 if (!context_->initial_metadata_received_) {
421 finish_ops_.RecvInitialMetadata(context_);
423 finish_ops_.ClientRecvStatus(context_, status);
431 R* response,
bool start,
void* tag)
432 : context_(context), call_(call), started_(start) {
433 finish_ops_.RecvMessage(response);
434 finish_ops_.AllowNoMessage();
436 StartCallInternal(tag);
442 void StartCallInternal(
void* tag) {
443 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
444 context_->initial_metadata_flags());
447 if (!context_->initial_metadata_corked_) {
471 template <
class W,
class R>
485 template <
class W,
class R>
486 class ClientAsyncReaderWriterFactory {
498 bool start,
void* tag) {
512 template <
class W,
class R>
513 class ClientAsyncReaderWriter final
514 :
public ClientAsyncReaderWriterInterface<W, R> {
517 static void operator delete(
void* , std::size_t size) {
526 static void operator delete(
void*,
void*) {
GPR_ASSERT(
false); }
531 StartCallInternal(tag);
543 GPR_ASSERT(!context_->initial_metadata_received_);
546 meta_ops_.RecvInitialMetadata(context_);
550 void Read(R* msg,
void* tag)
override {
553 if (!context_->initial_metadata_received_) {
554 read_ops_.RecvInitialMetadata(context_);
556 read_ops_.RecvMessage(msg);
560 void Write(
const W& msg,
void* tag)
override {
573 write_ops_.ClientSendClose();
576 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
583 write_ops_.ClientSendClose();
594 if (!context_->initial_metadata_received_) {
595 finish_ops_.RecvInitialMetadata(context_);
597 finish_ops_.ClientRecvStatus(context_, status);
605 : context_(context), call_(call), started_(start) {
607 StartCallInternal(tag);
613 void StartCallInternal(
void* tag) {
614 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
615 context_->initial_metadata_flags());
618 if (!context_->initial_metadata_corked_) {
641 template <
class W,
class R>
697 template <
class W,
class R>
701 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
712 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
713 ctx_->initial_metadata_flags());
717 ctx_->sent_initial_metadata_ =
true;
721 void Read(R* msg,
void* tag)
override {
723 read_ops_.RecvMessage(msg);
740 if (!ctx_->sent_initial_metadata_) {
741 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
742 ctx_->initial_metadata_flags());
746 ctx_->sent_initial_metadata_ =
true;
750 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
751 finish_ops_.SendMessage(msg));
753 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
770 if (!ctx_->sent_initial_metadata_) {
771 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
772 ctx_->initial_metadata_flags());
776 ctx_->sent_initial_metadata_ =
true;
778 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
848 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
861 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
862 ctx_->initial_metadata_flags());
866 ctx_->sent_initial_metadata_ =
true;
870 void Write(
const W& msg,
void* tag)
override {
872 EnsureInitialMetadataSent(&write_ops_);
884 EnsureInitialMetadataSent(&write_ops_);
886 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
903 EnsureInitialMetadataSent(&write_ops_);
905 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
906 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
923 EnsureInitialMetadataSent(&finish_ops_);
924 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
932 void EnsureInitialMetadataSent(T* ops) {
933 if (!ctx_->sent_initial_metadata_) {
934 ops->SendInitialMetadata(&ctx_->initial_metadata_,
935 ctx_->initial_metadata_flags());
939 ctx_->sent_initial_metadata_ =
true;
957 template <
class W,
class R>
1009 template <
class W,
class R>
1014 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1027 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1028 ctx_->initial_metadata_flags());
1032 ctx_->sent_initial_metadata_ =
true;
1036 void Read(R* msg,
void* tag)
override {
1038 read_ops_.RecvMessage(msg);
1042 void Write(
const W& msg,
void* tag)
override {
1044 EnsureInitialMetadataSent(&write_ops_);
1046 GPR_ASSERT(write_ops_.SendMessage(msg).ok());
1055 EnsureInitialMetadataSent(&write_ops_);
1056 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
1074 EnsureInitialMetadataSent(&write_ops_);
1076 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
1077 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1094 EnsureInitialMetadataSent(&finish_ops_);
1096 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1106 void EnsureInitialMetadataSent(T* ops) {
1107 if (!ctx_->sent_initial_metadata_) {
1108 ops->SendInitialMetadata(&ctx_->initial_metadata_,
1109 ctx_->initial_metadata_flags());
1113 ctx_->sent_initial_metadata_ =
true;
1133 #endif // GRPCPP_SUPPORT_ASYNC_STREAM_H
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:591
void Read(R *msg, void *tag) override
Definition: async_stream.h:1036
void StartCall(void *tag) override
Definition: async_stream.h:361
Definition: service_type.h:37
Definition: call_op_set.h:619
GRPCAPI void * grpc_call_arena_alloc(grpc_call *call, size_t size)
Allocate memory in the grpc_call arena: this memory is automatically discarded at call completion.
Definition: call_op_set.h:526
An interface that can be fed a sequence of messages of type W.
Definition: async_stream.h:105
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:573
Definition: call_op_set.h:654
Represents a gRPC server.
Definition: server.h:58
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics of this method.
Definition: async_stream.h:541
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:96
void Read(R *msg, void *tag) override
Definition: async_stream.h:550
virtual ~ClientAsyncStreamingInterface()
Definition: async_stream.h:36
Definition: call_op_set.h:286
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:1050
void Write(const W &msg, void *tag) override
Definition: async_stream.h:383
Async server-side API for doing server streaming RPCs, where the outgoing message stream from the ser...
Definition: server_context.h:58
Definition: channel_interface.h:42
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:155
void StartCall(void *tag) override
Definition: async_stream.h:213
void Finish(const W &msg, const grpc::Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:738
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes (half-close the client stream).
void Write(const W &msg, void *tag) override
Definition: async_stream.h:870
static ClientAsyncWriter< W > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, R *response, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:330
Straightforward wrapping of the C call object.
Definition: call.h:36
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:95
void WritesDone(void *tag) override
Definition: async_stream.h:580
ServerAsyncReader(grpc::ServerContext *ctx)
Definition: async_stream.h:700
void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:1071
virtual void FinishWithError(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain non-OK status code.
virtual void ReadInitialMetadata(void *tag)=0
Request notification of the reading of the initial metadata.
static ClientAsyncReaderWriter< W, R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:495
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:227
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:391
bool ok() const
Is the status OK?
Definition: status.h:126
void Write(const W &msg, void *tag) override
Definition: async_stream.h:1042
void Read(R *msg, void *tag) override
Definition: async_stream.h:721
Async client-side interface for bi-directional streaming, where the client-to-server message stream h...
Definition: async_stream.h:472
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:417
ServerAsyncReaderWriter(grpc::ServerContext *ctx)
Definition: async_stream.h:1013
Definition: async_stream.h:797
void WriteLast(const W &msg, grpc::WriteOptions options, void *tag)
Request the writing of msg and coalesce it with the writing of trailing metadata, using WriteOptions ...
Definition: async_stream.h:158
Did it work? If it didn't, why?
Definition: status.h:35
Definition: channel_interface.h:38
virtual void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag)=0
Request the writing of msg and coalesce it with trailing metadata which contains status,...
Common interface for all client side asynchronous streaming.
Definition: async_stream.h:34
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236
virtual void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag)=0
Request the writing of msg and coalesce it with trailing metadata which contains status,...
void WritesDone(void *tag) override
Definition: async_stream.h:403
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:193
Server-side interface for asynchronous bi-directional streaming.
Definition: async_stream.h:958
virtual void Finish(const W &msg, const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code and also send out msg response ...
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:878
Async client-side API for doing server-streaming RPCs, where the incoming message stream coming from ...
Definition: client_context.h:85
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:1023
virtual void WritesDone(void *tag)=0
Signal the client is done with the writes (half-close the client stream).
virtual void Finish(grpc::Status *status, void *tag)=0
Indicate that the stream is to be finished and request notification for when the call has been ended.
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
void ReadInitialMetadata(void *tag) override
See the ClientAsyncStreamingInterface.ReadInitialMetadata method for semantics.
Definition: async_stream.h:374
void Finish(const grpc::Status &status, void *tag) override
See the ServerAsyncReaderWriterInterface.Finish method for semantics.
Definition: async_stream.h:1092
virtual void StartCall(void *tag)=0
Start the call that was set up by the constructor, but only if the constructor was invoked through th...
static ClientAsyncReader< R > * Create(grpc::ChannelInterface *channel, grpc::CompletionQueue *cq, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request, bool start, void *tag)
Create a stream object.
Definition: async_stream.h:182
virtual void Write(const W &msg, void *tag)=0
Request the writing of msg with identifying tag tag.
Common interface for client side asynchronous writing.
Definition: async_stream.h:303
void Read(R *msg, void *tag) override
Definition: async_stream.h:236
virtual ~AsyncWriterInterface()
Definition: async_stream.h:107
virtual void Read(R *msg, void *tag)=0
Read a message of type R into msg.
void FinishWithError(const grpc::Status &status, void *tag) override
See the ServerAsyncReaderInterface.Read method for semantics.
Definition: async_stream.h:767
void WriteAndFinish(const W &msg, grpc::WriteOptions options, const grpc::Status &status, void *tag) override
See the ServerAsyncWriterInterface.WriteAndFinish method for semantics.
Definition: async_stream.h:900
Async API on the client side for doing client-streaming RPCs, where the outgoing message stream going...
Definition: client_context.h:87
void set_output_tag(void *return_tag)
Definition: call_op_set.h:935
Per-message write options.
Definition: call_op_set.h:78
virtual void Finish(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code.
An interface that yields a sequence of messages of type R.
Definition: async_stream.h:83
Async client-side interface for bi-directional streaming, where the outgoing message stream going to ...
Definition: client_context.h:89
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:708
void Write(const W &msg, void *tag) override
Definition: async_stream.h:560
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
void Write(const W &msg, grpc::WriteOptions options, void *tag) override
Definition: async_stream.h:568
grpc_call * call() const
Definition: call.h:70
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
Definition: call_op_set.h:769
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:116
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:103
Definition: channel_interface.h:40
void Finish(grpc::Status *status, void *tag) override
See the ClientAsyncStreamingInterface.Finish method for semantics.
Definition: async_stream.h:251
virtual ~AsyncReaderInterface()
Definition: async_stream.h:85
void StartCall(void *tag) override
Definition: async_stream.h:528
Definition: async_stream.h:642
void Finish(const grpc::Status &status, void *tag) override
See the ServerAsyncWriterInterface.Finish method for semantics.
Definition: async_stream.h:921
void SendInitialMetadata(void *tag) override
See ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
Definition: async_stream.h:857
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:171
Async server-side API for doing client-streaming RPCs, where the incoming message stream from the cli...
Definition: server_context.h:56
Definition: call_op_set.h:424
Descriptor of an RPC method.
Definition: rpc_method.h:29
Async server-side API for doing bidirectional streaming RPCs, where the incoming message stream comin...
Definition: server_context.h:62
ServerAsyncWriter(grpc::ServerContext *ctx)
Definition: async_stream.h:847
Definition: async_stream.h:166
virtual void Finish(const grpc::Status &status, void *tag)=0
Indicate that the stream is to be finished with a certain status code.