Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_SYNC_STREAM_H
20 #define GRPCPP_SUPPORT_SYNC_STREAM_H
30 #include "absl/log/absl_check.h"
101 virtual bool Read(R* msg) = 0;
178 class ClientReader final :
public ClientReaderInterface<R> {
188 ABSL_CHECK(!context_->initial_metadata_received_);
191 ops.RecvInitialMetadata(context_);
198 *sz = (result > 0) ? result : UINT32_MAX;
211 if (!context_->initial_metadata_received_) {
212 ops.RecvInitialMetadata(context_);
216 return cq_.Pluck(&ops) && ops.got_message;
228 if (!context_->initial_metadata_received_) {
229 ops.RecvInitialMetadata(context_);
232 ops.ClientRecvStatus(context_, &status);
234 ABSL_CHECK(cq_.Pluck(&ops));
255 call_(channel->CreateCall(method, context, &cq_)) {
260 ops.SendInitialMetadata(&context->send_initial_metadata_,
261 context->initial_metadata_flags());
263 ABSL_CHECK(ops.SendMessagePtr(&request).ok());
264 ops.ClientSendClose();
301 class ClientWriter :
public ClientWriterInterface<W> {
310 ABSL_CHECK(!context_->initial_metadata_received_);
313 ops.RecvInitialMetadata(context_);
333 ops.ClientSendClose();
335 if (context_->initial_metadata_corked_) {
336 ops.SendInitialMetadata(&context_->send_initial_metadata_,
337 context_->initial_metadata_flags());
338 context_->set_initial_metadata_corked(
false);
340 if (!ops.SendMessagePtr(&msg, options).ok()) {
345 return cq_.Pluck(&ops);
350 ops.ClientSendClose();
352 return cq_.Pluck(&ops);
363 if (!context_->initial_metadata_received_) {
364 finish_ops_.RecvInitialMetadata(context_);
366 finish_ops_.ClientRecvStatus(context_, &status);
368 ABSL_CHECK(cq_.Pluck(&finish_ops_));
388 call_(channel->CreateCall(method, context, &cq_)) {
389 finish_ops_.RecvMessage(response);
390 finish_ops_.AllowNoMessage();
392 if (!context_->initial_metadata_corked_) {
394 ops.SendInitialMetadata(&context->send_initial_metadata_,
395 context->initial_metadata_flags());
413 template <
class W,
class R>
434 template <
class W,
class R>
449 template <
class W,
class R>
450 class ClientReaderWriter final :
public ClientReaderWriterInterface<W, R> {
459 ABSL_CHECK(!context_->initial_metadata_received_);
462 ops.RecvInitialMetadata(context_);
469 *sz = (result > 0) ? result : UINT32_MAX;
481 if (!context_->initial_metadata_received_) {
482 ops.RecvInitialMetadata(context_);
484 ops.RecvMessage(msg);
486 return cq_.Pluck(&ops) && ops.got_message;
503 ops.ClientSendClose();
505 if (context_->initial_metadata_corked_) {
506 ops.SendInitialMetadata(&context_->send_initial_metadata_,
507 context_->initial_metadata_flags());
508 context_->set_initial_metadata_corked(
false);
510 if (!ops.SendMessagePtr(&msg, options).ok()) {
515 return cq_.Pluck(&ops);
520 ops.ClientSendClose();
522 return cq_.Pluck(&ops);
534 if (!context_->initial_metadata_received_) {
535 ops.RecvInitialMetadata(context_);
538 ops.ClientRecvStatus(context_, &status);
540 ABSL_CHECK(cq_.Pluck(&ops));
561 call_(channel->CreateCall(method, context, &cq_)) {
562 if (!context_->initial_metadata_corked_) {
564 ops.SendInitialMetadata(&context->send_initial_metadata_,
565 context->initial_metadata_flags());
587 ABSL_CHECK(!ctx_->sent_initial_metadata_);
590 ops.SendInitialMetadata(&ctx_->initial_metadata_,
591 ctx_->initial_metadata_flags());
595 ctx_->sent_initial_metadata_ =
true;
597 call_->
cq()->Pluck(&ops);
602 *sz = (result > 0) ? result : UINT32_MAX;
608 ops.RecvMessage(msg);
610 bool ok = call_->
cq()->Pluck(&ops) && ops.got_message;
612 ctx_->MaybeMarkCancelledOnRead();
621 template <
class ServiceType,
class RequestType,
class ResponseType>
625 : call_(call), ctx_(ctx) {}
644 ABSL_CHECK(!ctx_->sent_initial_metadata_);
647 ops.SendInitialMetadata(&ctx_->initial_metadata_,
648 ctx_->initial_metadata_flags());
652 ctx_->sent_initial_metadata_ =
true;
654 call_->
cq()->Pluck(&ops);
668 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
671 if (!ctx_->sent_initial_metadata_) {
672 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
673 ctx_->initial_metadata_flags());
677 ctx_->sent_initial_metadata_ =
true;
684 ctx_->has_pending_ops_ =
true;
687 ctx_->has_pending_ops_ =
false;
688 return call_->
cq()->Pluck(&ctx_->pending_ops_);
695 template <
class ServiceType,
class RequestType,
class ResponseType>
699 : call_(call), ctx_(ctx) {}
703 template <
class W,
class R>
710 template <
class W,
class R>
711 class ServerReaderWriterBody final {
714 : call_(call), ctx_(ctx) {}
717 ABSL_CHECK(!ctx_->sent_initial_metadata_);
720 ops.SendInitialMetadata(&ctx_->initial_metadata_,
721 ctx_->initial_metadata_flags());
725 ctx_->sent_initial_metadata_ =
true;
727 call_->
cq()->Pluck(&ops);
732 *sz = (result > 0) ? result : UINT32_MAX;
738 ops.RecvMessage(msg);
740 bool ok = call_->
cq()->Pluck(&ops) && ops.got_message;
742 ctx_->MaybeMarkCancelledOnRead();
751 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
754 if (!ctx_->sent_initial_metadata_) {
755 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
756 ctx_->initial_metadata_flags());
760 ctx_->sent_initial_metadata_ =
true;
767 ctx_->has_pending_ops_ =
true;
770 ctx_->has_pending_ops_ =
false;
771 return call_->
cq()->Pluck(&ctx_->pending_ops_);
785 template <
class W,
class R>
794 return body_.NextMessageSize(sz);
797 bool Read(R* msg)
override {
return body_.Read(msg); }
806 return body_.Write(msg, options);
815 : body_(call, ctx) {}
826 template <
class RequestType,
class ResponseType>
851 bool Read(RequestType* request)
override {
856 return body_.
Read(request);
867 bool Write(
const ResponseType& response,
869 if (write_done_ || !read_done_) {
873 return body_.
Write(response, options);
884 : body_(call, ctx), read_done_(false), write_done_(false) {}
892 template <
class RequestType,
class ResponseType>
917 bool Read(RequestType* request)
override {
922 return body_.
Read(request);
933 bool Write(
const ResponseType& response,
935 return read_done_ && body_.
Write(response, options);
945 : body_(call, ctx), read_done_(false) {}
950 #endif // GRPCPP_SUPPORT_SYNC_STREAM_H
bool Write(const ResponseType &response, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:867
grpc::CompletionQueue * cq() const
Definition: call.h:71
ServerReaderWriterBody(grpc::internal::Call *call, grpc::ServerContext *ctx)
Definition: sync_stream.h:713
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:630
void SendInitialMetadata()
Definition: sync_stream.h:716
Definition: call_op_set.h:621
Definition: call_op_set.h:528
bool WritesDone() override
Definition: sync_stream.h:518
Synchronous (blocking) client-side API for doing client-streaming RPCs, where the outgoing message st...
Definition: client_context.h:82
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:578
void WaitForInitialMetadata() override
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:187
int max_receive_message_size() const
Definition: call.h:73
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
static ClientWriter< W > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, R *response)
Definition: sync_stream.h:289
Definition: call_op_set.h:288
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:477
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:157
bool Write(const W &msg)
Block to write msg to the stream with default write options.
Definition: sync_stream.h:125
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:62
bool Write(const ResponseType &response, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:933
Definition: sync_stream.h:162
Straightforward wrapping of the C call object.
Definition: call.h:36
virtual bool NextMessageSize(uint32_t *sz)=0
Get an upper bound on the next message size available for reading on this stream.
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:224
Client-side interface for bi-directional streaming with client-to-server stream messages of type W an...
Definition: sync_stream.h:414
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:325
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:361
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:903
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:893
Definition: completion_queue.h:65
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:70
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:467
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:600
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:851
Did it work? If it didn't, why?
Definition: status.h:34
@ GRPC_CQ_DEFAULT_POLLING
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:413
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void set_compression_level(grpc_compression_level level)
Set level to be the compression level used for the server call.
Definition: server_context.h:243
Synchronous (blocking) client-side API for doing server-streaming RPCs, where the stream of messages ...
Definition: client_context.h:80
Definition: sync_stream.h:435
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:106
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:60
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:194
bool Write(const W &msg, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:805
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:663
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:704
virtual bool Read(R *msg)=0
Block to read a message and parse to msg.
Client-side interface for streaming writes of message type W.
Definition: sync_stream.h:272
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:207
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:196
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:837
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:793
bool Write(const W &msg, grpc::WriteOptions options)
Definition: sync_stream.h:747
void WaitForInitialMetadata()
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:309
@ GRPC_CQ_PLUCK
Events are popped out by calling grpc_completion_queue_pluck() API ONLY.
Definition: grpc_types.h:433
Definition: grpc_types.h:461
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:36
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:730
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:900
bool Read(R *msg) override
Block to read a message and parse to msg.
Definition: sync_stream.h:797
virtual void SendInitialMetadata()=0
Block to send initial metadata to client.
Per-message write options.
Definition: call_op_set.h:80
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:786
A wrapper class of an application provided server streaming handler.
Definition: completion_queue.h:77
virtual ~WriterInterface()
Definition: sync_stream.h:108
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:495
static ClientReader< R > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request)
Definition: sync_stream.h:165
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:530
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:75
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:150
virtual bool Write(const W &msg, grpc::WriteOptions options)=0
Block to write msg to the stream with WriteOptions options.
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:79
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:459
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:586
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
bool Read(R *msg)
Definition: sync_stream.h:736
Synchronous (blocking) client-side API for bi-directional streaming RPCs, where the outgoing message ...
Definition: client_context.h:84
virtual ~ReaderInterface()
Definition: sync_stream.h:85
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:574
void WaitForInitialMetadata() override
Block waiting to read initial metadata from the server.
Definition: sync_stream.h:458
static ClientReaderWriter< W, R > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context)
Definition: sync_stream.h:437
Definition: call_op_set.h:771
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:118
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:104
virtual grpc::Status Finish()=0
Block waiting until the stream finishes and a final status of the call is available.
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:791
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:83
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:643
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:827
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:834
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:173
Definition: call_op_set.h:426
Descriptor of an RPC method.
Definition: rpc_method.h:29
Definition: sync_stream.h:286
void WriteLast(const W &msg, grpc::WriteOptions options)
Write msg and coalesce it with the writing of trailing metadata, using WriteOptions options.
Definition: sync_stream.h:141
virtual bool WritesDone()=0
Half close writing from the client.
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:917
virtual bool WritesDone()=0
Half close writing from the client.
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:38
bool WritesDone() override
Definition: sync_stream.h:348
void RecvMessage(R *message)
Definition: call_op_set.h:428
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:68
bool Read(R *msg) override
Definition: sync_stream.h:606