Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_SYNC_STREAM_H
20 #define GRPCPP_SUPPORT_SYNC_STREAM_H
100 virtual bool Read(R* msg) = 0;
177 class ClientReader final :
public ClientReaderInterface<R> {
187 GPR_ASSERT(!context_->initial_metadata_received_);
190 ops.RecvInitialMetadata(context_);
197 *sz = (result > 0) ? result : UINT32_MAX;
210 if (!context_->initial_metadata_received_) {
211 ops.RecvInitialMetadata(context_);
215 return cq_.Pluck(&ops) && ops.got_message;
227 if (!context_->initial_metadata_received_) {
228 ops.RecvInitialMetadata(context_);
231 ops.ClientRecvStatus(context_, &status);
254 call_(channel->CreateCall(method, context, &cq_)) {
259 ops.SendInitialMetadata(&context->send_initial_metadata_,
260 context->initial_metadata_flags());
262 GPR_ASSERT(ops.SendMessagePtr(&request).ok());
263 ops.ClientSendClose();
300 class ClientWriter :
public ClientWriterInterface<W> {
309 GPR_ASSERT(!context_->initial_metadata_received_);
312 ops.RecvInitialMetadata(context_);
332 ops.ClientSendClose();
334 if (context_->initial_metadata_corked_) {
335 ops.SendInitialMetadata(&context_->send_initial_metadata_,
336 context_->initial_metadata_flags());
337 context_->set_initial_metadata_corked(
false);
339 if (!ops.SendMessagePtr(&msg, options).ok()) {
344 return cq_.Pluck(&ops);
349 ops.ClientSendClose();
351 return cq_.Pluck(&ops);
362 if (!context_->initial_metadata_received_) {
363 finish_ops_.RecvInitialMetadata(context_);
365 finish_ops_.ClientRecvStatus(context_, &status);
387 call_(channel->CreateCall(method, context, &cq_)) {
388 finish_ops_.RecvMessage(response);
389 finish_ops_.AllowNoMessage();
391 if (!context_->initial_metadata_corked_) {
393 ops.SendInitialMetadata(&context->send_initial_metadata_,
394 context->initial_metadata_flags());
412 template <
class W,
class R>
433 template <
class W,
class R>
448 template <
class W,
class R>
449 class ClientReaderWriter final :
public ClientReaderWriterInterface<W, R> {
458 GPR_ASSERT(!context_->initial_metadata_received_);
461 ops.RecvInitialMetadata(context_);
468 *sz = (result > 0) ? result : UINT32_MAX;
480 if (!context_->initial_metadata_received_) {
481 ops.RecvInitialMetadata(context_);
483 ops.RecvMessage(msg);
485 return cq_.Pluck(&ops) && ops.got_message;
502 ops.ClientSendClose();
504 if (context_->initial_metadata_corked_) {
505 ops.SendInitialMetadata(&context_->send_initial_metadata_,
506 context_->initial_metadata_flags());
507 context_->set_initial_metadata_corked(
false);
509 if (!ops.SendMessagePtr(&msg, options).ok()) {
514 return cq_.Pluck(&ops);
519 ops.ClientSendClose();
521 return cq_.Pluck(&ops);
533 if (!context_->initial_metadata_received_) {
534 ops.RecvInitialMetadata(context_);
537 ops.ClientRecvStatus(context_, &status);
560 call_(channel->CreateCall(method, context, &cq_)) {
561 if (!context_->initial_metadata_corked_) {
563 ops.SendInitialMetadata(&context->send_initial_metadata_,
564 context->initial_metadata_flags());
589 ops.SendInitialMetadata(&ctx_->initial_metadata_,
590 ctx_->initial_metadata_flags());
594 ctx_->sent_initial_metadata_ =
true;
596 call_->
cq()->Pluck(&ops);
601 *sz = (result > 0) ? result : UINT32_MAX;
607 ops.RecvMessage(msg);
609 bool ok = call_->
cq()->Pluck(&ops) && ops.got_message;
611 ctx_->MaybeMarkCancelledOnRead();
620 template <
class ServiceType,
class RequestType,
class ResponseType>
624 : call_(call), ctx_(ctx) {}
646 ops.SendInitialMetadata(&ctx_->initial_metadata_,
647 ctx_->initial_metadata_flags());
651 ctx_->sent_initial_metadata_ =
true;
653 call_->
cq()->Pluck(&ops);
667 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
670 if (!ctx_->sent_initial_metadata_) {
671 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
672 ctx_->initial_metadata_flags());
676 ctx_->sent_initial_metadata_ =
true;
683 ctx_->has_pending_ops_ =
true;
686 ctx_->has_pending_ops_ =
false;
687 return call_->
cq()->Pluck(&ctx_->pending_ops_);
694 template <
class ServiceType,
class RequestType,
class ResponseType>
698 : call_(call), ctx_(ctx) {}
702 template <
class W,
class R>
709 template <
class W,
class R>
710 class ServerReaderWriterBody final {
713 : call_(call), ctx_(ctx) {}
719 ops.SendInitialMetadata(&ctx_->initial_metadata_,
720 ctx_->initial_metadata_flags());
724 ctx_->sent_initial_metadata_ =
true;
726 call_->
cq()->Pluck(&ops);
731 *sz = (result > 0) ? result : UINT32_MAX;
737 ops.RecvMessage(msg);
739 bool ok = call_->
cq()->Pluck(&ops) && ops.got_message;
741 ctx_->MaybeMarkCancelledOnRead();
750 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
753 if (!ctx_->sent_initial_metadata_) {
754 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
755 ctx_->initial_metadata_flags());
759 ctx_->sent_initial_metadata_ =
true;
766 ctx_->has_pending_ops_ =
true;
769 ctx_->has_pending_ops_ =
false;
770 return call_->
cq()->Pluck(&ctx_->pending_ops_);
784 template <
class W,
class R>
793 return body_.NextMessageSize(sz);
796 bool Read(R* msg)
override {
return body_.Read(msg); }
805 return body_.Write(msg, options);
814 : body_(call, ctx) {}
825 template <
class RequestType,
class ResponseType>
850 bool Read(RequestType* request)
override {
855 return body_.
Read(request);
866 bool Write(
const ResponseType& response,
868 if (write_done_ || !read_done_) {
872 return body_.
Write(response, options);
883 : body_(call, ctx), read_done_(false), write_done_(false) {}
891 template <
class RequestType,
class ResponseType>
916 bool Read(RequestType* request)
override {
921 return body_.
Read(request);
932 bool Write(
const ResponseType& response,
934 return read_done_ && body_.
Write(response, options);
944 : body_(call, ctx), read_done_(false) {}
949 #endif // GRPCPP_SUPPORT_SYNC_STREAM_H
bool Write(const ResponseType &response, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:866
grpc::CompletionQueue * cq() const
Definition: call.h:71
ServerReaderWriterBody(grpc::internal::Call *call, grpc::ServerContext *ctx)
Definition: sync_stream.h:712
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:629
void SendInitialMetadata()
Definition: sync_stream.h:715
Definition: call_op_set.h:619
Definition: call_op_set.h:526
bool WritesDone() override
Definition: sync_stream.h:517
Synchronous (blocking) client-side API for doing client-streaming RPCs, where the outgoing message st...
Definition: client_context.h:81
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:572
void WaitForInitialMetadata() override
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:186
int max_receive_message_size() const
Definition: call.h:73
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
static ClientWriter< W > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, R *response)
Definition: sync_stream.h:288
Definition: call_op_set.h:286
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:476
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:155
bool Write(const W &msg)
Block to write msg to the stream with default write options.
Definition: sync_stream.h:124
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:61
bool Write(const ResponseType &response, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:932
Definition: sync_stream.h:161
Straightforward wrapping of the C call object.
Definition: call.h:36
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:95
virtual bool NextMessageSize(uint32_t *sz)=0
Get an upper bound on the next message size available for reading on this stream.
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:223
Client-side interface for bi-directional streaming with client-to-server stream messages of type W an...
Definition: sync_stream.h:413
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:324
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:360
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:902
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:892
Definition: completion_queue.h:64
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:69
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:466
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:599
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:850
Did it work? If it didn't, why?
Definition: status.h:35
@ GRPC_CQ_DEFAULT_POLLING
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:415
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void set_compression_level(grpc_compression_level level)
Set level to be the compression level used for the server call.
Definition: server_context.h:243
Synchronous (blocking) client-side API for doing server-streaming RPCs, where the stream of messages ...
Definition: client_context.h:79
Definition: sync_stream.h:434
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:105
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:59
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:193
bool Write(const W &msg, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:804
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:662
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:703
virtual bool Read(R *msg)=0
Block to read a message and parse to msg.
Client-side interface for streaming writes of message type W.
Definition: sync_stream.h:271
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:206
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:195
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:836
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:792
bool Write(const W &msg, grpc::WriteOptions options)
Definition: sync_stream.h:746
void WaitForInitialMetadata()
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:308
@ GRPC_CQ_PLUCK
Events are popped out by calling grpc_completion_queue_pluck() API ONLY.
Definition: grpc_types.h:435
Definition: grpc_types.h:463
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:35
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:729
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:899
bool Read(R *msg) override
Block to read a message and parse to msg.
Definition: sync_stream.h:796
virtual void SendInitialMetadata()=0
Block to send initial metadata to client.
Per-message write options.
Definition: call_op_set.h:78
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:785
A wrapper class of an application provided server streaming handler.
Definition: completion_queue.h:76
virtual ~WriterInterface()
Definition: sync_stream.h:107
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:494
static ClientReader< R > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request)
Definition: sync_stream.h:164
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:529
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:74
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:149
virtual bool Write(const W &msg, grpc::WriteOptions options)=0
Block to write msg to the stream with WriteOptions options.
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:78
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:461
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:585
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
bool Read(R *msg)
Definition: sync_stream.h:735
Synchronous (blocking) client-side API for bi-directional streaming RPCs, where the outgoing message ...
Definition: client_context.h:83
virtual ~ReaderInterface()
Definition: sync_stream.h:84
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:573
void WaitForInitialMetadata() override
Block waiting to read initial metadata from the server.
Definition: sync_stream.h:457
static ClientReaderWriter< W, R > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context)
Definition: sync_stream.h:436
Definition: call_op_set.h:769
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:116
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:103
virtual grpc::Status Finish()=0
Block waiting until the stream finishes and a final status of the call is available.
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:790
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:82
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:642
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:826
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:833
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:171
Definition: call_op_set.h:424
Descriptor of an RPC method.
Definition: rpc_method.h:29
Definition: sync_stream.h:285
void WriteLast(const W &msg, grpc::WriteOptions options)
Write msg and coalesce it with the writing of trailing metadata, using WriteOptions options.
Definition: sync_stream.h:140
virtual bool WritesDone()=0
Half close writing from the client.
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:916
virtual bool WritesDone()=0
Half close writing from the client.
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:37
bool WritesDone() override
Definition: sync_stream.h:347
void RecvMessage(R *message)
Definition: call_op_set.h:426
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:67
bool Read(R *msg) override
Definition: sync_stream.h:605