class EventHub::Processor2
Processor2
class
Constants
- ALL_SIGNALS
- SIGNALS_FOR_RELOAD_CONFIG
- SIGNALS_FOR_TERMINATION
Attributes
started_at[R]
statistics[R]
Public Class Methods
new(args = {})
click to toggle source
# File lib/eventhub/processor2.rb, line 15 def initialize(args = {}) # Set processor name EventHub::Configuration.name = args[:name] || get_name_from_class(self) # Parse comand line options EventHub::Configuration.parse_options # Load configuration file EventHub::Configuration.load!(args) @command_queue = [] @sleeper = EventHub::Sleeper.new @started_at = Time.now @statistics = EventHub::Statistics.new end
Public Instance Methods
after_stop()
click to toggle source
# File lib/eventhub/processor2.rb, line 73 def after_stop # can be implemented in derived class end
before_start()
click to toggle source
# File lib/eventhub/processor2.rb, line 69 def before_start # can be implemented in derived class end
handle_message(_message, _args = {})
click to toggle source
get message as EventHub::Message
class instance args contain :queue_name, :content_type, :priority, :delivery_tag
# File lib/eventhub/processor2.rb, line 56 def handle_message(_message, _args = {}) raise "need to be implemented in derived class" end
publish(args = {})
click to toggle source
pass message as string like: '{ “header”: … , “body”: { .. }}' and optionally exchange_name: 'your exchange name'
# File lib/eventhub/processor2.rb, line 62 def publish(args = {}) Celluloid::Actor[:actor_listener].publish(args) rescue => error EventHub.logger.error("Unexpected exeption while publish: #{error}") raise end
start()
click to toggle source
# File lib/eventhub/processor2.rb, line 33 def start EventHub.logger.info("#{Configuration.name} (#{version}): has been started") before_start main_event_loop after_stop EventHub.logger.info("#{Configuration.name} (#{version}): has been stopped") rescue => ex EventHub.logger.error("Unexpected error in Processor2.start: #{ex}") end
stop()
click to toggle source
# File lib/eventhub/processor2.rb, line 45 def stop # used by rspec @command_queue << :TERM end
version()
click to toggle source
# File lib/eventhub/processor2.rb, line 50 def version EventHub::VERSION end
Private Instance Methods
main_event_loop()
click to toggle source
# File lib/eventhub/processor2.rb, line 103 def main_event_loop Celluloid.boot setup_signal_handler start_supervisor loop do command = @command_queue.pop if SIGNALS_FOR_TERMINATION.include?(command) EventHub.logger.info("Command [#{command}] received") @sleeper.stop break elsif SIGNALS_FOR_RELOAD_CONFIG.include?(command) EventHub::Configuration.load! EventHub.logger.info("Configuration file reloaded") # restart listener when actor is known if Celluloid::Actor[:actor_listener] Celluloid::Actor[:actor_listener].async.restart else EventHub.logger.info("Was unable to get a valid listener actor to restart... check!!!") end else sleep 0.5 end end Celluloid.shutdown # make sure all actors are gone while Celluloid.running? sleep 0.1 end end
setup_signal_handler()
click to toggle source
# File lib/eventhub/processor2.rb, line 79 def setup_signal_handler # have a re-entrant signal handler by just using a simple array # https://www.sitepoint.com/the-self-pipe-trick-explained/ ALL_SIGNALS.each do |signal| Signal.trap(signal) { @command_queue << signal } end end
start_supervisor()
click to toggle source
# File lib/eventhub/processor2.rb, line 87 def start_supervisor @config = Celluloid::Supervision::Configuration.define([ {type: ActorHeartbeat, as: :actor_heartbeat, args: [self]}, {type: ActorListener, as: :actor_listener, args: [self]} ]) sleeper = @sleeper @config.injection!(:before_restart, proc do restart_in_s = Configuration.processor[:restart_in_s] EventHub.logger.info("Restarting in #{restart_in_s} seconds...") sleeper.start(restart_in_s) end) @config.deploy end