class Message::Worker

Constants

DEFAULT_JOB_NAME
DEFAULT_PROCESS_INTERVAL
DEFAULT_PROCESS_SIZE

Attributes

default_job[RW]
sync[RW]
job_name[R]

Public Class Methods

callbacks() click to toggle source
# File lib/message/worker.rb, line 55
def callbacks
  @callbacks ||= {:start => [], :crash => [], :stop => []}
end
default() click to toggle source
# File lib/message/worker.rb, line 39
def default
  new(default_job)
end
jobs() click to toggle source
# File lib/message/worker.rb, line 51
def jobs
  @jobs ||= RUBY_PLATFORM =~ /java/ ? java.util.concurrent.ConcurrentHashMap.new : {}
end
new(job_name) click to toggle source
# File lib/message/worker.rb, line 74
def initialize(job_name)
  @job_name = job_name
end
process(*args) click to toggle source
# File lib/message/worker.rb, line 43
def process(*args)
  default.process(*args)
end
reset() click to toggle source
# File lib/message/worker.rb, line 59
def reset
  @default_job = nil
  @sync = nil
  @jobs = nil
  @callbacks = nil
  callbacks[:start] << lambda {|job_name, options| Message.log(:info) { "[Worker] start with options #{options.inspect}" } }
  callbacks[:crash] << lambda {|job_name, e| Message.log(:error) { "[Worker] crashed: #{e.message}\n#{e.backtrace.join("\n")}"} }
  callbacks[:stop] << lambda {|job_name| Message.log(:info) { "[Worker] stopped" } }
end
start(*args) click to toggle source
# File lib/message/worker.rb, line 47
def start(*args)
  default.start(*args)
end

Public Instance Methods

<<(work)
Alias for: enq
enq(work) click to toggle source
# File lib/message/worker.rb, line 108
def enq(work)
  if self.class.sync
    process_work(work)
  else
    job.enq(YAML.dump(work))
  end
end
Also aliased as: <<
process(size=1) click to toggle source
# File lib/message/worker.rb, line 104
def process(size=1)
  job.process(size)
end
start(options={}) click to toggle source
# File lib/message/worker.rb, line 78
def start(options={})
  Thread.start do
    self.work_in_thread(options) do |size|
      process(size)
    end
  end
end
work_in_thread(options) { |size| ... } click to toggle source
# File lib/message/worker.rb, line 86
def work_in_thread(options, &block)
  size = options[:size] || DEFAULT_PROCESS_SIZE
  interval = options[:interval] || DEFAULT_PROCESS_INTERVAL
  delay = options[:delay] || 10 + rand(20)
  begin
    callback(:start, options)
    sleep delay if delay > 0
    loop do
      yield(size)
      sleep interval
    end
  rescue => e
    callback(:crash, e)
  ensure
    callback(:stop)
  end
end

Private Instance Methods

callback(name, *args) click to toggle source
# File lib/message/worker.rb, line 118
def callback(name, *args)
  self.class.callbacks[name].each do |c|
    c.call(job_name, *args)
  end
end
job() click to toggle source
# File lib/message/worker.rb, line 124
def job
  self.class.jobs[@job_name] ||= Message.job(@job_name, &job_processor)
end
job_processor() click to toggle source
# File lib/message/worker.rb, line 128
def job_processor
  lambda do |msg|
    process_work(YAML.load(msg))
  end
end
process_work(work) click to toggle source
# File lib/message/worker.rb, line 134
def process_work(work)
  obj, m, args = work
  obj.send(m, *args)
end