class Awscli::Emr::EMR

Public Class Methods

new(connection) click to toggle source
# File lib/awscli/emr.rb, line 4
def initialize(connection)
  @conn = connection
end

Public Instance Methods

add_instance_group(options) click to toggle source
# File lib/awscli/emr.rb, line 145
def add_instance_group(options)
  opts = Marshal.load(Marshal.dump(options))
  opts.reject! { |key| key == 'job_flow_id' }
  opts.reject! { |key| key == 'region' }
  abort 'invalid job id' unless @conn.describe_job_flows.body['JobFlows'].map { |job| job['JobFlowId'] }.include?(options[:job_flow_id])
  abort 'invalid instance type' unless Awscli::Instances::INSTANCE_SIZES.include?(options[:instance_type])
  if instance_count = opts.delete(:instance_count)
    opts.merge!('InstanceCount' => instance_count)
  end
  if instance_type = opts.delete(:instance_type)
    opts.merge!('InstanceType' => instance_type)
  end
  if instance_role = opts.delete(:instance_role)
    opts.merge!('InstanceRole' => instance_role)
  end
  if name = opts.delete(:name)
    opts.merge!('Name' => name)
  end
  if bid_price = opts.delete(:bid_price)
    opts.merge!('BidPrice' => bid_price)
    opts.merge!('MarketType' => 'SPOT')
  else
    opts.merge!('MarketType' => 'ON_DEMAND')
  end
  (instance_groups ||= []) << opts
  @conn.add_instance_groups(options[:job_flow_id], 'InstanceGroups' => instance_groups)
  puts "Added instance group to job flow(with id): #{options[:job_flow_id]}"
end
add_instance_groups(job_flow_id, groups) click to toggle source
# File lib/awscli/emr.rb, line 207
def add_instance_groups(job_flow_id, groups)
  validate_job_ids job_flow_id
  instance_groups = parse_instance_groups(groups)
  @conn.add_instance_groups(job_flow_id, 'InstanceGroups' => instance_groups)
end
add_steps(job_flow_id, job_steps) click to toggle source
# File lib/awscli/emr.rb, line 174
def add_steps(job_flow_id, job_steps)
  validate_job_ids job_flow_id
  @conn.add_job_flow_steps(job_flow_id, 'Steps' => parse_custom_jar(job_steps))
  puts "Added step to job flow id: #{job_flow_id}"
