class SalesforceBulkQuery::Query

Abstraction of a single user-given query. It contains multiple jobs, is tied to a specific connection

Constants

DEFAULT_DATE_FIELD
DEFAULT_MIN_CREATED
OFFSET_FROM_NOW

if no date_to is given we use the current time with this offset subtracted (to make sure the freshest changes that can be inconsistent aren't there) It's in minutes

Attributes

jobs_done[R]
jobs_in_progress[R]
jobs_restarted[R]

Public Class Methods

new(sobject, soql, connection, options={}) click to toggle source
# File lib/salesforce_bulk_query/query.rb, line 16
def initialize(sobject, soql, connection, options={})
  @sobject = sobject
  @soql = soql
  @connection = connection
  @logger = options[:logger]
  @date_field = options[:date_field] || DEFAULT_DATE_FIELD
  @date_from = options[:date_from] || options[:created_from]
  @date_to = options[:date_to] || options[:created_to]
  @single_batch = options[:single_batch]

  # jobs currently running
  @jobs_in_progress = []

  # successfully finished jobs with no batches to split
  @jobs_done = []

  # finished or timeouted jobs with some batches split into other jobs
  @jobs_restarted = []

  @finished_batch_filenames = []
  @restarted_subqueries = []
end

Public Instance Methods

get_available_results(options={}) click to toggle source

Get results for all finished jobs. If there are some unfinished batches, skip them and return them as unfinished.

@param options

# File lib/salesforce_bulk_query/query.rb, line 89
def get_available_results(options={})

  unfinished_subqueries = []
  jobs_in_progress = []
  jobs_restarted = []
  jobs_done = []

  # check all jobs statuses and split what should be split
  @jobs_in_progress.each do |job|

    # download what's available
    job_results = job.get_available_results(options)

    job_over_limit = job.over_limit?
    job_done = job_results[:finished] || job_over_limit

    @logger.debug "job_results: #{job_results}" if @logger

    unfinished_batches = job_results[:unfinished_batches]
    verification_fail_batches = job_results[:verification_fail_batches]

    unfinished_subqueries += unfinished_batches.map {|b| b.soql}

    # split to subqueries what needs to be split
    to_split = verification_fail_batches
    to_split += unfinished_batches if job_over_limit

    # delete files associated with batches that failed verification
    verification_fail_batches.each do |b|
      @logger.info "Deleting #{b.filename}, verification failed."
      File.delete(b.filename)
    end

    to_split.each do |batch|
      # for each unfinished batch create a new job and add it to new jobs
      @logger.info "The following subquery didn't end in time / failed verification: #{batch.soql}. Dividing into multiple and running again" if @logger
      new_job = SalesforceBulkQuery::Job.new(
        @sobject,
        @connection,
        {:logger => @logger, :date_field => @date_field}.merge(options)
      )
      new_job.create_job
      new_job.generate_batches(@soql, batch.start, batch.stop)
      new_job.close_job
      jobs_in_progress.push(new_job)
    end

    # what to do with the current job
    # finish, some stuff restarted
    if job_done
      if to_split.empty?
        # done, nothing left
        jobs_done.push(job)

        @logger.info "#{job.job_id} finished. Nothing to split. unfinished_batches: #{unfinished_batches}, verification_fail_batches: #{verification_fail_batches}" if @logger
      else
        # done, some batches needed to be restarted
        jobs_restarted.push(job)
      end

      # store the filenames and restarted stuff
      @finished_batch_filenames += job_results[:filenames]
      @restarted_subqueries += to_split.map {|b| b.soql}
    else
      # still in progress
      jobs_in_progress.push(job)
    end
  end

  # remove the finished jobs from progress and add there the new ones
  @jobs_in_progress = jobs_in_progress
  @jobs_done += jobs_done

  # we're done if there're no jobs in progress
  return {
    :succeeded => @jobs_in_progress.empty?,
    :filenames => @finished_batch_filenames,
    :unfinished_subqueries => unfinished_subqueries,
    :jobs_done => @jobs_done.map { |j| j.job_id }
  }
end
start(options={}) click to toggle source

Creates the first job, divides the query to subqueries, puts all the subqueries as batches to the job

# File lib/salesforce_bulk_query/query.rb, line 44
def start(options={})
  # order by and where not allowed
  if (!@single_batch) && (@soql =~ / WHERE /i || @soql =~ /ORDER BY/i)
    raise "You can't have WHERE or ORDER BY in your soql. If you want to download just specific date range use date_from / date_to"
  end

  # create the first job
  job = SalesforceBulkQuery::Job.new(
    @sobject,
    @connection,
    {:logger => @logger, :date_field => @date_field}.merge(options)
  )
  job.create_job

  # get the date when it should start
  min_date = get_min_date

  # generate intervals
  start = nil
  if (min_date.instance_of?(Time))
    start = DateTime.parse(min_date.to_s)
  else
    start = DateTime.parse(min_date)
  end

  stop = nil
  if (@date_to.nil?)
    stop = DateTime.now - Rational(options[:offset_from_now] || OFFSET_FROM_NOW, 1440)
  else
    if (@date_to.instance_of?(Time))
      stop = DateTime.parse(@date_to.to_s)
    else
      stop = DateTime.parse(@date_to)
    end
  end
  job.generate_batches(@soql, start, stop, @single_batch)

  job.close_job

  @jobs_in_progress.push(job)
end

Private Instance Methods

get_min_date() click to toggle source
# File lib/salesforce_bulk_query/query.rb, line 173
def get_min_date
  if @date_from
    return @date_from
  end

  # get the date when the first was created
  min_created = nil
  begin
    min_created_resp = @connection.client.query("SELECT #{@date_field} FROM #{@sobject} ORDER BY #{@date_field} LIMIT 1")
    min_created_resp.each {|s| min_created = s[@date_field.to_sym]}
  rescue Faraday::Error::TimeoutError => e
    @logger.warn "Timeout getting the oldest object for #{@sobject}. Error: #{e}. Using the default value" if @logger
    min_created = DEFAULT_MIN_CREATED
  rescue Faraday::Error::ClientError => e
    fail ArgumentError, "Error when trying to get the oldest record according to #{@date_field}, looks like #{@date_field} is not on #{@sobject}. Original error: #{e}\n #{e.message} \n #{e.backtrace} "
  end
  min_created
end