class RubyEventStore::Client

Constants

EMPTY_HASH

Attributes

broker[R]
clock[R]
correlation_id_generator[R]
event_type_resolver[R]
mapper[R]
repository[R]
subscriptions[R]

Public Class Methods

new( repository: InMemoryRepository.new, mapper: Mappers::Default.new, subscriptions: Subscriptions.new, dispatcher: Dispatcher.new, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new ) click to toggle source
# File lib/ruby_event_store/client.rb, line 7
def initialize(
  repository: InMemoryRepository.new,
  mapper: Mappers::Default.new,
  subscriptions: Subscriptions.new,
  dispatcher: Dispatcher.new,
  clock: default_clock,
  correlation_id_generator: default_correlation_id_generator,
  event_type_resolver: EventTypeResolver.new
)
  @repository = repository
  @mapper = mapper
  @subscriptions = subscriptions
  @broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher)
  @clock = clock
  @metadata = Concurrent::ThreadLocalVar.new
  @correlation_id_generator = correlation_id_generator
  @event_type_resolver = event_type_resolver
end

Public Instance Methods

append(events, stream_name: GLOBAL_STREAM, expected_version: :any) click to toggle source

Persists new event(s) without notifying any subscribed handlers

@param (see publish) @return [self]

# File lib/ruby_event_store/client.rb, line 48
def append(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  append_records_to_stream(
    transform(enrich_events_metadata(events)),
    stream_name: stream_name,
    expected_version: expected_version
  )
  self
end
delete_stream(stream_name) click to toggle source

Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.

@param stream_name [String] name of the stream to be cleared. @return [self]

# File lib/ruby_event_store/client.rb, line 75
def delete_stream(stream_name)
  repository.delete_stream(Stream.new(stream_name))
  self
end
deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) click to toggle source

Deserialize event which was serialized for async event handlers {railseventstore.org/docs/subscribe/#async-handlers Read more}

@return [Event] deserialized event

# File lib/ruby_event_store/client.rb, line 273
def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil)
  extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 }

  mapper.record_to_event(
    SerializedRecord
      .new(
        event_type: event_type,
        event_id: event_id,
        data: data,
        metadata: metadata,
        timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load(metadata)],
        valid_at: valid_at || timestamp_
      )
      .deserialize(serializer)
  )
end
event_in_stream?(event_id, stream_name) click to toggle source

Checks whether event is linked in given stream

@param event_id [String] @param stream_name [String] @return [Boolean] true if event is linked to given stream, false otherwise

# File lib/ruby_event_store/client.rb, line 127
def event_in_stream?(event_id, stream_name)
  stream = Stream.new(stream_name)
  stream.global? ? repository.has_event?(event_id) : repository.event_in_stream?(event_id, stream)
end
global_position(event_id) click to toggle source

Gets position of the event in global stream

The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.

@param event_id [String] @raise [EventNotFound] @return [Integer] nonnegno ative integer position of event in global stream

# File lib/ruby_event_store/client.rb, line 118
def global_position(event_id)
  repository.global_position(event_id)
end
inspect() click to toggle source
# File lib/ruby_event_store/client.rb, line 332
def inspect
  "#<#{self.class}:0x#{__id__.to_s(16)}>"
end
metadata() click to toggle source

Read additional metadata which will be added for published events {railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}

@return [Hash]

# File lib/ruby_event_store/client.rb, line 294
def metadata
  @metadata.value || EMPTY_HASH
end
overwrite(events_or_event) click to toggle source

Overwrite existing event(s) with the same ID.

Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. {railseventstore.org/docs/migrating_messages Read more}

@example Add data and metadata to existing events

events = event_store.read.limit(10).to_a
events.each do |ev|
  ev.data[:tenant_id] = 1
  ev.metadata[:server_id] = "eu-west-2"
end
event_store.overwrite(events)

@example Change event type

events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev|
  NewType.new(
    event_id: ev.event_id,
    data: ev.data,
    metadata: ev.metadata,
  )
end
event_store.overwrite(events)

@param events [Array<Event>, Event] event(s) to serialize and overwrite again @return [self]

# File lib/ruby_event_store/client.rb, line 327
def overwrite(events_or_event)
  repository.update_messages(transform(Array(events_or_event)))
  self
end
position_in_stream(event_id, stream_name) click to toggle source

Gets position of the event in given stream

The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.

@param event_id [String] @param stream_name [String] @return [Integer] nonnegative integer position of event in stream @raise [EventNotInStream]

# File lib/ruby_event_store/client.rb, line 105
def position_in_stream(event_id, stream_name)
  repository.position_in_stream(event_id, Stream.new(stream_name))
end
publish(events, stream_name: GLOBAL_STREAM, expected_version: :any) click to toggle source

Persists events and notifies subscribed handlers about them

@param events [Array<Event>, Event] event(s) @param stream_name [String] name of the stream for persisting events. @param expected_version [:any, :auto, :none, Integer] controls optimistic locking strategy. {railseventstore.org/docs/expected_version/ Read more} @return [self]

# File lib/ruby_event_store/client.rb, line 32
def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  enriched_events = enrich_events_metadata(events)
  records = transform(enriched_events)
  append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version)
  enriched_events.zip(records) do |event, record|
    with_metadata(correlation_id: event.metadata.fetch(:correlation_id), causation_id: event.event_id) do
      broker.(event, record)
    end
  end
  self
