GRPC C++  1.62.0
completion_queue.h
Go to the documentation of this file.
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
32 #ifndef GRPCPP_COMPLETION_QUEUE_H
33 #define GRPCPP_COMPLETION_QUEUE_H
34 
35 #include <list>
36 
37 #include <grpc/grpc.h>
38 #include <grpc/support/atm.h>
39 #include <grpc/support/log.h>
40 #include <grpc/support/time.h>
47 #include <grpcpp/impl/sync.h>
48 
50 
51 namespace grpc {
52 template <class R>
53 class ClientReader;
54 template <class W>
55 class ClientWriter;
56 template <class W, class R>
57 class ClientReaderWriter;
58 template <class R>
60 template <class W>
62 namespace internal {
63 template <class W, class R>
65 
66 template <class ResponseType>
69  grpc::Status&);
70 template <class ServiceType, class RequestType, class ResponseType,
71  class BaseRequestType, class BaseResponseType>
73 template <class ServiceType, class RequestType, class ResponseType>
75 template <class ServiceType, class RequestType, class ResponseType>
77 template <class Streamer, bool WriteNeeded>
79 template <grpc::StatusCode code>
81 } // namespace internal
82 
83 class Channel;
84 class ChannelInterface;
85 class Server;
86 class ServerBuilder;
87 class ServerContextBase;
88 class ServerInterface;
89 
90 namespace internal {
91 class CompletionQueueTag;
92 class RpcMethod;
93 template <class InputMessage, class OutputMessage>
95 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
96 class CallOpSet;
97 } // namespace internal
98 
104  public:
110  nullptr}) {}
111 
115  explicit CompletionQueue(grpc_completion_queue* take);
116 
119 
121  enum NextStatus {
124  TIMEOUT
126  };
127 
176  bool Next(void** tag, bool* ok) {
177  // Check return type == GOT_EVENT... cases:
178  // SHUTDOWN - queue has been shutdown, return false.
179  // TIMEOUT - we passed infinity time => queue has been shutdown, return
180  // false.
181  // GOT_EVENT - we actually got an event, return true.
182  return (AsyncNextInternal(tag, ok, gpr_inf_future(GPR_CLOCK_REALTIME)) ==
183  GOT_EVENT);
184  }
185 
197  template <typename T>
198  NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
199  grpc::TimePoint<T> deadline_tp(deadline);
200  return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
201  }
202 
217  template <typename T, typename F>
218  NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) {
219  CompletionQueueTLSCache cache = CompletionQueueTLSCache(this);
220  f();
221  if (cache.Flush(tag, ok)) {
222  return GOT_EVENT;
223  } else {
224  return AsyncNext(tag, ok, deadline);
225  }
226  }
227 
238  void Shutdown();
239 
245  grpc_completion_queue* cq() { return cq_; }
246 
247  protected:
249  explicit CompletionQueue(const grpc_completion_queue_attributes& attributes) {
251  grpc_completion_queue_factory_lookup(&attributes), &attributes,
252  nullptr);
253  InitialAvalanching(); // reserve this for the future shutdown
254  }
255 
256  private:
257  // Friends for access to server registration lists that enable checking and
258  // logging on shutdown
259  friend class grpc::ServerBuilder;
260  friend class grpc::Server;
261 
262  // Friend synchronous wrappers so that they can access Pluck(), which is
263  // a semi-private API geared towards the synchronous implementation.
264  template <class R>
265  friend class grpc::ClientReader;
266  template <class W>
267  friend class grpc::ClientWriter;
268  template <class W, class R>
270  template <class R>
271  friend class grpc::ServerReader;
272  template <class W>
273  friend class grpc::ServerWriter;
274  template <class W, class R>
276  template <class ResponseType>
279  grpc::Status&);
280  template <class ServiceType, class RequestType, class ResponseType>
282  template <class ServiceType, class RequestType, class ResponseType>
284  template <class Streamer, bool WriteNeeded>
286  template <grpc::StatusCode code>
289  friend class grpc::ServerInterface;
290  template <class InputMessage, class OutputMessage>
292 
293  // Friends that need access to constructor for callback CQ
294  friend class grpc::Channel;
295 
296  // For access to Register/CompleteAvalanching
297  template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
299 
304  class CompletionQueueTLSCache {
305  public:
306  explicit CompletionQueueTLSCache(CompletionQueue* cq);
307  ~CompletionQueueTLSCache();
308  bool Flush(void** tag, bool* ok);
309 
310  private:
311  CompletionQueue* cq_;
312  bool flushed_;
313  };
314 
315  NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
316 
319  bool Pluck(grpc::internal::CompletionQueueTag* tag) {
320  auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
321  while (true) {
322  auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
323  bool ok = ev.success != 0;
324  void* ignored = tag;
325  if (tag->FinalizeResult(&ignored, &ok)) {
326  GPR_ASSERT(ignored == tag);
327  return ok;
328  }
329  }
330  }
331 
340  void TryPluck(grpc::internal::CompletionQueueTag* tag) {
341  auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
342  auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
343  if (ev.type == GRPC_QUEUE_TIMEOUT) return;
344  bool ok = ev.success != 0;
345  void* ignored = tag;
346  // the tag must be swallowed if using TryPluck
347  GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));
348  }
349 
355  void TryPluck(grpc::internal::CompletionQueueTag* tag,
356  gpr_timespec deadline) {
357  auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
358  if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
359  return;
360  }
361 
362  bool ok = ev.success != 0;
363  void* ignored = tag;
364  GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));
365  }
366 
373  void InitialAvalanching() {
374  gpr_atm_rel_store(&avalanches_in_flight_, gpr_atm{1});
375  }
376  void RegisterAvalanching() {
377  gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, gpr_atm{1});
378  }
379  void CompleteAvalanching() {
380  if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, gpr_atm{-1}) ==
381  1) {
383  }
384  }
385 
386  void RegisterServer(const grpc::Server* server) {
387  (void)server;
388 #ifndef NDEBUG
389  grpc::internal::MutexLock l(&server_list_mutex_);
390  server_list_.push_back(server);
391 #endif
392  }
393  void UnregisterServer(const grpc::Server* server) {
394  (void)server;
395 #ifndef NDEBUG
396  grpc::internal::MutexLock l(&server_list_mutex_);
397  server_list_.remove(server);
398 #endif
399  }
400  bool ServerListEmpty() const {
401 #ifndef NDEBUG
402  grpc::internal::MutexLock l(&server_list_mutex_);
403  return server_list_.empty();
404 #endif
405  return true;
406  }
407 
408  static CompletionQueue* CallbackAlternativeCQ();
409  static void ReleaseCallbackAlternativeCQ(CompletionQueue* cq);
410 
411  grpc_completion_queue* cq_; // owned
412 
413  gpr_atm avalanches_in_flight_;
414 
415  // List of servers associated with this CQ. Even though this is only used with
416  // NDEBUG, instantiate it in all cases since otherwise the size will be
417  // inconsistent.
418  mutable grpc::internal::Mutex server_list_mutex_;
419  std::list<const grpc::Server*>
420  server_list_ /* GUARDED_BY(server_list_mutex_) */;
421 };
422 
426  public:
427  bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; }
428 
429  protected:
432 
433  private:
441  grpc_cq_polling_type polling_type,
442  grpc_completion_queue_functor* shutdown_cb)
444  GRPC_CQ_CURRENT_VERSION, completion_type, polling_type,
445  shutdown_cb}),
446  polling_type_(polling_type) {}
447 
448  grpc_cq_polling_type polling_type_;
449  friend class grpc::ServerBuilder;
450  friend class grpc::Server;
451 };
452 
453 } // namespace grpc
454 
455 #endif // GRPCPP_COMPLETION_QUEUE_H
grpc::CompletionQueue::Shutdown
void Shutdown()
Request the shutdown of the queue.
grpc::ClientWriter
Synchronous (blocking) client-side API for doing client-streaming RPCs, where the outgoing message st...
Definition: client_context.h:81
GRPC_CQ_NEXT
@ GRPC_CQ_NEXT
Events are popped out by calling grpc_completion_queue_next() API ONLY.
Definition: grpc_types.h:432
grpc::Server
Represents a gRPC server.
Definition: server.h:58
time.h
rpc_service_method.h
grpc
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
grpc::internal::CallOpSet
Primary implementation of CallOpSetInterface.
Definition: completion_queue.h:96
status.h
grpc_cq_polling_type
grpc_cq_polling_type
Completion queues internally MAY maintain a set of file descriptors in a structure called 'pollset'.
Definition: grpc_types.h:412
grpc::internal::ErrorMethodHandler
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: completion_queue.h:80
grpc::internal::MethodHandler::HandlerParameter
Definition: rpc_service_method.h:41
grpc::internal::RpcMethodHandler
A wrapper class of an application provided rpc method handler.
Definition: completion_queue.h:72
grpc::ServerWriter
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: completion_queue.h:61
grpc_cq_completion_type
grpc_cq_completion_type
Specifies the type of APIs to use to pop events from the completion queue.
Definition: grpc_types.h:430
grpc::CompletionQueue::AsyncNext
NextStatus AsyncNext(void **tag, bool *ok, const T &deadline)
Read from the queue, blocking up to deadline (or the queue's shutdown).
Definition: completion_queue.h:198
grpc::internal::BlockingUnaryCallImpl
Definition: client_context.h:101
GRPC_QUEUE_SHUTDOWN
@ GRPC_QUEUE_SHUTDOWN
Shutting down.
Definition: grpc_types.h:228
GPR_ASSERT
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:95
grpc::ServerContextBase
Base class of ServerContext.
Definition: server_context.h:124
completion_queue_tag.h
gpr_atm
intptr_t gpr_atm
Definition: atm_gcc_atomic.h:32
grpc::CompletionQueue::DoThenAsyncNext
NextStatus DoThenAsyncNext(F &&f, void **tag, bool *ok, const T &deadline)
EXPERIMENTAL First executes F, then reads from the queue, blocking up to deadline (or the queue's shu...
Definition: completion_queue.h:218
gpr_inf_future
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
The far future.
grpc_completion_queue_create
GRPCAPI grpc_completion_queue * grpc_completion_queue_create(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved)
Create a completion queue.
grpc::internal::ServerReaderWriterBody
Definition: completion_queue.h:64
grpc::CompletionQueue::CompletionQueue
CompletionQueue(const grpc_completion_queue_attributes &attributes)
Private constructor of CompletionQueue only visible to friend classes.
Definition: completion_queue.h:249
grpc::Status
Did it work? If it didn't, why?
Definition: status.h:35
GRPC_CQ_DEFAULT_POLLING
@ GRPC_CQ_DEFAULT_POLLING
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:415
GRPC_CQ_NON_LISTENING
@ GRPC_CQ_NON_LISTENING
Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will not contain any 'listening ...
Definition: grpc_types.h:420
grpc::ClientReader
Synchronous (blocking) client-side API for doing server-streaming RPCs, where the stream of messages ...
Definition: client_context.h:79
log.h
grpc::ServerReader
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: completion_queue.h:59
grpc_completion_queue_factory_lookup
const GRPCAPI grpc_completion_queue_factory * grpc_completion_queue_factory_lookup(const grpc_completion_queue_attributes *attributes)
Returns the completion queue factory based on the attributes.
gpr_time_0
GPRAPI gpr_timespec gpr_time_0(gpr_clock_type type)
Time constants.
grpc.h
grpc::CompletionQueue::NextStatus
NextStatus
Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
Definition: completion_queue.h:121
grpc::TimePoint::raw_time
gpr_timespec raw_time()=delete
grpc::ChannelInterface
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
grpc::CompletionQueue::TIMEOUT
@ TIMEOUT
deadline was reached.
Definition: completion_queue.h:125
grpc::CompletionQueue::cq
grpc_completion_queue * cq()
Returns a raw pointer to the underlying grpc_completion_queue instance.
Definition: completion_queue.h:245
sync.h
grpc_completion_queue_attributes
Definition: grpc_types.h:463
grpc::internal::GrpcLibrary
Classes that require gRPC to be initialized should inherit from this class.
Definition: grpc_library.h:32
grpc::internal::CompletionQueueTag
An interface allowing implementors to process and filter event tags.
Definition: completion_queue_tag.h:26
grpc_completion_queue_pluck
GRPCAPI grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)
Blocks until an event with tag 'tag' is available, the completion queue is being shutdown or deadline...
grpc::internal::ServerStreamingHandler
A wrapper class of an application provided server streaming handler.
Definition: completion_queue.h:76
grpc::ServerCompletionQueue::IsFrequentlyPolled
bool IsFrequentlyPolled()
Definition: completion_queue.h:427
grpc::internal::MutexLock
Definition: sync.h:80
grpc_library.h
grpc::internal::ClientStreamingHandler
A wrapper class of an application provided client streaming handler.
Definition: completion_queue.h:74
grpc::CompletionQueue::GOT_EVENT
@ GOT_EVENT
Got a new event; tag will be filled in with its associated value; ok indicating its success.
Definition: completion_queue.h:123
grpc_completion_queue_destroy
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq)
Destroy a completion queue.
grpc::internal::TemplatedBidiStreamingHandler
A wrapper class of an application provided bidi-streaming handler.
Definition: completion_queue.h:78
GRPC_CQ_CURRENT_VERSION
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:461
grpc::CompletionQueue::CompletionQueue
CompletionQueue()
Default constructor.
Definition: completion_queue.h:107
grpc::CompletionQueue::~CompletionQueue
~CompletionQueue() override
Destructor. Destroys the owned wrapped completion queue / instance.
Definition: completion_queue.h:118
grpc::ClientReaderWriter
Synchronous (blocking) client-side API for bi-directional streaming RPCs, where the outgoing message ...
Definition: client_context.h:83
grpc::ServerCompletionQueue
A specific type of completion queue used by the processing of notifications by servers.
Definition: completion_queue.h:425
grpc_completion_queue
struct grpc_completion_queue grpc_completion_queue
Completion Queues enable notification of the completion of asynchronous actions.
Definition: grpc_types.h:59
grpc::internal::CompletionQueueTag::FinalizeResult
virtual bool FinalizeResult(void **tag, bool *status)=0
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
grpc_completion_queue_shutdown
GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
Begin destruction of a completion queue.
grpc_completion_queue_functor
Specifies an interface class to be used as a tag for callback-based completion queues.
Definition: grpc_types.h:445
grpc::CompletionQueue
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:103
grpc::Channel
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: channel.h:54
gpr_atm_rel_store
#define gpr_atm_rel_store(p, value)
Definition: atm_gcc_atomic.h:40
grpc::internal::Mutex
Definition: sync.h:57
grpc::internal::RpcMethod
Descriptor of an RPC method.
Definition: rpc_method.h:29
grpc::ServerCompletionQueue::ServerCompletionQueue
ServerCompletionQueue()
Default constructor.
Definition: completion_queue.h:431
grpc::ServerBuilder
A builder class for the creation and startup of grpc::Server instances.
Definition: server_builder.h:85
grpc::ServerInterface
Definition: server_interface.h:60
atm.h
gpr_timespec
Analogous to struct timespec.
Definition: time.h:48
grpc::internal::UnaryRunHandlerHelper
void UnaryRunHandlerHelper(const grpc::internal::MethodHandler::HandlerParameter &, ResponseType *, grpc::Status &)
A helper function with reduced templating to do the common work needed to actually send the server re...
Definition: method_handler.h:58
grpc::CompletionQueue::SHUTDOWN
@ SHUTDOWN
The completion queue has been shutdown and fully-drained.
Definition: completion_queue.h:122
sync.h
GRPC_QUEUE_TIMEOUT
@ GRPC_QUEUE_TIMEOUT
No event before timeout.
Definition: grpc_types.h:230
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Realtime clock.
Definition: time.h:37
grpc::TimePoint
If you are trying to use CompletionQueue::AsyncNext with a time class that isn't either gpr_timespec ...
Definition: time.h:40
time.h
grpc::CompletionQueue::Next
bool Next(void **tag, bool *ok)
Read from the queue, blocking until an event is available or the queue is shutting down.
Definition: completion_queue.h:176
gpr_atm_no_barrier_fetch_add
#define gpr_atm_no_barrier_fetch_add(p, delta)
Definition: atm_gcc_atomic.h:45