class Elasticrawl::CombineJob

Represents an Elastic MapReduce job flow that combines the results of multiple Elasticrawl Parse jobs. Parse jobs write their results per segment. Combine jobs aggregate parse results into a single set of files.

Inherits from Job which is the ActiveRecord model class.

Public Instance Methods

log_uri() click to toggle source

Returns the S3 location for storing Elastic MapReduce job logs.

# File lib/elasticrawl/combine_job.rb, line 43
def log_uri
  s3_path = "/logs/2-combine/#{self.job_name}/"
  build_s3_uri(s3_path)
end
run() click to toggle source

Runs the job by calling the Elastic MapReduce API.

# File lib/elasticrawl/combine_job.rb, line 31
def run
  emr_config = job_config['emr_config']
  job_flow_id = run_job_flow(emr_config)

  if job_flow_id.present?
    self.job_flow_id = job_flow_id
    self.save
    self.result_message
  end
end
set_input_jobs(input_jobs) click to toggle source

Takes in an array of parse jobs that are to be combined. Creates a single job step whose input paths are the outputs of the parse jobs.

# File lib/elasticrawl/combine_job.rb, line 10
def set_input_jobs(input_jobs)
  segment_count = 0
  input_paths = []

  input_jobs.each do |job_name|
    input_job = Job.where(:job_name => job_name,
                          :type => 'Elasticrawl::ParseJob').first_or_initialize
    step_count = input_job.job_steps.count

    if step_count > 0
      segment_count += step_count
      input_paths << set_input_path(input_job)
    end
  end

  self.job_name = set_job_name
  self.job_desc = set_job_desc(segment_count)
  job_steps.push(create_job_step(input_paths.join(',')))
end

Private Instance Methods

create_job_step(input_paths) click to toggle source

Returns a single job step. The input paths are a CSV list of parse job outputs.

# File lib/elasticrawl/combine_job.rb, line 51
def create_job_step(input_paths)
  JobStep.create(:job => self,
                 :input_paths => input_paths,
                 :output_path => set_output_path)
end
job_config() click to toggle source

Returns the combine job configuration from ~/.elasticrawl.jobs.yml.

# File lib/elasticrawl/combine_job.rb, line 81
def job_config
  config = Config.new
  config.load_config('jobs')['steps']['combine']
end
set_input_path(input_job) click to toggle source

Returns the S3 location for reading a parse job. A wildcard is used for the segment names. The input filter depends on the output file type of the parse job and what type of compression is used.

# File lib/elasticrawl/combine_job.rb, line 60
def set_input_path(input_job)
  job_name = input_job.job_name
  input_filter = job_config['input_filter']

  s3_path = "/data/1-parse/#{job_name}/segments/*/#{input_filter}"
  build_s3_uri(s3_path)
end
set_job_desc(segment_count) click to toggle source

Sets the job description which forms part of the Elastic MapReduce job flow name.

# File lib/elasticrawl/combine_job.rb, line 76
def set_job_desc(segment_count)
  "Combining: #{segment_count} segments"
end
set_output_path() click to toggle source

Returns the S3 location for storing the combine job results.

# File lib/elasticrawl/combine_job.rb, line 69
def set_output_path
  s3_path = "/data/2-combine/#{self.job_name}/"
  build_s3_uri(s3_path)
end