end
read() click to toggle source

Starts building a query specification for reading events. {railseventstore.org/docs/read/ More info.}

@return [Specification]

# File lib/ruby_event_store/client.rb, line 84
def read
  Specification.new(SpecificationReader.new(repository, mapper))
end
streams_of(event_id) click to toggle source

Gets list of streams where event is stored or linked

@return [Array<Stream>] where event is stored or linked

# File lib/ruby_event_store/client.rb, line 91
def streams_of(event_id)
  repository.streams_of(event_id)
end
subscribe(subscriber = nil, to:, &proc) click to toggle source

Subscribes a handler (subscriber) that will be invoked for published events of provided type.

@overload subscribe(subscriber, to:)

@param to [Array<Class>] types of events to subscribe
@param subscriber [Object, Class] handler
@return [Proc] - unsubscribe proc. Call to unsubscribe.
@raise [ArgumentError, SubscriberNotExist]

@overload subscribe(to:, &subscriber)

@param to [Array<Class>] types of events to subscribe
@param subscriber [Proc] handler
@return [Proc] - unsubscribe proc. Call to unsubscribe.
@raise [ArgumentError, SubscriberNotExist]
# File lib/ruby_event_store/client.rb, line 144
def subscribe(subscriber = nil, to:, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  subscriber ||= proc
  broker.add_subscription(subscriber, to.map { |event_klass| event_type_resolver.call(event_klass) })
end
subscribe_to_all_events(subscriber = nil, &proc) click to toggle source

Subscribes a handler (subscriber) that will be invoked for all published events

@overload subscribe_to_all_events(subscriber)

@param subscriber [Object, Class] handler
@return [Proc] - unsubscribe proc. Call to unsubscribe.
@raise [ArgumentError, SubscriberNotExist]

@overload subscribe_to_all_events(&subscriber)

@param subscriber [Proc] handler
@return [Proc] - unsubscribe proc. Call to unsubscribe.
@raise [ArgumentError, SubscriberNotExist]
# File lib/ruby_event_store/client.rb, line 160
def subscribe_to_all_events(subscriber = nil, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  broker.add_global_subscription(subscriber || proc)
end
subscribers_for(event_class) click to toggle source

Get list of handlers subscribed to an event

@param to [Class, String] type of events to get list of sybscribed handlers @return [Array<Object, Class>]

# File lib/ruby_event_store/client.rb, line 169
def subscribers_for(event_class)
  subscriptions.all_for(event_type_resolver.call(event_class))
end
with_metadata(metadata_for_block, &block) click to toggle source

Set additional metadata for all events published within the provided block {railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}

@param metadata [Hash] metadata to set for events @param block [Proc] block of code during which the metadata will be added @return [Object] last value returned by the provided block

# File lib/ruby_event_store/client.rb, line 261
def with_metadata(metadata_for_block, &block)
  previous_metadata = metadata
  self.metadata = previous_metadata.merge(metadata_for_block)
  block.call if block_given?
ensure
  self.metadata = previous_metadata
end
within(&block) click to toggle source

Use for starting temporary subscriptions. {railseventstore.org/docs/subscribe/#temporary-subscriptions Read more}

@param block [Proc] block of code during which the temporary subscriptions will be active @return [Within] builder object which collects temporary subscriptions

# File lib/ruby_event_store/client.rb, line 250
def within(&block)
  raise ArgumentError if block.nil?
  Within.new(block, broker, event_type_resolver)
end

Protected Instance Methods

default_clock() click to toggle source
# File lib/ruby_event_store/client.rb, line 368
def default_clock
  -> { Time.now.utc.round(TIMESTAMP_PRECISION) }
end
default_correlation_id_generator() click to toggle source
# File lib/ruby_event_store/client.rb, line 372
def default_correlation_id_generator
  -> { SecureRandom.uuid }
end
metadata=(value) click to toggle source
# File lib/ruby_event_store/client.rb, line 364
def metadata=(value)
  @metadata.value = value
end

Private Instance Methods

append_records_to_stream(records, stream_name:, expected_version:) click to toggle source
# File lib/ruby_event_store/client.rb, line 358
def append_records_to_stream(records, stream_name:, expected_version:)
  repository.append_to_stream(records, Stream.new(stream_name), ExpectedVersion.new(expected_version))
end
enrich_event_metadata(event) click to toggle source
# File lib/ruby_event_store/client.rb, line 351
def enrich_event_metadata(event)
  metadata.each { |key, value| event.metadata[key] ||= value }
  event.metadata[:timestamp] ||= clock.call
  event.metadata[:valid_at] ||= event.metadata.fetch(:timestamp)
  event.metadata[:correlation_id] ||= correlation_id_generator.call
end
enrich_events_metadata(events) click to toggle source
# File lib/ruby_event_store/client.rb, line 345
def enrich_events_metadata(events)
  events = Array(events)
  events.each { |event| enrich_event_metadata(event) }
  events
end
transform(events) click to toggle source
# File lib/ruby_event_store/client.rb, line 341
def transform(events)
  events.map { |ev| mapper.event_to_record(ev) }
end