GRPC C++  1.26.0
call_op_set.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
21 
22 #include <cstring>
23 #include <map>
24 #include <memory>
25 
42 
43 namespace grpc {
44 
45 extern CoreCodegenInterface* g_core_codegen_interface;
46 
47 namespace internal {
48 class Call;
49 class CallHook;
50 
51 // TODO(yangg) if the map is changed before we send, the pointers will be a
52 // mess. Make sure it does not happen.
54  const std::multimap<grpc::string, grpc::string>& metadata,
55  size_t* metadata_count, const grpc::string& optional_error_details) {
56  *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
57  if (*metadata_count == 0) {
58  return nullptr;
59  }
60  grpc_metadata* metadata_array =
61  (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
62  (*metadata_count) * sizeof(grpc_metadata)));
63  size_t i = 0;
64  for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
65  metadata_array[i].key = SliceReferencingString(iter->first);
66  metadata_array[i].value = SliceReferencingString(iter->second);
67  }
68  if (!optional_error_details.empty()) {
69  metadata_array[i].key =
70  g_core_codegen_interface->grpc_slice_from_static_buffer(
72  metadata_array[i].value = SliceReferencingString(optional_error_details);
73  }
74  return metadata_array;
75 }
76 } // namespace internal
77 
79 class WriteOptions {
80  public:
81  WriteOptions() : flags_(0), last_message_(false) {}
82  WriteOptions(const WriteOptions& other)
83  : flags_(other.flags_), last_message_(other.last_message_) {}
84 
86  WriteOptions& operator=(const WriteOptions& other) = default;
87 
89  inline void Clear() { flags_ = 0; }
90 
92  inline uint32_t flags() const { return flags_; }
93 
98  SetBit(GRPC_WRITE_NO_COMPRESS);
99  return *this;
100  }
101 
106  ClearBit(GRPC_WRITE_NO_COMPRESS);
107  return *this;
108  }
109 
114  inline bool get_no_compression() const {
115  return GetBit(GRPC_WRITE_NO_COMPRESS);
116  }
117 
123  SetBit(GRPC_WRITE_BUFFER_HINT);
124  return *this;
125  }
126 
132  ClearBit(GRPC_WRITE_BUFFER_HINT);
133  return *this;
134  }
135 
140  inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
141 
145  SetBit(GRPC_WRITE_BUFFER_HINT);
146  return *this;
147  }
148 
150  ClearBit(GRPC_WRITE_BUFFER_HINT);
151  return *this;
152  }
153 
154  inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
155 
162  last_message_ = true;
163  return *this;
164  }
165 
169  last_message_ = false;
170  return *this;
171  }
172 
176  SetBit(GRPC_WRITE_THROUGH);
177  return *this;
178  }
179 
180  inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
181 
186  bool is_last_message() const { return last_message_; }
187 
188  private:
189  void SetBit(const uint32_t mask) { flags_ |= mask; }
190 
191  void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
192 
193  bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
194 
195  uint32_t flags_;
196  bool last_message_;
197 };
198 
199 namespace internal {
200 
203 template <int I>
204 class CallNoOp {
205  protected:
206  void AddOp(grpc_op* /*ops*/, size_t* /*nops*/) {}
207  void FinishOp(bool* /*status*/) {}
209  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
211  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
212  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
213  }
214 };
215 
217  public:
218  CallOpSendInitialMetadata() : send_(false) {
219  maybe_compression_level_.is_set = false;
220  }
221 
222  void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata,
223  uint32_t flags) {
224  maybe_compression_level_.is_set = false;
225  send_ = true;
226  flags_ = flags;
227  metadata_map_ = metadata;
228  }
229 
231  maybe_compression_level_.is_set = true;
232  maybe_compression_level_.level = level;
233  }
234 
235  protected:
236  void AddOp(grpc_op* ops, size_t* nops) {
237  if (!send_ || hijacked_) return;
238  grpc_op* op = &ops[(*nops)++];
240  op->flags = flags_;
241  op->reserved = NULL;
242  initial_metadata_ =
243  FillMetadataArray(*metadata_map_, &initial_metadata_count_, "");
244  op->data.send_initial_metadata.count = initial_metadata_count_;
245  op->data.send_initial_metadata.metadata = initial_metadata_;
247  maybe_compression_level_.is_set;
248  if (maybe_compression_level_.is_set) {
250  maybe_compression_level_.level;
251  }
252  }
253  void FinishOp(bool* /*status*/) {
254  if (!send_ || hijacked_) return;
255  g_core_codegen_interface->gpr_free(initial_metadata_);
256  send_ = false;
257  }
258 
260  InterceptorBatchMethodsImpl* interceptor_methods) {
261  if (!send_) return;
262  interceptor_methods->AddInterceptionHookPoint(
264  interceptor_methods->SetSendInitialMetadata(metadata_map_);
265  }
266 
268  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
269 
270  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
271  hijacked_ = true;
272  }
273 
274  bool hijacked_ = false;
275  bool send_;
276  uint32_t flags_;
278  std::multimap<grpc::string, grpc::string>* metadata_map_;
280  struct {
281  bool is_set;
283  } maybe_compression_level_;
284 };
285 
287  public:
288  CallOpSendMessage() : send_buf_() {}
289 
292  template <class M>
293  Status SendMessage(const M& message,
295 
296  template <class M>
297  Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
298 
302  template <class M>
303  Status SendMessagePtr(const M* message,
305 
308  template <class M>
309  Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT;
310 
311  protected:
312  void AddOp(grpc_op* ops, size_t* nops) {
313  if (msg_ == nullptr && !send_buf_.Valid()) return;
314  if (hijacked_) {
315  serializer_ = nullptr;
316  return;
317  }
318  if (msg_ != nullptr) {
319  GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
320  }
321  serializer_ = nullptr;
322  grpc_op* op = &ops[(*nops)++];
323  op->op = GRPC_OP_SEND_MESSAGE;
324  op->flags = write_options_.flags();
325  op->reserved = NULL;
326  op->data.send_message.send_message = send_buf_.c_buffer();
327  // Flags are per-message: clear them after use.
328  write_options_.Clear();
329  }
330  void FinishOp(bool* status) {
331  if (msg_ == nullptr && !send_buf_.Valid()) return;
332  if (hijacked_ && failed_send_) {
333  // Hijacking interceptor failed this Op
334  *status = false;
335  } else if (!*status) {
336  // This Op was passed down to core and the Op failed
337  failed_send_ = true;
338  }
339  }
340 
342  InterceptorBatchMethodsImpl* interceptor_methods) {
343  if (msg_ == nullptr && !send_buf_.Valid()) return;
344  interceptor_methods->AddInterceptionHookPoint(
346  interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
347  serializer_);
348  }
349 
351  InterceptorBatchMethodsImpl* interceptor_methods) {
352  if (msg_ != nullptr || send_buf_.Valid()) {
353  interceptor_methods->AddInterceptionHookPoint(
355  }
356  send_buf_.Clear();
357  msg_ = nullptr;
358  // The contents of the SendMessage value that was previously set
359  // has had its references stolen by core's operations
360  interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
361  nullptr);
362  }
363 
364  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
365  hijacked_ = true;
366  }
367 
368  private:
369  const void* msg_ = nullptr; // The original non-serialized message
370  bool hijacked_ = false;
371  bool failed_send_ = false;
372  ByteBuffer send_buf_;
373  WriteOptions write_options_;
374  std::function<Status(const void*)> serializer_;
375 };
376 
377 template <class M>
379  write_options_ = options;
380  serializer_ = [this](const void* message) {
381  bool own_buf;
382  send_buf_.Clear();
383  // TODO(vjpai): Remove the void below when possible
384  // The void in the template parameter below should not be needed
385  // (since it should be implicit) but is needed due to an observed
386  // difference in behavior between clang and gcc for certain internal users
388  *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf);
389  if (!own_buf) {
390  send_buf_.Duplicate();
391  }
392  return result;
393  };
394  // Serialize immediately only if we do not have access to the message pointer
395  if (msg_ == nullptr) {
396  Status result = serializer_(&message);
397  serializer_ = nullptr;
398  return result;
399  }
400  return Status();
401 }
402 
403 template <class M>
405  return SendMessage(message, WriteOptions());
406 }
407 
408 template <class M>
410  WriteOptions options) {
411  msg_ = message;
412  return SendMessage(*message, options);
413 }
414 
415 template <class M>
417  msg_ = message;
418  return SendMessage(*message, WriteOptions());
419 }
420 
421 template <class R>
422 class CallOpRecvMessage {
423  public:
425  : got_message(false),
426  message_(nullptr),
427  allow_not_getting_message_(false) {}
428 
429  void RecvMessage(R* message) { message_ = message; }
430 
431  // Do not change status if no message is received.
432  void AllowNoMessage() { allow_not_getting_message_ = true; }
433 
435 
436  protected:
437  void AddOp(grpc_op* ops, size_t* nops) {
438  if (message_ == nullptr || hijacked_) return;
439  grpc_op* op = &ops[(*nops)++];
440  op->op = GRPC_OP_RECV_MESSAGE;
441  op->flags = 0;
442  op->reserved = NULL;
443  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
444  }
445 
446  void FinishOp(bool* status) {
447  if (message_ == nullptr || hijacked_) return;
448  if (recv_buf_.Valid()) {
449  if (*status) {
450  got_message = *status =
451  SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
452  .ok();
453  recv_buf_.Release();
454  } else {
455  got_message = false;
456  recv_buf_.Clear();
457  }
458  } else {
459  got_message = false;
460  if (!allow_not_getting_message_) {
461  *status = false;
462  }
463  }
464  }
465 
467  InterceptorBatchMethodsImpl* interceptor_methods) {
468  if (message_ == nullptr) return;
469  interceptor_methods->SetRecvMessage(message_, &got_message);
470  }
471 
473  InterceptorBatchMethodsImpl* interceptor_methods) {
474  if (message_ == nullptr) return;
475  interceptor_methods->AddInterceptionHookPoint(
477  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
478  }
479  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
480  hijacked_ = true;
481  if (message_ == nullptr) return;
482  interceptor_methods->AddInterceptionHookPoint(
484  got_message = true;
485  }
486 
487  private:
488  R* message_;
489  ByteBuffer recv_buf_;
490  bool allow_not_getting_message_;
491  bool hijacked_ = false;
492 };
493 
495  public:
496  virtual Status Deserialize(ByteBuffer* buf) = 0;
497  virtual ~DeserializeFunc() {}
498 };
499 
500 template <class R>
501 class DeserializeFuncType final : public DeserializeFunc {
502  public:
503  DeserializeFuncType(R* message) : message_(message) {}
504  Status Deserialize(ByteBuffer* buf) override {
505  return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
506  }
507 
508  ~DeserializeFuncType() override {}
509 
510  private:
511  R* message_; // Not a managed pointer because management is external to this
512 };
513 
515  public:
517  : got_message(false), allow_not_getting_message_(false) {}
518 
519  template <class R>
520  void RecvMessage(R* message) {
521  // Use an explicit base class pointer to avoid resolution error in the
522  // following unique_ptr::reset for some old implementations.
523  DeserializeFunc* func = new DeserializeFuncType<R>(message);
524  deserialize_.reset(func);
525  message_ = message;
526  }
527 
528  // Do not change status if no message is received.
529  void AllowNoMessage() { allow_not_getting_message_ = true; }
530 
532 
533  protected:
534  void AddOp(grpc_op* ops, size_t* nops) {
535  if (!deserialize_ || hijacked_) return;
536  grpc_op* op = &ops[(*nops)++];
537  op->op = GRPC_OP_RECV_MESSAGE;
538  op->flags = 0;
539  op->reserved = NULL;
540  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
541  }
542 
543  void FinishOp(bool* status) {
544  if (!deserialize_ || hijacked_) return;
545  if (recv_buf_.Valid()) {
546  if (*status) {
547  got_message = true;
548  *status = deserialize_->Deserialize(&recv_buf_).ok();
549  recv_buf_.Release();
550  } else {
551  got_message = false;
552  recv_buf_.Clear();
553  }
554  } else {
555  got_message = false;
556  if (!allow_not_getting_message_) {
557  *status = false;
558  }
559  }
560  }
561 
563  InterceptorBatchMethodsImpl* interceptor_methods) {
564  if (!deserialize_) return;
565  interceptor_methods->SetRecvMessage(message_, &got_message);
566  }
567 
569  InterceptorBatchMethodsImpl* interceptor_methods) {
570  if (!deserialize_) return;
571  interceptor_methods->AddInterceptionHookPoint(
573  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
574  deserialize_.reset();
575  }
576  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
577  hijacked_ = true;
578  if (!deserialize_) return;
579  interceptor_methods->AddInterceptionHookPoint(
581  got_message = true;
582  }
583 
584  private:
585  void* message_;
586  bool hijacked_ = false;
587  std::unique_ptr<DeserializeFunc> deserialize_;
588  ByteBuffer recv_buf_;
589  bool allow_not_getting_message_;
590 };
591 
593  public:
594  CallOpClientSendClose() : send_(false) {}
595 
596  void ClientSendClose() { send_ = true; }
597 
598  protected:
599  void AddOp(grpc_op* ops, size_t* nops) {
600  if (!send_ || hijacked_) return;
601  grpc_op* op = &ops[(*nops)++];
603  op->flags = 0;
604  op->reserved = NULL;
605  }
606  void FinishOp(bool* /*status*/) { send_ = false; }
607 
609  InterceptorBatchMethodsImpl* interceptor_methods) {
610  if (!send_) return;
611  interceptor_methods->AddInterceptionHookPoint(
613  }
614 
616  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
617 
618  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
619  hijacked_ = true;
620  }
621 
622  private:
623  bool hijacked_ = false;
624  bool send_;
625 };
626 
628  public:
629  CallOpServerSendStatus() : send_status_available_(false) {}
630 
632  std::multimap<grpc::string, grpc::string>* trailing_metadata,
633  const Status& status) {
634  send_error_details_ = status.error_details();
635  metadata_map_ = trailing_metadata;
636  send_status_available_ = true;
637  send_status_code_ = static_cast<grpc_status_code>(status.error_code());
638  send_error_message_ = status.error_message();
639  }
640 
641  protected:
642  void AddOp(grpc_op* ops, size_t* nops) {
643  if (!send_status_available_ || hijacked_) return;
644  trailing_metadata_ = FillMetadataArray(
645  *metadata_map_, &trailing_metadata_count_, send_error_details_);
646  grpc_op* op = &ops[(*nops)++];
649  trailing_metadata_count_;
650  op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
651  op->data.send_status_from_server.status = send_status_code_;
652  error_message_slice_ = SliceReferencingString(send_error_message_);
654  send_error_message_.empty() ? nullptr : &error_message_slice_;
655  op->flags = 0;
656  op->reserved = NULL;
657  }
658 
659  void FinishOp(bool* /*status*/) {
660  if (!send_status_available_ || hijacked_) return;
661  g_core_codegen_interface->gpr_free(trailing_metadata_);
662  send_status_available_ = false;
663  }
664 
666  InterceptorBatchMethodsImpl* interceptor_methods) {
667  if (!send_status_available_) return;
668  interceptor_methods->AddInterceptionHookPoint(
670  interceptor_methods->SetSendTrailingMetadata(metadata_map_);
671  interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_,
672  &send_error_message_);
673  }
674 
676  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
677 
678  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
679  hijacked_ = true;
680  }
681 
682  private:
683  bool hijacked_ = false;
684  bool send_status_available_;
685  grpc_status_code send_status_code_;
686  grpc::string send_error_details_;
687  grpc::string send_error_message_;
688  size_t trailing_metadata_count_;
689  std::multimap<grpc::string, grpc::string>* metadata_map_;
690  grpc_metadata* trailing_metadata_;
691  grpc_slice error_message_slice_;
692 };
693 
695  public:
696  CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
697 
699  context->initial_metadata_received_ = true;
700  metadata_map_ = &context->recv_initial_metadata_;
701  }
702 
703  protected:
704  void AddOp(grpc_op* ops, size_t* nops) {
705  if (metadata_map_ == nullptr || hijacked_) return;
706  grpc_op* op = &ops[(*nops)++];
708  op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
709  op->flags = 0;
710  op->reserved = NULL;
711  }
712 
713  void FinishOp(bool* /*status*/) {
714  if (metadata_map_ == nullptr || hijacked_) return;
715  }
716 
718  InterceptorBatchMethodsImpl* interceptor_methods) {
719  interceptor_methods->SetRecvInitialMetadata(metadata_map_);
720  }
721 
723  InterceptorBatchMethodsImpl* interceptor_methods) {
724  if (metadata_map_ == nullptr) return;
725  interceptor_methods->AddInterceptionHookPoint(
727  metadata_map_ = nullptr;
728  }
729 
730  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
731  hijacked_ = true;
732  if (metadata_map_ == nullptr) return;
733  interceptor_methods->AddInterceptionHookPoint(
735  }
736 
737  private:
738  bool hijacked_ = false;
739  MetadataMap* metadata_map_;
740 };
741 
743  public:
745  : recv_status_(nullptr), debug_error_string_(nullptr) {}
746 
748  client_context_ = context;
749  metadata_map_ = &client_context_->trailing_metadata_;
750  recv_status_ = status;
751  error_message_ = g_core_codegen_interface->grpc_empty_slice();
752  }
753 
754  protected:
755  void AddOp(grpc_op* ops, size_t* nops) {
756  if (recv_status_ == nullptr || hijacked_) return;
757  grpc_op* op = &ops[(*nops)++];
759  op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
760  op->data.recv_status_on_client.status = &status_code_;
761  op->data.recv_status_on_client.status_details = &error_message_;
762  op->data.recv_status_on_client.error_string = &debug_error_string_;
763  op->flags = 0;
764  op->reserved = NULL;
765  }
766 
767  void FinishOp(bool* /*status*/) {
768  if (recv_status_ == nullptr || hijacked_) return;
769  if (static_cast<StatusCode>(status_code_) == StatusCode::OK) {
770  *recv_status_ = Status();
771  GPR_CODEGEN_DEBUG_ASSERT(debug_error_string_ == nullptr);
772  } else {
773  *recv_status_ =
774  Status(static_cast<StatusCode>(status_code_),
775  GRPC_SLICE_IS_EMPTY(error_message_)
776  ? grpc::string()
777  : grpc::string(GRPC_SLICE_START_PTR(error_message_),
778  GRPC_SLICE_END_PTR(error_message_)),
779  metadata_map_->GetBinaryErrorDetails());
780  if (debug_error_string_ != nullptr) {
781  client_context_->set_debug_error_string(debug_error_string_);
782  g_core_codegen_interface->gpr_free((void*)debug_error_string_);
783  }
784  }
785  // TODO(soheil): Find callers that set debug string even for status OK,
786  // and fix them.
787  g_core_codegen_interface->grpc_slice_unref(error_message_);
788  }
789 
791  InterceptorBatchMethodsImpl* interceptor_methods) {
792  interceptor_methods->SetRecvStatus(recv_status_);
793  interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
794  }
795 
797  InterceptorBatchMethodsImpl* interceptor_methods) {
798  if (recv_status_ == nullptr) return;
799  interceptor_methods->AddInterceptionHookPoint(
801  recv_status_ = nullptr;
802  }
803 
804  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
805  hijacked_ = true;
806  if (recv_status_ == nullptr) return;
807  interceptor_methods->AddInterceptionHookPoint(
809  }
810 
811  private:
812  bool hijacked_ = false;
813  ::grpc_impl::ClientContext* client_context_;
814  MetadataMap* metadata_map_;
815  Status* recv_status_;
816  const char* debug_error_string_;
817  grpc_status_code status_code_;
818  grpc_slice error_message_;
819 };
820 
821 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
822  class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
823  class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
824 class CallOpSet;
825 
832 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
833 class CallOpSet : public CallOpSetInterface,
834  public Op1,
835  public Op2,
836  public Op3,
837  public Op4,
838  public Op5,
839  public Op6 {
840  public:
841  CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
842  // The copy constructor and assignment operator reset the value of
843  // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
844  // since those are only meaningful on a specific object, not across objects.
845  CallOpSet(const CallOpSet& other)
846  : core_cq_tag_(this),
847  return_tag_(this),
848  call_(other.call_),
849  done_intercepting_(false),
850  interceptor_methods_(InterceptorBatchMethodsImpl()) {}
851 
852  CallOpSet& operator=(const CallOpSet& other) {
853  core_cq_tag_ = this;
854  return_tag_ = this;
855  call_ = other.call_;
856  done_intercepting_ = false;
857  interceptor_methods_ = InterceptorBatchMethodsImpl();
858  return *this;
859  }
860 
861  void FillOps(Call* call) override {
862  done_intercepting_ = false;
863  g_core_codegen_interface->grpc_call_ref(call->call());
864  call_ =
865  *call; // It's fine to create a copy of call since it's just pointers
866 
867  if (RunInterceptors()) {
868  ContinueFillOpsAfterInterception();
869  } else {
870  // After the interceptors are run, ContinueFillOpsAfterInterception will
871  // be run
872  }
873  }
874 
875  bool FinalizeResult(void** tag, bool* status) override {
876  if (done_intercepting_) {
877  // Complete the avalanching since we are done with this batch of ops
878  call_.cq()->CompleteAvalanching();
879  // We have already finished intercepting and filling in the results. This
880  // round trip from the core needed to be made because interceptors were
881  // run
882  *tag = return_tag_;
883  *status = saved_status_;
884  g_core_codegen_interface->grpc_call_unref(call_.call());
885  return true;
886  }
887 
888  this->Op1::FinishOp(status);
889  this->Op2::FinishOp(status);
890  this->Op3::FinishOp(status);
891  this->Op4::FinishOp(status);
892  this->Op5::FinishOp(status);
893  this->Op6::FinishOp(status);
894  saved_status_ = *status;
895  if (RunInterceptorsPostRecv()) {
896  *tag = return_tag_;
897  g_core_codegen_interface->grpc_call_unref(call_.call());
898  return true;
899  }
900  // Interceptors are going to be run, so we can't return the tag just yet.
901  // After the interceptors are run, ContinueFinalizeResultAfterInterception
902  return false;
903  }
904 
905  void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
906 
907  void* core_cq_tag() override { return core_cq_tag_; }
908 
913  void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
914 
915  // This will be called while interceptors are run if the RPC is a hijacked
916  // RPC. This should set hijacking state for each of the ops.
917  void SetHijackingState() override {
918  this->Op1::SetHijackingState(&interceptor_methods_);
919  this->Op2::SetHijackingState(&interceptor_methods_);
920  this->Op3::SetHijackingState(&interceptor_methods_);
921  this->Op4::SetHijackingState(&interceptor_methods_);
922  this->Op5::SetHijackingState(&interceptor_methods_);
923  this->Op6::SetHijackingState(&interceptor_methods_);
924  }
925 
926  // Should be called after interceptors are done running
928  static const size_t MAX_OPS = 6;
929  grpc_op ops[MAX_OPS];
930  size_t nops = 0;
931  this->Op1::AddOp(ops, &nops);
932  this->Op2::AddOp(ops, &nops);
933  this->Op3::AddOp(ops, &nops);
934  this->Op4::AddOp(ops, &nops);
935  this->Op5::AddOp(ops, &nops);
936  this->Op6::AddOp(ops, &nops);
937 
938  grpc_call_error err = g_core_codegen_interface->grpc_call_start_batch(
939  call_.call(), ops, nops, core_cq_tag(), nullptr);
940 
941  if (err != GRPC_CALL_OK) {
942  // A failure here indicates an API misuse; for example, doing a Write
943  // while another Write is already pending on the same RPC or invoking
944  // WritesDone multiple times
945  // gpr_log(GPR_ERROR, "API misuse of type %s observed",
946  // g_core_codegen_interface->grpc_call_error_to_string(err));
947  GPR_CODEGEN_ASSERT(false);
948  }
949  }
950 
951  // Should be called after interceptors are done running on the finalize result
952  // path
954  done_intercepting_ = true;
955  // The following call_start_batch is internally-generated so no need for an
956  // explanatory log on failure.
957  GPR_CODEGEN_ASSERT(g_core_codegen_interface->grpc_call_start_batch(
958  call_.call(), nullptr, 0, core_cq_tag(), nullptr) ==
959  GRPC_CALL_OK);
960  }
961 
962  private:
963  // Returns true if no interceptors need to be run
964  bool RunInterceptors() {
965  interceptor_methods_.ClearState();
966  interceptor_methods_.SetCallOpSetInterface(this);
967  interceptor_methods_.SetCall(&call_);
968  this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
969  this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
970  this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
971  this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
972  this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
973  this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
974  if (interceptor_methods_.InterceptorsListEmpty()) {
975  return true;
976  }
977  // This call will go through interceptors and would need to
978  // schedule new batches, so delay completion queue shutdown
979  call_.cq()->RegisterAvalanching();
980  return interceptor_methods_.RunInterceptors();
981  }
982  // Returns true if no interceptors need to be run
983  bool RunInterceptorsPostRecv() {
984  // Call and OpSet had already been set on the set state.
985  // SetReverse also clears previously set hook points
986  interceptor_methods_.SetReverse();
987  this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
988  this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
989  this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
990  this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
991  this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
992  this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
993  return interceptor_methods_.RunInterceptors();
994  }
995 
996  void* core_cq_tag_;
997  void* return_tag_;
998  Call call_;
999  bool done_intercepting_ = false;
1000  InterceptorBatchMethodsImpl interceptor_methods_;
1001  bool saved_status_;
1002 };
1003 
1004 } // namespace internal
1005 } // namespace grpc
1006 
1007 #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
void ContinueFillOpsAfterInterception() override
Definition: call_op_set.h:927
everything went ok
Definition: grpc_types.h:409
grpc_op_type op
Operation type, as defined by grpc_op_type.
Definition: grpc_types.h:592
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:635
union grpc_op::grpc_op_data data
bool get_no_compression() const
Get value for the flag indicating whether compression for the next message write is forcefully disabl...
Definition: call_op_set.h:114
void * reserved
Reserved for future usage.
Definition: grpc_types.h:596
WriteOptions & clear_buffer_hint()
Clears flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:131
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:675
void ClientRecvStatus(::grpc_impl::ClientContext *context, Status *status)
Definition: call_op_set.h:747
grpc_status_code
Definition: status.h:26
void FinishOp(bool *)
Definition: call_op_set.h:713
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:122
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
void SetHijackingState() override
Definition: call_op_set.h:917
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:704
void ClientSendClose()
Definition: call_op_set.h:596
void RecvInitialMetadata(::grpc_impl::ClientContext *context)
Definition: call_op_set.h:698
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:804
CallOpRecvMessage()
Definition: call_op_set.h:424
std::string string
Definition: config.h:35
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:270
bool get_buffer_hint() const
Get value for the flag indicating that the write may be buffered and need not go out on the wire imme...
Definition: call_op_set.h:140
WriteOptions & clear_no_compression()
Clears flag for the disabling of compression for the next message write.
Definition: call_op_set.h:105
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:643
struct grpc_byte_buffer * send_message
This op takes ownership of the slices in send_message.
Definition: grpc_types.h:618
void FinishOp(bool *)
Definition: call_op_set.h:207
Send a close from the client: one and only one instance MUST be sent from the client, unless the call was cancelled - in which case this can be skipped.
Definition: grpc_types.h:557
CallOpSet(const CallOpSet &other)
Definition: call_op_set.h:845
Send status from the server: one and only one instance MUST be sent from the server unless the call w...
Definition: grpc_types.h:562
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:312
void AllowNoMessage()
Definition: call_op_set.h:529
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:618
Definition: metadata_map.h:33
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:161
#define GRPC_WRITE_NO_COMPRESS
Force compression to be disabled for a particular write (start_write/add_metadata).
Definition: grpc_types.h:456
grpc_slice key
the key, value values are expected to line up with grpc_mdelem: if changing them, update metadata...
Definition: grpc_types.h:488
Status Deserialize(ByteBuffer *buf) override
Definition: call_op_set.h:504
CallOpRecvInitialMetadata()
Definition: call_op_set.h:696
void SetRecvMessage(void *message, bool *got_message)
Definition: interceptor_common.h:169
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
void SetRecvStatus(Status *status)
Definition: interceptor_common.h:178
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:824
#define GRPC_SLICE_IS_EMPTY(slice)
Definition: slice.h:107
grpc_slice value
Definition: grpc_types.h:489
#define GRPC_WRITE_THROUGH
Force this message to be written to the socket before completing it.
Definition: grpc_types.h:458
const char ** error_string
If this is not nullptr, it will be populated with the full fidelity error string for debugging purpos...
Definition: grpc_types.h:657
grpc_slice * status_details
Definition: grpc_types.h:653
void Clear()
Clear all flags.
Definition: call_op_set.h:89
WriteOptions & set_write_through()
Guarantee that all bytes have been written to the socket before completing this write (usually writes...
Definition: call_op_set.h:175
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:562
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1]. ...
Definition: slice.h:60
bool send_
Definition: call_op_set.h:275
#define GRPC_WRITE_BUFFER_HINT
Write Flags:
Definition: grpc_types.h:453
Send a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:552
bool got_message
Definition: call_op_set.h:434
Not an error; returned on success.
Definition: status_code_enum.h:26
The first three in this list are for clients and servers.
bool is_write_through() const
Definition: call_op_set.h:180
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:615
WriteOptions()
Definition: call_op_set.h:81
WriteOptions & clear_last_message()
Clears flag indicating that this is the last message in a stream, disabling coalescing.
Definition: call_op_set.h:168
grpc_slice SliceReferencingString(const grpc::string &str)
Definition: slice.h:131
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata::grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
grpc_compression_level
Compression levels allow a party with knowledge of its peer&#39;s accepted encodings to request compressi...
Definition: compression_types.h:71
grpc_call * call() const
Definition: call.h:72
#define GRPC_SLICE_START_PTR(slice)
Definition: slice.h:96
void ContinueFinalizeResultAfterInterception() override
Definition: call_op_set.h:953
bool is_set
Definition: call_op_set.h:281
grpc_metadata_array * trailing_metadata
ownership of the array is with the caller, but ownership of the elements stays with the call object (...
Definition: grpc_types.h:651
WriteOptions & clear_corked()
Definition: call_op_set.h:149
WriteOptions & set_no_compression()
Sets flag for the disabling of compression for the next message write.
Definition: call_op_set.h:97
#define GRPC_SLICE_END_PTR(slice)
Definition: slice.h:105
grpc_call_error
Result of a grpc call.
Definition: grpc_types.h:407
CallOpClientSendClose()
Definition: call_op_set.h:594
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:755
Definition: byte_buffer.h:62
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
uint32_t flags_
Definition: call_op_set.h:276
Default argument for CallOpSet.
Definition: call_op_set.h:204
void FinishOp(bool *)
Definition: call_op_set.h:767
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:568
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:678
grpc_compression_level level
Definition: call_op_set.h:282
bool got_message
Definition: call_op_set.h:531
grpc::string error_message() const
Return the instance&#39;s error message.
Definition: status.h:112
Definition: call_op_set.h:494
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
The following three are for hijacked clients only.
StatusCode error_code() const
Return the instance&#39;s error code.
Definition: status.h:110
CallOpServerSendStatus()
Definition: call_op_set.h:629
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
Definition: call_op_set.h:875
Definition: call_op_set.h:627
void set_compression_level(grpc_compression_level level)
Definition: call_op_set.h:230
Definition: call_op_set.h:216
bool is_corked() const
Definition: call_op_set.h:154
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:534
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:730
Definition: call_op_set.h:694
Status SendMessage(const M &message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:378
std::multimap< grpc::string, grpc::string > * metadata_map_
Definition: call_op_set.h:278
Status SendMessagePtr(const M *message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:409
uint32_t flags() const
Returns raw flags bitset.
Definition: call_op_set.h:92
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:212
grpc_metadata * FillMetadataArray(const std::multimap< grpc::string, grpc::string > &metadata, size_t *metadata_count, const grpc::string &optional_error_details)
Definition: call_op_set.h:53
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
A single metadata element.
Definition: grpc_types.h:485
Definition: call_op_set.h:286
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:236
void FinishOp(bool *status)
Definition: call_op_set.h:543
Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has no arguments) ...
Definition: grpc_types.h:590
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:608
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
Receive initial metadata: one and only one MUST be made on the client, must not be made on the server...
Definition: grpc_types.h:567
void RecvMessage(R *message)
Definition: call_op_set.h:520
void FillOps(Call *call) override
Fills in grpc_op, starting from ops[*nops] and moving upwards.
Definition: call_op_set.h:861
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:350
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
void SendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata, uint32_t flags)
Definition: call_op_set.h:222
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:267
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:90
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:642
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:210
void * core_cq_tag() override
Get the tag to be used at the core completion queue.
Definition: call_op_set.h:907
#define GRPC_MUST_USE_RESULT
Definition: port_platform.h:570
Send initial metadata: one and only one instance MUST be sent for each call, unless the call was canc...
Definition: grpc_types.h:548
WriteOptions(const WriteOptions &other)
Definition: call_op_set.h:82
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:599
CallOpClientRecvStatus()
Definition: call_op_set.h:744
Definition: interceptor_common.h:36
Definition: byte_buffer.h:58
void ServerSendStatus(std::multimap< grpc::string, grpc::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:631
Per-message write options.
Definition: call_op_set.h:79
grpc_slice * status_details
optional: set to NULL if no details need sending, non-NULL if they do pointer will not be retained pa...
Definition: grpc_types.h:627
CallOpSet & operator=(const CallOpSet &other)
Definition: call_op_set.h:852
void AllowNoMessage()
Definition: call_op_set.h:432
size_t trailing_metadata_count
Definition: grpc_types.h:621
CallOpSendMessage()
Definition: call_op_set.h:288
grpc_metadata * metadata
Definition: grpc_types.h:604
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:341
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:364
void FinishOp(bool *status)
Definition: call_op_set.h:446
Definition: call_op_set.h:592
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:472
void FinishOp(bool *)
Definition: call_op_set.h:606
void AddOp(grpc_op *, size_t *)
Definition: call_op_set.h:206
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:34
void SetSendStatus(grpc_status_code *code, grpc::string *error_details, grpc::string *error_message)
Definition: interceptor_common.h:157
void FinishOp(bool *)
Definition: call_op_set.h:659
void SetSendTrailingMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:164
virtual ~DeserializeFunc()
Definition: call_op_set.h:497
void SetRecvInitialMetadata(MetadataMap *map)
Definition: interceptor_common.h:174
void RecvMessage(R *message)
Definition: call_op_set.h:429
struct grpc_op::grpc_op_data::grpc_op_send_status_from_server send_status_from_server
void set_output_tag(void *return_tag)
Definition: call_op_set.h:905
CallOpSendInitialMetadata()
Definition: call_op_set.h:218
void SetSendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:152
WriteOptions & set_corked()
corked bit: aliases set_buffer_hint currently, with the intent that set_buffer_hint will be removed i...
Definition: call_op_set.h:144
A ClientContext allows the person implementing a service client to:
Definition: client_context_impl.h:184
Did it work? If it didn&#39;t, why?
Definition: status.h:31
void FinishOp(bool *status)
Definition: call_op_set.h:330
Receive status on the client: one and only one must be made on the client.
Definition: grpc_types.h:577
grpc_metadata * initial_metadata_
Definition: call_op_set.h:279
void AddInterceptionHookPoint(experimental::InterceptionHookPoints type)
Definition: interceptor_common.h:78
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:576
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:722
DeserializeFuncType(R *message)
Definition: call_op_set.h:503
size_t initial_metadata_count_
Definition: call_op_set.h:277
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:796
void SetRecvTrailingMetadata(MetadataMap *map)
Definition: interceptor_common.h:180
grpc_status_code status
Definition: grpc_types.h:623
grpc_status_code * status
Definition: grpc_types.h:652
size_t count
Definition: grpc_types.h:603
Definition: call_op_set.h:514
uint32_t flags
Write flags bitset for grpc_begin_messages.
Definition: grpc_types.h:594
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:717
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:790
Definition: call_op_set.h:742
void FinishOp(bool *)
Definition: call_op_set.h:253
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:665
void SetSendMessage(ByteBuffer *buf, const void **msg, bool *fail_send_message, std::function< Status(const void *)> serializer)
Definition: interceptor_common.h:143
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:208
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:479
Receive a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:571
A sequence of bytes.
Definition: byte_buffer.h:67
CallOpGenericRecvMessage()
Definition: call_op_set.h:516
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:437
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:466
CallOpSet()
Definition: call_op_set.h:841
const char kBinaryErrorDetailsKey[]
Definition: metadata_map.h:31
The following two are for all clients and servers.
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:259
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:913
grpc::string error_details() const
Return the (binary) error details.
Definition: status.h:115
Straightforward wrapping of the C call object.
Definition: call.h:38
grpc_metadata * trailing_metadata
Definition: grpc_types.h:622
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
~DeserializeFuncType() override
Definition: call_op_set.h:508