class Rodimus::Transformation

Attributes

drb_server[R]
ids[R]

Contains the thread or process identifiers currently in use

shared_data[R]

User-data accessible across all running steps.

steps[R]

Public Class Methods

new() click to toggle source
# File lib/rodimus/transformation.rb, line 18
def initialize
  @steps = []
  @ids = []
  @shared_data = {} # TODO: This needs to be thread safe
  observers << self
end

Public Instance Methods

run() click to toggle source

Run the transformation

# File lib/rodimus/transformation.rb, line 26
def run
  notify(self, :before_run)
  @drb_server = DRb.start_service(nil, shared_data) unless using_threads?
  ids.clear
  prepare

  steps.each do |step|
    ids << in_parallel do
      step.shared_data = step_shared_data
      step.run
    end
    step.close_descriptors unless using_threads?
  end
ensure
  cleanup
  notify(self, :after_run)
end
to_s() click to toggle source
# File lib/rodimus/transformation.rb, line 44
def to_s
  "#{self.class} with #{steps.length} steps"
end

Private Instance Methods

cleanup() click to toggle source
# File lib/rodimus/transformation.rb, line 50
def cleanup
  if using_threads?
    ids.each { |t| t.join }
  else
    Process.waitall
    drb_server.stop_service
  end
end
in_parallel() { || ... } click to toggle source
# File lib/rodimus/transformation.rb, line 59
def in_parallel 
  if using_threads?
    Thread.start { yield }
  else
    fork { yield }
  end
end
prepare() click to toggle source
# File lib/rodimus/transformation.rb, line 67
def prepare
  # [1, 2, 3, 4] => [1, 2], [2, 3], [3, 4]
  steps.inject do |first, second|
    read, write = IO.pipe
    first.outgoing = write
    second.incoming = read
    second
  end
end
step_shared_data() click to toggle source
# File lib/rodimus/transformation.rb, line 77
def step_shared_data
  if using_threads?
    shared_data
  else
    DRb.start_service # service dies across forked process
    DRbObject.new_with_uri(drb_server.uri)
  end
end
using_threads?() click to toggle source
# File lib/rodimus/transformation.rb, line 86
def using_threads?
  Rodimus.configuration.use_threads
end