class Isimud::EventListener

Daemon process manager for monitoring event queues. Known EventObserver models and their instances automatically registered upon startup. It is also possible to define ad-hoc queues and handlers by extending In addition, ad-hoc event managing may be set up by extending bind_queues() and making the appropriate subscribe calls directly.

Threads created by the daemon process

Upon startup, EventListener operates using the following threads:

Registering Queues

All active instances of all known EventObserver classes (which are assumed to be ActiveRecord instances) are automatically loaded by the event processing thread, and their associated queues are bound. Note that queues and associated routing key bindings are established at the time the instance is created or modified. @see EventObserver.find_active_observers

Each EventListener process creates an exclusive queue for monitoring the creation, modification, and destruction of EventObserver instances, using ModelWatcher messages.

Error Handling

Whenever an uncaught exception is rescued from a consumer handling a message, it is logged and the error counter is incremented. The error counter is reset periodically according to the value of error_interval. If the total number of errors logged exceeds error_limit, the process is terminated immediately. @see BunnyClient#subscribe()

There are certain situations that may cause a Bunny exception to occur, such as a loss of network connection. Whenever a Bunny exception is rescued in the event processing thread, the Bunny session is closed (canceling all queue consumers), in addition to the error being counted, all Bunny channels are closed, and queues are reinitialized.

Constants

DEFAULT_ERROR_INTERVAL
DEFAULT_ERROR_LIMIT
DEFAULT_EVENTS_EXCHANGE
DEFAULT_MODELS_EXCHANGE
STATUS_INITIALIZE
STATUS_RUNNING
STATUS_SHUTDOWN

Attributes

error_count[R]
error_interval[R]
error_limit[R]
events_exchange[R]
models_exchange[R]
name[R]
queues[R]
status[R]

Public Class Methods

new(options = {}) click to toggle source

Initialize a new EventListener daemon instance @param [Hash] options daemon options @option options [Integer] :error_limit (10) maximum number of errors that are allowed to occur within error_interval

before the process terminates

@option options [Integer] :error_interval (3600) time interval, in seconds, before the error counter is cleared @option options [String] :events_exchange ('events') name of AMQP exchange used for listening to event messages @option options [String] :models_exchange ('models') name of AMQP exchange used for listening to EventObserver

instance create, update, and destroy messages

