class Hasta::EmrJobDefinition

Defines the EMR job that is being tested

Attributes

emr_node[R]

Public Class Methods

load(file_path, id, scheduled_start_time = Time.now) click to toggle source
# File lib/hasta/emr_job_definition.rb, line 21
def self.load(file_path, id, scheduled_start_time = Time.now)
  emr_node = JSON.parse(File.read(file_path))['objects'].find { |node|
    node['type'] == 'EmrActivity' && node['id'] == id
  }

  raise ArgumentError, "No EmrActivity for id: #{id} in file: #{file_path}" unless emr_node
  new(EmrNode.from_json(emr_node, scheduled_start_time))
end
new(emr_node) click to toggle source
# File lib/hasta/emr_job_definition.rb, line 32
def initialize(emr_node)
  @emr_node = emr_node
end

Public Instance Methods

data_sink() click to toggle source
# File lib/hasta/emr_job_definition.rb, line 76
def data_sink
  @data_sink ||= S3DataSink.new(output_path)
end
data_sources() click to toggle source
# File lib/hasta/emr_job_definition.rb, line 72
def data_sources
  @data_sources ||= input_paths.map { |path| S3DataSource.new(path) }
end
env() click to toggle source
# File lib/hasta/emr_job_definition.rb, line 44
def env
  @env ||= Env.new(
    emr_node.env,
    Hash[
      emr_node.
        cache_files.
        reject { |tag, uri| uri.end_with?('.rb') }.
        map { |tag, uri| ["#{tag.split('.').first.upcase}_FILE_PATH", S3URI.parse(uri)] }
    ]
  )
end
input_paths() click to toggle source
# File lib/hasta/emr_job_definition.rb, line 36
def input_paths
  @input_paths ||= emr_node.input_paths.map { |path| S3URI.parse(path) }
end
mapper() click to toggle source
# File lib/hasta/emr_job_definition.rb, line 64
def mapper
  @mapper ||= parse_mapper(emr_node.mapper)
end
output_path() click to toggle source
# File lib/hasta/emr_job_definition.rb, line 40
def output_path
  @output_path ||= S3URI.parse(emr_node.output_path)
end
reducer() click to toggle source
# File lib/hasta/emr_job_definition.rb, line 68
def reducer
  @reducer ||= parse_reducer(emr_node.reducer)
end
ruby_files() click to toggle source
# File lib/hasta/emr_job_definition.rb, line 56
def ruby_files
  @ruby_files ||= emr_node.
    cache_files.
    values.
    select { |uri| uri.end_with?('.rb') }.
    map { |uri| local_path_to_step_file(S3URI.parse(uri)) }
end

Private Instance Methods

local_path_to_step_file(s3_uri) click to toggle source
# File lib/hasta/emr_job_definition.rb, line 84
def local_path_to_step_file(s3_uri)
  File.join(Hasta.project_root, Hasta.project_steps, s3_uri.basename)
end
parse_mapper(mapper_command) click to toggle source
# File lib/hasta/emr_job_definition.rb, line 88
def parse_mapper(mapper_command)
  if %w[cat org.apache.hadoop.mapred.lib.IdentityMapper].include?(mapper_command)
    IdentityMapper
  else
    Mapper.new(local_path_to_step_file(S3URI.parse(mapper_command)))
  end
end
parse_reducer(reducer_command) click to toggle source
# File lib/hasta/emr_job_definition.rb, line 96
def parse_reducer(reducer_command)
  if %w[cat org.apache.hadoop.mapred.lib.IdentityReducer].include?(reducer_command)
    IdentityReducer
  else
    Reducer.new(local_path_to_step_file(S3URI.parse(reducer_command)))
  end
end