class SalesforceChunker::Job

Constants

DEFAULT_RETRY_SECONDS
DEFAULT_TIMEOUT_SECONDS
QUERY_OPERATIONS

Attributes

batches_count[R]

Public Class Methods

new(connection:, object:, operation:, **options) click to toggle source
# File lib/salesforce_chunker/job.rb, line 11
def initialize(connection:, object:, operation:, **options)
  @log = options[:logger] || Logger.new(options[:log_output])
  @log.progname = "salesforce_chunker"

  @connection = connection
  @operation = operation
  @batches_count = nil

  @log.info "Creating Bulk API Job"
  @job_id = create_job(object, options.slice(:headers, :external_id))
end

Public Instance Methods

close() click to toggle source
# File lib/salesforce_chunker/job.rb, line 107
def close
  body = {"state": "Closed"}
  @connection.post_json("job/#{@job_id}/", body)
end
create_batch(payload) click to toggle source
# File lib/salesforce_chunker/job.rb, line 81
def create_batch(payload)
  if QUERY_OPERATIONS.include?(@operation)
    @log.info "Creating #{@operation.capitalize} Batch: \"#{payload.gsub(/\n/, " ").strip}\""
    @connection.post("job/#{@job_id}/batch", payload.to_s)["id"]
  else
    @log.info "Creating #{@operation.capitalize} Batch"
    @connection.post_json("job/#{@job_id}/batch", payload)["id"]
  end
end
download_results(**options) { |result| ... } click to toggle source
# File lib/salesforce_chunker/job.rb, line 23
def download_results(**options)
  return nil unless QUERY_OPERATIONS.include?(@operation)
  return to_enum(:download_results, **options) unless block_given?

  retry_seconds = options[:retry_seconds] || DEFAULT_RETRY_SECONDS
  timeout_at = Time.now.utc + (options[:timeout_seconds] || DEFAULT_TIMEOUT_SECONDS)
  downloaded_batches = []

  loop do
    results_downloaded = false
    @log.info "Retrieving batch status information"
    get_completed_batches.each do |batch|
      next if downloaded_batches.include?(batch["id"])
      @log.info "Batch #{downloaded_batches.length + 1} of #{@batches_count || '?'}: " \
        "retrieving #{batch["numberRecordsProcessed"]} records"
      if batch["numberRecordsProcessed"].to_i > 0
        get_batch_results(batch["id"]) { |result| yield(result) }
        results_downloaded = true
      end
      downloaded_batches.append(batch["id"])
    end

    break if @batches_count && downloaded_batches.length == @batches_count

    unless results_downloaded
      raise TimeoutError, "Timeout during batch processing" if Time.now.utc > timeout_at

      @log.info "Waiting #{retry_seconds} seconds"
      sleep(retry_seconds)
    end
  end
  
  @log.info "Completed"
end
get_batch_results(batch_id) { |result| ... } click to toggle source
# File lib/salesforce_chunker/job.rb, line 66
def get_batch_results(batch_id)
  retrieve_batch_results(batch_id).each do |result_id|
    results = retrieve_raw_results(batch_id, result_id)

    @log.info "Parsing JSON response"
    parsed_results = JSON.parse(results)

    @log.info "Yielding records"
    parsed_results.each do |result|
      result.tap { |h| h.delete("attributes") }
      yield(result)
    end
  end
end
get_batch_statuses() click to toggle source
# File lib/salesforce_chunker/job.rb, line 91
def get_batch_statuses
  @connection.get_json("job/#{@job_id}/batch")["batchInfo"]
end
get_completed_batches() click to toggle source
# File lib/salesforce_chunker/job.rb, line 58
def get_completed_batches
  get_batch_statuses.select do |batch|
    raise BatchError, "Batch failed: #{batch["stateMessage"]}" if batch["state"] == "Failed"
    raise RecordError, "Failed records in batch" if batch["state"] == "Completed" && batch["numberRecordsFailed"].to_i > 0
    batch["state"] == "Completed"
  end
end
retrieve_batch_results(batch_id) click to toggle source
# File lib/salesforce_chunker/job.rb, line 95
def retrieve_batch_results(batch_id)
  @connection.get_json("job/#{@job_id}/batch/#{batch_id}/result")
end
retrieve_raw_results(batch_id, result_id) click to toggle source
# File lib/salesforce_chunker/job.rb, line 103
def retrieve_raw_results(batch_id, result_id)
  @connection.get("job/#{@job_id}/batch/#{batch_id}/result/#{result_id}")
end
retrieve_results(batch_id, result_id) click to toggle source
# File lib/salesforce_chunker/job.rb, line 99
def retrieve_results(batch_id, result_id)
  @connection.get_json("job/#{@job_id}/batch/#{batch_id}/result/#{result_id}")
end

Private Instance Methods

create_job(object, options) click to toggle source
# File lib/salesforce_chunker/job.rb, line 114
def create_job(object, options)
  body = {
    "operation": @operation,
    "object": object,
    "contentType": options[:content_type] || "JSON",
  }
  body[:externalIdFieldName] = options[:external_id] if @operation == "upsert"
  @connection.post_json("job", body, options[:headers].to_h)["id"]
end