end
create_job_flow(options) click to toggle source
# File lib/awscli/emr.rb, line 41
def create_job_flow(options)
  # => BOOTSTRAP ACTIONS
  boot_strap_actions = []
  if options[:bootstrap_actions]
    options[:bootstrap_actions].each do |step|
      boot_strap_actions << parse_boot_strap_actions(step)
    end
  end

  # => STEPS
  steps = []
  if options[:custom_jar_steps]
    options[:custom_jar_steps].each do |step|
      steps << parse_custom_jar(step)
    end
  end
  if options[:hive_interactive]
    steps << hive_install(options[:hadoop_version])
  end
  if options[:pig_interactive]
    steps << pig_install
  end
  if options[:hive_steps]
    steps << hive_install(options[:hadoop_version]) unless options[:hive_interactive]
    options[:hive_steps].each do |step|
      steps << parse_hive_steps(step)
    end
  end
  if options[:pig_steps]
    steps << pig_install unless options[:pig_interactive]
    options[:pig_steps].each do |step|
      steps << parse_pig_steps(step)
    end
  end
  if options[:streaming_steps]
    options[:streaming_steps].each do |step|
      steps << parse_streaming_steps(step)
    end
  end
  if options[:hbase_install]
    boot_strap_actions << hbase_install_boot_strap
    steps << hbase_install_steps
    #validate hadoop version and instance size
    abort "Invalid hadoop version #{options[:hadoop_version]}, supported Hadoop Versions for HBase are: #{Awscli::EMR::HBASE_SUPPORTED_HADOOP.join(',')}" unless Awscli::EMR::HBASE_SUPPORTED_HADOOP.include?(options[:hadoop_version])
    options[:instance_groups] && parse_instance_groups(options[:instance_groups]).each do |group|
      unless is_valid_instance_type?(group['InstanceType'])
        abort "Instance type #{group['InstanceType']} is not compatible with HBase, instance size should be equal or greater than m1.large"
      end
    end
    if options[:master_instance_type]
      unless is_valid_instance_type?(options[:master_instance_type])
        abort "Instance type #{options[:master_instance_type]} is not compatible with HBase, instance size should be equal or greater than m1.large"
      end
    end
    if options[:slave_instance_type]
      unless is_valid_instance_type?(options[:slave_instance_type])
        abort "Instance type #{options[:slave_instance_type]} is not compatible with HBase, instance size should be equal or greater than m1.large"
      end
    end
    # => HBase backups
    if options[:hbase_backup_schedule]
      # Backup
      if options[:hbase_consistent_backup]
        steps << parse_hbase_backup(options[:hbase_backup_schedule], true)
      else
        steps << parse_hbase_backup(options[:hbase_backup_schedule])
      end
    elsif options[:hbase_backup_restore]
      # Restore
      steps << parse_hbase_restore(options[:hbase_backup_restore])
    end
  end

  # => INSTANCES
  instances = Hash.new
  instances['HadoopVersion'] = options[:hadoop_version]
  if options[:hive_interactive] or options[:pig_interactive] or options[:hbase_install]  #then job flow should not be terminated
    instances['KeepJobFlowAliveWhenNoSteps'] = true
  else
    instances['KeepJobFlowAliveWhenNoSteps'] = options[:alive]
  end
  instances['Ec2KeyName'] = options[:instance_ec2_key_name] if options[:instance_ec2_key_name]
  instances['InstanceCount'] = options[:instance_count] if options[:instance_count]
  instances['MasterInstanceType'] = options[:master_instance_type] if options[:master_instance_type]
  instances['SlaveInstanceType'] = options[:slave_instance_type] if options[:slave_instance_type]
  instances['TerminationProtected'] = options[:termination_protection] if options[:termination_protection]
  # => Instance Groups
  instances['InstanceGroups'] = parse_instance_groups(options[:instance_groups]) if options[:instance_groups]

  # => Build final request
  job_flow = Hash.new
  job_flow['AmiVersion'] = Awscli::EMR::HADOOP_AMI_MAPPING[options[:hadoop_version]]
  job_flow['LogUri'] = options[:log_uri] if options[:log_uri]
  job_flow['BootstrapActions'] = boot_strap_actions if options[:bootstrap_actions] or options[:hbase_install]
  job_flow['Instances'] = instances
  job_flow['Steps'] = steps
  if options[:alive] or options[:hive_interactive] or options[:pig_interactive] or options[:hbase_install]
    @conn.run_job_flow("#{options[:name]} (requires manual termination)", job_flow)
  else
    @conn.run_job_flow(options[:name], job_flow)
  end
  puts "Create JobFlow '#{options[:name]}' Successfully!"
end
delete(job_ids) click to toggle source
# File lib/awscli/emr.rb, line 213
def delete(job_ids)
  validate_job_ids job_ids
  @conn.terminate_job_flows('JobFlowIds' => job_ids)
  puts "Terminated Job Flows: #{job_ids.join(',')}"
end
list(options) click to toggle source
# File lib/awscli/emr.rb, line 8
def list(options)
  validate_job_ids options[:job_flow_ids] if options[:job_flow_ids]
  opts = Marshal.load(Marshal.dump(options))
  opts.reject! { |k| k == 'table' } if options[:table]
  if job_flow_ids = opts.delete(:job_flow_ids)
    opts.merge!('JobFlowIds' => job_flow_ids)
  end
  if job_flow_status = opts.delete(:job_flow_status)
    opts.merge!('JobFlowStates' => job_flow_status)
  end
  if options[:table]
    puts 'For detailed information, dont pass --table option'
    job_flows = @conn.describe_job_flows(opts).body['JobFlows']
    table_data = Array.new
    unless job_flows.empty?
      job_flows.each do |job_flow|
        table_data << {
                        :job_flow_id => job_flow['JobFlowId'],
                        :name => job_flow['Name'],
                        :instance_count => job_flow['Instances']['InstanceCount'],
                        :master_dns => job_flow['Instances']['MasterPublicDnsName'],
                        :ec2_key_name => job_flow['Instances']['Ec2KeyName'],
                        :state => job_flow['ExecutionStatusDetail']['State']
                      }
      end
    end
    Formatador.display_table(table_data, [:job_flow_id, :name, :state, :instance_count, :master_dns, :ec2_key_name])
  else
    puts 'For less information, pass --table option'
    puts @conn.describe_job_flows(opts).body['JobFlows'].to_yaml
  end
