class EsReadModel::Subscriber

Attributes

state[R]
status[R]

Public Class Methods

new(options) click to toggle source
# File lib/es_readmodel/subscriber.rb, line 10
def initialize(options)
  @listener = options[:listener]
  @initial_state = options[:initial]
  url = options[:es_url]
  @status = {
    available: false,
    startedAt: Time.now,
    eventsReceived: 0,
    eventStore: {
      url: url,
      connected: true,
      disconnects: 0
    }
  }
  @connection = Connection.new(url, options[:es_username], options[:es_password])
  @reducer = options[:reducer]
end

Public Instance Methods

subscribe() click to toggle source
# File lib/es_readmodel/subscriber.rb, line 28
def subscribe
  loop do
    begin
      @status[:available] = false
      @status[:eventStore][:connected] = false
      @state = @initial_state
      @stream = Stream.open("$all", @connection, @listener)
      @status[:eventStore][:connected] = true
      @status[:eventStore][:lastConnect] = Time.now
      subscribe_to_all_events
    rescue Exception => ex
      @listener.call({
        level: 'error',
        tag:   'connection.error',
        msg:   "#{ex.class}: #{ex.message}"
      })
      @status[:eventStore][:disconnects] = @status[:eventStore][:disconnects] + 1
      @status[:eventStore][:lastDisconnect] = Time.now
    end
  end
end

Private Instance Methods

subscribe_to_all_events() click to toggle source
# File lib/es_readmodel/subscriber.rb, line 52
def subscribe_to_all_events
  loop do
    @status[:available] = true
    @stream.wait_for_new_events
    @status[:available] = false
    num_events_processed = 0
    @stream.each_event do |evt|
      begin
        @state = @reducer.call(@state, evt)
      rescue Exception => ex
        @listener.call({
          level: 'error',
          tag:   'reducer.error',
          msg:   "Error in reducer: #{ex.class}: #{ex.message}. Read model state not updated.",
          event: evt.type
        })
      end
      @status[:eventsReceived] = @status[:eventsReceived] + 1
      num_events_processed += 1
    end
    @listener.call({
      level: 'info',
      tag:   'subscription.caughtUp',
      msg:   "Subscription to $all caught up",
      eventsProcessed: num_events_processed
    })
  end
end