class Hasta::EmrNode

Models the Amazon Data Pipeline configuration details for the EMR job that is being tested

Attributes

attributes[R]

Public Class Methods

from_json(json, scheduled_start_time = Time.now) click to toggle source
# File lib/hasta/emr_node.rb, line 9
def from_json(json, scheduled_start_time = Time.now)
  command_line = parse_step_line(json['step'])

  new(
    :id => json['id'],
    :input_paths => command_line['input'],
    :output_path => command_line['output'].first,
    :mapper => command_line['mapper'].first,
    :reducer => command_line['reducer'].first,
    :cache_files => command_line['cacheFile'],
    :env => command_line['cmdenv'],
    :scheduled_start_time => scheduled_start_time
  )
end
new(attributes) click to toggle source
# File lib/hasta/emr_node.rb, line 63
def initialize(attributes)
  @attributes = attributes
end

Private Class Methods

parse_step_line(step) click to toggle source

Parses the 'step' attribute of an EMR configuration into a Hash Sample step line:

"/home/hadoop/contrib/streaming/hadoop-streaming.jar,
 -input,s3n://data-bucket/input1/,
 -output,s3://data-bucket/output/,
 -mapper,cat,
 -reducer,s3n://steps-bucket/path/to/reducer.rb,
 -cacheFile,s3://data-bucket/path/to/mappings.yml#mappings.yml,
 -cacheFile,s3://data-bucket/path/to/ignored.yml#ignored.yml,
 -cmdenv,API_KEY=123456,
 -cmdenv,ENVIRONMENT_NAME=uat"

Sample output:

{
  "input" => ["s3n://data-bucket/input1/"],
  "output"=> ["s3://data-bucket/output/"],
  "mapper => ["cat"],
  "reducer" => ["s3n://steps-bucket/path/to/reducer.rb"],
  "cacheFile" => ["s3://data-bucket/path/to/mappings.yml#mappings.yml",
    "s3://data-bucket/path/to/ignored.yml#ignored.yml"],
  "cmdenv" => ["API_KEY=123456", "ENVIRONMENT_NAME=uat"]
}
# File lib/hasta/emr_node.rb, line 49
def parse_step_line(step)
  parsed = Hash.new { |h, k| h[k] = [] }
  step.
    split(',-').
    drop(1).
    map { |value| i = value.index(','); [value[0...i], value[i+1..-1]] }.
    each do |switch, arg|
      parsed[switch] << arg
    end

  parsed
end

Public Instance Methods

cache_files() click to toggle source
# File lib/hasta/emr_node.rb, line 87
def cache_files
  files = attributes[:cache_files]
  @cache_files ||= Hash[files.map { |value| interpolate(value).split('#').reverse }]
end
env() click to toggle source
# File lib/hasta/emr_node.rb, line 92
def env
  @env ||= Hash[attributes[:env].map { |value| value.split('=') }]
end
id() click to toggle source
# File lib/hasta/emr_node.rb, line 67
def id
  attributes[:id]
end
input_paths() click to toggle source
# File lib/hasta/emr_node.rb, line 71
def input_paths
  @input_path ||= attributes[:input_paths].map { |path| interpolate(path) }
end
mapper() click to toggle source
# File lib/hasta/emr_node.rb, line 79
def mapper
  attributes[:mapper]
end
output_path() click to toggle source
# File lib/hasta/emr_node.rb, line 75
def output_path
  @output_path ||= interpolate(attributes[:output_path])
end
reducer() click to toggle source
# File lib/hasta/emr_node.rb, line 83
def reducer
  attributes[:reducer]
end

Private Instance Methods

interpolate(path) click to toggle source
# File lib/hasta/emr_node.rb, line 100
def interpolate(path)
  InterpolateString.evaluate(path, 'scheduledStartTime' => attributes[:scheduled_start_time])
end