GRPC C++  1.66.0
proto_buffer_writer.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
20 #define GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
21 
22 #include <type_traits>
23 
24 #include "absl/log/absl_check.h"
25 #include "absl/strings/cord.h"
26 
27 #include <grpc/byte_buffer.h>
28 #include <grpc/impl/grpc_types.h>
29 #include <grpc/slice.h>
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/log.h>
35 #include <grpcpp/support/status.h>
36 
39 
40 namespace grpc {
41 
42 // Forward declaration for testing use only
43 namespace internal {
44 class ProtoBufferWriterPeer;
45 } // namespace internal
46 
47 const int kProtoBufferWriterMaxBufferLength = 1024 * 1024;
48 
57  public:
63  ProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size)
64  : block_size_(block_size),
65  total_size_(total_size),
66  byte_count_(0),
67  have_backup_(false) {
68  ABSL_CHECK(!byte_buffer->Valid());
71  byte_buffer->set_buffer(bp);
72  slice_buffer_ = &bp->data.raw.slice_buffer;
73  }
74 
75  ~ProtoBufferWriter() override {
76  if (have_backup_) {
77  grpc_slice_unref(backup_slice_);
78  }
79  }
80 
83  bool Next(void** data, int* size) override {
84  // Protobuf should not ask for more memory than total_size_.
85  ABSL_CHECK_LT(byte_count_, total_size_);
86  // 1. Use the remaining backup slice if we have one
87  // 2. Otherwise allocate a slice, up to the remaining length needed
88  // or our maximum allocation size
89  // 3. Provide the slice start and size available
90  // 4. Add the slice being returned to the slice buffer
91  size_t remain = static_cast<size_t>(total_size_ - byte_count_);
92  if (have_backup_) {
94  slice_ = backup_slice_;
95  have_backup_ = false;
96  if (GRPC_SLICE_LENGTH(slice_) > remain) {
97  GRPC_SLICE_SET_LENGTH(slice_, remain);
98  }
99  } else {
100  // When less than a whole block is needed, only allocate that much.
101  // But make sure the allocated slice is not inlined.
102  size_t allocate_length =
103  remain > static_cast<size_t>(block_size_) ? block_size_ : remain;
104  slice_ = grpc_slice_malloc(allocate_length > GRPC_SLICE_INLINED_SIZE
105  ? allocate_length
107  }
108  *data = GRPC_SLICE_START_PTR(slice_);
109  // On win x64, int is only 32bit
110  ABSL_CHECK(GRPC_SLICE_LENGTH(slice_) <= static_cast<size_t>(INT_MAX));
111  byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_));
112  // Using grpc_slice_buffer_add could modify slice_ and merge it with the
113  // previous slice. Therefore, use grpc_slice_buffer_add_indexed method to
114  // ensure the slice gets added at a separate index. It can then be kept
115  // around and popped later in the BackUp function.
116  grpc_slice_buffer_add_indexed(slice_buffer_, slice_);
117  return true;
118  }
119 
123  void BackUp(int count) override {
124  // count == 0 is invoked by ZeroCopyOutputStream users indicating that any
125  // potential buffer obtained through a previous call to Next() is final.
126  // ZeroCopyOutputStream implementations such as streaming output can use
127  // these calls to flush any temporary buffer and flush the output. The logic
128  // below is not robust against count == 0 invocations, so directly return.
129  if (count == 0) return;
130 
135  ABSL_CHECK_LE(count, static_cast<int>(GRPC_SLICE_LENGTH(slice_)));
136  grpc_slice_buffer_pop(slice_buffer_);
137  if (static_cast<size_t>(count) == GRPC_SLICE_LENGTH(slice_)) {
138  backup_slice_ = slice_;
139  } else {
140  backup_slice_ =
141  grpc_slice_split_tail(&slice_, GRPC_SLICE_LENGTH(slice_) - count);
142  grpc_slice_buffer_add(slice_buffer_, slice_);
143  }
144  // It's dangerous to keep an inlined grpc_slice as the backup slice, since
145  // on a following Next() call, a reference will be returned to this slice
146  // via GRPC_SLICE_START_PTR, which will not be an address held by
147  // slice_buffer_.
148  have_backup_ = backup_slice_.refcount != nullptr;
149  byte_count_ -= count;
150  }
151 
153  int64_t ByteCount() const override { return byte_count_; }
154 
155 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
156  // (override is conditionally omitted here to support old Protobuf which
159  // doesn't have ReadCord method)
160  // NOLINTBEGIN(modernize-use-override,
161  // clang-diagnostic-inconsistent-missing-override)
162  virtual bool WriteCord(const absl::Cord& cord)
163 #if GOOGLE_PROTOBUF_VERSION >= 4022000
164  override
165 #endif
166  // NOLINTEND(modernize-use-override,
167  // clang-diagnostic-inconsistent-missing-override)
168  {
169  grpc_slice_buffer* buffer = slice_buffer();
170  size_t cur = 0;
171  for (absl::string_view chunk : cord.Chunks()) {
172  // TODO(veblush): Revisit this 512 threadhold which could be smaller.
173  if (chunk.size() < 512) {
174  // If chunk is small enough, just copy it.
175  grpc_slice slice =
176  grpc_slice_from_copied_buffer(chunk.data(), chunk.size());
177  grpc_slice_buffer_add(buffer, slice);
178  } else {
179  // If chunk is large, just use the pointer instead of copying.
180  // To make sure it's alive while being used, a subcord for chunk is
181  // created and attached to a grpc_slice instance.
182  absl::Cord* subcord = new absl::Cord(cord.Subcord(cur, chunk.size()));
184  const_cast<uint8_t*>(
185  reinterpret_cast<const uint8_t*>(chunk.data())),
186  chunk.size(), [](void* p) { delete static_cast<absl::Cord*>(p); },
187  subcord);
188  grpc_slice_buffer_add(buffer, slice);
189  }
190  cur += chunk.size();
191  }
192  set_byte_count(ByteCount() + cur);
193  return true;
194  }
195 #endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
196 
197  // These protected members are needed to support internal optimizations.
198  // they expose internal bits of grpc core that are NOT stable. If you have
199  // a use case needs to use one of these functions, please send an email to
200  // https://groups.google.com/forum/#!forum/grpc-io.
201  protected:
202  grpc_slice_buffer* slice_buffer() { return slice_buffer_; }
203  void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
204 
205  private:
206  // friend for testing purposes only
208  const int block_size_;
209  const int total_size_;
210  int64_t byte_count_;
212  slice_buffer_;
213  bool have_backup_;
214  grpc_slice backup_slice_;
215  grpc_slice slice_;
217 };
218 
219 } // namespace grpc
220 
221 #endif // GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
grpc_slice_new_with_user_data
GPRAPI grpc_slice grpc_slice_new_with_user_data(void *p, size_t len, void(*destroy)(void *), void *user_data)
Equivalent to grpc_slice_new, but with a separate pointer that is passed to the destroy function.
grpc::ProtoBufferWriter
This is a specialization of the protobuf class ZeroCopyOutputStream.
Definition: proto_buffer_writer.h:56
grpc_slice::refcount
struct grpc_slice_refcount * refcount
Definition: slice_type.h:64
grpc::ProtoBufferWriter::Next
bool Next(void **data, int *size) override
Give the proto library the next buffer of bytes and its size.
Definition: proto_buffer_writer.h:83
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: slice_type.h:99
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::protobuf::io::ZeroCopyOutputStream
::google::protobuf::io::ZeroCopyOutputStream ZeroCopyOutputStream
Definition: config_protobuf.h:113
grpc_byte_buffer::grpc_byte_buffer_data::raw
struct grpc_byte_buffer::grpc_byte_buffer_data::grpc_compressed_buffer raw
status.h
grpc_slice_malloc
GPRAPI grpc_slice grpc_slice_malloc(size_t length)
Equivalent to grpc_slice_new(malloc(len), len, free), but saves one malloc() call.
grpc_slice_buffer_add_indexed
GPRAPI size_t grpc_slice_buffer_add_indexed(grpc_slice_buffer *sb, grpc_slice slice)
add an element to a slice buffer - takes ownership of the slice and returns the index of the slice.
grpc_types.h
grpc_slice_from_copied_buffer
GPRAPI grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len)
Create a slice by copying a buffer.
grpc::ProtoBufferWriter::ProtoBufferWriter
ProtoBufferWriter(ByteBuffer *byte_buffer, int block_size, int total_size)
Constructor for this derived class.
Definition: proto_buffer_writer.h:63
log.h
grpc_byte_buffer
Definition: grpc_types.h:42
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc_slice_buffer_pop
GPRAPI void grpc_slice_buffer_pop(grpc_slice_buffer *sb)
pop the last buffer, but don't unref it
grpc_slice
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice_type.h:63
grpc_slice_buffer_add
GPRAPI void grpc_slice_buffer_add(grpc_slice_buffer *sb, grpc_slice slice)
Add an element to a slice buffer - takes ownership of the slice.
grpc::kProtoBufferWriterMaxBufferLength
const int kProtoBufferWriterMaxBufferLength
Definition: proto_buffer_writer.h:47
slice_buffer.h
grpc::ProtoBufferWriter::slice_buffer
grpc_slice_buffer * slice_buffer()
Definition: proto_buffer_writer.h:202
grpc_byte_buffer::data
union grpc_byte_buffer::grpc_byte_buffer_data data
byte_buffer.h
serialization_traits.h
config_protobuf.h
slice.h
grpc::ProtoBufferWriter::ByteCount
int64_t ByteCount() const override
Returns the total number of bytes written since this object was created.
Definition: proto_buffer_writer.h:153
grpc::ProtoBufferWriter::~ProtoBufferWriter
~ProtoBufferWriter() override
Definition: proto_buffer_writer.h:75
grpc_slice_split_tail
GPRAPI grpc_slice grpc_slice_split_tail(grpc_slice *s, size_t split)
Splits s into two: modifies s to be s[0:split], and returns a new slice, sharing a refcount with s,...
byte_buffer.h
GRPC_SLICE_SET_LENGTH
#define GRPC_SLICE_SET_LENGTH(slice, newlen)
Definition: slice_type.h:105
grpc_byte_buffer::grpc_byte_buffer_data::grpc_compressed_buffer::slice_buffer
grpc_slice_buffer slice_buffer
Definition: grpc_types.h:51
grpc_slice_unref
GPRAPI void grpc_slice_unref(grpc_slice s)
Decrement the ref count of s.
grpc::ByteBuffer::Valid
bool Valid() const
Is this ByteBuffer valid?
Definition: byte_buffer.h:159
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: slice_type.h:102
grpc_slice_buffer
Represents an expandable array of slices, to be interpreted as a single item.
Definition: slice_type.h:81
grpc::ProtoBufferWriter::BackUp
void BackUp(int count) override
Backup by count bytes because Next returned more bytes than needed (only used in the last buffer).
Definition: proto_buffer_writer.h:123
grpc::ProtoBufferWriter::set_byte_count
void set_byte_count(int64_t byte_count)
Definition: proto_buffer_writer.h:203
grpc::ProtoBufferWriter::ProtoBufferWriterPeer
friend class internal::ProtoBufferWriterPeer
Definition: proto_buffer_writer.h:207
GRPC_SLICE_INLINED_SIZE
#define GRPC_SLICE_INLINED_SIZE
Definition: slice_type.h:47
grpc_raw_byte_buffer_create
GRPCAPI grpc_byte_buffer * grpc_raw_byte_buffer_create(grpc_slice *slices, size_t nslices)
Returns a RAW byte buffer instance over the given slices (up to nslices).