class Hasta::EmrNode
Models the Amazon Data Pipeline configuration details for the EMR job that is being tested
Attributes
attributes[R]
Public Class Methods
from_json(json, scheduled_start_time = Time.now)
click to toggle source
# File lib/hasta/emr_node.rb, line 9 def from_json(json, scheduled_start_time = Time.now) command_line = parse_step_line(json['step']) new( :id => json['id'], :input_paths => command_line['input'], :output_path => command_line['output'].first, :mapper => command_line['mapper'].first, :reducer => command_line['reducer'].first, :cache_files => command_line['cacheFile'], :env => command_line['cmdenv'], :scheduled_start_time => scheduled_start_time ) end
new(attributes)
click to toggle source
# File lib/hasta/emr_node.rb, line 63 def initialize(attributes) @attributes = attributes end
Private Class Methods
parse_step_line(step)
click to toggle source
Parses the 'step' attribute of an EMR configuration into a Hash Sample step line:
"/home/hadoop/contrib/streaming/hadoop-streaming.jar,
-input,s3n://data-bucket/input1/,
-output,s3://data-bucket/output/,
-mapper,cat,
-reducer,s3n://steps-bucket/path/to/reducer.rb,
-cacheFile,s3://data-bucket/path/to/mappings.yml#mappings.yml,
-cacheFile,s3://data-bucket/path/to/ignored.yml#ignored.yml,
-cmdenv,API_KEY=123456,
-cmdenv,ENVIRONMENT_NAME=uat"
Sample output:
{ "input" => ["s3n://data-bucket/input1/"], "output"=> ["s3://data-bucket/output/"], "mapper => ["cat"], "reducer" => ["s3n://steps-bucket/path/to/reducer.rb"], "cacheFile" => ["s3://data-bucket/path/to/mappings.yml#mappings.yml", "s3://data-bucket/path/to/ignored.yml#ignored.yml"], "cmdenv" => ["API_KEY=123456", "ENVIRONMENT_NAME=uat"] }
# File lib/hasta/emr_node.rb, line 49 def parse_step_line(step) parsed = Hash.new { |h, k| h[k] = [] } step. split(',-'). drop(1). map { |value| i = value.index(','); [value[0...i], value[i+1..-1]] }. each do |switch, arg| parsed[switch] << arg end parsed end
Public Instance Methods
cache_files()
click to toggle source
# File lib/hasta/emr_node.rb, line 87 def cache_files files = attributes[:cache_files] @cache_files ||= Hash[files.map { |value| interpolate(value).split('#').reverse }] end
env()
click to toggle source
# File lib/hasta/emr_node.rb, line 92 def env @env ||= Hash[attributes[:env].map { |value| value.split('=') }] end
id()
click to toggle source
# File lib/hasta/emr_node.rb, line 67 def id attributes[:id] end
input_paths()
click to toggle source
# File lib/hasta/emr_node.rb, line 71 def input_paths @input_path ||= attributes[:input_paths].map { |path| interpolate(path) } end
mapper()
click to toggle source
# File lib/hasta/emr_node.rb, line 79 def mapper attributes[:mapper] end
output_path()
click to toggle source
# File lib/hasta/emr_node.rb, line 75 def output_path @output_path ||= interpolate(attributes[:output_path]) end
reducer()
click to toggle source
# File lib/hasta/emr_node.rb, line 83 def reducer attributes[:reducer] end
Private Instance Methods
interpolate(path)
click to toggle source
# File lib/hasta/emr_node.rb, line 100 def interpolate(path) InterpolateString.evaluate(path, 'scheduledStartTime' => attributes[:scheduled_start_time]) end