class SalesforceBulkClient::Job
Constants
- BATCH_CHARACTER_LIMIT
Attributes
job_id[R]
Public Class Methods
new(args)
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 12 def initialize(args) @job_id = args[:job_id] @operation = args[:operation] @sobject = args[:sobject] @external_field = args[:external_field] @records = args[:records] @connection = args[:connection] @batch_ids = [] end
Public Instance Methods
add_batch(batch)
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 49 def add_batch(batch) batch_ids = [] batch_groups = [] batch_size = MultiJson.dump(batch).size if batch_size <= BATCH_CHARACTER_LIMIT batch_groups << batch else # Split batch into sub-batches batch_group = [] batch_group_size = MultiJson.dump(batch_group).size batch.each do |record| record_size = MultiJson.dump(record).size if batch_group_size + record_size + 1 > BATCH_CHARACTER_LIMIT batch_groups << batch_group.dup batch_group.clear batch_group_size = MultiJson.dump(batch_group).size end batch_group << record.dup batch_group_size += record_size + 1 end # Add remaining records if !batch_group.empty? batch_groups << batch_group.dup batch_group.clear end end batch_groups.each do |batch_group| add_batch_result = @connection.post_request("job/#{@job_id}/batch", batch_group) batch_ids << add_batch_result.id end batch_ids end
add_batches()
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 42 def add_batches raise 'Records must be an array of hashes.' unless @records.is_a? Array @records.each_slice(@batch_size) do |batch| @batch_ids.concat(add_batch(batch)) end end
add_query()
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 37 def add_query add_query_result = @connection.post_request("job/#{@job_id}/batch", @records, false) @batch_ids << add_query_result.id end
check_batch_status(batch_id)
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 91 def check_batch_status(batch_id) @connection.get_request("job/#{@job_id}/batch/#{batch_id}") end
check_job_status()
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 87 def check_job_status @connection.get_request("job/#{@job_id}") end
close_job()
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 32 def close_job close_job_request = { state: 'Closed' } @connection.post_request("job/#{@job_id}", close_job_request) end
create_job(batch_size)
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 22 def create_job(batch_size) @batch_size = batch_size create_job_request = { operation: @operation.to_s.downcase, object: @sobject, contentType: 'JSON' } if !@external_field.nil? create_job_request[:externalIdFieldName] = @external_field end create_job_result = @connection.post_request('job', create_job_request) @job_id = create_job_result.id end
each_batch(timeout = 3600, poll_delay = 5) { |batch_info, batch_result| ... }
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 152 def each_batch(timeout = 3600, poll_delay = 5) job_result = self.get_job_result(false, timeout, poll_delay) job_result.batches.each do |batch_info| batch_result = nil if batch_info.state == 'Completed' batch_result = self.get_batch_result(batch_info.id) end yield(batch_info, batch_result) end end
get_batch_result(batch_id)
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 139 def get_batch_result(batch_id) batch_results = @connection.get_request("job/#{@job_id}/batch/#{batch_id}/result") results = [] if @operation.to_s != 'query' results = batch_results else batch_results.each do |batch_result_id| results.concat(@connection.get_request("job/#{@job_id}/batch/#{batch_id}/result/#{batch_result_id}")) end end results end
get_job_result(return_result, timeout, poll_delay)
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 99 def get_job_result(return_result, timeout, poll_delay) batch_infos = [] polling_started = false polling_completed = false FirePoll.poll("Timeout waiting for Salesforce to process job batches #{@batch_ids} of job #{@job_id}.", timeout) do sleep poll_delay if polling_started polling_started = true job_status = self.check_job_status if job_status.state == 'Closed' batch_info_map = {} batches_ready = @batch_ids.all? do |batch_id| batch_info = batch_info_map[batch_id] = self.check_batch_status(batch_id) batch_info.state != 'Queued' && batch_info.state != 'InProgress' end if batches_ready @batch_ids.each do |batch_id| batch_infos.insert(0, batch_info_map[batch_id]) @batch_ids.delete(batch_id) end end polling_completed = true if @batch_ids.empty? else polling_completed = true end polling_completed end job_status = self.check_job_status batch_infos.each_with_index do |batch_info, i| if batch_info.state == 'Completed' && return_result == true batch_infos[i].merge!({ 'response' => self.get_batch_result(batch_info.id)}) end end job_status.merge!({ 'batches' => batch_infos }) job_status end
list_batches()
click to toggle source
# File lib/salesforce_bulk_client/job.rb, line 95 def list_batches @connection.get_request("job/#{@job_id}/batch")&.batchInfo end