GRPC Core  44.2.0
event_engine.h
Go to the documentation of this file.
1 // Copyright 2021 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H
15 #define GRPC_EVENT_ENGINE_EVENT_ENGINE_H
16 
20 #include <grpc/event_engine/port.h>
23 
24 #include <vector>
25 
26 #include "absl/functional/any_invocable.h"
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 
30 // TODO(vigneshbabu): Define the Endpoint::Write metrics collection system
31 namespace grpc_event_engine {
32 namespace experimental {
33 
102 class EventEngine : public std::enable_shared_from_this<EventEngine>,
103  public Extensible {
104  public:
109  using Duration = std::chrono::duration<int64_t, std::nano>;
116 
117  class Closure {
118  public:
119  Closure() = default;
120  // Closure's are an interface, and thus non-copyable.
121  Closure(const Closure&) = delete;
122  Closure& operator=(const Closure&) = delete;
123  // Polymorphic type => virtual destructor
124  virtual ~Closure() = default;
125  // Run the contained code.
126  virtual void Run() = 0;
127  };
132  struct TaskHandle {
133  intptr_t keys[2];
135  };
140  intptr_t keys[2];
142  };
151  public:
152  static constexpr socklen_t MAX_SIZE_BYTES = 128;
153 
154  ResolvedAddress(const sockaddr* address, socklen_t size);
155  ResolvedAddress() = default;
156  ResolvedAddress(const ResolvedAddress&) = default;
157  const struct sockaddr* address() const;
158  socklen_t size() const;
159 
160  private:
161  char address_[MAX_SIZE_BYTES] = {};
162  socklen_t size_ = 0;
163  };
164 
173  class Endpoint : public Extensible {
174  public:
177  virtual ~Endpoint() = default;
182  struct ReadArgs {
183  // A suggestion to the endpoint implementation to read at-least the
184  // specified number of bytes over the network connection before marking
185  // the endpoint read operation as complete. gRPC may use this argument
186  // to minimize the number of endpoint read API calls over the lifetime
187  // of a connection.
189  };
213  virtual bool Read(absl::AnyInvocable<void(absl::Status)> on_read,
214  SliceBuffer* buffer, const ReadArgs* args) = 0;
219  struct WriteArgs {
220  // Represents private information that may be passed by gRPC for
221  // select endpoints expected to be used only within google.
222  void* google_specific = nullptr;
223  // A suggestion to the endpoint implementation to group data to be written
224  // into frames of the specified max_frame_size. gRPC may use this
225  // argument to dynamically control the max sizes of frames sent to a
226  // receiver in response to high receiver memory pressure.
227  int64_t max_frame_size;
228  };
249  virtual bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
250  SliceBuffer* data, const WriteArgs* args) = 0;
253  virtual const ResolvedAddress& GetPeerAddress() const = 0;
254  virtual const ResolvedAddress& GetLocalAddress() const = 0;
255  };
256 
263  using OnConnectCallback =
264  absl::AnyInvocable<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>;
265 
268  class Listener : public Extensible {
269  public:
271  using AcceptCallback = absl::AnyInvocable<void(
272  std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>;
273  virtual ~Listener() = default;
279  virtual absl::StatusOr<int> Bind(const ResolvedAddress& addr) = 0;
280  virtual absl::Status Start() = 0;
281  };
282 
298  virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
299  Listener::AcceptCallback on_accept,
300  absl::AnyInvocable<void(absl::Status)> on_shutdown,
301  const EndpointConfig& config,
302  std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0;
314  virtual ConnectionHandle Connect(OnConnectCallback on_connect,
315  const ResolvedAddress& addr,
316  const EndpointConfig& args,
317  MemoryAllocator memory_allocator,
318  Duration timeout) = 0;
319 
328  virtual bool CancelConnect(ConnectionHandle handle) = 0;
336  class DNSResolver {
337  public:
342  std::string dns_server;
343  };
345  struct SRVRecord {
346  std::string host;
347  int port = 0;
348  int priority = 0;
349  int weight = 0;
350  };
353  using LookupHostnameCallback =
354  absl::AnyInvocable<void(absl::StatusOr<std::vector<ResolvedAddress>>)>;
356  using LookupSRVCallback =
357  absl::AnyInvocable<void(absl::StatusOr<std::vector<SRVRecord>>)>;
359  using LookupTXTCallback =
360  absl::AnyInvocable<void(absl::StatusOr<std::vector<std::string>>)>;
361 
362  virtual ~DNSResolver() = default;
363 
374  virtual void LookupHostname(LookupHostnameCallback on_resolve,
375  absl::string_view name,
376  absl::string_view default_port) = 0;
381  virtual void LookupSRV(LookupSRVCallback on_resolve,
382  absl::string_view name) = 0;
387  virtual void LookupTXT(LookupTXTCallback on_resolve,
388  absl::string_view name) = 0;
389  };
390 
398  virtual ~EventEngine() = default;
399 
400  // TODO(nnoble): consider whether we can remove this method before we
401  // de-experimentalize this API.
402  virtual bool IsWorkerThread() = 0;
403 
409  virtual absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
410  const DNSResolver::ResolverOptions& options) = 0;
411 
421  virtual void Run(Closure* closure) = 0;
434  virtual void Run(absl::AnyInvocable<void()> closure) = 0;
446  virtual TaskHandle RunAfter(Duration when, Closure* closure) = 0;
461  virtual TaskHandle RunAfter(Duration when,
462  absl::AnyInvocable<void()> closure) = 0;
471  virtual bool Cancel(TaskHandle handle) = 0;
472 };
473 
484  absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory);
485 
494 std::unique_ptr<EventEngine> CreateEventEngine();
495 
496 bool operator==(const EventEngine::TaskHandle& lhs,
497  const EventEngine::TaskHandle& rhs);
498 bool operator!=(const EventEngine::TaskHandle& lhs,
499  const EventEngine::TaskHandle& rhs);
500 std::ostream& operator<<(std::ostream& out,
501  const EventEngine::TaskHandle& handle);
503  const EventEngine::ConnectionHandle& rhs);
505  const EventEngine::ConnectionHandle& rhs);
506 std::ostream& operator<<(std::ostream& out,
507  const EventEngine::ConnectionHandle& handle);
508 
509 namespace detail {
510 std::string FormatHandleString(uint64_t key1, uint64_t key2);
511 }
512 
513 template <typename Sink>
514 void AbslStringify(Sink& out, const EventEngine::ConnectionHandle& handle) {
515  out.Append(detail::FormatHandleString(handle.keys[0], handle.keys[1]));
516 }
517 
518 template <typename Sink>
519 void AbslStringify(Sink& out, const EventEngine::TaskHandle& handle) {
520  out.Append(detail::FormatHandleString(handle.keys[0], handle.keys[1]));
521 }
522 
523 } // namespace experimental
524 } // namespace grpc_event_engine
525 
526 #endif // GRPC_EVENT_ENGINE_EVENT_ENGINE_H
grpc_event_engine::experimental::EventEngine::OnConnectCallback
absl::AnyInvocable< void(absl::StatusOr< std::unique_ptr< Endpoint > >)> OnConnectCallback
Called when a new connection is established.
Definition: event_engine.h:264
grpc_event_engine::experimental::EventEngine::~EventEngine
virtual ~EventEngine()=default
At time of destruction, the EventEngine must have no active responsibilities.
grpc_event_engine::experimental::operator<<
std::ostream & operator<<(std::ostream &out, const EventEngine::TaskHandle &handle)
grpc_event_engine::experimental::EventEngine::ResolvedAddress::size
socklen_t size() const
grpc_event_engine::experimental::EventEngine::ConnectionHandle
A handle to a cancellable connection attempt.
Definition: event_engine.h:139
port.h
grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs
A struct representing optional arguments that may be provided to an EventEngine Endpoint Write API ca...
Definition: event_engine.h:219
grpc_event_engine::experimental::EventEngine::ResolvedAddress::MAX_SIZE_BYTES
static constexpr socklen_t MAX_SIZE_BYTES
Definition: event_engine.h:152
extensible.h
grpc_event_engine::experimental::EventEngine::RunAfter
virtual TaskHandle RunAfter(Duration when, Closure *closure)=0
Synonymous with scheduling an alarm to run after duration when.
grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback
absl::AnyInvocable< void(std::unique_ptr< Endpoint >, MemoryAllocator memory_allocator)> AcceptCallback
Called when the listener has accepted a new client connection.
Definition: event_engine.h:272
grpc_event_engine::experimental::EventEngine
The EventEngine Interface.
Definition: event_engine.h:102
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs::read_hint_bytes
int64_t read_hint_bytes
Definition: event_engine.h:188
grpc_event_engine::experimental::EventEngine::Endpoint::~Endpoint
virtual ~Endpoint()=default
Shuts down all connections and invokes all pending read or write callbacks with an error status.
grpc_event_engine::experimental::EventEngine::IsWorkerThread
virtual bool IsWorkerThread()=0
grpc_event_engine::experimental::EventEngine::DNSResolver::~DNSResolver
virtual ~DNSResolver()=default
grpc_event_engine::experimental::EventEngine::Endpoint::Read
virtual bool Read(absl::AnyInvocable< void(absl::Status)> on_read, SliceBuffer *buffer, const ReadArgs *args)=0
Reads data from the Endpoint.
grpc_event_engine::experimental::EventEngine::DNSResolver::LookupHostname
virtual void LookupHostname(LookupHostnameCallback on_resolve, absl::string_view name, absl::string_view default_port)=0
Asynchronously resolve an address.
grpc_event_engine::experimental::AbslStringify
void AbslStringify(Sink &out, const EventEngine::ConnectionHandle &handle)
Definition: event_engine.h:514
grpc_event_engine::experimental::EventEngine::DNSResolver::LookupSRVCallback
absl::AnyInvocable< void(absl::StatusOr< std::vector< SRVRecord > >)> LookupSRVCallback
Called with a collection of SRV records.
Definition: event_engine.h:357
grpc_event_engine::experimental::EventEngine::DNSResolver::SRVRecord::port
int port
Definition: event_engine.h:347
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
A struct representing optional arguments that may be provided to an EventEngine Endpoint Read API cal...
Definition: event_engine.h:182
grpc_event_engine::experimental::EventEngine::Listener::Bind
virtual absl::StatusOr< int > Bind(const ResolvedAddress &addr)=0
Bind an address/port to this Listener.
grpc_event_engine::experimental::EventEngine::DNSResolver::SRVRecord::host
std::string host
Definition: event_engine.h:346
grpc_event_engine::experimental::EventEngine::ConnectionHandle::keys
intptr_t keys[2]
Definition: event_engine.h:140
grpc_event_engine::experimental::EventEngineFactoryReset
void EventEngineFactoryReset()
Reset gRPC's EventEngine factory to the built-in default.
grpc_event_engine::experimental::EventEngine::DNSResolver::LookupHostnameCallback
absl::AnyInvocable< void(absl::StatusOr< std::vector< ResolvedAddress > >)> LookupHostnameCallback
Called with the collection of sockaddrs that were resolved from a given target address.
Definition: event_engine.h:354
grpc_event_engine::experimental::EventEngine::Endpoint::Write
virtual bool Write(absl::AnyInvocable< void(absl::Status)> on_writable, SliceBuffer *data, const WriteArgs *args)=0
Writes data out on the connection.
grpc_event_engine::experimental::EventEngine::CancelConnect
virtual bool CancelConnect(ConnectionHandle handle)=0
Request cancellation of a connection attempt.
grpc_event_engine::experimental::operator!=
bool operator!=(const EventEngine::TaskHandle &lhs, const EventEngine::TaskHandle &rhs)
grpc_event_engine::experimental::EventEngine::DNSResolver::ResolverOptions
Optional configuration for DNSResolvers.
Definition: event_engine.h:339
grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs::max_frame_size
int64_t max_frame_size
Definition: event_engine.h:227
grpc_event_engine::experimental::EventEngine::DNSResolver::SRVRecord
DNS SRV record type.
Definition: event_engine.h:345
grpc_event_engine::experimental::EventEngine::DNSResolver::LookupTXT
virtual void LookupTXT(LookupTXTCallback on_resolve, absl::string_view name)=0
Asynchronously perform a TXT record lookup.
grpc_event_engine::experimental::EndpointConfig
Collection of parameters used to configure client and server endpoints.
Definition: endpoint_config.h:31
grpc_event_engine::experimental::EventEngine::ResolvedAddress
Thin wrapper around a platform-specific sockaddr type.
Definition: event_engine.h:150
grpc_event_engine::experimental::EventEngine::Listener
Listens for incoming connection requests from gRPC clients and initiates request processing once conn...
Definition: event_engine.h:268
grpc_event_engine::experimental::SliceBuffer
A Wrapper around grpc_slice_buffer pointer.
Definition: slice_buffer.h:50
grpc_event_engine::experimental::detail::FormatHandleString
std::string FormatHandleString(uint64_t key1, uint64_t key2)
grpc_event_engine::experimental::EventEngine::Closure
A custom closure type for EventEngine task execution.
Definition: event_engine.h:117
grpc_event_engine::experimental::EventEngine::Closure::Closure
Closure()=default
grpc_event_engine::experimental::EventEngine::ResolvedAddress::ResolvedAddress
ResolvedAddress()=default
grpc_event_engine::experimental::EventEngine::ResolvedAddress::address
const struct sockaddr * address() const
grpc_event_engine::experimental::EventEngine::Listener::Start
virtual absl::Status Start()=0
grpc_event_engine::experimental::EventEngine::Closure::operator=
Closure & operator=(const Closure &)=delete
grpc_event_engine::experimental::EventEngine::Closure::~Closure
virtual ~Closure()=default
grpc_event_engine::experimental::EventEngine::Run
virtual void Run(Closure *closure)=0
Asynchronously executes a task as soon as possible.
slice_buffer.h
grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs::google_specific
void * google_specific
Definition: event_engine.h:222
grpc_event_engine::experimental::EventEngine::DNSResolver::ResolverOptions::dns_server
std::string dns_server
If empty, default DNS servers will be used.
Definition: event_engine.h:342
grpc_event_engine::experimental::EventEngine::DNSResolver::SRVRecord::weight
int weight
Definition: event_engine.h:349
grpc_event_engine::experimental::EventEngine::Endpoint::GetPeerAddress
virtual const ResolvedAddress & GetPeerAddress() const =0
Returns an address in the format described in DNSResolver.
grpc_event_engine::experimental::CreateEventEngine
std::unique_ptr< EventEngine > CreateEventEngine()
Create an EventEngine using the default factory.
grpc_event_engine::experimental::EventEngine::Closure::Run
virtual void Run()=0
grpc_event_engine::experimental::EventEngine::DNSResolver
Provides asynchronous resolution.
Definition: event_engine.h:336
grpc_event_engine::experimental::Extensible
Definition: extensible.h:25
grpc_event_engine::experimental::EventEngine::TaskHandle
Represents a scheduled task.
Definition: event_engine.h:132
grpc_event_engine::experimental::EventEngine::Cancel
virtual bool Cancel(TaskHandle handle)=0
Request cancellation of a task.
grpc_event_engine::experimental::EventEngine::DNSResolver::SRVRecord::priority
int priority
Definition: event_engine.h:348
grpc_event_engine::experimental::EventEngine::GetDNSResolver
virtual absl::StatusOr< std::unique_ptr< DNSResolver > > GetDNSResolver(const DNSResolver::ResolverOptions &options)=0
Creates and returns an instance of a DNSResolver, optionally configured by the options struct.
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid
static const GRPC_DLL TaskHandle kInvalid
Definition: event_engine.h:134
grpc_event_engine
Definition: endpoint_config.h:24
grpc_event_engine::experimental::EventEngine::DNSResolver::LookupTXTCallback
absl::AnyInvocable< void(absl::StatusOr< std::vector< std::string > >)> LookupTXTCallback
Called with the result of a TXT record lookup.
Definition: event_engine.h:360
grpc_event_engine::experimental::EventEngine::ConnectionHandle::kInvalid
static const GRPC_DLL ConnectionHandle kInvalid
Definition: event_engine.h:141
grpc_event_engine::experimental::EventEngine::Endpoint
One end of a connection between a gRPC client and server.
Definition: event_engine.h:173
grpc_event_engine::experimental::MemoryAllocator
Definition: memory_allocator.h:33
grpc_event_engine::experimental::EventEngine::Connect
virtual ConnectionHandle Connect(OnConnectCallback on_connect, const ResolvedAddress &addr, const EndpointConfig &args, MemoryAllocator memory_allocator, Duration timeout)=0
Creates a client network connection to a remote network listener.
grpc_event_engine::experimental::SetEventEngineFactory
void SetEventEngineFactory(absl::AnyInvocable< std::unique_ptr< EventEngine >()> factory)
Replace gRPC's default EventEngine factory.
memory_allocator.h
GRPC_DLL
#define GRPC_DLL
Definition: port_platform.h:127
grpc_event_engine::experimental::EventEngine::TaskHandle::keys
intptr_t keys[2]
Definition: event_engine.h:133
endpoint_config.h
grpc_event_engine::experimental::EventEngine::Listener::~Listener
virtual ~Listener()=default
grpc_event_engine::experimental::EventEngine::DNSResolver::LookupSRV
virtual void LookupSRV(LookupSRVCallback on_resolve, absl::string_view name)=0
Asynchronously perform an SRV record lookup.
grpc_event_engine::experimental::EventEngine::CreateListener
virtual absl::StatusOr< std::unique_ptr< Listener > > CreateListener(Listener::AcceptCallback on_accept, absl::AnyInvocable< void(absl::Status)> on_shutdown, const EndpointConfig &config, std::unique_ptr< MemoryAllocatorFactory > memory_allocator_factory)=0
Factory method to create a network listener / server.
grpc_event_engine::experimental::EventEngine::Duration
std::chrono::duration< int64_t, std::nano > Duration
A duration between two events.
Definition: event_engine.h:109
grpc_event_engine::experimental::EventEngine::Endpoint::GetLocalAddress
virtual const ResolvedAddress & GetLocalAddress() const =0
grpc_event_engine::experimental::operator==
bool operator==(const EventEngine::TaskHandle &lhs, const EventEngine::TaskHandle &rhs)
port_platform.h