class SalesforceBulkQuery::Job

Represents a Salesforce bulk api job, contains multiple batches. Many jobs contained in Query

Constants

BATCH_COUNT
JOB_TIME_LIMIT

Attributes

job_id[R]

Public Class Methods

new(sobject, connection, options={}) click to toggle source
# File lib/salesforce_bulk_query/job.rb, line 14
def initialize(sobject, connection, options={})
  @sobject = sobject
  @connection = connection
  @logger = options[:logger]
  @job_time_limit = options[:job_time_limit] || JOB_TIME_LIMIT
  @date_field = options[:date_field] or fail "date_field must be given when creating a batch"
  @batch_count = options[:batch_count] || BATCH_COUNT

  # all batches (static)
  @batches = []

  # unfinished batches as of last get_available_results call
  @unfinished_batches = []

  # filenames fort the already downloaded and verified batches
  @filenames = []
end

Public Instance Methods

add_query(query, options={}) click to toggle source
# File lib/salesforce_bulk_query/job.rb, line 84
def add_query(query, options={})
  # create and create a batch
  batch = SalesforceBulkQuery::Batch.new(
    :sobject => @sobject,
    :soql => query,
    :job_id => @job_id,
    :connection => @connection,
    :start => options[:start],
    :stop => options[:stop],
    :logger => @logger,
    :date_field => @date_field
  )
  batch.create

  # add the batch to the list
  @batches.push(batch)
  @unfinished_batches.push(batch)
end
check_status() click to toggle source
# File lib/salesforce_bulk_query/job.rb, line 114
def check_status
  path = "job/#{@job_id}"
  response_parsed = @connection.get_xml(path)
  @completed_count = Integer(response_parsed["numberBatchesCompleted"][0])
  @succeeded = @completed_count == Integer(response_parsed["numberBatchesTotal"][0])

  return {
    :succeeded => @succeeded,
    :some_records_failed => Integer(response_parsed["numberRecordsFailed"][0]) > 0,
    :some_batches_failed => Integer(response_parsed["numberBatchesFailed"][0]) > 0,
    :response => response_parsed
  }
end
close_job() click to toggle source
# File lib/salesforce_bulk_query/job.rb, line 103
def close_job
  xml = "#{@@xml_header}<jobInfo xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">"
  xml += "<state>Closed</state>"
  xml += "</jobInfo>"

  path = "job/#{@job_id}"

  response_parsed = @connection.post_xml(path, xml)
  @job_closed_time = Time.now
end
create_job(csv=true) click to toggle source

Do the API request

# File lib/salesforce_bulk_query/job.rb, line 35
def create_job(csv=true)
  content_type = csv ? "CSV" : "XML"
  xml = "#{@@xml_header}<jobInfo xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">"
  xml += "<operation>#{@@operation}</operation>"
  xml += "<object>#{@sobject}</object>"
  xml += "<contentType>#{content_type}</contentType>"
  xml += "</jobInfo>"

  response_parsed = @connection.post_xml("job", xml)
  @job_id = response_parsed['id'][0]
end
generate_batches(soql, start, stop, single_batch=false) click to toggle source
# File lib/salesforce_bulk_query/job.rb, line 51
def generate_batches(soql, start, stop, single_batch=false)
  # if there's just one batch wanted, add it and we're done
  if single_batch
    soql_extended = get_extended_soql(soql, start, stop)
    @logger.info "Adding soql #{soql_extended} as a batch to job" if @logger

    add_query(soql_extended,
      :start => start,
      :stop => stop
    )
    return
  end

  # if there's more, generate the time intervals and generate the batches
  step_size = (stop - start) / @batch_count

  interval_beginings = start.step(stop - step_size, step_size).map{|f|f}
  interval_ends = interval_beginings.clone
  interval_ends.shift
  interval_ends.push(stop)

  interval_beginings.zip(interval_ends).each do |from, to|

    soql_extended = get_extended_soql(soql, from, to)
    @logger.info "Adding soql #{soql_extended} as a batch to job" if @logger

    add_query(soql_extended,
      :start => from,
      :stop => to
    )
  end
end
get_available_results(options={}) click to toggle source

downloads whatever is available, returns as unfinished whatever is not

# File lib/salesforce_bulk_query/job.rb, line 134
def get_available_results(options={})
  downloaded_filenames = []
  unfinished_batches = []
  verification_fail_batches = []
  failed_batches = []

  # get result for each batch in the job
  @unfinished_batches.each do |batch|
    batch_status = batch.check_status

    # if the result is ready
    if batch_status[:succeeded]
      # each finished batch should go here only once

      # download the result
      result = batch.get_result(options)
      @logger.info "get_result result: #{result}" if @logger

      # if the verification failed, put it to failed
      # will never ask about this one again.
      if result[:verification] == false
        verification_fail_batches << batch
      else
        # if verification ok and finished put it to filenames
        downloaded_filenames << result[:filename]
      end
    elsif batch_status[:failed]
      # put it to failed and raise error at the end
      failed_batches << batch
    else
      # otherwise put it to unfinished
      unfinished_batches << batch
    end
  end

  unless failed_batches.empty?
    details = failed_batches.map{ |b| "#{b.batch_id}: #{b.fail_message}"}.join("\n")
    fail ArgumentError, "#{failed_batches.length} batches failed. Details: #{details}"
  end

  # cache the unfinished_batches till the next run
  @unfinished_batches = unfinished_batches

  # cumulate filenames
  @filenames += downloaded_filenames

  @logger.info "unfinished batches: #{unfinished_batches}\nverification_fail_batches: #{verification_fail_batches}" if @logger

  return {
    :finished => @unfinished_batches.empty?,
    :filenames => @filenames,
    :unfinished_batches => @unfinished_batches,
    :verification_fail_batches => verification_fail_batches
  }
end
get_extended_soql(soql, from, to) click to toggle source
# File lib/salesforce_bulk_query/job.rb, line 47
def get_extended_soql(soql, from, to)
  return "#{soql} WHERE #{@date_field} >= #{from} AND #{@date_field} < #{to}"
end
over_limit?() click to toggle source
# File lib/salesforce_bulk_query/job.rb, line 128
def over_limit?
  (Time.now - @job_closed_time) > @job_time_limit
end
to_log() click to toggle source
# File lib/salesforce_bulk_query/job.rb, line 190
def to_log
  return {
    :sobject => @sobject,
    :connection => @connection.to_log,
    :batches => @batches.map {|b| b.to_log},
    :unfinished_batches => @unfinished_batches.map {|b| b.to_log}
  }
end