class RubyEventStore::Projection
Attributes
handlers[R]
streams[R]
Public Class Methods
from_all_streams()
click to toggle source
# File lib/ruby_event_store/projection.rb, line 13 def self.from_all_streams new end
from_stream(stream_or_streams)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 7 def self.from_stream(stream_or_streams) streams = Array(stream_or_streams) raise(ArgumentError, "At least one stream must be given") if streams.empty? new(streams: streams) end
new(streams: [])
click to toggle source
# File lib/ruby_event_store/projection.rb, line 17 def initialize(streams: []) @streams = streams @handlers = {} @init = -> { {} } end
Public Instance Methods
call(event)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 44 def call(event) handlers.fetch(event.event_type).(current_state, event) end
current_state()
click to toggle source
# File lib/ruby_event_store/projection.rb, line 40 def current_state @current_state ||= initial_state end
handled_events()
click to toggle source
# File lib/ruby_event_store/projection.rb, line 48 def handled_events handlers.keys end
init(handler)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 25 def init(handler) @init = handler self end
initial_state()
click to toggle source
# File lib/ruby_event_store/projection.rb, line 36 def initial_state @init.call end
run(event_store, start: nil, count: PAGE_SIZE)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 52 def run(event_store, start: nil, count: PAGE_SIZE) return initial_state if handled_events.empty? streams.any? ? reduce_from_streams(event_store, start, count) : reduce_from_all_streams(event_store, start, count) end
when(events, handler)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 30 def when(events, handler) Array(events).each { |event| handlers[event.to_s] = handler } self end
Private Instance Methods
read_scope(event_store, stream, count, start)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 78 def read_scope(event_store, stream, count, start) scope = event_store.read.in_batches(count) scope = scope.of_type(handled_events) scope = scope.stream(stream) if stream scope = scope.from(start) if start scope end
reduce_from_all_streams(event_store, start, count)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 73 def reduce_from_all_streams(event_store, start, count) raise ArgumentError.new("Start must be valid event id") unless valid_starting_point?(start) read_scope(event_store, nil, count, start).reduce(initial_state, &method(:transition)) end
reduce_from_streams(event_store, start, count)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 64 def reduce_from_streams(event_store, start, count) raise ArgumentError.new("Start must be an array with event ids") unless valid_starting_point?(start) streams .zip(start_events(start)) .reduce(initial_state) do |state, (stream_name, start_event_id)| read_scope(event_store, stream_name, count, start_event_id).reduce(state, &method(:transition)) end end
start_events(start)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 86 def start_events(start) start ? start : Array.new end
transition(state, event)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 90 def transition(state, event) handlers.fetch(event.event_type).call(state, event) state end
valid_starting_point?(start)
click to toggle source
# File lib/ruby_event_store/projection.rb, line 59 def valid_starting_point?(start) return true unless start streams.any? ? (start.instance_of?(Array) && start.size === streams.size) : start.instance_of?(String) end