end
modify_instance_group(options) click to toggle source
# File lib/awscli/emr.rb, line 180
def modify_instance_group(options)
  abort "Invalid instance group id: #{options[:instance_group_id]}" unless validate_instance_group_id?(options[:instance_group_id])
  @conn.modify_instance_groups(
      'InstanceGroups' => [
        'InstanceCount' => options[:instance_count],
        'InstanceGroupId' => options[:instance_group_id]
      ]
  )
rescue Excon::Errors::BadRequest
  puts "[Error]: #{$!}"
else
  puts "Modified instance group #{options[:instance_group_id]} size to #{options[:instance_count]}"
end
set_termination_protection(job_flow_ids, terminate_protection) click to toggle source
# File lib/awscli/emr.rb, line 194
def set_termination_protection(job_flow_ids, terminate_protection)
  validate_job_ids job_flow_ids
  @conn.set_termination_protection(
      terminate_protection,
      {
          'JobFlowIds' => job_flow_ids
      }
  )
  terminate_protection ?
    puts("Termination protection flag added to job_flows: #{job_flow_ids.join(',')}") :
    puts("Termination protection flag removed from job_flows: #{job_flow_ids.join(',')}")
end

Private Instance Methods

hbase_install_boot_strap() click to toggle source
# File lib/awscli/emr.rb, line 408
def hbase_install_boot_strap
  {
    'Name' => 'awscli-emr-install-hbase',
    'ScriptBootstrapAction' => {
        'Args' => [],
        'Path' => 's3://us-west-1.elasticmapreduce/bootstrap-actions/setup-hbase'
    }
  }
end
hbase_install_steps() click to toggle source
# File lib/awscli/emr.rb, line 418
def hbase_install_steps
  {
    'ActionOnFailure' => 'CANCEL_AND_WAIT',
    'Name' => 'awscli-emr-start-hbase',
    'HadoopJarStep' => {
        'Jar' => '/home/hadoop/lib/hbase-0.92.0.jar',
        'Args' => %w(emr.hbase.backup.Main --start-master)
    }
  }
end
hive_install(hadoop_version) click to toggle source
# File lib/awscli/emr.rb, line 380
def hive_install(hadoop_version)
  {
    'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
    'Name' => 'awscli-emr-hive-setup',
    'HadoopJarStep' => {
        'Args' => ['s3://us-east-1.elasticmapreduce/libs/hive/hive-script',
                   '--base-path',
                   's3://us-east-1.elasticmapreduce/libs/hive/',
                   '--install-hive',
                   '--hive-versions',
                   Awscli::EMR::HADOOP_HIVE_COMPATIBILITY[hadoop_version]
                  ],
        'Jar' => 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
    }
  }
end
is_valid_instance_type?(instance_type) click to toggle source
# File lib/awscli/emr.rb, line 230
def is_valid_instance_type?(instance_type)
  ! Awscli::EMR::HBASE_INVALID_INSTANCES.member?(instance_type)
end
parse_boot_strap_actions(step) click to toggle source
# File lib/awscli/emr.rb, line 266
def parse_boot_strap_actions(step)
  #parse => name,bootstrap_action_path,bootstrap_action_args
  name, path, *args = step.split(',')
  if name.empty? or path.empty?
    abort 'name and path are required'
  end
  boot_strap_actions = {
    'Name' => name,
    'ScriptBootstrapAction' => {
      'Args' => args || [],
      'Path' => path
    }
  }
end
parse_custom_jar(steps) click to toggle source
# File lib/awscli/emr.rb, line 281
def parse_custom_jar(steps)
  #parse jar_path(s3)*,name_of_step*,main_class,action_on_failure(TERMINATE_JOB_FLOW | CANCEL_AND_WAIT | CONTINUE),arg1=agr2=arg3,properties(k=v,k=v)
  abort "invalid step pattern, expecting 'jar_path(s3)*,name_of_step*,main_class,action_on_failure,arg1=agr2=arg3,prop_k1=prop_v1,prop_k2=prop_v2)'" unless step =~ /(.*),(.*),(.*),(.*),(.*),(.*),(.*)/
  jar, name, main_class, action_on_failure, extra_args, *job_conf = step.split(',')
  if jar.empty? or name.empty?
    abort 'jar and name are required for a step'
  end
  step_to_run = {
    'ActionOnFailure' => action_on_failure.empty? ? 'TERMINATE_JOB_FLOW' : action_on_failure,
    'Name' => name,
    'HadoopJarStep' => {
      'Jar' => jar,
      'Args' => extra_args.empty? ? [] : extra_args.split('='),
      'Properties' => []
    }
  }
  #steps['HadoopJarStep']['Args'] + extra_args.split('=') unless extra_args
  step_to_run['HadoopJarStep']['MainClass'] = main_class unless main_class.empty?
  unless job_conf.empty?
    job_conf.each do |kv_pair|
      properties = {}
      properties['Key'], properties['Value'] = kv_pair.split('=')
      step_to_run['HadoopJarStep']['Properties'] << properties
    end
  end
  step_to_run
