class Message::Job
Public Class Methods
filter(name, &block)
click to toggle source
# File lib/message/job.rb, line 10 def filter(name, &block) filters << [name, block] end
filters()
click to toggle source
# File lib/message/job.rb, line 6 def filters @filters ||= Filters.new end
new(queue, &processor)
click to toggle source
# File lib/message/job.rb, line 19 def initialize(queue, &processor) @queue = queue @processor = processor || lambda {|msg| msg} end
reset()
click to toggle source
# File lib/message/job.rb, line 14 def reset @filters = nil end
Public Instance Methods
enq(msg)
click to toggle source
# File lib/message/job.rb, line 32 def enq(msg) chain(:enq, lambda {|msg| @queue.enq(msg)}).call(msg) end
Also aliased as: <<
name()
click to toggle source
# File lib/message/job.rb, line 24 def name @queue.name end
process(size=1)
click to toggle source
# File lib/message/job.rb, line 37 def process(size=1) deq = lambda do |size| @queue.deq(size) do |msg| chain(:process, @processor).call(msg) end end chain(:deq, deq).call(size) end
size()
click to toggle source
# File lib/message/job.rb, line 28 def size @queue.size end
Private Instance Methods
chain(action, base)
click to toggle source
# File lib/message/job.rb, line 47 def chain(action, base) Job.filters.to_a.reverse.inject(base) do |m, f| f[1].call(m, self, action) end end