class Tresse::Group

Attributes

name[RW]

Public Class Methods

new(name=nil) click to toggle source
# File lib/tresse.rb, line 130
def initialize(name=nil)

  @name = name

  @batches = []
  @launched = false
  @maps = [ nil ]

  @reduce = nil
  @reduce_mutex = Mutex.new
  @reduction_queue = Queue.new
end

Public Instance Methods

each(&block) click to toggle source

mapping

# File lib/tresse.rb, line 169
def each(&block)

  do_map(:each, block)
end
flatten() click to toggle source
# File lib/tresse.rb, line 188
def flatten

  do_reduce(
    [],
    lambda { |a, e|
      if e.respond_to?(:to_a) && ! e.is_a?(Hash)
        a.concat(e.to_a)
      else
        a.push(e)
      end })
end
Also aliased as: values
inject(target, &block)
Alias for: reduce
map(&block) click to toggle source
# File lib/tresse.rb, line 174
def map(&block)

  do_map(:map, block)
end
reduce(target, &block) click to toggle source

reducing

# File lib/tresse.rb, line 182
def reduce(target, &block)

  do_reduce(target, block)
end
Also aliased as: inject
source(&block) click to toggle source

sourcing methods

# File lib/tresse.rb, line 146
def source(&block)

  @batches << Tresse::Batch.new(self, block)

  self
end
source_each(collection, &block) click to toggle source
# File lib/tresse.rb, line 153
def source_each(collection, &block)

  if collection.is_a?(Hash)
    collection.each { |k, v|
      source { Tresse.call_block(block, [ k, v ]) } }
  else
    collection.each_with_index { |e, i|
      source { Tresse.call_block(block, [ e, i ]) } }
  end

  self
end
values()
Alias for: flatten

Protected Instance Methods

do_map(type, block) click to toggle source
# File lib/tresse.rb, line 203
def do_map(type, block)

  @maps << [ type, block ]

  launch

  self
end
do_reduce(target, block) click to toggle source
# File lib/tresse.rb, line 212
def do_reduce(target, block)

  @reduce = [ target, block ]

  launch

  r = @reduction_queue.pop

  raise r.error if r.is_a?(Tresse::Batch)

  r
end
launch() click to toggle source
# File lib/tresse.rb, line 225
def launch

  return if @launched == true
  @launched = true

  @batches.each { |b| Tresse.enqueue(b) }
end
queue_for_reduction(batch) click to toggle source
# File lib/tresse.rb, line 248
def queue_for_reduction(batch)

  @reduce_mutex.synchronize do

    batch.complete

    return unless @reduce
    return if @batches.find { |b| ! b.completed }

    es = @batches.collect(&:value)
    target, block = @reduce

    @reduction_queue << es.inject(target, &block)
  end
end
receive(batch) click to toggle source
# File lib/tresse.rb, line 233
def receive(batch)

  if batch.error
    @reduction_queue << batch
  elsif batch.map_index == 0
    batch.source
    Tresse.enqueue(batch)
  elsif m = @maps[batch.map_index]
    batch.map(*m)
    Tresse.enqueue(batch)
  else
    queue_for_reduction(batch)
  end
end