class Tresse::Group
Attributes
name[RW]
Public Class Methods
new(name=nil)
click to toggle source
# File lib/tresse.rb, line 130 def initialize(name=nil) @name = name @batches = [] @launched = false @maps = [ nil ] @reduce = nil @reduce_mutex = Mutex.new @reduction_queue = Queue.new end
Public Instance Methods
each(&block)
click to toggle source
mapping
# File lib/tresse.rb, line 169 def each(&block) do_map(:each, block) end
flatten()
click to toggle source
# File lib/tresse.rb, line 188 def flatten do_reduce( [], lambda { |a, e| if e.respond_to?(:to_a) && ! e.is_a?(Hash) a.concat(e.to_a) else a.push(e) end }) end
Also aliased as: values
map(&block)
click to toggle source
# File lib/tresse.rb, line 174 def map(&block) do_map(:map, block) end
reduce(target, &block)
click to toggle source
reducing
# File lib/tresse.rb, line 182 def reduce(target, &block) do_reduce(target, block) end
Also aliased as: inject
source(&block)
click to toggle source
sourcing methods
# File lib/tresse.rb, line 146 def source(&block) @batches << Tresse::Batch.new(self, block) self end
source_each(collection, &block)
click to toggle source
# File lib/tresse.rb, line 153 def source_each(collection, &block) if collection.is_a?(Hash) collection.each { |k, v| source { Tresse.call_block(block, [ k, v ]) } } else collection.each_with_index { |e, i| source { Tresse.call_block(block, [ e, i ]) } } end self end
Protected Instance Methods
do_map(type, block)
click to toggle source
# File lib/tresse.rb, line 203 def do_map(type, block) @maps << [ type, block ] launch self end
do_reduce(target, block)
click to toggle source
# File lib/tresse.rb, line 212 def do_reduce(target, block) @reduce = [ target, block ] launch r = @reduction_queue.pop raise r.error if r.is_a?(Tresse::Batch) r end
launch()
click to toggle source
# File lib/tresse.rb, line 225 def launch return if @launched == true @launched = true @batches.each { |b| Tresse.enqueue(b) } end
queue_for_reduction(batch)
click to toggle source
# File lib/tresse.rb, line 248 def queue_for_reduction(batch) @reduce_mutex.synchronize do batch.complete return unless @reduce return if @batches.find { |b| ! b.completed } es = @batches.collect(&:value) target, block = @reduce @reduction_queue << es.inject(target, &block) end end
receive(batch)
click to toggle source
# File lib/tresse.rb, line 233 def receive(batch) if batch.error @reduction_queue << batch elsif batch.map_index == 0 batch.source Tresse.enqueue(batch) elsif m = @maps[batch.map_index] batch.map(*m) Tresse.enqueue(batch) else queue_for_reduction(batch) end end