class Hasta::EmrJobDefinition
Defines the EMR job that is being tested
Attributes
emr_node[R]
Public Class Methods
load(file_path, id, scheduled_start_time = Time.now)
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 21 def self.load(file_path, id, scheduled_start_time = Time.now) emr_node = JSON.parse(File.read(file_path))['objects'].find { |node| node['type'] == 'EmrActivity' && node['id'] == id } raise ArgumentError, "No EmrActivity for id: #{id} in file: #{file_path}" unless emr_node new(EmrNode.from_json(emr_node, scheduled_start_time)) end
new(emr_node)
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 32 def initialize(emr_node) @emr_node = emr_node end
Public Instance Methods
data_sink()
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 76 def data_sink @data_sink ||= S3DataSink.new(output_path) end
data_sources()
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 72 def data_sources @data_sources ||= input_paths.map { |path| S3DataSource.new(path) } end
env()
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 44 def env @env ||= Env.new( emr_node.env, Hash[ emr_node. cache_files. reject { |tag, uri| uri.end_with?('.rb') }. map { |tag, uri| ["#{tag.split('.').first.upcase}_FILE_PATH", S3URI.parse(uri)] } ] ) end
input_paths()
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 36 def input_paths @input_paths ||= emr_node.input_paths.map { |path| S3URI.parse(path) } end
mapper()
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 64 def mapper @mapper ||= parse_mapper(emr_node.mapper) end
output_path()
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 40 def output_path @output_path ||= S3URI.parse(emr_node.output_path) end
reducer()
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 68 def reducer @reducer ||= parse_reducer(emr_node.reducer) end
ruby_files()
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 56 def ruby_files @ruby_files ||= emr_node. cache_files. values. select { |uri| uri.end_with?('.rb') }. map { |uri| local_path_to_step_file(S3URI.parse(uri)) } end
Private Instance Methods
local_path_to_step_file(s3_uri)
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 84 def local_path_to_step_file(s3_uri) File.join(Hasta.project_root, Hasta.project_steps, s3_uri.basename) end
parse_mapper(mapper_command)
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 88 def parse_mapper(mapper_command) if %w[cat org.apache.hadoop.mapred.lib.IdentityMapper].include?(mapper_command) IdentityMapper else Mapper.new(local_path_to_step_file(S3URI.parse(mapper_command))) end end
parse_reducer(reducer_command)
click to toggle source
# File lib/hasta/emr_job_definition.rb, line 96 def parse_reducer(reducer_command) if %w[cat org.apache.hadoop.mapred.lib.IdentityReducer].include?(reducer_command) IdentityReducer else Reducer.new(local_path_to_step_file(S3URI.parse(reducer_command))) end end