class QPush::Server::Worker

The Worker manages our actions - Queue, Delay, Perform and Heartbeat. Each of these actions is alloted a number of threads. Each action object maintains control of these threads through the aptly named start and shutdown methods.

Attributes

config[R]
id[R]
pid[R]

Public Class Methods

new(id, config) click to toggle source
# File lib/qpush/server/worker.rb, line 24
def initialize(id, config)
  @id = id
  @pid = Process.pid
  @config = config
  @actions = []
  @threads = []
  at_exit { shutdown }
end

Public Instance Methods

shutdown() click to toggle source

Shutsdown our worker as well as its threads.

# File lib/qpush/server/worker.rb, line 45
def shutdown
  shutdown_message
  @actions.each(&:shutdown)
  @threads.each(&:exit)
end
start() click to toggle source

Starts our new worker.

# File lib/qpush/server/worker.rb, line 35
def start
  assign_globals
  register_space
  start_message
  build_actions
  start_threads
end

Private Instance Methods

assign_globals() click to toggle source

Assign the globals that are required for our worker to function.

# File lib/qpush/server/worker.rb, line 55
def assign_globals
  Server.keys = Server.build_keys(@config.namespace, @config.priorities)
  Server.worker = self
end
base_actions() click to toggle source
# File lib/qpush/server/worker.rb, line 86
def base_actions
  [
    { klass: Perform, count: @config.perform_threads },
    { klass: Queue, count: @config.queue_threads },
    { klass: Delay, count: @config.delay_threads },
    { klass: Heartbeat, count: 1 }
  ]
end
build_actions() click to toggle source

Instantiates our Queue, Perform, Delay and Heartbeat objects based on the number of threads specified for each action type. We store these objects as an array in @actions.

# File lib/qpush/server/worker.rb, line 78
def build_actions
  base_actions.each do |action|
    action[:count].times do
      @actions << action[:klass].new
    end
  end
end
register_space() click to toggle source

Registers our workers namespace on Redis

# File lib/qpush/server/worker.rb, line 62
def register_space
  Server.redis do |c|
    c.sadd("#{QPush::Base::KEY}:namespaces", @config.namespace)
  end
end
shutdown_message() click to toggle source

Information about the shutdown process

# File lib/qpush/server/worker.rb, line 107
def shutdown_message
  Server.log.info("* Worker #{@id} shutdown | pid: #{@pid}")
end
start_message() click to toggle source

Information about the start process

# File lib/qpush/server/worker.rb, line 70
def start_message
  Server.log.info("* Worker #{@id} started | pid: #{@pid} | namespace: #{@config.namespace}")
end
start_threads() click to toggle source

Creates threads for each of the action objects, We then start them and join them to the main process.

# File lib/qpush/server/worker.rb, line 98
def start_threads
  @actions.each do |action|
    @threads << Thread.new { action.start }
  end
  @threads.map(&:join)
end