class Kinetic::Worker

Attributes

app[R]
channel[R]
exchanges[R]
id[R]

Public Class Methods

new(id, app) click to toggle source
# File lib/kinetic/worker.rb, line 8
def initialize(id, app)
  @app = app
  logger.debug "Initializing worker #{id}"
  @id = id
  @to_io, @master = Kgio::Pipe.new.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
end

Public Instance Methods

atfork_child() click to toggle source
# File lib/kinetic/worker.rb, line 20
def atfork_child
  logger.debug "Worker #{id} closing pipe to master"
  @master = @master.close
end
atfork_parent() click to toggle source
# File lib/kinetic/worker.rb, line 15
def atfork_parent
  logger.debug "Worker #{id} closing pipe to child"
  @to_io = @to_io.close
end
run() click to toggle source
# File lib/kinetic/worker.rb, line 25
def run
  logger.debug "Establishing connection host: '#{config.host}', port: '#{config.port}'"
  AMQP.start(host: config.host, port: config.port) do |connection|
    logger.debug "AMQP started with conneciton #{connection}"
    initialize_channel!(connection)
    initialize_exchanges!
    initialize_subscribers!
    initialize_timed_events!
  end
end

Private Instance Methods

initialize_channel!(connection) click to toggle source
# File lib/kinetic/worker.rb, line 55
def initialize_channel!(connection)
  logger.debug 'Intitializing channel'
  @channel = AMQP::Channel.new(connection)
end
initialize_exchanges!() click to toggle source
# File lib/kinetic/worker.rb, line 60
def initialize_exchanges!
  logger.debug 'Initializing exchanges'
  @exchanges = {}
  prefix = config.name
  app.send(:exchanges).each_key do |name|
    logger.debug "   Initializing #{name} exchange"
    @exchanges[name] = channel.send(name, "#{prefix}.#{name}")
  end
end
initialize_subscribers!() click to toggle source
# File lib/kinetic/worker.rb, line 70
def initialize_subscribers!
  logger.debug 'Initializing subscribers'
  app.send(:exchanges).each do |name, value|
    logger.debug "   Building subscribers for '#{name}' exchange"
    value.each do |queue, block|
      logger.debug "   Initializing queue for '#{queue}'"
      q = channel.queue(queue)
      logger.debug "   Binding queue to '#{name}' exchange with routing_key '#{queue}'"
      q.bind(exchanges[name], routing_key: queue)
      logger.debug "   Subscribing to messages for '#{queue}'"
      q.subscribe do |meta, payload|
        begin
          block.call(deserialize(payload))
        rescue Exception => e
          logger.error e
        end
      end
    end
  end
end
initialize_timed_events!() click to toggle source
# File lib/kinetic/worker.rb, line 91
def initialize_timed_events!
  logger.debug 'Initializing timed events'
  app.send(:timed_events).each do |label, event|
    logger.debug "   Creating timer for #{label}"
    event[:timer] = EM.add_periodic_timer(event[:interval]) { event[:block].call }
  end
end