class EventHub::Processor2

Processor2 class

Constants

ALL_SIGNALS
SIGNALS_FOR_RELOAD_CONFIG
SIGNALS_FOR_TERMINATION

Attributes

started_at[R]
statistics[R]

Public Class Methods

new(args = {}) click to toggle source
# File lib/eventhub/processor2.rb, line 15
def initialize(args = {})
  # Set processor name
  EventHub::Configuration.name = args[:name] ||
    get_name_from_class(self)

  # Parse comand line options
  EventHub::Configuration.parse_options

  # Load configuration file
  EventHub::Configuration.load!(args)

  @command_queue = []

  @sleeper = EventHub::Sleeper.new
  @started_at = Time.now
  @statistics = EventHub::Statistics.new
end

Public Instance Methods

after_stop() click to toggle source
# File lib/eventhub/processor2.rb, line 73
def after_stop
  # can be implemented in derived class
end
before_start() click to toggle source
# File lib/eventhub/processor2.rb, line 69
def before_start
  # can be implemented in derived class
end
handle_message(_message, _args = {}) click to toggle source

get message as EventHub::Message class instance args contain :queue_name, :content_type, :priority, :delivery_tag

# File lib/eventhub/processor2.rb, line 56
def handle_message(_message, _args = {})
  raise "need to be implemented in derived class"
end
publish(args = {}) click to toggle source

pass message as string like: '{ “header”: … , “body”: { .. }}' and optionally exchange_name: 'your exchange name'

# File lib/eventhub/processor2.rb, line 62
def publish(args = {})
  Celluloid::Actor[:actor_listener].publish(args)
rescue => error
  EventHub.logger.error("Unexpected exeption while publish: #{error}")
  raise
end
start() click to toggle source
# File lib/eventhub/processor2.rb, line 33
def start
  EventHub.logger.info("#{Configuration.name} (#{version}): has been started")

  before_start
  main_event_loop
  after_stop

  EventHub.logger.info("#{Configuration.name} (#{version}): has been stopped")
rescue => ex
  EventHub.logger.error("Unexpected error in Processor2.start: #{ex}")
end
stop() click to toggle source
# File lib/eventhub/processor2.rb, line 45
def stop
  # used by rspec
  @command_queue << :TERM
end
version() click to toggle source
# File lib/eventhub/processor2.rb, line 50
def version
  EventHub::VERSION
end

Private Instance Methods

main_event_loop() click to toggle source
# File lib/eventhub/processor2.rb, line 103
def main_event_loop
  Celluloid.boot
  setup_signal_handler
  start_supervisor

  loop do
    command = @command_queue.pop
    if SIGNALS_FOR_TERMINATION.include?(command)
      EventHub.logger.info("Command [#{command}] received")
      @sleeper.stop
      break
    elsif SIGNALS_FOR_RELOAD_CONFIG.include?(command)
      EventHub::Configuration.load!
      EventHub.logger.info("Configuration file reloaded")

      # restart listener when actor is known
      if Celluloid::Actor[:actor_listener]
        Celluloid::Actor[:actor_listener].async.restart
      else
        EventHub.logger.info("Was unable to get a valid listener actor to restart... check!!!")
      end
    else
      sleep 0.5
    end
  end

  Celluloid.shutdown
  # make sure all actors are gone
  while Celluloid.running?
    sleep 0.1
  end
end
setup_signal_handler() click to toggle source
# File lib/eventhub/processor2.rb, line 79
def setup_signal_handler
  # have a re-entrant signal handler by just using a simple array
  # https://www.sitepoint.com/the-self-pipe-trick-explained/
  ALL_SIGNALS.each do |signal|
    Signal.trap(signal) { @command_queue << signal }
  end
end
start_supervisor() click to toggle source
# File lib/eventhub/processor2.rb, line 87
def start_supervisor
  @config = Celluloid::Supervision::Configuration.define([
    {type: ActorHeartbeat, as: :actor_heartbeat, args: [self]},
    {type: ActorListener, as: :actor_listener, args: [self]}
  ])

  sleeper = @sleeper
  @config.injection!(:before_restart, proc do
    restart_in_s = Configuration.processor[:restart_in_s]
    EventHub.logger.info("Restarting in #{restart_in_s} seconds...")
    sleeper.start(restart_in_s)
  end)

  @config.deploy
end