Go to the documentation of this file.
19 #ifndef GRPCPP_SUPPORT_SYNC_STREAM_H
20 #define GRPCPP_SUPPORT_SYNC_STREAM_H
30 #include "absl/log/absl_check.h"
101 virtual bool Read(R* msg) = 0;
178 class ClientReader final :
public ClientReaderInterface<R> {
188 ABSL_CHECK(!context_->initial_metadata_received_);
191 ops.RecvInitialMetadata(context_);
198 *sz = (result > 0) ? result : UINT32_MAX;
211 if (!context_->initial_metadata_received_) {
212 ops.RecvInitialMetadata(context_);
216 return cq_.Pluck(&ops) && ops.got_message;
228 if (!context_->initial_metadata_received_) {
229 ops.RecvInitialMetadata(context_);
232 ops.ClientRecvStatus(context_, &status);
234 ABSL_CHECK(cq_.Pluck(&ops));
255 call_(channel->CreateCall(method, context, &cq_)) {
260 ops.SendInitialMetadata(&context->send_initial_metadata_,
261 context->initial_metadata_flags());
263 ABSL_CHECK(ops.SendMessagePtr(&request, channel->memory_allocator()).ok());
264 ops.ClientSendClose();
301 class ClientWriter :
public ClientWriterInterface<W> {
310 ABSL_CHECK(!context_->initial_metadata_received_);
313 ops.RecvInitialMetadata(context_);
333 ops.ClientSendClose();
335 if (context_->initial_metadata_corked_) {
336 ops.SendInitialMetadata(&context_->send_initial_metadata_,
337 context_->initial_metadata_flags());
338 context_->set_initial_metadata_corked(
false);
340 if (!ops.SendMessagePtr(&msg, options, channel_->
memory_allocator()).ok()) {
345 return cq_.Pluck(&ops);
350 ops.ClientSendClose();
352 return cq_.Pluck(&ops);
363 if (!context_->initial_metadata_received_) {
364 finish_ops_.RecvInitialMetadata(context_);
366 finish_ops_.ClientRecvStatus(context_, &status);
368 ABSL_CHECK(cq_.Pluck(&finish_ops_));
389 call_(channel->CreateCall(method, context, &cq_)) {
390 finish_ops_.RecvMessage(response);
391 finish_ops_.AllowNoMessage();
393 if (!context_->initial_metadata_corked_) {
395 ops.SendInitialMetadata(&context->send_initial_metadata_,
396 context->initial_metadata_flags());
415 template <
class W,
class R>
436 template <
class W,
class R>
451 template <
class W,
class R>
452 class ClientReaderWriter final :
public ClientReaderWriterInterface<W, R> {
461 ABSL_CHECK(!context_->initial_metadata_received_);
464 ops.RecvInitialMetadata(context_);
471 *sz = (result > 0) ? result : UINT32_MAX;
483 if (!context_->initial_metadata_received_) {
484 ops.RecvInitialMetadata(context_);
486 ops.RecvMessage(msg);
488 return cq_.Pluck(&ops) && ops.got_message;
505 ops.ClientSendClose();
507 if (context_->initial_metadata_corked_) {
508 ops.SendInitialMetadata(&context_->send_initial_metadata_,
509 context_->initial_metadata_flags());
510 context_->set_initial_metadata_corked(
false);
512 if (!ops.SendMessagePtr(&msg, options, channel_->
memory_allocator()).ok()) {
517 return cq_.Pluck(&ops);
522 ops.ClientSendClose();
524 return cq_.Pluck(&ops);
536 if (!context_->initial_metadata_received_) {
537 ops.RecvInitialMetadata(context_);
540 ops.ClientRecvStatus(context_, &status);
542 ABSL_CHECK(cq_.Pluck(&ops));
565 call_(channel->CreateCall(method, context, &cq_)) {
566 if (!context_->initial_metadata_corked_) {
568 ops.SendInitialMetadata(&context->send_initial_metadata_,
569 context->initial_metadata_flags());
591 ABSL_CHECK(!ctx_->sent_initial_metadata_);
594 ops.SendInitialMetadata(&ctx_->initial_metadata_,
595 ctx_->initial_metadata_flags());
599 ctx_->sent_initial_metadata_ =
true;
601 call_->
cq()->Pluck(&ops);
606 *sz = (result > 0) ? result : UINT32_MAX;
612 ops.RecvMessage(msg);
614 bool ok = call_->
cq()->Pluck(&ops) && ops.got_message;
616 ctx_->MaybeMarkCancelledOnRead();
625 template <
class ServiceType,
class RequestType,
class ResponseType>
629 : call_(call), ctx_(ctx) {}
648 ABSL_CHECK(!ctx_->sent_initial_metadata_);
651 ops.SendInitialMetadata(&ctx_->initial_metadata_,
652 ctx_->initial_metadata_flags());
656 ctx_->sent_initial_metadata_ =
true;
658 call_->
cq()->Pluck(&ops);
672 if (!ctx_->pending_ops_
677 if (!ctx_->sent_initial_metadata_) {
678 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
679 ctx_->initial_metadata_flags());
683 ctx_->sent_initial_metadata_ =
true;
690 ctx_->has_pending_ops_ =
true;
693 ctx_->has_pending_ops_ =
false;
694 return call_->
cq()->Pluck(&ctx_->pending_ops_);
701 template <
class ServiceType,
class RequestType,
class ResponseType>
705 : call_(call), ctx_(ctx) {}
709 template <
class W,
class R>
716 template <
class W,
class R>
717 class ServerReaderWriterBody final {
720 : call_(call), ctx_(ctx) {}
723 ABSL_CHECK(!ctx_->sent_initial_metadata_);
726 ops.SendInitialMetadata(&ctx_->initial_metadata_,
727 ctx_->initial_metadata_flags());
731 ctx_->sent_initial_metadata_ =
true;
733 call_->
cq()->Pluck(&ops);
738 *sz = (result > 0) ? result : UINT32_MAX;
744 ops.RecvMessage(msg);
746 bool ok = call_->
cq()->Pluck(&ops) && ops.got_message;
748 ctx_->MaybeMarkCancelledOnRead();
757 if (!ctx_->pending_ops_
762 if (!ctx_->sent_initial_metadata_) {
763 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
764 ctx_->initial_metadata_flags());
768 ctx_->sent_initial_metadata_ =
true;
775 ctx_->has_pending_ops_ =
true;
778 ctx_->has_pending_ops_ =
false;
779 return call_->
cq()->Pluck(&ctx_->pending_ops_);
793 template <
class W,
class R>
802 return body_.NextMessageSize(sz);
805 bool Read(R* msg)
override {
return body_.Read(msg); }
814 return body_.Write(msg, options);
823 : body_(call, ctx) {}
834 template <
class RequestType,
class ResponseType>
859 bool Read(RequestType* request)
override {
864 return body_.
Read(request);
875 bool Write(
const ResponseType& response,
877 if (write_done_ || !read_done_) {
881 return body_.
Write(response, options);
892 : body_(call, ctx), read_done_(false), write_done_(false) {}
900 template <
class RequestType,
class ResponseType>
925 bool Read(RequestType* request)
override {
930 return body_.
Read(request);
941 bool Write(
const ResponseType& response,
943 return read_done_ && body_.
Write(response, options);
953 : body_(call, ctx), read_done_(false) {}
958 #endif // GRPCPP_SUPPORT_SYNC_STREAM_H
bool Write(const ResponseType &response, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:875
grpc::CompletionQueue * cq() const
Definition: call.h:71
ServerReaderWriterBody(grpc::internal::Call *call, grpc::ServerContext *ctx)
Definition: sync_stream.h:719
Server-side interface for streaming writes of message of type W.
Definition: sync_stream.h:634
void SendInitialMetadata()
Definition: sync_stream.h:722
Definition: call_op_set.h:623
Definition: call_op_set.h:530
bool WritesDone() override
Definition: sync_stream.h:520
Synchronous (blocking) client-side API for doing client-streaming RPCs, where the outgoing message st...
Definition: client_context.h:82
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:586
void WaitForInitialMetadata() override
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:187
int max_receive_message_size() const
Definition: call.h:73
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
static ClientWriter< W > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, R *response)
Definition: sync_stream.h:289
Definition: call_op_set.h:289
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:479
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:158
bool Write(const W &msg)
Block to write msg to the stream with default write options.
Definition: sync_stream.h:125
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:62
bool Write(const ResponseType &response, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:941
Definition: sync_stream.h:162
Straightforward wrapping of the C call object.
Definition: call.h:36
virtual bool NextMessageSize(uint32_t *sz)=0
Get an upper bound on the next message size available for reading on this stream.
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:224
Client-side interface for bi-directional streaming with client-to-server stream messages of type W an...
Definition: sync_stream.h:416
grpc_event_engine::experimental::MemoryAllocator * memory_allocator()
Definition: server_context.h:300
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:325
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:361
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:911
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:901
Definition: completion_queue.h:65
virtual ~ServerStreamingInterface()
Definition: sync_stream.h:70
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:469
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:604
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:859
Did it work? If it didn't, why?
Definition: status.h:34
@ GRPC_CQ_DEFAULT_POLLING
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:413
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void set_compression_level(grpc_compression_level level)
Set level to be the compression level used for the server call.
Definition: server_context.h:243
Synchronous (blocking) client-side API for doing server-streaming RPCs, where the stream of messages ...
Definition: client_context.h:80
Definition: sync_stream.h:437
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:236
An interface that can be fed a sequence of messages of type W.
Definition: sync_stream.h:106
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:60
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:194
bool Write(const W &msg, grpc::WriteOptions options) override
Block to write msg to the stream with WriteOptions options.
Definition: sync_stream.h:813
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:667
Server-side interface for bi-directional streaming.
Definition: sync_stream.h:710
virtual bool Read(R *msg)=0
Block to read a message and parse to msg.
Client-side interface for streaming writes of message type W.
Definition: sync_stream.h:272
bool Read(R *msg) override
See the ReaderInterface.Read method for semantics.
Definition: sync_stream.h:207
Codegen interface for grpc::Channel.
Definition: channel_interface.h:72
bool NextMessageSize(uint32_t *sz) override
Definition: sync_stream.h:196
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the request message size from the client.
Definition: sync_stream.h:845
bool NextMessageSize(uint32_t *sz) override
Get an upper bound on the next message size available for reading on this stream.
Definition: sync_stream.h:801
bool Write(const W &msg, grpc::WriteOptions options)
Definition: sync_stream.h:753
void WaitForInitialMetadata()
See the ClientStreamingInterface.WaitForInitialMetadata method for semantics.
Definition: sync_stream.h:309
@ GRPC_CQ_PLUCK
Events are popped out by calling grpc_completion_queue_pluck() API ONLY.
Definition: grpc_types.h:433
Definition: grpc_types.h:461
Common interface for all synchronous client side streaming.
Definition: sync_stream.h:36
bool NextMessageSize(uint32_t *sz)
Definition: sync_stream.h:736
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:908
bool Read(R *msg) override
Block to read a message and parse to msg.
Definition: sync_stream.h:805
virtual void SendInitialMetadata()=0
Block to send initial metadata to client.
Per-message write options.
Definition: call_op_set.h:81
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:794
A wrapper class of an application provided server streaming handler.
Definition: completion_queue.h:77
virtual ~WriterInterface()
Definition: sync_stream.h:108
bool Write(const W &msg, grpc::WriteOptions options) override
Definition: sync_stream.h:497
static ClientReader< R > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context, const W &request)
Definition: sync_stream.h:165
grpc::Status Finish() override
See the ClientStreamingInterface.Finish method for semantics.
Definition: sync_stream.h:532
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:75
Client-side interface for streaming reads of message of type R.
Definition: sync_stream.h:150
virtual bool Write(const W &msg, grpc::WriteOptions options)=0
Block to write msg to the stream with WriteOptions options.
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:79
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:459
virtual void WaitForInitialMetadata()=0
Block to wait for initial metadata from server.
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:590
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:66
bool Read(R *msg)
Definition: sync_stream.h:742
Synchronous (blocking) client-side API for bi-directional streaming RPCs, where the outgoing message ...
Definition: client_context.h:84
virtual ~ReaderInterface()
Definition: sync_stream.h:85
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:251
Server-side interface for streaming reads of message of type R.
Definition: sync_stream.h:578
void WaitForInitialMetadata() override
Block waiting to read initial metadata from the server.
Definition: sync_stream.h:460
static ClientReaderWriter< W, R > * Create(grpc::ChannelInterface *channel, const grpc::internal::RpcMethod &method, grpc::ClientContext *context)
Definition: sync_stream.h:439
Definition: call_op_set.h:773
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:119
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:104
virtual grpc::Status Finish()=0
Block waiting until the stream finishes and a final status of the call is available.
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:799
An interface that yields a sequence of messages of type R.
Definition: sync_stream.h:83
void SendInitialMetadata() override
See the ServerStreamingInterface.SendInitialMetadata method for semantics.
Definition: sync_stream.h:647
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:835
void SendInitialMetadata() override
Block to send initial metadata to client.
Definition: sync_stream.h:842
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:174
Definition: call_op_set.h:429
Descriptor of an RPC method.
Definition: rpc_method.h:29
Definition: sync_stream.h:286
void WriteLast(const W &msg, grpc::WriteOptions options)
Write msg and coalesce it with the writing of trailing metadata, using WriteOptions options.
Definition: sync_stream.h:141
virtual bool WritesDone()=0
Half close writing from the client.
bool Read(RequestType *request) override
Read a message of type R into msg.
Definition: sync_stream.h:925
virtual bool WritesDone()=0
Half close writing from the client.
virtual ~ClientStreamingInterface()
Definition: sync_stream.h:38
bool WritesDone() override
Definition: sync_stream.h:348
virtual grpc_event_engine::experimental::MemoryAllocator * memory_allocator() const
Definition: channel_interface.h:106
void RecvMessage(R *message)
Definition: call_op_set.h:431
Common interface for all synchronous server side streaming.
Definition: sync_stream.h:68
bool Read(R *msg) override
Definition: sync_stream.h:610