class Rodimus::Step

Attributes

incoming[RW]

The incoming data stream. Can be anything that quacks like an IO

outgoing[RW]

The outgoing data stream. Can be anything that quacks like an IO

shared_data[RW]

Shared user-data accessible across all running transformation steps. This is initialized by the Transformation when the step begins to run.

Public Class Methods

new() click to toggle source
# File lib/rodimus/step.rb, line 18
def initialize
  observers << self
  observers << Benchmark.new if Rodimus.configuration.benchmarking
end

Public Instance Methods

close_descriptors() click to toggle source
# File lib/rodimus/step.rb, line 23
def close_descriptors
  [incoming, outgoing].reject(&:nil?).each do |descriptor|
    descriptor.close if descriptor.respond_to?(:close)
  end
end
handle_output(transformed_row) click to toggle source

Override this for custom output handling functionality per-row.

# File lib/rodimus/step.rb, line 30
def handle_output(transformed_row)
  outgoing.puts(transformed_row)
end
process_row(row) click to toggle source

Override this for custom transformation functionality

# File lib/rodimus/step.rb, line 35
def process_row(row)
  row.to_s
end
run() click to toggle source
# File lib/rodimus/step.rb, line 39
def run
  notify(self, :before_run)
  @row_count = 1
  incoming.each do |row|
    notify(self, :before_row)
    transformed_row = process_row(row)
    handle_output(transformed_row)
    Rodimus.logger.info(self) { "#{@row_count} rows processed" } if @row_count % 50000 == 0
    @row_count += 1
    notify(self, :after_row)
  end
  notify(self, :after_run)
ensure
  close_descriptors
end
to_s() click to toggle source
# File lib/rodimus/step.rb, line 55
def to_s
  "#{self.class} connected to input: #{incoming || 'nil'} and output: #{outgoing || 'nil'}"
end