class Rodimus::Transformation
Attributes
drb_server[R]
ids[R]
Contains the thread or process identifiers currently in use
steps[R]
Public Class Methods
new()
click to toggle source
# File lib/rodimus/transformation.rb, line 18 def initialize @steps = [] @ids = [] @shared_data = {} # TODO: This needs to be thread safe observers << self end
Public Instance Methods
run()
click to toggle source
Run the transformation
# File lib/rodimus/transformation.rb, line 26 def run notify(self, :before_run) @drb_server = DRb.start_service(nil, shared_data) unless using_threads? ids.clear prepare steps.each do |step| ids << in_parallel do step.shared_data = step_shared_data step.run end step.close_descriptors unless using_threads? end ensure cleanup notify(self, :after_run) end
to_s()
click to toggle source
# File lib/rodimus/transformation.rb, line 44 def to_s "#{self.class} with #{steps.length} steps" end
Private Instance Methods
cleanup()
click to toggle source
# File lib/rodimus/transformation.rb, line 50 def cleanup if using_threads? ids.each { |t| t.join } else Process.waitall drb_server.stop_service end end
in_parallel() { || ... }
click to toggle source
# File lib/rodimus/transformation.rb, line 59 def in_parallel if using_threads? Thread.start { yield } else fork { yield } end end
prepare()
click to toggle source
# File lib/rodimus/transformation.rb, line 67 def prepare # [1, 2, 3, 4] => [1, 2], [2, 3], [3, 4] steps.inject do |first, second| read, write = IO.pipe first.outgoing = write second.incoming = read second end end
using_threads?()
click to toggle source
# File lib/rodimus/transformation.rb, line 86 def using_threads? Rodimus.configuration.use_threads end