Class: Isimud::EventListener

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/isimud/event_listener.rb

Overview

Daemon process manager for monitoring event queues. Known EventObserver models and their instances automatically registered upon startup. It is also possible to define ad-hoc queues and handlers by extending In addition, ad-hoc event managing may be set up by extending bind_queues() and making the appropriate subscribe calls directly.

Threads created by the daemon process

Upon startup, EventListener operates using the following threads:

  • An event processing thread that establishes consumers for message queues

  • An error counter thread that manages the error counter

  • A shutdown thread that listens for INT or TERM signals, which will trigger a graceful shutdown.

  • The main thread is put to sleep until a shutdown is required.

Registering Queues

All active instances of all known EventObserver classes (which are assumed to be ActiveRecord instances) are automatically loaded by the event processing thread, and their associated queues are bound. Note that queues and associated routing key bindings are established at the time the instance is created or modified. Each EventListener process creates an exclusive queue for monitoring the creation, modification, and destruction of EventObserver instances, using ModelWatcher messages.

Error Handling

Whenever an uncaught exception is rescued from a consumer handling a message, it is logged and the error counter is incremented. The error counter is reset periodically according to the value of error_interval. If the total number of errors logged exceeds error_limit, the process is terminated immediately. There are certain situations that may cause a Bunny exception to occur, such as a loss of network connection. Whenever a Bunny exception is rescued in the event processing thread, the Bunny session is closed (canceling all queue consumers), in addition to the error being counted, all Bunny channels are closed, and queues are reinitialized.

See Also:

Constant Summary

DEFAULT_ERROR_LIMIT =
10
DEFAULT_ERROR_INTERVAL =
3600
DEFAULT_EVENTS_EXCHANGE =
'events'
DEFAULT_MODELS_EXCHANGE =
'models'
STATUS_INITIALIZE =
:initialize
STATUS_RUNNING =
:running
STATUS_SHUTDOWN =
:shutdown

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Logging

#log, #logger

Constructor Details

- (EventListener) initialize(options = {})

Initialize a new EventListener daemon instance

Parameters:

  • options (Hash) (defaults to: {})

    daemon options

Options Hash (options):

  • :error_limit (Integer) — default: 10

    maximum number of errors that are allowed to occur within error_interval before the process terminates

  • :error_interval (Integer) — default: 3600

    time interval, in seconds, before the error counter is cleared

  • :events_exchange (String) — default: 'events'

    name of AMQP exchange used for listening to event messages

  • :models_exchange (String) — default: 'models'

    name of AMQP exchange used for listening to EventObserver instance create, update, and destroy messages

  • :name (String) — default: "#{Rails.application.class.parent_name.downcase}-listener"

    daemon instance name.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/isimud/event_listener.rb', line 70

def initialize(options = {})
  default_options = {
      error_limit:     Isimud.listener_error_limit || DEFAULT_ERROR_LIMIT,
      error_interval:  DEFAULT_ERROR_INTERVAL,
      events_exchange: Isimud.events_exchange || DEFAULT_EVENTS_EXCHANGE,
      models_exchange: Isimud.model_watcher_exchange || DEFAULT_MODELS_EXCHANGE,
      name:            "#{Rails.application.class.parent_name.downcase}-listener"
  }
  options.reverse_merge!(default_options)
  @error_count         = 0
  @observers           = Hash.new
  @observed_models     = Set.new
  @error_limit         = options[:error_limit]
  @error_interval      = options[:error_interval]
  @events_exchange     = options[:events_exchange]
  @models_exchange     = options[:models_exchange]
  @name                = options[:name]
  @observer_mutex      = Mutex.new
  @error_counter_mutex = Mutex.new
  @status              = STATUS_INITIALIZE
end

Instance Attribute Details

- (Object) error_count (readonly)

Returns the value of attribute error_count



49
50
51
# File 'lib/isimud/event_listener.rb', line 49

def error_count
  @error_count
end

- (Object) error_interval (readonly)

Returns the value of attribute error_interval



49
50
51
# File 'lib/isimud/event_listener.rb', line 49

def error_interval
  @error_interval
end

- (Object) error_limit (readonly)

Returns the value of attribute error_limit



49
50
51
# File 'lib/isimud/event_listener.rb', line 49

def error_limit
  @error_limit
end

- (Object) events_exchange (readonly)

Returns the value of attribute events_exchange



49
50
51
# File 'lib/isimud/event_listener.rb', line 49

def events_exchange
  @events_exchange
end

- (Object) models_exchange (readonly)

Returns the value of attribute models_exchange



49
50
51
# File 'lib/isimud/event_listener.rb', line 49

def models_exchange
  @models_exchange
end

- (Object) name (readonly)

Returns the value of attribute name



49
50
51
# File 'lib/isimud/event_listener.rb', line 49

def name
  @name
end

- (Object) queues (readonly)

Returns the value of attribute queues



49
50
51
# File 'lib/isimud/event_listener.rb', line 49

def queues
  @queues
end

- (Object) status (readonly)

Returns the value of attribute status



49
50
51
# File 'lib/isimud/event_listener.rb', line 49

def status
  @status
end

Instance Method Details

- (Object) bind_event_queues

Hook for setting up custom queues in your application. Override in your subclass.



110
111
# File 'lib/isimud/event_listener.rb', line 110

def bind_event_queues
end

- (Object) bind_queues



114
115
116
117
# File 'lib/isimud/event_listener.rb', line 114

def bind_queues
  bind_observer_queues
  bind_event_queues
end

- (Boolean) has_observer?(observer)

Returns:

  • (Boolean)


120
121
122
# File 'lib/isimud/event_listener.rb', line 120

def has_observer?(observer)
  @observers.has_key?(observer_key_for(observer.class, observer.id))
end

- (Object) run

Run the daemon process. This creates the event, error counter, and shutdown threads



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/isimud/event_listener.rb', line 93

def run
  bind_queues and return if test_env?
  start_shutdown_thread
  start_error_counter_thread
  client.on_exception do |e|
    count_error(e)
  end
  client.connect
  start_event_thread

  puts 'EventListener started. Hit Ctrl-C to exit'
  Thread.stop
  puts 'Main thread wakeup - exiting.'
  client.close
end