class Message::Worker
Constants
- DEFAULT_JOB_NAME
- DEFAULT_PROCESS_INTERVAL
- DEFAULT_PROCESS_SIZE
Attributes
default_job[RW]
sync[RW]
job_name[R]
Public Class Methods
callbacks()
click to toggle source
# File lib/message/worker.rb, line 55 def callbacks @callbacks ||= {:start => [], :crash => [], :stop => []} end
default()
click to toggle source
# File lib/message/worker.rb, line 39 def default new(default_job) end
jobs()
click to toggle source
# File lib/message/worker.rb, line 51 def jobs @jobs ||= RUBY_PLATFORM =~ /java/ ? java.util.concurrent.ConcurrentHashMap.new : {} end
new(job_name)
click to toggle source
# File lib/message/worker.rb, line 74 def initialize(job_name) @job_name = job_name end
process(*args)
click to toggle source
# File lib/message/worker.rb, line 43 def process(*args) default.process(*args) end
reset()
click to toggle source
# File lib/message/worker.rb, line 59 def reset @default_job = nil @sync = nil @jobs = nil @callbacks = nil callbacks[:start] << lambda {|job_name, options| Message.log(:info) { "[Worker] start with options #{options.inspect}" } } callbacks[:crash] << lambda {|job_name, e| Message.log(:error) { "[Worker] crashed: #{e.message}\n#{e.backtrace.join("\n")}"} } callbacks[:stop] << lambda {|job_name| Message.log(:info) { "[Worker] stopped" } } end
start(*args)
click to toggle source
# File lib/message/worker.rb, line 47 def start(*args) default.start(*args) end
Public Instance Methods
enq(work)
click to toggle source
# File lib/message/worker.rb, line 108 def enq(work) if self.class.sync process_work(work) else job.enq(YAML.dump(work)) end end
Also aliased as: <<
process(size=1)
click to toggle source
# File lib/message/worker.rb, line 104 def process(size=1) job.process(size) end
start(options={})
click to toggle source
# File lib/message/worker.rb, line 78 def start(options={}) Thread.start do self.work_in_thread(options) do |size| process(size) end end end
work_in_thread(options) { |size| ... }
click to toggle source
# File lib/message/worker.rb, line 86 def work_in_thread(options, &block) size = options[:size] || DEFAULT_PROCESS_SIZE interval = options[:interval] || DEFAULT_PROCESS_INTERVAL delay = options[:delay] || 10 + rand(20) begin callback(:start, options) sleep delay if delay > 0 loop do yield(size) sleep interval end rescue => e callback(:crash, e) ensure callback(:stop) end end
Private Instance Methods
callback(name, *args)
click to toggle source
# File lib/message/worker.rb, line 118 def callback(name, *args) self.class.callbacks[name].each do |c| c.call(job_name, *args) end end
job()
click to toggle source
# File lib/message/worker.rb, line 124 def job self.class.jobs[@job_name] ||= Message.job(@job_name, &job_processor) end
job_processor()
click to toggle source
# File lib/message/worker.rb, line 128 def job_processor lambda do |msg| process_work(YAML.load(msg)) end end
process_work(work)
click to toggle source
# File lib/message/worker.rb, line 134 def process_work(work) obj, m, args = work obj.send(m, *args) end