class RP::EMR::CLI

Public Instance Methods

add_pig_script_step(job_id, script_path, *) click to toggle source
# File lib/rp/emr/cli.rb, line 229
def add_pig_script_step(job_id, script_path, *)
  job = AWS::EMR.new.job_flows[job_id]
  
  step = RP::EMR::Step::Pig.new(
    name: 'Pig',
    script_path: script_path,
    script_bucket: options[:script_bucket],
  ) do |s|
    s.pig_params = options[:pig_params] if options[:pig_params]
    s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive]
    s.dry_run = options[:dry_run]
  end

  job.add_steps([step.to_hash]) unless options[:dry_run]
  puts '-----------'
  puts "Added pig script step to #{job.id} with #{args}, #{options}"
  pp step.to_hash if options[:verbose]
end
add_rollup_step(job_id, input, output, *) click to toggle source
# File lib/rp/emr/cli.rb, line 207
def add_rollup_step(job_id, input, output, *)
  job = AWS::EMR.new.job_flows[job_id]
  
  step = RP::EMR::Step::S3DistCp.new(
    name: 'Rollup',
    src: input,
    dest: output,
  ) do |s|
    s.srcPattern = options[:rollup_input_pattern] if options[:rollup_input_pattern]
    s.groupBy = options[:rollup_group_by] if options[:rollup_group_by]
    s.targetSize = options[:rollup_target_size] if options[:rollup_target_size]
    s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive]
  end

  job.add_steps([step.to_hash]) unless options[:dry_run]
  puts '-----------'
  puts "Added rollup step to #{job.id} with #{args}, #{options}"
  pp step.to_hash if options[:verbose]
end
add_setup_hive_step(job_id, *) click to toggle source
# File lib/rp/emr/cli.rb, line 191
def add_setup_hive_step(job_id, *)
  job = AWS::EMR.new.job_flows[job_id]

  step = RP::EMR::Step::SetupHive.new do |s|
    s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive]
    s.hive_version = options[:hive_version] if options[:hive_version]
  end

  job.add_steps([step.to_hash]) unless options[:dry_run]
  puts '-----------'
  puts "Added setup hive step to #{job.id} with #{args}, #{options}"
  pp step.to_hash if options[:verbose]
end
add_setup_pig_step(job_id, *) click to toggle source
# File lib/rp/emr/cli.rb, line 175
def add_setup_pig_step(job_id, *)
  job = AWS::EMR.new.job_flows[job_id]

  step = RP::EMR::Step::SetupPig.new do |s|
    s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive]
    s.pig_version = options[:pig_version] if options[:pig_version]
  end

  job.add_steps([step.to_hash]) unless options[:dry_run]
  puts '-----------'
  puts "Added setup pig step to #{job.id} with #{args}, #{options}"
  pp step.to_hash if options[:verbose]
end
create_job(job_name, *) click to toggle source
# File lib/rp/emr/cli.rb, line 128
def create_job(job_name, *)
  instances = RP::EMR::Instances.new do |i|
    i.hadoop_version = '2.2.0'
    i.ec2_key_name = options[:ec2_key_name] if options[:ec2_key_name]
    i.keep_job_flow_alive_when_no_steps = options[:keep_alive]

    i.instance_groups = RP::EMR::InstanceGroups.new do |ig|
      ig.default_instance_type = options[:default_instance_type] if options[:default_instance_type]

      ig.master_instance_type = options[:master_instance_type] if options[:master_instance_type]
      ig.master_instance_count = options[:master_instance_count] if options[:master_instance_count]

      ig.core_instance_type = options[:core_instance_type] if options[:core_instance_type]
      ig.core_instance_count = options[:core_instance_count] if options[:core_instance_count]

      ig.task_instance_type = options[:task_instance_type] if options[:task_instance_type]
      ig.task_instance_count = options[:task_instance_count] if options[:task_instance_count]
      ig.task_bid_price = options[:task_bid_price] if options[:task_bid_price]
    end.to_a
  end

  setup_debugging_step = RP::EMR::Step::SetupDebugging.new do |s|
    s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive]
  end

  job = RP::EMR::Job.new do |job|
    job.log_uri = "s3://oib-mapreduce/logs/mosaic_analysis/#{job_name.underscore}"
    job.instances = instances.to_hash
    job.steps = [setup_debugging_step.to_hash]
    job.job_flow_role = options[:job_flow_role] if options[:job_flow_role]
    job.service_role = options[:service_role] if options[:service_role]
  end

  if options[:dry_run]
    job_flow = OpenStruct.new(id: 'job_flow_id')
  else
    job_flow = AWS::EMR.new.job_flows.create(job_name, job.to_hash)
  end
  puts '-----------'
  puts "Created job flow #{job_flow.id} with #{args}, #{options}"
  pp job.to_hash if options[:verbose]

  return job_flow.id
end