@option options [String] :name (“#{Rails.application.class.parent_name.downcase}-listener”) daemon instance name.

# File lib/isimud/event_listener.rb, line 70
def initialize(options = {})
  default_options = {
      error_limit:     Isimud.listener_error_limit || DEFAULT_ERROR_LIMIT,
      error_interval:  DEFAULT_ERROR_INTERVAL,
      events_exchange: Isimud.events_exchange || DEFAULT_EVENTS_EXCHANGE,
      models_exchange: Isimud.model_watcher_exchange || DEFAULT_MODELS_EXCHANGE,
      name:            "#{Rails.application.class.parent_name.downcase}-listener"
  }
  options.reverse_merge!(default_options)
  @error_count         = 0
  @observers           = Hash.new
  @observed_models     = Set.new
  @error_limit         = options[:error_limit]
  @error_interval      = options[:error_interval]
  @events_exchange     = options[:events_exchange]
  @models_exchange     = options[:models_exchange]
  @name                = options[:name]
  @observer_mutex      = Thread::Mutex.new
  @error_counter_mutex = Thread::Mutex.new
  @status              = STATUS_INITIALIZE
end

Public Instance Methods

bind_event_queues() click to toggle source

Hook for setting up custom queues in your application. Override in your subclass.

# File lib/isimud/event_listener.rb, line 110
def bind_event_queues
end
bind_queues() click to toggle source

@private

# File lib/isimud/event_listener.rb, line 114
def bind_queues
  bind_observer_queues
  bind_event_queues
end
find_observer(klass, id) click to toggle source

@private

# File lib/isimud/event_listener.rb, line 125
def find_observer(klass, id)
  @observers[observer_key_for(klass, id)]
end
has_observer?(observer) click to toggle source

@private

# File lib/isimud/event_listener.rb, line 120
def has_observer?(observer)
  @observers.has_key?(observer_key_for(observer.class, observer.id))
end
run() click to toggle source

Run the daemon process. This creates the event, error counter, and shutdown threads

# File lib/isimud/event_listener.rb, line 93
def run
  bind_queues and return if test_env?
  start_shutdown_thread
  start_error_counter_thread
  client.on_exception do |e|
    count_error(e)
  end
  client.connect
  start_event_thread

  puts 'EventListener started. Hit Ctrl-C to exit'
  Thread.stop
  puts 'Main thread wakeup - exiting.'
  client.close
end

Private Instance Methods

bind_observer_queues() click to toggle source
# File lib/isimud/event_listener.rb, line 135
def bind_observer_queues
  Isimud::EventObserver.observed_models.each do |model_class|
    log "EventListener: registering observers for #{model_class}"
    register_observer_class(model_class)
    count = 0
    model_class.find_active_observers.each do |model|
      register_observer(model)
      count += 1
    end
    log "EventListener: registered #{count} observers for #{model_class}"
  end
  client.subscribe(observer_queue) do |payload|
    handle_observer_event(payload)
  end
end
client() click to toggle source
# File lib/isimud/event_listener.rb, line 161
def client
  Isimud.client
end
count_error(exception) click to toggle source
# File lib/isimud/event_listener.rb, line 196
def count_error(exception)
  @error_counter_mutex.synchronize do
    @error_count += 1
    log "EventListener#count_error count = #{@error_count} limit=#{error_limit}", :warn
    if (@error_count >= error_limit)
      log 'EventListener: too many errors, exiting', :fatal
      @status = STATUS_SHUTDOWN
      Thread.main.run unless test_env?
    end
  end
end
handle_observer_event(payload) click to toggle source
# File lib/isimud/event_listener.rb, line 222
def handle_observer_event(payload)
  event  = JSON.parse(payload).with_indifferent_access
  action = event[:action]
  log "EventListener: received observer model message: #{event.inspect}"
  if %w(update destroy).include?(action)
    unregister_observer(event[:type], event[:id])
  end
  if %w(create update).include?(action)
    observer = event[:type].constantize.find(event[:id])
    register_observer(observer) if observer.enable_listener?
  end
end
initializing?() click to toggle source
# File lib/isimud/event_listener.rb, line 165
def initializing?
  status == STATUS_INITIALIZE
end
observer_key_for(type, id) click to toggle source
# File lib/isimud/event_listener.rb, line 272
def observer_key_for(type, id)
  [type.to_s, id.to_s].join(':')
end
observer_queue() click to toggle source

Create or return the observer queue which listens for ModelWatcher events

# File lib/isimud/event_listener.rb, line 265
def observer_queue
  @observer_queue ||= client.create_queue([name, 'listener', Socket.gethostname, Process.pid].join('.'),
                                          models_exchange,
                                          queue_options:     {exclusive: true},
                                          subscribe_options: {manual_ack: true})
end
register_observer(observer) click to toggle source

Register an observer instance, and start listening for events on its associated queue. Also ensure that we are listening for observer class update events

# File lib/isimud/event_listener.rb, line 247
def register_observer(observer)
  @observer_mutex.synchronize do
    log "EventListener: registering observer #{observer.class} #{observer.id}"
    @observers[observer_key_for(observer.class, observer.id)] = observer.observe_events(client)
  end
end
register_observer_class(observer_class) click to toggle source

Register the observer class watcher

# File lib/isimud/event_listener.rb, line 236
def register_observer_class(observer_class)
  @observer_mutex.synchronize do
    return if @observed_models.include?(observer_class)
    @observed_models << observer_class
    log "EventListener: registering observer class #{observer_class}"
    observer_queue.bind(models_exchange, routing_key: "#{Isimud.model_watcher_schema}.#{observer_class.base_class.name}.*")
  end
end
running?() click to toggle source
# File lib/isimud/event_listener.rb, line 169
def running?
  status == STATUS_RUNNING
end
shutdown?() click to toggle source
# File lib/isimud/event_listener.rb, line 173
def shutdown?
  status == STATUS_SHUTDOWN
end
start_error_counter_thread() click to toggle source
# File lib/isimud/event_listener.rb, line 208
def start_error_counter_thread
  log 'EventListener: starting error counter', :info
  @error_count = 0
  Thread.new do
    while true
      sleep(error_interval)
      @error_counter_mutex.synchronize do
        log('EventListener: resetting error counter') if @error_count > 0
        @error_count = 0
      end
    end
  end
end
start_event_thread() click to toggle source
# File lib/isimud/event_listener.rb, line 177
def start_event_thread
  Thread.new do
    log 'EventListener: starting event_thread', :info
    until shutdown? do
      begin
        bind_queues
        log 'EventListener: event_thread bind_queues finished', :info
        @status = STATUS_RUNNING
        Thread.stop
      rescue => e
        log "EventListener: error in event thread: #{e.message}\n  #{e.backtrace.join("\n  ")}", :warn
        count_error(e)
        @observer_queue = nil
        client.reset
      end
    end
  end
end
start_shutdown_thread() click to toggle source
# File lib/isimud/event_listener.rb, line 151
def start_shutdown_thread
  shutdown_thread = Thread.new do
    Thread.stop # wait until we get a TERM or INT signal.
    log 'EventListener: shutdown requested.  Shutting down AMQP...', :info
    @status = STATUS_SHUTDOWN
    Thread.main.run
  end
  %w(SIGINT SIGTERM).each { |sig| trap(sig) { shutdown_thread.wakeup } }
end
test_env?() click to toggle source
# File lib/isimud/event_listener.rb, line 131
def test_env?
  ['cucumber', 'test'].include?(Rails.env)
end
unregister_observer(observer_class, observer_id) click to toggle source

Unregister an observer instance, and cancel consumption of messages. Any pre-fetched messages will be returned to the queue.

# File lib/isimud/event_listener.rb, line 255
def unregister_observer(observer_class, observer_id)
  @observer_mutex.synchronize do
    log "EventListener: un-registering observer #{observer_class} #{observer_id}"
    if (consumer = @observers.delete(observer_key_for(observer_class, observer_id)))
      consumer.cancel
    end
  end
end