end
parse_hbase_backup(backup_step, consistent=false) click to toggle source
# File lib/awscli/emr.rb, line 429
def parse_hbase_backup(backup_step, consistent=false)
  #parse frequency*,frequency_unit*(Days|Hrs|Mins),path(s3)*,start_time*(now|iso-format)
  frequency, frequency_unit, path, start_time = backup_step.split(',')
  abort 'Invalid backup step pattern, expecting frequency,frequency_unit(days|hrs|mins),path(s3),start_time(now|iso-format)' unless backup_step =~ /(.*),(.*),(.*),(.*)/
  if frequency.empty? or frequency_unit.empty? or path.empty? or start_time.empty?
    abort 'frequency, frequency_unit, path, start_time are required to perform a backup'
  end
  abort "Invalid frequency unit : #{frequency_unit}" unless %w(days hrs mins).include?(frequency_unit)
  hbase_backup_step = {
      'Name' => 'awscli-emr-schedule-hbase-backup',
      'ActionOnFailure' => 'CANCEL_AND_WAIT',
      'HadoopJarStep' => {
          'Jar' => '/home/hadoop/lib/hbase-0.92.0.jar',
          'Args' => ['emr.hbase.backup.Main', '--backup-dir', path, '--set-scheduled-backup', true, '--full-backup-time-interval',
                     frequency, '--incremental-backup-time-unit', frequency_unit, '--start-time', start_time]
      }
  }
  hbase_backup_step['HadoopJarStep']['Args'] << '--consistent' if consistent
  hbase_backup_step
end
parse_hbase_restore(restore_step) click to toggle source
# File lib/awscli/emr.rb, line 450
def parse_hbase_restore(restore_step)
  #parse path(s3)*,version
  path, version = restore_step.split(',')
  if path.empty?
    abort 'path is required'
  end
  hbase_restore_step = {
      'Name' => 'awscli-emr-restore-hbase-backup',
      'ActionOnFailure' => 'CANCEL_AND_WAIT',
      'HadoopJarStep' => {
          'Jar' => '/home/hadoop/lib/hbase-0.92.0.jar',
          'Args' => ['emr.hbase.backup.Main', '--restore', '--backup-dir', path]
      }
  }
  if defined?(version).nil?
    hbase_restore_step['HadoopJarStep']['Args'] << '--backup-version' << version unless version.empty?
  end
  hbase_restore_step
end
parse_hive_steps(step) click to toggle source
# File lib/awscli/emr.rb, line 309
def parse_hive_steps(step)
  #parse script_path(s3)*,input_path(s3),output_path(s3),'-d','args1','-d','args2','-d','arg3'
  path, input_path, output_path, *args = step.split(',')
  abort 'path to the hive script is required' if path.empty?
  hive_step = {
    'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
    'Name' => 'awscli-emr-hive-step',
    'HadoopJarStep' => {
      "Jar" => 's3://us-west-1.elasticmapreduce/libs/script-runner/script-runner.jar',
      "Args" => [
        's3://us-west-1.elasticmapreduce/libs/hive/hive-script',
        '--base-path',
        's3://us-west-1.elasticmapreduce/libs/hive/',
        '--run-hive-script',
        '--args',
        '-f',
        path
      ]
    }
  }
  hive_step['HadoopJarStep']['Args'] << '-d' << "INPUT=#{input_path}" unless input_path.empty?
  hive_step['HadoopJarStep']['Args'] << '-d' << "OUTPUT=#{output_path}" unless output_path.empty?
  hive_step['HadoopJarStep']['Args'] += args unless args.empty?
  hive_step
