class Cyclop::Worker

Attributes

actions[RW]

Path to actions directory

die_after[RW]

Number of jobs to process before exiting

job_opts[RW]

Options passed to Cyclop.next to get next job

logger[RW]

Logger for master

processed_jobs[RW]

Number of jobs processed by this worker

queues[RW]

Queues to process

sleep_interval[RW]

How much time to sleep between poll

Public Class Methods

new(config={}) click to toggle source
# File lib/cyclop/worker.rb, line 18
def initialize(config={})
  raise ArgumentError, 'mongo["database"] is required' unless config["mongo"] && config["mongo"]["database"]

  self.queues = config["queues"] || []
  self.logger = Logger.new(config["log_file"] || $stdout)
  self.sleep_interval = config["sleep_interval"] || 1
  self.actions = config["actions"] || "./actions"
  self.processed_jobs = 0
  self.die_after = config["die_after"]
  @job_opts = {}
  if config["limit_to_host"]
    @job_opts[:host] = config["limit_to_host"]
    @job_opts[:host] = Cyclop.host if @job_opts[:host]=="localhost"
  end
  connection = if config["mongo"]["hosts"]
    Mongo::ReplSetConnection.new(
      *config["mongo"]["hosts"],
      rs_name: config["mongo"]["rs_name"],
      read_secondary: !!config["mongo"]["read_secondary"],
      logger: (logger if config["mongo"]["log"]),
    )
  else
    Mongo::Connection.new(
      (config["mongo"]["host"] || "127.0.0.1"),
      (config["mongo"]["port"] || 27017),
      logger: (logger if config["mongo"]["log"]),
    )
  end
  Cyclop.db = connection.db config["mongo"]["database"]
end

Public Instance Methods

perform(job) click to toggle source

Called inside forked process

Parameters:

# File lib/cyclop/worker.rb, line 89
def perform(job)
  load_actions
  Cyclop::Action.find_by_queue(job.queue).perform(*job.job_params)
  0
rescue Exception => e
  log e.to_s
  job.release! e
  1
end
run() click to toggle source

Start processing jobs

# File lib/cyclop/worker.rb, line 50
def run
  register_signal_handlers
  loop do
    if stop?
      log "Shutting down..."
      break
    end
    if job = next_job
      @sleeping = false
      if @pid = fork
        msg = "Forked process #{@pid} to work on job #{job.queue}-#{job._id}..."
        log msg
        procline msg
        Process.wait
        log "Child process #{@pid} ended with status: #{$?}"
        self.processed_jobs += 1
        if $?.exitstatus==0
          job.complete!
        else
          job.release!
        end
      else
        procline "Processing #{job.queue}-#{job._id} (started at #{Time.now.utc})"
        exit! perform job
      end
    else
      log "No more job to process, start sleeping..." unless @sleeping
      @sleeping = true
      sleep sleep_interval
    end
  end
end
stop() click to toggle source

Gracefull shutdown

# File lib/cyclop/worker.rb, line 100
def stop
  @stop = true
end
stop!() click to toggle source

Forced shutdown

# File lib/cyclop/worker.rb, line 105
def stop!
  if @pid
    Process.kill "TERM", @pid
    Process.wait
  end
  exit!
end

Private Instance Methods

load_actions() click to toggle source
# File lib/cyclop/worker.rb, line 138
def load_actions
  Dir["#{actions}/*.rb"].each{|action| require File.absolute_path(action) }
end
log(message) click to toggle source
# File lib/cyclop/worker.rb, line 134
def log(message)
  logger << "#{Time.now}: #{message}\n"
end
next_job() click to toggle source
# File lib/cyclop/worker.rb, line 126
def next_job
  Cyclop.next *queues, job_opts
end
procline(line) click to toggle source
# File lib/cyclop/worker.rb, line 130
def procline(line)
  $0 = "cyclop-#{Cyclop::VERSION}: #{line}"
end
register_signal_handlers() click to toggle source

Trap signals

QUIT - graceful shutdown INT - first gracefull shutdown, second time force shutdown TERM - force shutdown

# File lib/cyclop/worker.rb, line 120
def register_signal_handlers
  trap("QUIT") { stop }
  trap("INT")  { @stop ? stop! : stop }
  trap("TERM") { stop! }
end
stop?() click to toggle source
# File lib/cyclop/worker.rb, line 142
def stop?
  @stop || (die_after && processed_jobs >= die_after.to_i)
end