GRPC C++  1.62.0
proto_buffer_writer.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
20 #define GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
21 
22 #include <type_traits>
23 
24 #include "absl/strings/cord.h"
25 
26 #include <grpc/byte_buffer.h>
27 #include <grpc/impl/grpc_types.h>
28 #include <grpc/slice.h>
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/log.h>
34 #include <grpcpp/support/status.h>
35 
38 
39 namespace grpc {
40 
41 // Forward declaration for testing use only
42 namespace internal {
43 class ProtoBufferWriterPeer;
44 } // namespace internal
45 
46 const int kProtoBufferWriterMaxBufferLength = 1024 * 1024;
47 
56  public:
62  ProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size)
63  : block_size_(block_size),
64  total_size_(total_size),
65  byte_count_(0),
66  have_backup_(false) {
67  GPR_ASSERT(!byte_buffer->Valid());
70  byte_buffer->set_buffer(bp);
71  slice_buffer_ = &bp->data.raw.slice_buffer;
72  }
73 
74  ~ProtoBufferWriter() override {
75  if (have_backup_) {
76  grpc_slice_unref(backup_slice_);
77  }
78  }
79 
82  bool Next(void** data, int* size) override {
83  // Protobuf should not ask for more memory than total_size_.
84  GPR_ASSERT(byte_count_ < total_size_);
85  // 1. Use the remaining backup slice if we have one
86  // 2. Otherwise allocate a slice, up to the remaining length needed
87  // or our maximum allocation size
88  // 3. Provide the slice start and size available
89  // 4. Add the slice being returned to the slice buffer
90  size_t remain = static_cast<size_t>(total_size_ - byte_count_);
91  if (have_backup_) {
93  slice_ = backup_slice_;
94  have_backup_ = false;
95  if (GRPC_SLICE_LENGTH(slice_) > remain) {
96  GRPC_SLICE_SET_LENGTH(slice_, remain);
97  }
98  } else {
99  // When less than a whole block is needed, only allocate that much.
100  // But make sure the allocated slice is not inlined.
101  size_t allocate_length =
102  remain > static_cast<size_t>(block_size_) ? block_size_ : remain;
103  slice_ = grpc_slice_malloc(allocate_length > GRPC_SLICE_INLINED_SIZE
104  ? allocate_length
106  }
107  *data = GRPC_SLICE_START_PTR(slice_);
108  // On win x64, int is only 32bit
109  GPR_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX);
110  byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_));
111  // Using grpc_slice_buffer_add could modify slice_ and merge it with the
112  // previous slice. Therefore, use grpc_slice_buffer_add_indexed method to
113  // ensure the slice gets added at a separate index. It can then be kept
114  // around and popped later in the BackUp function.
115  grpc_slice_buffer_add_indexed(slice_buffer_, slice_);
116  return true;
117  }
118 
122  void BackUp(int count) override {
123  // count == 0 is invoked by ZeroCopyOutputStream users indicating that any
124  // potential buffer obtained through a previous call to Next() is final.
125  // ZeroCopyOutputStream implementations such as streaming output can use
126  // these calls to flush any temporary buffer and flush the output. The logic
127  // below is not robust against count == 0 invocations, so directly return.
128  if (count == 0) return;
129 
134  GPR_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_)));
135  grpc_slice_buffer_pop(slice_buffer_);
136  if (static_cast<size_t>(count) == GRPC_SLICE_LENGTH(slice_)) {
137  backup_slice_ = slice_;
138  } else {
139  backup_slice_ =
140  grpc_slice_split_tail(&slice_, GRPC_SLICE_LENGTH(slice_) - count);
141  grpc_slice_buffer_add(slice_buffer_, slice_);
142  }
143  // It's dangerous to keep an inlined grpc_slice as the backup slice, since
144  // on a following Next() call, a reference will be returned to this slice
145  // via GRPC_SLICE_START_PTR, which will not be an address held by
146  // slice_buffer_.
147  have_backup_ = backup_slice_.refcount != nullptr;
148  byte_count_ -= count;
149  }
150 
152  int64_t ByteCount() const override { return byte_count_; }
153 
154 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
155  // (override is conditionally omitted here to support old Protobuf which
158  // doesn't have ReadCord method)
159  // NOLINTBEGIN(modernize-use-override,
160  // clang-diagnostic-inconsistent-missing-override)
161  virtual bool WriteCord(const absl::Cord& cord)
162 #if GOOGLE_PROTOBUF_VERSION >= 4022000
163  override
164 #endif
165  // NOLINTEND(modernize-use-override,
166  // clang-diagnostic-inconsistent-missing-override)
167  {
168  grpc_slice_buffer* buffer = slice_buffer();
169  size_t cur = 0;
170  for (absl::string_view chunk : cord.Chunks()) {
171  // TODO(veblush): Revisit this 512 threadhold which could be smaller.
172  if (chunk.size() < 512) {
173  // If chunk is small enough, just copy it.
174  grpc_slice slice =
175  grpc_slice_from_copied_buffer(chunk.data(), chunk.size());
176  grpc_slice_buffer_add(buffer, slice);
177  } else {
178  // If chunk is large, just use the pointer instead of copying.
179  // To make sure it's alive while being used, a subcord for chunk is
180  // created and attached to a grpc_slice instance.
181  absl::Cord* subcord = new absl::Cord(cord.Subcord(cur, chunk.size()));
183  const_cast<uint8_t*>(
184  reinterpret_cast<const uint8_t*>(chunk.data())),
185  chunk.size(), [](void* p) { delete static_cast<absl::Cord*>(p); },
186  subcord);
187  grpc_slice_buffer_add(buffer, slice);
188  }
189  cur += chunk.size();
190  }
191  set_byte_count(ByteCount() + cur);
192  return true;
193  }
194 #endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
195 
196  // These protected members are needed to support internal optimizations.
197  // they expose internal bits of grpc core that are NOT stable. If you have
198  // a use case needs to use one of these functions, please send an email to
199  // https://groups.google.com/forum/#!forum/grpc-io.
200  protected:
201  grpc_slice_buffer* slice_buffer() { return slice_buffer_; }
202  void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
203 
204  private:
205  // friend for testing purposes only
207  const int block_size_;
208  const int total_size_;
209  int64_t byte_count_;
211  slice_buffer_;
212  bool have_backup_;
213  grpc_slice backup_slice_;
214  grpc_slice slice_;
216 };
217 
218 } // namespace grpc
219 
220 #endif // GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
grpc_slice_new_with_user_data
GPRAPI grpc_slice grpc_slice_new_with_user_data(void *p, size_t len, void(*destroy)(void *), void *user_data)
Equivalent to grpc_slice_new, but with a separate pointer that is passed to the destroy function.
grpc::ProtoBufferWriter
This is a specialization of the protobuf class ZeroCopyOutputStream.
Definition: proto_buffer_writer.h:55
grpc_slice::refcount
struct grpc_slice_refcount * refcount
Definition: slice_type.h:64
grpc::ProtoBufferWriter::Next
bool Next(void **data, int *size) override
Give the proto library the next buffer of bytes and its size.
Definition: proto_buffer_writer.h:82
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: slice_type.h:99
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::protobuf::io::ZeroCopyOutputStream
::google::protobuf::io::ZeroCopyOutputStream ZeroCopyOutputStream
Definition: config_protobuf.h:104
grpc_byte_buffer::grpc_byte_buffer_data::raw
struct grpc_byte_buffer::grpc_byte_buffer_data::grpc_compressed_buffer raw
GPR_ASSERT
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:95
status.h
grpc_slice_malloc
GPRAPI grpc_slice grpc_slice_malloc(size_t length)
Equivalent to grpc_slice_new(malloc(len), len, free), but saves one malloc() call.
grpc_slice_buffer_add_indexed
GPRAPI size_t grpc_slice_buffer_add_indexed(grpc_slice_buffer *sb, grpc_slice slice)
add an element to a slice buffer - takes ownership of the slice and returns the index of the slice.
grpc_types.h
grpc_slice_from_copied_buffer
GPRAPI grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len)
Create a slice by copying a buffer.
grpc::ProtoBufferWriter::ProtoBufferWriter
ProtoBufferWriter(ByteBuffer *byte_buffer, int block_size, int total_size)
Constructor for this derived class.
Definition: proto_buffer_writer.h:62
log.h
grpc_byte_buffer
Definition: grpc_types.h:43
grpc::ByteBuffer
A sequence of bytes.
Definition: byte_buffer.h:60
grpc_slice_buffer_pop
GPRAPI void grpc_slice_buffer_pop(grpc_slice_buffer *sb)
pop the last buffer, but don't unref it
grpc_slice
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice_type.h:63
grpc_slice_buffer_add
GPRAPI void grpc_slice_buffer_add(grpc_slice_buffer *sb, grpc_slice slice)
Add an element to a slice buffer - takes ownership of the slice.
grpc::kProtoBufferWriterMaxBufferLength
const int kProtoBufferWriterMaxBufferLength
Definition: proto_buffer_writer.h:46
slice_buffer.h
grpc::ProtoBufferWriter::slice_buffer
grpc_slice_buffer * slice_buffer()
Definition: proto_buffer_writer.h:201
grpc_byte_buffer::data
union grpc_byte_buffer::grpc_byte_buffer_data data
byte_buffer.h
serialization_traits.h
config_protobuf.h
slice.h
grpc::ProtoBufferWriter::ByteCount
int64_t ByteCount() const override
Returns the total number of bytes written since this object was created.
Definition: proto_buffer_writer.h:152
grpc::ProtoBufferWriter::~ProtoBufferWriter
~ProtoBufferWriter() override
Definition: proto_buffer_writer.h:74
grpc_slice_split_tail
GPRAPI grpc_slice grpc_slice_split_tail(grpc_slice *s, size_t split)
Splits s into two: modifies s to be s[0:split], and returns a new slice, sharing a refcount with s,...
byte_buffer.h
GRPC_SLICE_SET_LENGTH
#define GRPC_SLICE_SET_LENGTH(slice, newlen)
Definition: slice_type.h:105
grpc_byte_buffer::grpc_byte_buffer_data::grpc_compressed_buffer::slice_buffer
grpc_slice_buffer slice_buffer
Definition: grpc_types.h:52
grpc_slice_unref
GPRAPI void grpc_slice_unref(grpc_slice s)
Decrement the ref count of s.
grpc::ByteBuffer::Valid
bool Valid() const
Is this ByteBuffer valid?
Definition: byte_buffer.h:159
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: slice_type.h:102
grpc_slice_buffer
Represents an expandable array of slices, to be interpreted as a single item.
Definition: slice_type.h:81
grpc::ProtoBufferWriter::BackUp
void BackUp(int count) override
Backup by count bytes because Next returned more bytes than needed (only used in the last buffer).
Definition: proto_buffer_writer.h:122
grpc::ProtoBufferWriter::set_byte_count
void set_byte_count(int64_t byte_count)
Definition: proto_buffer_writer.h:202
grpc::ProtoBufferWriter::ProtoBufferWriterPeer
friend class internal::ProtoBufferWriterPeer
Definition: proto_buffer_writer.h:206
GRPC_SLICE_INLINED_SIZE
#define GRPC_SLICE_INLINED_SIZE
Definition: slice_type.h:47
grpc_raw_byte_buffer_create
GRPCAPI grpc_byte_buffer * grpc_raw_byte_buffer_create(grpc_slice *slices, size_t nslices)
Returns a RAW byte buffer instance over the given slices (up to nslices).