class RP::EMR::CLI
Public Instance Methods
add_pig_script_step(job_id, script_path, *)
click to toggle source
# File lib/rp/emr/cli.rb, line 229 def add_pig_script_step(job_id, script_path, *) job = AWS::EMR.new.job_flows[job_id] step = RP::EMR::Step::Pig.new( name: 'Pig', script_path: script_path, script_bucket: options[:script_bucket], ) do |s| s.pig_params = options[:pig_params] if options[:pig_params] s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive] s.dry_run = options[:dry_run] end job.add_steps([step.to_hash]) unless options[:dry_run] puts '-----------' puts "Added pig script step to #{job.id} with #{args}, #{options}" pp step.to_hash if options[:verbose] end
add_rollup_step(job_id, input, output, *)
click to toggle source
# File lib/rp/emr/cli.rb, line 207 def add_rollup_step(job_id, input, output, *) job = AWS::EMR.new.job_flows[job_id] step = RP::EMR::Step::S3DistCp.new( name: 'Rollup', src: input, dest: output, ) do |s| s.srcPattern = options[:rollup_input_pattern] if options[:rollup_input_pattern] s.groupBy = options[:rollup_group_by] if options[:rollup_group_by] s.targetSize = options[:rollup_target_size] if options[:rollup_target_size] s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive] end job.add_steps([step.to_hash]) unless options[:dry_run] puts '-----------' puts "Added rollup step to #{job.id} with #{args}, #{options}" pp step.to_hash if options[:verbose] end
add_setup_hive_step(job_id, *)
click to toggle source
# File lib/rp/emr/cli.rb, line 191 def add_setup_hive_step(job_id, *) job = AWS::EMR.new.job_flows[job_id] step = RP::EMR::Step::SetupHive.new do |s| s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive] s.hive_version = options[:hive_version] if options[:hive_version] end job.add_steps([step.to_hash]) unless options[:dry_run] puts '-----------' puts "Added setup hive step to #{job.id} with #{args}, #{options}" pp step.to_hash if options[:verbose] end
add_setup_pig_step(job_id, *)
click to toggle source
# File lib/rp/emr/cli.rb, line 175 def add_setup_pig_step(job_id, *) job = AWS::EMR.new.job_flows[job_id] step = RP::EMR::Step::SetupPig.new do |s| s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive] s.pig_version = options[:pig_version] if options[:pig_version] end job.add_steps([step.to_hash]) unless options[:dry_run] puts '-----------' puts "Added setup pig step to #{job.id} with #{args}, #{options}" pp step.to_hash if options[:verbose] end
create_job(job_name, *)
click to toggle source
# File lib/rp/emr/cli.rb, line 128 def create_job(job_name, *) instances = RP::EMR::Instances.new do |i| i.hadoop_version = '2.2.0' i.ec2_key_name = options[:ec2_key_name] if options[:ec2_key_name] i.keep_job_flow_alive_when_no_steps = options[:keep_alive] i.instance_groups = RP::EMR::InstanceGroups.new do |ig| ig.default_instance_type = options[:default_instance_type] if options[:default_instance_type] ig.master_instance_type = options[:master_instance_type] if options[:master_instance_type] ig.master_instance_count = options[:master_instance_count] if options[:master_instance_count] ig.core_instance_type = options[:core_instance_type] if options[:core_instance_type] ig.core_instance_count = options[:core_instance_count] if options[:core_instance_count] ig.task_instance_type = options[:task_instance_type] if options[:task_instance_type] ig.task_instance_count = options[:task_instance_count] if options[:task_instance_count] ig.task_bid_price = options[:task_bid_price] if options[:task_bid_price] end.to_a end setup_debugging_step = RP::EMR::Step::SetupDebugging.new do |s| s.action_on_failure = 'CANCEL_AND_WAIT' if options[:keep_alive] end job = RP::EMR::Job.new do |job| job.log_uri = "s3://oib-mapreduce/logs/mosaic_analysis/#{job_name.underscore}" job.instances = instances.to_hash job.steps = [setup_debugging_step.to_hash] job.job_flow_role = options[:job_flow_role] if options[:job_flow_role] job.service_role = options[:service_role] if options[:service_role] end if options[:dry_run] job_flow = OpenStruct.new(id: 'job_flow_id') else job_flow = AWS::EMR.new.job_flows.create(job_name, job.to_hash) end puts '-----------' puts "Created job flow #{job_flow.id} with #{args}, #{options}" pp job.to_hash if options[:verbose] return job_flow.id end