class Message::Job

Public Class Methods

filter(name, &block) click to toggle source
# File lib/message/job.rb, line 10
def filter(name, &block)
  filters << [name, block]
end
filters() click to toggle source
# File lib/message/job.rb, line 6
def filters
  @filters ||= Filters.new
end
new(queue, &processor) click to toggle source
# File lib/message/job.rb, line 19
def initialize(queue, &processor)
  @queue = queue
  @processor = processor || lambda {|msg| msg}
end
reset() click to toggle source
# File lib/message/job.rb, line 14
def reset
  @filters = nil
end

Public Instance Methods

<<(msg)
Alias for: enq
enq(msg) click to toggle source
# File lib/message/job.rb, line 32
def enq(msg)
  chain(:enq, lambda {|msg| @queue.enq(msg)}).call(msg)
end
Also aliased as: <<
name() click to toggle source
# File lib/message/job.rb, line 24
def name
  @queue.name
end
process(size=1) click to toggle source
# File lib/message/job.rb, line 37
def process(size=1)
  deq = lambda do |size|
    @queue.deq(size) do |msg|
      chain(:process, @processor).call(msg)
    end
  end
  chain(:deq, deq).call(size)
end
size() click to toggle source
# File lib/message/job.rb, line 28
def size
  @queue.size
end

Private Instance Methods

chain(action, base) click to toggle source
# File lib/message/job.rb, line 47
def chain(action, base)
  Job.filters.to_a.reverse.inject(base) do |m, f|
    f[1].call(m, self, action)
  end
end