class Kinetic::Worker
Attributes
app[R]
channel[R]
exchanges[R]
id[R]
Public Class Methods
new(id, app)
click to toggle source
# File lib/kinetic/worker.rb, line 8 def initialize(id, app) @app = app logger.debug "Initializing worker #{id}" @id = id @to_io, @master = Kgio::Pipe.new.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } end
Public Instance Methods
atfork_child()
click to toggle source
# File lib/kinetic/worker.rb, line 20 def atfork_child logger.debug "Worker #{id} closing pipe to master" @master = @master.close end
atfork_parent()
click to toggle source
# File lib/kinetic/worker.rb, line 15 def atfork_parent logger.debug "Worker #{id} closing pipe to child" @to_io = @to_io.close end
run()
click to toggle source
# File lib/kinetic/worker.rb, line 25 def run logger.debug "Establishing connection host: '#{config.host}', port: '#{config.port}'" AMQP.start(host: config.host, port: config.port) do |connection| logger.debug "AMQP started with conneciton #{connection}" initialize_channel!(connection) initialize_exchanges! initialize_subscribers! initialize_timed_events! end end
Private Instance Methods
initialize_channel!(connection)
click to toggle source
# File lib/kinetic/worker.rb, line 55 def initialize_channel!(connection) logger.debug 'Intitializing channel' @channel = AMQP::Channel.new(connection) end
initialize_exchanges!()
click to toggle source
# File lib/kinetic/worker.rb, line 60 def initialize_exchanges! logger.debug 'Initializing exchanges' @exchanges = {} prefix = config.name app.send(:exchanges).each_key do |name| logger.debug " Initializing #{name} exchange" @exchanges[name] = channel.send(name, "#{prefix}.#{name}") end end
initialize_subscribers!()
click to toggle source
# File lib/kinetic/worker.rb, line 70 def initialize_subscribers! logger.debug 'Initializing subscribers' app.send(:exchanges).each do |name, value| logger.debug " Building subscribers for '#{name}' exchange" value.each do |queue, block| logger.debug " Initializing queue for '#{queue}'" q = channel.queue(queue) logger.debug " Binding queue to '#{name}' exchange with routing_key '#{queue}'" q.bind(exchanges[name], routing_key: queue) logger.debug " Subscribing to messages for '#{queue}'" q.subscribe do |meta, payload| begin block.call(deserialize(payload)) rescue Exception => e logger.error e end end end end end
initialize_timed_events!()
click to toggle source
# File lib/kinetic/worker.rb, line 91 def initialize_timed_events! logger.debug 'Initializing timed events' app.send(:timed_events).each do |label, event| logger.debug " Creating timer for #{label}" event[:timer] = EM.add_periodic_timer(event[:interval]) { event[:block].call } end end