GRPC C++  1.62.0
proto_buffer_reader.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
20 #define GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
21 
22 #include <type_traits>
23 
24 #include "absl/strings/cord.h"
25 
26 #include <grpc/byte_buffer.h>
28 #include <grpc/impl/grpc_types.h>
29 #include <grpc/slice.h>
30 #include <grpc/support/log.h>
34 #include <grpcpp/support/status.h>
35 
38 
39 namespace grpc {
40 
48  public:
51  explicit ProtoBufferReader(ByteBuffer* buffer)
52  : byte_count_(0), backup_count_(0), status_() {
55  if (!buffer->Valid() ||
56  !grpc_byte_buffer_reader_init(&reader_, buffer->c_buffer())) {
57  status_ = Status(StatusCode::INTERNAL,
58  "Couldn't initialize byte buffer reader");
59  }
60  }
61 
62  ~ProtoBufferReader() override {
63  if (status_.ok()) {
65  }
66  }
67 
70  bool Next(const void** data, int* size) override {
71  if (!status_.ok()) {
72  return false;
73  }
75  if (backup_count_ > 0) {
76  *data = GRPC_SLICE_START_PTR(*slice_) + GRPC_SLICE_LENGTH(*slice_) -
77  backup_count_;
78  GPR_ASSERT(backup_count_ <= INT_MAX);
79  *size = static_cast<int>(backup_count_);
80  backup_count_ = 0;
81  return true;
82  }
84  if (!grpc_byte_buffer_reader_peek(&reader_, &slice_)) {
85  return false;
86  }
87  *data = GRPC_SLICE_START_PTR(*slice_);
88  // On win x64, int is only 32bit
89  GPR_ASSERT(GRPC_SLICE_LENGTH(*slice_) <= INT_MAX);
90  byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(*slice_));
91  return true;
92  }
93 
95  Status status() const { return status_; }
96 
100  void BackUp(int count) override {
101  GPR_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(*slice_)));
102  backup_count_ = count;
103  }
104 
107  bool Skip(int count) override {
108  const void* data;
109  int size;
110  while (Next(&data, &size)) {
111  if (size >= count) {
112  BackUp(size - count);
113  return true;
114  }
115  // size < count;
116  count -= size;
117  }
118  // error or we have too large count;
119  return false;
120  }
121 
123  int64_t ByteCount() const override { return byte_count_ - backup_count_; }
124 
125 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
126  // (override is conditionally omitted here to support old Protobuf which
128  // doesn't have ReadCord method)
129  // NOLINTBEGIN(modernize-use-override,
130  // clang-diagnostic-inconsistent-missing-override)
131  virtual bool ReadCord(absl::Cord* cord, int count)
132 #if GOOGLE_PROTOBUF_VERSION >= 4022000
133  override
134 #endif
135  // NOLINTEND(modernize-use-override,
136  // clang-diagnostic-inconsistent-missing-override)
137  {
138  if (!status().ok()) {
139  return false;
140  }
141  // check for backed up data
142  if (backup_count() > 0) {
143  if (backup_count() <= count) {
144  cord->Append(MakeCordFromSlice(grpc_slice_split_tail(
146  } else {
147  cord->Append(MakeCordFromSlice(grpc_slice_sub(
149  GRPC_SLICE_LENGTH(*slice()) - backup_count() + count)));
150  }
151  int64_t take = (std::min)(backup_count(), static_cast<int64_t>(count));
152  set_backup_count(backup_count() - take);
153  // This cast is safe as the size of a serialized protobuf message
154  // should be smaller than 2GiB.
155  // (https://protobuf.dev/programming-guides/encoding/#size-limit)
156  count -= static_cast<int>(take);
157  if (count == 0) {
158  return true;
159  }
160  }
161  while (count > 0) {
163  return false;
164  }
165  uint64_t slice_length = GRPC_SLICE_LENGTH(*slice());
166  set_byte_count(ByteCount() + slice_length);
167  if (slice_length <= static_cast<uint64_t>(count)) {
168  cord->Append(MakeCordFromSlice(grpc_slice_ref(*slice())));
169  // This cast is safe as above.
170  count -= static_cast<int>(slice_length);
171  } else {
172  cord->Append(MakeCordFromSlice(grpc_slice_split_head(slice(), count)));
173  set_backup_count(slice_length - count);
174  return true;
175  }
176  }
177  GPR_ASSERT(count == 0);
178  return true;
179  }
180 #endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
181 
182  // These protected members are needed to support internal optimizations.
183  // they expose internal bits of grpc core that are NOT stable. If you have
184  // a use case needs to use one of these functions, please send an email to
185  // https://groups.google.com/forum/#!forum/grpc-io.
186  protected:
187  void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
188  int64_t backup_count() { return backup_count_; }
189  void set_backup_count(int64_t backup_count) { backup_count_ = backup_count; }
190  grpc_byte_buffer_reader* reader() { return &reader_; }
191  grpc_slice* slice() { return slice_; }
192  grpc_slice** mutable_slice_ptr() { return &slice_; }
193 
194  private:
195 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
196  // This function takes ownership of slice and return a newly created Cord off
197  // of it.
198  static absl::Cord MakeCordFromSlice(grpc_slice slice) {
199  // slice_for_cord is created to keep inlined data of the given slice
200  grpc_slice* slice_for_cord = new grpc_slice;
201  *slice_for_cord = slice;
202  return absl::MakeCordFromExternal(
203  absl::string_view(
204  reinterpret_cast<char*>(GRPC_SLICE_START_PTR(*slice_for_cord)),
205  GRPC_SLICE_LENGTH(*slice_for_cord)),
206  [slice_for_cord](absl::string_view /* view */) {
207  grpc_slice_unref(*slice_for_cord);
208  delete slice_for_cord;
209  });
210  }
211 #endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
212 
213  int64_t byte_count_;
214  int64_t backup_count_;
215  grpc_byte_buffer_reader reader_;
216  grpc_slice* slice_;
218  Status status_;
219 };
220 
221 } // namespace grpc
222 
223 #endif // GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: slice_type.h:99
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::ProtoBufferReader::~ProtoBufferReader
~ProtoBufferReader() override
Definition: proto_buffer_reader.h:62
grpc::ProtoBufferReader::ProtoBufferReader
ProtoBufferReader(ByteBuffer *buffer)
Constructs buffer reader from buffer.
Definition: proto_buffer_reader.h:51
GPR_ASSERT
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:95
grpc::protobuf::io::ZeroCopyInputStream
::google::protobuf::io::ZeroCopyInputStream ZeroCopyInputStream
Definition: config_protobuf.h:105
status.h
grpc::ProtoBufferReader::backup_count
int64_t backup_count()
Definition: proto_buffer_reader.h:188
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:126
grpc::ProtoBufferReader::BackUp
void BackUp(int count) override
The proto library calls this to indicate that we should back up count bytes that have already been re...
Definition: proto_buffer_reader.h:100
grpc_slice
struct grpc_slice grpc_slice
Definition: slice_type.h:28
byte_buffer_reader.h
grpc_types.h
grpc_slice_ref
GPRAPI grpc_slice grpc_slice_ref(grpc_slice s)
Increment the refcount of s.
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:35
grpc::ProtoBufferReader::reader
grpc_byte_buffer_reader * reader()
Definition: proto_buffer_reader.h:190
log.h
grpc::ProtoBufferReader::ByteCount
int64_t ByteCount() const override
Returns the total number of bytes read since this object was created.
Definition: proto_buffer_reader.h:123
grpc::ProtoBufferReader::Skip
bool Skip(int count) override
The proto library calls this to skip over count bytes.
Definition: proto_buffer_reader.h:107
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc::ProtoBufferReader
This is a specialization of the protobuf class ZeroCopyInputStream The principle is to get one chunk ...
Definition: proto_buffer_reader.h:47
grpc_slice
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice_type.h:63
grpc::ProtoBufferReader::set_byte_count
void set_byte_count(int64_t byte_count)
Definition: proto_buffer_reader.h:187
grpc_byte_buffer_reader_destroy
GRPCAPI void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader)
Cleanup and destroy reader.
grpc::ProtoBufferReader::mutable_slice_ptr
grpc_slice ** mutable_slice_ptr()
Definition: proto_buffer_reader.h:192
grpc_slice_sub
GPRAPI grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end)
Return a result slice derived from s, which shares a ref count with s, where result....
grpc_slice_split_head
GPRAPI grpc_slice grpc_slice_split_head(grpc_slice *s, size_t split)
Splits s into two: modifies s to be s[split:s.length], and returns a new slice, sharing a refcount wi...
byte_buffer.h
grpc::ProtoBufferReader::status
Status status() const
Returns the status of the buffer reader.
Definition: proto_buffer_reader.h:95
serialization_traits.h
config_protobuf.h
slice.h
grpc_slice_split_tail
GPRAPI grpc_slice grpc_slice_split_tail(grpc_slice *s, size_t split)
Splits s into two: modifies s to be s[0:split], and returns a new slice, sharing a refcount with s,...
byte_buffer.h
grpc_byte_buffer_reader_peek
GRPCAPI int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader *reader, grpc_slice **slice)
EXPERIMENTAL API - This function may be removed and changed, in the future.
grpc_byte_buffer_reader_init
GRPCAPI int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer)
Initialize reader to read over buffer.
grpc_slice_unref
GPRAPI void grpc_slice_unref(grpc_slice s)
Decrement the ref count of s.
grpc::ByteBuffer::Valid
bool Valid() const
Is this ByteBuffer valid?
Definition: byte_buffer.h:159
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: slice_type.h:102
grpc::protobuf::util::Status
::absl::Status Status
Definition: config_protobuf.h:97
grpc::ProtoBufferReader::slice
grpc_slice * slice()
Definition: proto_buffer_reader.h:191
grpc_byte_buffer_reader
Definition: byte_buffer_reader.h:30
grpc::ProtoBufferReader::Next
bool Next(const void **data, int *size) override
Give the proto library a chunk of data from the stream.
Definition: proto_buffer_reader.h:70
grpc::ProtoBufferReader::set_backup_count
void set_backup_count(int64_t backup_count)
Definition: proto_buffer_reader.h:189
grpc::INTERNAL
@ INTERNAL
Internal errors.
Definition: status_code_enum.h:121