GRPC C++  1.78.1
proto_buffer_writer.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
20 #define GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
21 
22 #include <grpc/byte_buffer.h>
24 #include <grpc/impl/grpc_types.h>
25 #include <grpc/slice.h>
26 #include <grpc/slice_buffer.h>
30 #include <grpcpp/support/status.h>
31 
32 #include <type_traits>
33 
34 #include "absl/log/absl_check.h"
35 #include "absl/strings/cord.h"
36 
39 
40 namespace grpc {
41 
42 // Forward declaration for testing use only
43 namespace internal {
44 class ProtoBufferWriterPeer;
45 } // namespace internal
46 
47 const int kProtoBufferWriterMaxBufferLength = 1024 * 1024;
48 
57  public:
64  ByteBuffer* byte_buffer, int block_size, int total_size,
66  : block_size_(block_size),
67  total_size_(total_size),
68  byte_count_(0),
69  allocator_(allocator),
70  have_backup_(false) {
71  ABSL_CHECK(!byte_buffer->Valid());
74  byte_buffer->set_buffer(bp);
75  slice_buffer_ = &bp->data.raw.slice_buffer;
76  }
77 
78  ~ProtoBufferWriter() override {
79  if (have_backup_) {
80  grpc_slice_unref(backup_slice_);
81  }
82  }
83 
86  bool Next(void** data, int* size) override {
87  // Protobuf should not ask for more memory than total_size_.
88  ABSL_CHECK_LT(byte_count_, total_size_);
89  // 1. Use the remaining backup slice if we have one
90  // 2. Otherwise allocate a slice, up to the remaining length needed
91  // or our maximum allocation size
92  // 3. Provide the slice start and size available
93  // 4. Add the slice being returned to the slice buffer
94  size_t remain = static_cast<size_t>(total_size_ - byte_count_);
95  if (have_backup_) {
97  slice_ = backup_slice_;
98  have_backup_ = false;
99  if (GRPC_SLICE_LENGTH(slice_) > remain) {
100  GRPC_SLICE_SET_LENGTH(slice_, remain);
101  }
102  } else {
103  // When less than a whole block is needed, only allocate that much.
104  // But make sure the allocated slice is not inlined.
105  const size_t allocate_length =
106  remain > static_cast<size_t>(block_size_) ? block_size_ : remain;
107  const size_t slice_size = allocate_length > GRPC_SLICE_INLINED_SIZE
108  ? allocate_length
110  if (allocator_ == nullptr) {
111  slice_ = grpc_slice_malloc(slice_size);
112  } else {
113  slice_ = allocator_->MakeSlice(slice_size);
114  }
115  }
116  *data = GRPC_SLICE_START_PTR(slice_);
117  // On win x64, int is only 32bit
118  ABSL_CHECK(GRPC_SLICE_LENGTH(slice_) <= static_cast<size_t>(INT_MAX));
119  byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_));
120  // Using grpc_slice_buffer_add could modify slice_ and merge it with the
121  // previous slice. Therefore, use grpc_slice_buffer_add_indexed method to
122  // ensure the slice gets added at a separate index. It can then be kept
123  // around and popped later in the BackUp function.
124  grpc_slice_buffer_add_indexed(slice_buffer_, slice_);
125  return true;
126  }
127 
131  void BackUp(int count) override {
132  // count == 0 is invoked by ZeroCopyOutputStream users indicating that any
133  // potential buffer obtained through a previous call to Next() is final.
134  // ZeroCopyOutputStream implementations such as streaming output can use
135  // these calls to flush any temporary buffer and flush the output. The logic
136  // below is not robust against count == 0 invocations, so directly return.
137  if (count == 0) return;
138 
143  ABSL_CHECK_LE(count, static_cast<int>(GRPC_SLICE_LENGTH(slice_)));
144  grpc_slice_buffer_pop(slice_buffer_);
145  if (static_cast<size_t>(count) == GRPC_SLICE_LENGTH(slice_)) {
146  backup_slice_ = slice_;
147  } else {
148  backup_slice_ =
149  grpc_slice_split_tail(&slice_, GRPC_SLICE_LENGTH(slice_) - count);
150  grpc_slice_buffer_add(slice_buffer_, slice_);
151  }
152  // It's dangerous to keep an inlined grpc_slice as the backup slice, since
153  // on a following Next() call, a reference will be returned to this slice
154  // via GRPC_SLICE_START_PTR, which will not be an address held by
155  // slice_buffer_.
156  have_backup_ = backup_slice_.refcount != nullptr;
157  byte_count_ -= count;
158  }
159 
161  int64_t ByteCount() const override { return byte_count_; }
162 
163 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
164  // (override is conditionally omitted here to support old Protobuf which
167  // doesn't have ReadCord method)
168  // NOLINTBEGIN(modernize-use-override,
169  // clang-diagnostic-inconsistent-missing-override)
170  virtual bool WriteCord(const absl::Cord& cord)
171 #if GOOGLE_PROTOBUF_VERSION >= 4022000
172  override
173 #endif
174  // NOLINTEND(modernize-use-override,
175  // clang-diagnostic-inconsistent-missing-override)
176  {
177  grpc_slice_buffer* buffer = slice_buffer();
178  size_t cur = 0;
179  for (absl::string_view chunk : cord.Chunks()) {
180  // TODO(veblush): Revisit this 512 threadhold which could be smaller.
181  if (chunk.size() < 512) {
182  // If chunk is small enough, just copy it.
183  grpc_slice slice =
184  grpc_slice_from_copied_buffer(chunk.data(), chunk.size());
185  grpc_slice_buffer_add(buffer, slice);
186  } else {
187  // If chunk is large, just use the pointer instead of copying.
188  // To make sure it's alive while being used, a subcord for chunk is
189  // created and attached to a grpc_slice instance.
190  absl::Cord* subcord = new absl::Cord(cord.Subcord(cur, chunk.size()));
192  const_cast<uint8_t*>(
193  reinterpret_cast<const uint8_t*>(chunk.data())),
194  chunk.size(), [](void* p) { delete static_cast<absl::Cord*>(p); },
195  subcord);
196  grpc_slice_buffer_add(buffer, slice);
197  }
198  cur += chunk.size();
199  }
200  set_byte_count(ByteCount() + cur);
201  return true;
202  }
203 #endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
204 
205  // These protected members are needed to support internal optimizations.
206  // they expose internal bits of grpc core that are NOT stable. If you have
207  // a use case needs to use one of these functions, please send an email to
208  // https://groups.google.com/forum/#!forum/grpc-io.
209  protected:
210  grpc_slice_buffer* slice_buffer() { return slice_buffer_; }
211  void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
212 
213  private:
214  // friend for testing purposes only
216  const int block_size_;
217  const int total_size_;
218  int64_t byte_count_;
221  slice_buffer_;
222  bool have_backup_;
223  grpc_slice backup_slice_;
224  grpc_slice slice_;
226 };
227 
228 } // namespace grpc
229 
230 #endif // GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
grpc_slice_new_with_user_data
GPRAPI grpc_slice grpc_slice_new_with_user_data(void *p, size_t len, void(*destroy)(void *), void *user_data)
Equivalent to grpc_slice_new, but with a separate pointer that is passed to the destroy function.
grpc_event_engine::experimental::MemoryAllocator::MakeSlice
grpc_slice MakeSlice(MemoryRequest request)
Allocate a slice, using MemoryRequest to size the number of returned bytes.
Definition: memory_allocator.h:137
grpc::ProtoBufferWriter
This is a specialization of the protobuf class ZeroCopyOutputStream.
Definition: proto_buffer_writer.h:56
grpc_slice::refcount
struct grpc_slice_refcount * refcount
Definition: slice_type.h:63
grpc::ProtoBufferWriter::Next
bool Next(void **data, int *size) override
Give the proto library the next buffer of bytes and its size.
Definition: proto_buffer_writer.h:86
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: slice_type.h:98
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::protobuf::io::ZeroCopyOutputStream
::google::protobuf::io::ZeroCopyOutputStream ZeroCopyOutputStream
Definition: config_protobuf.h:114
grpc_byte_buffer::grpc_byte_buffer_data::raw
struct grpc_byte_buffer::grpc_byte_buffer_data::grpc_compressed_buffer raw
status.h
grpc_slice_malloc
GPRAPI grpc_slice grpc_slice_malloc(size_t length)
Equivalent to grpc_slice_new(malloc(len), len, free), but saves one malloc() call.
grpc_slice_buffer_add_indexed
GPRAPI size_t grpc_slice_buffer_add_indexed(grpc_slice_buffer *sb, grpc_slice slice)
add an element to a slice buffer - takes ownership of the slice and returns the index of the slice.
grpc_types.h
grpc_slice_from_copied_buffer
GPRAPI grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len)
Create a slice by copying a buffer.
grpc::ProtoBufferWriter::ProtoBufferWriter
ProtoBufferWriter(ByteBuffer *byte_buffer, int block_size, int total_size, grpc_event_engine::experimental::MemoryAllocator *allocator=nullptr)
Constructor for this derived class.
Definition: proto_buffer_writer.h:63
grpc_byte_buffer
Definition: grpc_types.h:41
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:59
grpc_slice_buffer_pop
GPRAPI void grpc_slice_buffer_pop(grpc_slice_buffer *sb)
pop the last buffer, but don't unref it
grpc_slice
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice_type.h:62
grpc_slice_buffer_add
GPRAPI void grpc_slice_buffer_add(grpc_slice_buffer *sb, grpc_slice slice)
Add an element to a slice buffer - takes ownership of the slice.
grpc::kProtoBufferWriterMaxBufferLength
const int kProtoBufferWriterMaxBufferLength
Definition: proto_buffer_writer.h:47
slice_buffer.h
grpc::ProtoBufferWriter::slice_buffer
grpc_slice_buffer * slice_buffer()
Definition: proto_buffer_writer.h:210
grpc_byte_buffer::data
union grpc_byte_buffer::grpc_byte_buffer_data data
byte_buffer.h
serialization_traits.h
config_protobuf.h
slice.h
grpc::ProtoBufferWriter::ByteCount
int64_t ByteCount() const override
Returns the total number of bytes written since this object was created.
Definition: proto_buffer_writer.h:161
grpc::ProtoBufferWriter::~ProtoBufferWriter
~ProtoBufferWriter() override
Definition: proto_buffer_writer.h:78
grpc_slice_split_tail
GPRAPI grpc_slice grpc_slice_split_tail(grpc_slice *s, size_t split)
Splits s into two: modifies s to be s[0:split], and returns a new slice, sharing a refcount with s,...
byte_buffer.h
GRPC_SLICE_SET_LENGTH
#define GRPC_SLICE_SET_LENGTH(slice, newlen)
Definition: slice_type.h:104
grpc_event_engine::experimental::MemoryAllocator
Definition: memory_allocator.h:33
grpc_byte_buffer::grpc_byte_buffer_data::grpc_compressed_buffer::slice_buffer
grpc_slice_buffer slice_buffer
Definition: grpc_types.h:50
grpc_slice_unref
GPRAPI void grpc_slice_unref(grpc_slice s)
Decrement the ref count of s.
grpc::ByteBuffer::Valid
bool Valid() const
Is this ByteBuffer valid?
Definition: byte_buffer.h:158
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: slice_type.h:101
memory_allocator.h
grpc_slice_buffer
Represents an expandable array of slices, to be interpreted as a single item.
Definition: slice_type.h:80
grpc::ProtoBufferWriter::BackUp
void BackUp(int count) override
Backup by count bytes because Next returned more bytes than needed (only used in the last buffer).
Definition: proto_buffer_writer.h:131
grpc::ProtoBufferWriter::set_byte_count
void set_byte_count(int64_t byte_count)
Definition: proto_buffer_writer.h:211
grpc::ProtoBufferWriter::ProtoBufferWriterPeer
friend class internal::ProtoBufferWriterPeer
Definition: proto_buffer_writer.h:215
GRPC_SLICE_INLINED_SIZE
#define GRPC_SLICE_INLINED_SIZE
Definition: slice_type.h:46
grpc_raw_byte_buffer_create
GRPCAPI grpc_byte_buffer * grpc_raw_byte_buffer_create(grpc_slice *slices, size_t nslices)
Returns a RAW byte buffer instance over the given slices (up to nslices).