GRPC C++  1.66.0
proto_buffer_reader.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
20 #define GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
21 
22 #include <type_traits>
23 
24 #include "absl/log/absl_check.h"
25 #include "absl/strings/cord.h"
26 
27 #include <grpc/byte_buffer.h>
29 #include <grpc/impl/grpc_types.h>
30 #include <grpc/slice.h>
31 #include <grpc/support/log.h>
35 #include <grpcpp/support/status.h>
36 
39 
40 namespace grpc {
41 
49  public:
52  explicit ProtoBufferReader(ByteBuffer* buffer)
53  : byte_count_(0), backup_count_(0), status_() {
56  if (!buffer->Valid() ||
57  !grpc_byte_buffer_reader_init(&reader_, buffer->c_buffer())) {
58  status_ = Status(StatusCode::INTERNAL,
59  "Couldn't initialize byte buffer reader");
60  }
61  }
62 
63  ~ProtoBufferReader() override {
64  if (status_.ok()) {
66  }
67  }
68 
71  bool Next(const void** data, int* size) override {
72  if (!status_.ok()) {
73  return false;
74  }
76  if (backup_count_ > 0) {
77  *data = GRPC_SLICE_START_PTR(*slice_) + GRPC_SLICE_LENGTH(*slice_) -
78  backup_count_;
79  ABSL_CHECK_LE(backup_count_, INT_MAX);
80  *size = static_cast<int>(backup_count_);
81  backup_count_ = 0;
82  return true;
83  }
85  if (!grpc_byte_buffer_reader_peek(&reader_, &slice_)) {
86  return false;
87  }
88  *data = GRPC_SLICE_START_PTR(*slice_);
89  // On win x64, int is only 32bit
90  ABSL_CHECK_LE(GRPC_SLICE_LENGTH(*slice_), static_cast<size_t>(INT_MAX));
91  byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(*slice_));
92  return true;
93  }
94 
96  Status status() const { return status_; }
97 
101  void BackUp(int count) override {
102  ABSL_CHECK_LE(count, static_cast<int>(GRPC_SLICE_LENGTH(*slice_)));
103  backup_count_ = count;
104  }
105 
108  bool Skip(int count) override {
109  const void* data;
110  int size;
111  while (Next(&data, &size)) {
112  if (size >= count) {
113  BackUp(size - count);
114  return true;
115  }
116  // size < count;
117  count -= size;
118  }
119  // error or we have too large count;
120  return false;
121  }
122 
124  int64_t ByteCount() const override { return byte_count_ - backup_count_; }
125 
126 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
127  // (override is conditionally omitted here to support old Protobuf which
129  // doesn't have ReadCord method)
130  // NOLINTBEGIN(modernize-use-override,
131  // clang-diagnostic-inconsistent-missing-override)
132  virtual bool ReadCord(absl::Cord* cord, int count)
133 #if GOOGLE_PROTOBUF_VERSION >= 4022000
134  override
135 #endif
136  // NOLINTEND(modernize-use-override,
137  // clang-diagnostic-inconsistent-missing-override)
138  {
139  if (!status().ok()) {
140  return false;
141  }
142  // check for backed up data
143  if (backup_count() > 0) {
144  if (backup_count() <= count) {
145  cord->Append(MakeCordFromSlice(grpc_slice_split_tail(
147  } else {
148  cord->Append(MakeCordFromSlice(grpc_slice_sub(
150  GRPC_SLICE_LENGTH(*slice()) - backup_count() + count)));
151  }
152  int64_t take = (std::min)(backup_count(), static_cast<int64_t>(count));
153  set_backup_count(backup_count() - take);
154  // This cast is safe as the size of a serialized protobuf message
155  // should be smaller than 2GiB.
156  // (https://protobuf.dev/programming-guides/encoding/#size-limit)
157  count -= static_cast<int>(take);
158  if (count == 0) {
159  return true;
160  }
161  }
162  while (count > 0) {
164  return false;
165  }
166  uint64_t slice_length = GRPC_SLICE_LENGTH(*slice());
167  set_byte_count(ByteCount() + slice_length);
168  if (slice_length <= static_cast<uint64_t>(count)) {
169  cord->Append(MakeCordFromSlice(grpc_slice_ref(*slice())));
170  // This cast is safe as above.
171  count -= static_cast<int>(slice_length);
172  } else {
173  cord->Append(MakeCordFromSlice(grpc_slice_split_head(slice(), count)));
174  set_backup_count(slice_length - count);
175  return true;
176  }
177  }
178  ABSL_CHECK_EQ(count, 0);
179  return true;
180  }
181 #endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
182 
183  // These protected members are needed to support internal optimizations.
184  // they expose internal bits of grpc core that are NOT stable. If you have
185  // a use case needs to use one of these functions, please send an email to
186  // https://groups.google.com/forum/#!forum/grpc-io.
187  protected:
188  void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
189  int64_t backup_count() { return backup_count_; }
190  void set_backup_count(int64_t backup_count) { backup_count_ = backup_count; }
191  grpc_byte_buffer_reader* reader() { return &reader_; }
192  grpc_slice* slice() { return slice_; }
193  grpc_slice** mutable_slice_ptr() { return &slice_; }
194 
195  private:
196 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
197  // This function takes ownership of slice and return a newly created Cord off
198  // of it.
199  static absl::Cord MakeCordFromSlice(grpc_slice slice) {
200  // slice_for_cord is created to keep inlined data of the given slice
201  grpc_slice* slice_for_cord = new grpc_slice;
202  *slice_for_cord = slice;
203  return absl::MakeCordFromExternal(
204  absl::string_view(
205  reinterpret_cast<char*>(GRPC_SLICE_START_PTR(*slice_for_cord)),
206  GRPC_SLICE_LENGTH(*slice_for_cord)),
207  [slice_for_cord](absl::string_view /* view */) {
208  grpc_slice_unref(*slice_for_cord);
209  delete slice_for_cord;
210  });
211  }
212 #endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
213 
214  int64_t byte_count_;
215  int64_t backup_count_;
216  grpc_byte_buffer_reader reader_;
217  grpc_slice* slice_;
219  Status status_;
220 };
221 
222 } // namespace grpc
223 
224 #endif // GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: slice_type.h:99
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::ProtoBufferReader::~ProtoBufferReader
~ProtoBufferReader() override
Definition: proto_buffer_reader.h:63
grpc::ProtoBufferReader::ProtoBufferReader
ProtoBufferReader(ByteBuffer *buffer)
Constructs buffer reader from buffer.
Definition: proto_buffer_reader.h:52
grpc::protobuf::io::ZeroCopyInputStream
::google::protobuf::io::ZeroCopyInputStream ZeroCopyInputStream
Definition: config_protobuf.h:114
status.h
grpc::ProtoBufferReader::backup_count
int64_t backup_count()
Definition: proto_buffer_reader.h:189
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: status.h:125
grpc::ProtoBufferReader::BackUp
void BackUp(int count) override
The proto library calls this to indicate that we should back up count bytes that have already been re...
Definition: proto_buffer_reader.h:101
grpc_slice
struct grpc_slice grpc_slice
Definition: slice_type.h:28
byte_buffer_reader.h
grpc_types.h
grpc_slice_ref
GPRAPI grpc_slice grpc_slice_ref(grpc_slice s)
Increment the refcount of s.
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:34
grpc::ProtoBufferReader::reader
grpc_byte_buffer_reader * reader()
Definition: proto_buffer_reader.h:191
log.h
grpc::ProtoBufferReader::ByteCount
int64_t ByteCount() const override
Returns the total number of bytes read since this object was created.
Definition: proto_buffer_reader.h:124
grpc::ProtoBufferReader::Skip
bool Skip(int count) override
The proto library calls this to skip over count bytes.
Definition: proto_buffer_reader.h:108
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc::ProtoBufferReader
This is a specialization of the protobuf class ZeroCopyInputStream The principle is to get one chunk ...
Definition: proto_buffer_reader.h:48
grpc_slice
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice_type.h:63
grpc::ProtoBufferReader::set_byte_count
void set_byte_count(int64_t byte_count)
Definition: proto_buffer_reader.h:188
grpc_byte_buffer_reader_destroy
GRPCAPI void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader)
Cleanup and destroy reader.
grpc::ProtoBufferReader::mutable_slice_ptr
grpc_slice ** mutable_slice_ptr()
Definition: proto_buffer_reader.h:193
grpc_slice_sub
GPRAPI grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end)
Return a result slice derived from s, which shares a ref count with s, where result....
grpc_slice_split_head
GPRAPI grpc_slice grpc_slice_split_head(grpc_slice *s, size_t split)
Splits s into two: modifies s to be s[split:s.length], and returns a new slice, sharing a refcount wi...
byte_buffer.h
grpc::ProtoBufferReader::status
Status status() const
Returns the status of the buffer reader.
Definition: proto_buffer_reader.h:96
serialization_traits.h
config_protobuf.h
slice.h
grpc_slice_split_tail
GPRAPI grpc_slice grpc_slice_split_tail(grpc_slice *s, size_t split)
Splits s into two: modifies s to be s[0:split], and returns a new slice, sharing a refcount with s,...
byte_buffer.h
grpc_byte_buffer_reader_peek
GRPCAPI int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader *reader, grpc_slice **slice)
EXPERIMENTAL API - This function may be removed and changed, in the future.
grpc_byte_buffer_reader_init
GRPCAPI int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer)
Initialize reader to read over buffer.
grpc_slice_unref
GPRAPI void grpc_slice_unref(grpc_slice s)
Decrement the ref count of s.
grpc::ByteBuffer::Valid
bool Valid() const
Is this ByteBuffer valid?
Definition: byte_buffer.h:159
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: slice_type.h:102
grpc::protobuf::util::Status
::absl::Status Status
Definition: config_protobuf.h:106
grpc::ProtoBufferReader::slice
grpc_slice * slice()
Definition: proto_buffer_reader.h:192
grpc_byte_buffer_reader
Definition: byte_buffer_reader.h:30
grpc::ProtoBufferReader::Next
bool Next(const void **data, int *size) override
Give the proto library a chunk of data from the stream.
Definition: proto_buffer_reader.h:71
grpc::ProtoBufferReader::set_backup_count
void set_backup_count(int64_t backup_count)
Definition: proto_buffer_reader.h:190
grpc::INTERNAL
@ INTERNAL
Internal errors.
Definition: status_code_enum.h:121