GRPC C++  1.26.0
completion_queue_impl.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
32 #ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H
33 #define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H
34 
35 #include <grpc/impl/codegen/atm.h>
41 
43 
44 namespace grpc_impl {
45 
46 class Channel;
47 class Server;
48 class ServerBuilder;
49 template <class R>
50 class ClientReader;
51 template <class W>
52 class ClientWriter;
53 template <class W, class R>
54 class ClientReaderWriter;
55 template <class R>
57 template <class W>
59 class ServerContextBase;
60 namespace internal {
61 template <class W, class R>
63 
64 template <class ServiceType, class RequestType, class ResponseType>
65 class RpcMethodHandler;
66 template <class ServiceType, class RequestType, class ResponseType>
68 template <class ServiceType, class RequestType, class ResponseType>
70 template <class Streamer, bool WriteNeeded>
72 template <::grpc::StatusCode code>
73 class ErrorMethodHandler;
74 } // namespace internal
75 } // namespace grpc_impl
76 namespace grpc {
77 
78 class ChannelInterface;
79 class ServerInterface;
80 
81 namespace internal {
82 class CompletionQueueTag;
83 class RpcMethod;
84 template <class InputMessage, class OutputMessage>
85 class BlockingUnaryCallImpl;
86 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
87 class CallOpSet;
88 } // namespace internal
89 
91 
92 } // namespace grpc
93 
94 namespace grpc_impl {
95 
101  public:
107  nullptr}) {}
108 
112  explicit CompletionQueue(grpc_completion_queue* take);
113 
117  }
118 
120  enum NextStatus {
122  GOT_EVENT,
123  TIMEOUT
125  };
126 
175  bool Next(void** tag, bool* ok) {
176  return (AsyncNextInternal(tag, ok,
178  GPR_CLOCK_REALTIME)) != SHUTDOWN);
179  }
180 
192  template <typename T>
193  NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
194  ::grpc::TimePoint<T> deadline_tp(deadline);
195  return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
196  }
197 
212  template <typename T, typename F>
213  NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) {
214  CompletionQueueTLSCache cache = CompletionQueueTLSCache(this);
215  f();
216  if (cache.Flush(tag, ok)) {
217  return GOT_EVENT;
218  } else {
219  return AsyncNext(tag, ok, deadline);
220  }
221  }
222 
233  void Shutdown();
234 
240  grpc_completion_queue* cq() { return cq_; }
241 
242  protected:
247  &attributes),
248  &attributes, NULL);
249  InitialAvalanching(); // reserve this for the future shutdown
250  }
251 
252  private:
253  // Friend synchronous wrappers so that they can access Pluck(), which is
254  // a semi-private API geared towards the synchronous implementation.
255  template <class R>
257  template <class W>
259  template <class W, class R>
261  template <class R>
263  template <class W>
265  template <class W, class R>
266  friend class ::grpc_impl::internal::ServerReaderWriterBody;
267  template <class ServiceType, class RequestType, class ResponseType>
269  template <class ServiceType, class RequestType, class ResponseType>
271  template <class ServiceType, class RequestType, class ResponseType>
273  template <class Streamer, bool WriteNeeded>
275  template <::grpc::StatusCode code>
279  friend class ::grpc::ServerInterface;
280  template <class InputMessage, class OutputMessage>
281  friend class ::grpc::internal::BlockingUnaryCallImpl;
282 
283  // Friends that need access to constructor for callback CQ
285 
286  // For access to Register/CompleteAvalanching
287  template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
288  friend class ::grpc::internal::CallOpSet;
289 
294  class CompletionQueueTLSCache {
295  public:
296  CompletionQueueTLSCache(CompletionQueue* cq);
297  ~CompletionQueueTLSCache();
298  bool Flush(void** tag, bool* ok);
299 
300  private:
301  CompletionQueue* cq_;
302  bool flushed_;
303  };
304 
305  NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
306 
309  bool Pluck(::grpc::internal::CompletionQueueTag* tag) {
310  auto deadline =
312  while (true) {
314  cq_, tag, deadline, nullptr);
315  bool ok = ev.success != 0;
316  void* ignored = tag;
317  if (tag->FinalizeResult(&ignored, &ok)) {
318  GPR_CODEGEN_ASSERT(ignored == tag);
319  return ok;
320  }
321  }
322  }
323 
332  void TryPluck(::grpc::internal::CompletionQueueTag* tag) {
333  auto deadline =
336  cq_, tag, deadline, nullptr);
337  if (ev.type == GRPC_QUEUE_TIMEOUT) return;
338  bool ok = ev.success != 0;
339  void* ignored = tag;
340  // the tag must be swallowed if using TryPluck
341  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
342  }
343 
349  void TryPluck(::grpc::internal::CompletionQueueTag* tag,
350  gpr_timespec deadline) {
352  cq_, tag, deadline, nullptr);
353  if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
354  return;
355  }
356 
357  bool ok = ev.success != 0;
358  void* ignored = tag;
359  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
360  }
361 
368  void InitialAvalanching() {
369  gpr_atm_rel_store(&avalanches_in_flight_, static_cast<gpr_atm>(1));
370  }
371  void RegisterAvalanching() {
372  gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
373  static_cast<gpr_atm>(1));
374  }
375  void CompleteAvalanching() {
376  if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
377  static_cast<gpr_atm>(-1)) == 1) {
379  }
380  }
381 
382  grpc_completion_queue* cq_; // owned
383 
384  gpr_atm avalanches_in_flight_;
385 };
386 
390  public:
391  bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; }
392 
393  protected:
396 
397  private:
405  grpc_cq_polling_type polling_type,
408  GRPC_CQ_CURRENT_VERSION, completion_type, polling_type,
409  shutdown_cb}),
410  polling_type_(polling_type) {}
411 
412  grpc_cq_polling_type polling_type_;
415 };
416 
417 } // namespace grpc_impl
418 
419 #endif // GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H
::grpc_impl::internal::ErrorMethodHandler< code > ErrorMethodHandler
Definition: method_handler.h:62
~CompletionQueue()
Destructor. Destroys the owned wrapped completion queue / instance.
Definition: completion_queue_impl.h:115
NextStatus DoThenAsyncNext(F &&f, void **tag, bool *ok, const T &deadline)
EXPERIMENTAL First executes F, then reads from the queue, blocking up to deadline (or the queue&#39;s shu...
Definition: completion_queue_impl.h:213
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
virtual void grpc_completion_queue_shutdown(grpc_completion_queue *cq)=0
::grpc_impl::ServerContextBase ServerContextBase
Definition: server_context.h:30
::grpc_impl::internal::ClientStreamingHandler< ServiceType, RequestType, ResponseType > ClientStreamingHandler
Definition: method_handler.h:41
virtual gpr_timespec gpr_inf_future(gpr_clock_type type)=0
bool Next(void **tag, bool *ok)
Read from the queue, blocking until an event is available or the queue is shutting down...
Definition: completion_queue_impl.h:175
grpc_completion_queue * cq()
Returns a raw pointer to the underlying grpc_completion_queue instance.
Definition: completion_queue_impl.h:240
bool IsFrequentlyPolled()
Definition: completion_queue_impl.h:391
An interface allowing implementors to process and filter event tags.
Definition: completion_queue_tag.h:26
::grpc_impl::internal::TemplatedBidiStreamingHandler< Streamer, WriteNeeded > TemplatedBidiStreamingHandler
Definition: method_handler.h:50
::grpc_impl::Server Server
Definition: server.h:26
virtual bool FinalizeResult(void **tag, bool *status)=0
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
No event before timeout.
Definition: grpc_types.h:506
ServerCompletionQueue()
Default constructor.
Definition: completion_queue_impl.h:395
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
The zero time interval.
int success
If the grpc_completion_type is GRPC_OP_COMPLETE, this field indicates whether the operation was succe...
Definition: grpc_types.h:522
CompletionQueue(const grpc_completion_queue_attributes &attributes)
Private constructor of CompletionQueue only visible to friend classes.
Definition: completion_queue_impl.h:244
The completion queue has been shutdown and fully-drained.
Definition: completion_queue_impl.h:121
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:740
Definition: completion_queue_impl.h:62
Events are popped out by calling grpc_completion_queue_next() API ONLY.
Definition: grpc_types.h:708
gpr_timespec raw_time()
Definition: time.h:43
virtual void grpc_completion_queue_destroy(grpc_completion_queue *cq)=0
Classes that require gRPC to be initialized should inherit from this class.
Definition: grpc_library.h:38
If you are trying to use CompletionQueue::AsyncNext with a time class that isn&#39;t either gpr_timespec ...
Definition: time.h:40
Definition: grpc_types.h:742
::grpc_impl::ServerCompletionQueue ServerCompletionQueue
Definition: completion_queue.h:27
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: byte_buffer.h:44
grpc_cq_completion_type
Specifies the type of APIs to use to pop events from the completion queue.
Definition: grpc_types.h:706
grpc_cq_polling_type
Completion queues internally MAY maintain a set of file descriptors in a structure called &#39;pollset&#39;...
Definition: grpc_types.h:688
NextStatus
Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
Definition: completion_queue_impl.h:120
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue_impl.h:56
Base class of ServerContext. Experimental until callback API is final.
Definition: server_context_impl.h:117
Shutting down.
Definition: grpc_types.h:504
::grpc_impl::Channel Channel
Definition: channel.h:26
::grpc_impl::ServerReader< R > ServerReader
Definition: sync_stream.h:75
#define gpr_atm_rel_store(p, value)
Definition: atm_gcc_atomic.h:52
::grpc_impl::CompletionQueue CompletionQueue
Definition: completion_queue.h:26
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs, where the outgoing message stream coming from the server has messages of type W.
Definition: completion_queue_impl.h:58
::grpc_impl::ClientReaderWriter< W, R > ClientReaderWriter
Definition: sync_stream.h:69
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
virtual grpc_completion_queue * grpc_completion_queue_create(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved)=0
#define gpr_atm_no_barrier_fetch_add(p, delta)
Definition: atm_gcc_atomic.h:57
::grpc_impl::internal::RpcMethodHandler< ServiceType, RequestType, ResponseType > RpcMethodHandler
Definition: method_handler.h:36
GRPCAPI const grpc_completion_queue_factory * grpc_completion_queue_factory_lookup(const grpc_completion_queue_attributes *attributes)
Returns the completion queue factory based on the attributes.
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:90
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:691
intptr_t gpr_atm
Definition: atm_gcc_atomic.h:30
Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will not contain any &#39;listening ...
Definition: grpc_types.h:696
::grpc_impl::ClientWriter< W > ClientWriter
Definition: sync_stream.h:62
::grpc_impl::internal::ServerStreamingHandler< ServiceType, RequestType, ResponseType > ServerStreamingHandler
Definition: method_handler.h:46
Realtime clock.
Definition: gpr_types.h:36
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
A wrapper class of an application provided server streaming handler.
Definition: byte_buffer.h:42
::grpc_impl::ServerBuilder ServerBuilder
Definition: server_builder.h:26
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue_impl.h:71
A wrapper class of an application provided rpc method handler.
Definition: byte_buffer.h:40
Interface between the codegen library and the minimal subset of core features required by the generat...
Definition: core_codegen_interface.h:38
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue_impl.h:100
Analogous to struct timespec.
Definition: gpr_types.h:47
::grpc_impl::ServerWriter< W > ServerWriter
Definition: sync_stream.h:81
CompletionQueue()
Default constructor.
Definition: completion_queue_impl.h:104
A wrapper class of an application provided client streaming handler.
Definition: completion_queue_impl.h:67
A specific type of completion queue used by the processing of notifications by servers.
Definition: completion_queue_impl.h:389
struct grpc_completion_queue grpc_completion_queue
Completion Queues enable notification of the completion of asynchronous actions.
Definition: grpc_types.h:56
virtual gpr_timespec gpr_time_0(gpr_clock_type type)=0
EXPERIMENTAL: Specifies an interface class to be used as a tag for callback-based completion queues...
Definition: grpc_types.h:722
NextStatus AsyncNext(void **tag, bool *ok, const T &deadline)
Read from the queue, blocking up to deadline (or the queue&#39;s shutdown).
Definition: completion_queue_impl.h:193
::grpc_impl::ClientReader< R > ClientReader
Definition: sync_stream.h:56
virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)=0