Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_SYNC_STREAM_H
20 #define GRPCPP_SUPPORT_SYNC_STREAM_H
22 #include "absl/log/absl_check.h"
102 virtual bool Read(R* msg) = 0;
179 class ClientReader final :
public ClientReaderInterface<R> {
189 ABSL_CHECK(!context_->initial_metadata_received_);
192 ops.RecvInitialMetadata(context_);
199 *sz = (result > 0) ? result : UINT32_MAX;
212 if (!context_->initial_metadata_received_) {
213 ops.RecvInitialMetadata(context_);
217 return cq_.Pluck(&ops) && ops.got_message;
229 if (!context_->initial_metadata_received_) {
230 ops.RecvInitialMetadata(context_);
233 ops.ClientRecvStatus(context_, &status);
235 ABSL_CHECK(cq_.Pluck(&ops));
256 call_(channel->CreateCall(method, context, &cq_)) {
261 ops.SendInitialMetadata(&context->send_initial_metadata_,
262 context->initial_metadata_flags());
264 ABSL_CHECK(ops.SendMessagePtr(&request).ok());
265 ops.ClientSendClose();
302 class ClientWriter :
public ClientWriterInterface<W> {
311 ABSL_CHECK(!context_->initial_metadata_received_);
314 ops.RecvInitialMetadata(context_);
334 ops.ClientSendClose();
336 if (context_->initial_metadata_corked_) {
337 ops.SendInitialMetadata(&context_->send_initial_metadata_,
338 context_->initial_metadata_flags());
339 context_->set_initial_metadata_corked(
false);
341 if (!ops.SendMessagePtr(&msg, options).ok()) {
346 return cq_.Pluck(&ops);
351 ops.ClientSendClose();
353 return cq_.Pluck(&ops);
364 if (!context_->initial_metadata_received_) {
365 finish_ops_.RecvInitialMetadata(context_);
367 finish_ops_.ClientRecvStatus(context_, &status);
369 ABSL_CHECK(cq_.Pluck(&finish_ops_));
389 call_(channel->CreateCall(method, context, &cq_)) {
390 finish_ops_.RecvMessage(response);
391 finish_ops_.AllowNoMessage();
393 if (!context_->initial_metadata_corked_) {
395 ops.SendInitialMetadata(&context->send_initial_metadata_,
396 context->initial_metadata_flags());
414 template <
class W,
class R>
435 template <
class W,
class R>
450 template <
class W,
class R>
451 class ClientReaderWriter final :
public ClientReaderWriterInterface<W, R> {
460 ABSL_CHECK(!context_->initial_metadata_received_);
463 ops.RecvInitialMetadata(context_);
470 *sz = (result > 0) ? result : UINT32_MAX;
482 if (!context_->initial_metadata_received_) {
483 ops.RecvInitialMetadata(context_);
485 ops.RecvMessage(msg);
487 return cq_.Pluck(&ops) && ops.got_message;
504 ops.ClientSendClose();
506 if (context_->initial_metadata_corked_) {
507 ops.SendInitialMetadata(&context_->send_initial_metadata_,
508 context_->initial_metadata_flags());
509 context_->set_initial_metadata_corked(
false);
511 if (!ops.SendMessagePtr(&msg, options).ok()) {
516 return cq_.Pluck(&ops);
521 ops.ClientSendClose();
523 return cq_.Pluck(&ops);
535 if (!context_->initial_metadata_received_) {
536 ops.RecvInitialMetadata(context_);
539 ops.ClientRecvStatus(context_, &status);
541 ABSL_CHECK(cq_.Pluck(&ops));
562 call_(channel->CreateCall(method, context, &cq_)) {
563 if (!context_->initial_metadata_corked_) {
565 ops.SendInitialMetadata(&context->send_initial_metadata_,
566 context->initial_metadata_flags());
588 ABSL_CHECK(!ctx_->sent_initial_metadata_);
591 ops.SendInitialMetadata(&ctx_->initial_metadata_,
592 ctx_->initial_metadata_flags());
596 ctx_->sent_initial_metadata_ =
true;
598 call_->
cq()->Pluck(&ops);
603 *sz = (result > 0) ? result : UINT32_MAX;
609 ops.RecvMessage(msg);
611 bool ok = call_->
cq()->Pluck(&ops) && ops.got_message;
613 ctx_->MaybeMarkCancelledOnRead();
622 template <
class ServiceType,
class RequestType,
class ResponseType>
626 : call_(call), ctx_(ctx) {}
645 ABSL_CHECK(!ctx_->sent_initial_metadata_);
648 ops.SendInitialMetadata(&ctx_->initial_metadata_,
649 ctx_->initial_metadata_flags());
653 ctx_->sent_initial_metadata_ =
true;
655 call_->
cq()->Pluck(&ops);
669 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
672 if (!ctx_->sent_initial_metadata_) {
673 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
674 ctx_->initial_metadata_flags());
678 ctx_->sent_initial_metadata_ =
true;
685 ctx_->has_pending_ops_ =
true;
688 ctx_->has_pending_ops_ =
false;
689 return call_->
cq()->Pluck(&ctx_->pending_ops_);
696 template <
class ServiceType,
class RequestType,
class ResponseType>
700 : call_(call), ctx_(ctx) {}
704 template <
class W,
class R>
711 template <
class W,
class R>
712 class ServerReaderWriterBody final {
715 : call_(call), ctx_(ctx) {}
718 ABSL_CHECK(!ctx_->sent_initial_metadata_);
721 ops.SendInitialMetadata(&ctx_->initial_metadata_,
722 ctx_->initial_metadata_flags());
726 ctx_->sent_initial_metadata_ =
true;
728 call_->
cq()->Pluck(&ops);
733 *sz = (result > 0) ? result : UINT32_MAX;
739 ops.RecvMessage(msg);
741 bool ok = call_->
cq()->Pluck(&ops) && ops.got_message;
743 ctx_->MaybeMarkCancelledOnRead();
752 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
755 if (!ctx_->sent_initial_metadata_) {
756 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
757 ctx_->initial_metadata_flags());
761 ctx_->sent_initial_metadata_ =
true;
768 ctx_->has_pending_ops_ =
true;
771 ctx_->has_pending_ops_ =
false;
772 return call_->
cq()->Pluck(&ctx_->pending_ops_);
786 template <
class W,
class R>
795 return body_.NextMessageSize(sz);
798 bool Read(R* msg)
override {
return body_.Read(msg); }
807 return body_.Write(msg, options);
816 : body_(call, ctx) {}
827 template <
class RequestType,
class ResponseType>
852 bool Read(RequestType* request)
override {
857 return body_.
Read(request);
868 bool Write(
const ResponseType& response,
870 if (write_done_ || !read_done_) {
874 return body_.
Write(response, options);
885 : body_(call, ctx), read_done_(false), write_done_(false) {}
893 template <
class RequestType,
class ResponseType>
918 bool Read(RequestType* request)
override {
923 return body_.
Read(request);
934 bool Write(
const ResponseType& response,
936 return read_done_ && body_.
Write(response, options);
946 : body_(call, ctx), read_done_(false) {}
951 #endif // GRPCPP_SUPPORT_SYNC_STREAM_H
bool Write(const ResponseType &response, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:868
grpc::CompletionQueue * cq() const
Definition: call.h:71
ServerReaderWriterBody(grpc::internal::Call *call, grpc::ServerContext *ctx)
Definition: sync_stream.h:714
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:631
void SendInitialMetadata()
Definition: sync_stream.h:717
Definition: call_op_set.h:621
Definition: call_op_set.h:528
bool WritesDone() override
Definition: sync_stream.h:519
Synchronous (blocking) client-side API for doing client-streaming RPCs, where the outgoing message st...
Definition: client_context.h:83
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:578
void WaitForInitialMetadata() override
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:188
int max_receive_message_size() const
Definition: call.h:73
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
static ClientWriter< W > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, R *response)
Definition: sync_stream.h:290
Definition: call_op_set.h:288
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:478
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:157
bool Write(const W &msg)
Block to write msg to the stream with default write options.
Definition: sync_stream.h:126
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:63
bool Write(const ResponseType &response, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:934
Definition: sync_stream.h:163
Straightforward wrapping of the C call object.
Definition: call.h:36
virtual bool NextMessageSize(uint32_t *sz)=0
Get an upper bound on the next message size available for reading on this stream.
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:225
Client-side interface for bi-directional streaming with client-to-server stream messages of type W an...
Definition: sync_stream.h:415
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:326
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:362
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:904
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:894
Definition: completion_queue.h:66
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:71
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:468
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:601
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:852
Did it work? If it didn't, why?
Definition: status.h:34
@ GRPC_CQ_DEFAULT_POLLING
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:414
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void set_compression_level(grpc_compression_level level)
Set level to be the compression level used for the server call.
Definition: server_context.h:243
Synchronous (blocking) client-side API for doing server-streaming RPCs, where the stream of messages ...
Definition: client_context.h:81
Definition: sync_stream.h:436
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:107
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:61
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:195
bool Write(const W &msg, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:806
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:664
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:705
virtual bool Read(R *msg)=0
Block to read a message and parse to msg.
Client-side interface for streaming writes of message type W.
Definition: sync_stream.h:273
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:208
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:197
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:838
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:794
bool Write(const W &msg, grpc::WriteOptions options)
Definition: sync_stream.h:748
void WaitForInitialMetadata()
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:310
@ GRPC_CQ_PLUCK
Events are popped out by calling grpc_completion_queue_pluck() API ONLY.
Definition: grpc_types.h:434
Definition: grpc_types.h:462
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:37
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:731
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:901
bool Read(R *msg) override
Block to read a message and parse to msg.
Definition: sync_stream.h:798
virtual void SendInitialMetadata()=0
Block to send initial metadata to client.
Per-message write options.
Definition: call_op_set.h:80
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:787
A wrapper class of an application provided server streaming handler.
Definition: completion_queue.h:78
virtual ~WriterInterface()
Definition: sync_stream.h:109
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:496
static ClientReader< R > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request)
Definition: sync_stream.h:166
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:531
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:76
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:151
virtual bool Write(const W &msg, grpc::WriteOptions options)=0
Block to write msg to the stream with WriteOptions options.
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:80
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:460
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:587
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
bool Read(R *msg)
Definition: sync_stream.h:737
Synchronous (blocking) client-side API for bi-directional streaming RPCs, where the outgoing message ...
Definition: client_context.h:85
virtual ~ReaderInterface()
Definition: sync_stream.h:86
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:575
void WaitForInitialMetadata() override
Block waiting to read initial metadata from the server.
Definition: sync_stream.h:459
static ClientReaderWriter< W, R > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context)
Definition: sync_stream.h:438
Definition: call_op_set.h:771
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:118
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:105
virtual grpc::Status Finish()=0
Block waiting until the stream finishes and a final status of the call is available.
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:792
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:84
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:644
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:828
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:835
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:173
Definition: call_op_set.h:426
Descriptor of an RPC method.
Definition: rpc_method.h:29
Definition: sync_stream.h:287
void WriteLast(const W &msg, grpc::WriteOptions options)
Write msg and coalesce it with the writing of trailing metadata, using WriteOptions options.
Definition: sync_stream.h:142
virtual bool WritesDone()=0
Half close writing from the client.
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:918
virtual bool WritesDone()=0
Half close writing from the client.
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:39
bool WritesDone() override
Definition: sync_stream.h:349
void RecvMessage(R *message)
Definition: call_op_set.h:428
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:69
bool Read(R *msg) override
Definition: sync_stream.h:607