end
parse_instance_groups(groups) click to toggle source
# File lib/awscli/emr.rb, line 234
def parse_instance_groups(groups)
  #parse instance_groups => instance_count,instance_role(MASTER | CORE | TASK),instance_type,name,bid_price
  instance_groups = []
  groups.each do |group|
    instance_count, instance_role, instance_size, name, bid_price = group.split(',')
    if instance_count.empty? or instance_role.empty? or instance_size.empty?
      abort 'instance_count, instance_role and instance_size are required'
    end
    abort "Invalid instance role: #{instance_role}" unless %w(MASTER CORE TASK).include?(instance_role.upcase)
    abort "Invalid instance type: #{instance_size}" unless Awscli::Instances::INSTANCE_SIZES.include?(instance_size)
    if bid_price
      instance_groups << {
          'BidPrice' => bid_price,
          'InstanceCount' => instance_count.to_i,
          'InstanceRole' => instance_role,
          'InstanceType' => instance_size,
          'MarketType' => 'SPOT',
          'Name' => name || "awscli-emr-#{instance_role}-group",
      }
    else
      instance_groups << {
          'InstanceCount' => instance_count.to_i,
          'InstanceRole' => instance_role,
          'InstanceType' => instance_size,
          'MarketType' => 'ON_DEMAND',
          'Name' => name || "awscli-emr-#{instance_role}-group",
      }
    end
  end
  instance_groups
end
parse_pig_steps(step) click to toggle source
# File lib/awscli/emr.rb, line 335
def parse_pig_steps(step)
  #parse script_path(s3)*,input_path(s3),output_path(s3),'-p','args1','-p','args2','-p','arg3'
  path, input_path, output_path, *args = step.split(',')
  abort 'path to the hive script is required' if path.empty?
  pig_step = {
    'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
    'Name' => 'awscli-emr-pig-step',
    'HadoopJarStep' => {
      "Jar" => 's3://us-west-1.elasticmapreduce/libs/script-runner/script-runner.jar',
      "Args" => %w(s3://us-west-1.elasticmapreduce/libs/pig/pig-script --base-path s3://us-west-1.elasticmapreduce/libs/pig/ --run-pig-script --pig-versions latest --args)
    }
  }
  pig_step['HadoopJarStep']['Args'] << '-p' << "INPUT=#{input_path}" unless input_path.empty?
  pig_step['HadoopJarStep']['Args'] << '-p' << "OUTPUT=#{output_path}" unless output_path.empty?
  pig_step['HadoopJarStep']['Args'] += args unless args.empty?
  pig_step['HadoopJarStep']['Args'] << path
  pig_step
end
parse_streaming_steps(step) click to toggle source
# File lib/awscli/emr.rb, line 354
def parse_streaming_steps(step)
  #parse input*:output*:mapper*:reducer*:extra_arg1:extra_arg2
  input, output, mapper, reducer, *args = step.split(',')
  #input, output, mapper, reducer, args, *job_conf = step.split(',')
  if input.empty? or output.empty? or mapper.empty? or reducer.empty?
    abort 'input, output, mapper and reducer are required'
  end
  streaming_step = {
    'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
    'Name' => 'awscli-emr-streaming-step',
    'HadoopJarStep' => {
      "Jar" => '/home/hadoop/contrib/streaming/hadoop-streaming.jar',
      "Args" => [
        '-input', input,
        '-output', output,
        '-mapper', mapper,
        '-reducer', reducer
      ]
    }
  }
  streaming_step['HadoopJarStep']['Args'] + args unless args.empty?
  #TODO: Add -jobconf params as k=v,k=v,k=v
  #streaming_step['HadoopJarStep']['Args'] << '-job_conf' + job_conf if job_conf.empty?
  streaming_step
end
pig_install() click to toggle source
# File lib/awscli/emr.rb, line 397
def pig_install
  {
    'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
    'Name' => 'awscli-emr-pig-setup',
    'HadoopJarStep' => {
        'Args' => %w(s3://us-east-1.elasticmapreduce/libs/pig/pig-script --base-path s3://us-east-1.elasticmapreduce/libs/pig/ --install-pig --pig-versions latest),
        'Jar' => 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
    }
  }
end
validate_instance_group_id?(group_id) click to toggle source
# File lib/awscli/emr.rb, line 226
def validate_instance_group_id?(group_id)
  @conn.describe_job_flows.body['JobFlows'].map { |j| j['Instances']['InstanceGroups'].map {|g| g['InstanceGroupId']} }.flatten.include?(group_id)
end
validate_job_ids(job_ids) click to toggle source
# File lib/awscli/emr.rb, line 221
def validate_job_ids(job_ids)
  available_job_ids = @conn.describe_job_flows.body['JobFlows'].map { |job| job['JobFlowId'] }
  abort 'invalid job id\'s' unless available_job_ids.each_cons(job_ids.size).include? job_ids
end