class ROM::EventStore::Dataset

Attributes

category[R]

Public Class Methods

new(category, connection, options = {}) click to toggle source
# File lib/rom/event_store/dataset.rb, line 6
def initialize(category, connection, options = {})
  @category = category
  @connection = connection
  @options = options
end

Public Instance Methods

append(events) click to toggle source
# File lib/rom/event_store/dataset.rb, line 33
def append(events)
  @connection.append(stream, events).sync
  events
end
each() { |event| ... } click to toggle source
# File lib/rom/event_store/dataset.rb, line 44
def each
  with_events { |event| yield(event) }
end
events() click to toggle source
# File lib/rom/event_store/dataset.rb, line 29
def events
  @connection.read(stream, @options).sync
end
from(id) click to toggle source
# File lib/rom/event_store/dataset.rb, line 16
def from(id)
  __new__(from: id)
end
limit(limit) click to toggle source
# File lib/rom/event_store/dataset.rb, line 20
def limit(limit)
  __new__(limit: limit)
end
select(aggregate) click to toggle source
# File lib/rom/event_store/dataset.rb, line 12
def select(aggregate)
  __new__(aggregate: aggregate)
end
stream() click to toggle source
# File lib/rom/event_store/dataset.rb, line 24
def stream
  aggregate = @options[:aggregate]
  aggregate ? "#{category}-#{aggregate}" : "$ce-#{category}"
end
subscribe() { |dehydrate(event)| ... } click to toggle source
# File lib/rom/event_store/dataset.rb, line 38
def subscribe
  subscription = @connection.subscription(stream, @options)
  subscription.on_event { |event| yield(dehydrate(event)) }
  subscription.start
end

Private Instance Methods

__new__(new_opts = {}) click to toggle source
# File lib/rom/event_store/dataset.rb, line 50
def __new__(new_opts = {})
  self.class.new(@category, @connection, @options.merge(new_opts))
end
dehydrate(wrapper) click to toggle source
# File lib/rom/event_store/dataset.rb, line 62
def dehydrate(wrapper)
  event = wrapper.event
  category, aggregate = event.event_stream_id.split('-', 2)

  {
    id: Estore::Package.parse_uuid(event.event_id),
    type: event.event_type,
    data: event.data,
    category: category,
    aggregate: aggregate,
    number: event.event_number,
    position: wrapper.original_event_number,
    created_at: Time.at(event.created_epoch / 1000)
  }
end
option(option, default) click to toggle source
# File lib/rom/event_store/dataset.rb, line 54
def option(option, default)
  @options.fetch(option, default)
end
with_events() { |dehydrate(event)| ... } click to toggle source
# File lib/rom/event_store/dataset.rb, line 58
def with_events
  events.each { |event| yield(dehydrate(event)) }
end