class Retl::ThreadedExecution

Public Instance Methods

each(&block) click to toggle source
# File lib/retl/threaded_execution.rb, line 3
def each(&block)
  @executed = true
  queue = Queue.new

  producer = Thread.new do 
    @enumerable.each { |item| queue.push item }
    queue.push :eoq
  end

  while((data = queue.pop) != :eoq)
    execute(data, &block)
  end

  producer.join
end
load_into(*destinations) click to toggle source
# File lib/retl/threaded_execution.rb, line 19
def load_into(*destinations)
  destinations = Array(destinations)
  queue = Queue.new

  producer = Thread.new do 
    each do |data|
      queue.push data
    end
    queue.push :eoq
  end

  while((data = queue.pop) != :eoq)
    destinations.each do |destination|
      destination << data
    end
  end

  producer.join

  destinations.each do |destination|
    destination.close if destination.respond_to?(:close)
  end
end