class Cyclop::Worker
Attributes
actions[RW]
Path to actions directory
die_after[RW]
Number of jobs to process before exiting
job_opts[RW]
Options passed to Cyclop.next
to get next job
logger[RW]
Logger for master
processed_jobs[RW]
Number of jobs processed by this worker
queues[RW]
Queues to process
sleep_interval[RW]
How much time to sleep between poll
Public Class Methods
new(config={})
click to toggle source
# File lib/cyclop/worker.rb, line 18 def initialize(config={}) raise ArgumentError, 'mongo["database"] is required' unless config["mongo"] && config["mongo"]["database"] self.queues = config["queues"] || [] self.logger = Logger.new(config["log_file"] || $stdout) self.sleep_interval = config["sleep_interval"] || 1 self.actions = config["actions"] || "./actions" self.processed_jobs = 0 self.die_after = config["die_after"] @job_opts = {} if config["limit_to_host"] @job_opts[:host] = config["limit_to_host"] @job_opts[:host] = Cyclop.host if @job_opts[:host]=="localhost" end connection = if config["mongo"]["hosts"] Mongo::ReplSetConnection.new( *config["mongo"]["hosts"], rs_name: config["mongo"]["rs_name"], read_secondary: !!config["mongo"]["read_secondary"], logger: (logger if config["mongo"]["log"]), ) else Mongo::Connection.new( (config["mongo"]["host"] || "127.0.0.1"), (config["mongo"]["port"] || 27017), logger: (logger if config["mongo"]["log"]), ) end Cyclop.db = connection.db config["mongo"]["database"] end
Public Instance Methods
perform(job)
click to toggle source
Called inside forked process
Parameters:
-
(
Cyclop::Job
) job - the job to process
# File lib/cyclop/worker.rb, line 89 def perform(job) load_actions Cyclop::Action.find_by_queue(job.queue).perform(*job.job_params) 0 rescue Exception => e log e.to_s job.release! e 1 end
run()
click to toggle source
Start processing jobs
# File lib/cyclop/worker.rb, line 50 def run register_signal_handlers loop do if stop? log "Shutting down..." break end if job = next_job @sleeping = false if @pid = fork msg = "Forked process #{@pid} to work on job #{job.queue}-#{job._id}..." log msg procline msg Process.wait log "Child process #{@pid} ended with status: #{$?}" self.processed_jobs += 1 if $?.exitstatus==0 job.complete! else job.release! end else procline "Processing #{job.queue}-#{job._id} (started at #{Time.now.utc})" exit! perform job end else log "No more job to process, start sleeping..." unless @sleeping @sleeping = true sleep sleep_interval end end end
stop()
click to toggle source
Gracefull shutdown
# File lib/cyclop/worker.rb, line 100 def stop @stop = true end
stop!()
click to toggle source
Forced shutdown
# File lib/cyclop/worker.rb, line 105 def stop! if @pid Process.kill "TERM", @pid Process.wait end exit! end
Private Instance Methods
load_actions()
click to toggle source
# File lib/cyclop/worker.rb, line 138 def load_actions Dir["#{actions}/*.rb"].each{|action| require File.absolute_path(action) } end
log(message)
click to toggle source
# File lib/cyclop/worker.rb, line 134 def log(message) logger << "#{Time.now}: #{message}\n" end
next_job()
click to toggle source
# File lib/cyclop/worker.rb, line 126 def next_job Cyclop.next *queues, job_opts end
procline(line)
click to toggle source
# File lib/cyclop/worker.rb, line 130 def procline(line) $0 = "cyclop-#{Cyclop::VERSION}: #{line}" end
register_signal_handlers()
click to toggle source
Trap signals
QUIT - graceful shutdown INT - first gracefull shutdown, second time force shutdown TERM - force shutdown
# File lib/cyclop/worker.rb, line 120 def register_signal_handlers trap("QUIT") { stop } trap("INT") { @stop ? stop! : stop } trap("TERM") { stop! } end
stop?()
click to toggle source
# File lib/cyclop/worker.rb, line 142 def stop? @stop || (die_after && processed_jobs >= die_after.to_i) end