class RubyEventStore::Projection

Attributes

handlers[R]
streams[R]

Public Class Methods

from_all_streams() click to toggle source
# File lib/ruby_event_store/projection.rb, line 13
def self.from_all_streams
  new
end
from_stream(stream_or_streams) click to toggle source
# File lib/ruby_event_store/projection.rb, line 7
def self.from_stream(stream_or_streams)
  streams = Array(stream_or_streams)
  raise(ArgumentError, "At least one stream must be given") if streams.empty?
  new(streams: streams)
end
new(streams: []) click to toggle source
# File lib/ruby_event_store/projection.rb, line 17
def initialize(streams: [])
  @streams = streams
  @handlers = {}
  @init = -> { {} }
end

Public Instance Methods

call(event) click to toggle source
# File lib/ruby_event_store/projection.rb, line 44
def call(event)
  handlers.fetch(event.event_type).(current_state, event)
end
current_state() click to toggle source
# File lib/ruby_event_store/projection.rb, line 40
def current_state
  @current_state ||= initial_state
end
handled_events() click to toggle source
# File lib/ruby_event_store/projection.rb, line 48
def handled_events
  handlers.keys
end
init(handler) click to toggle source
# File lib/ruby_event_store/projection.rb, line 25
def init(handler)
  @init = handler
  self
end
initial_state() click to toggle source
# File lib/ruby_event_store/projection.rb, line 36
def initial_state
  @init.call
end
run(event_store, start: nil, count: PAGE_SIZE) click to toggle source
# File lib/ruby_event_store/projection.rb, line 52
def run(event_store, start: nil, count: PAGE_SIZE)
  return initial_state if handled_events.empty?
  streams.any? ? reduce_from_streams(event_store, start, count) : reduce_from_all_streams(event_store, start, count)
end
when(events, handler) click to toggle source
# File lib/ruby_event_store/projection.rb, line 30
def when(events, handler)
  Array(events).each { |event| handlers[event.to_s] = handler }

  self
end

Private Instance Methods

read_scope(event_store, stream, count, start) click to toggle source
# File lib/ruby_event_store/projection.rb, line 78
def read_scope(event_store, stream, count, start)
  scope = event_store.read.in_batches(count)
  scope = scope.of_type(handled_events)
  scope = scope.stream(stream) if stream
  scope = scope.from(start) if start
  scope
end
reduce_from_all_streams(event_store, start, count) click to toggle source
# File lib/ruby_event_store/projection.rb, line 73
def reduce_from_all_streams(event_store, start, count)
  raise ArgumentError.new("Start must be valid event id") unless valid_starting_point?(start)
  read_scope(event_store, nil, count, start).reduce(initial_state, &method(:transition))
end
reduce_from_streams(event_store, start, count) click to toggle source
# File lib/ruby_event_store/projection.rb, line 64
def reduce_from_streams(event_store, start, count)
  raise ArgumentError.new("Start must be an array with event ids") unless valid_starting_point?(start)
  streams
    .zip(start_events(start))
    .reduce(initial_state) do |state, (stream_name, start_event_id)|
      read_scope(event_store, stream_name, count, start_event_id).reduce(state, &method(:transition))
    end
end
start_events(start) click to toggle source
# File lib/ruby_event_store/projection.rb, line 86
def start_events(start)
  start ? start : Array.new
end
transition(state, event) click to toggle source
# File lib/ruby_event_store/projection.rb, line 90
def transition(state, event)
  handlers.fetch(event.event_type).call(state, event)
  state
end
valid_starting_point?(start) click to toggle source
# File lib/ruby_event_store/projection.rb, line 59
def valid_starting_point?(start)
  return true unless start
  streams.any? ? (start.instance_of?(Array) && start.size === streams.size) : start.instance_of?(String)
end