class Lhm::Chunker

Attributes

connection[R]

Public Class Methods

new(migration, connection = nil, options = {}) click to toggle source

Copy from origin to destination in chunks of size `stride`. Use the `throttler` class to sleep between each stride.

# File lib/lhm/chunker.rb, line 18
def initialize(migration, connection = nil, options = {})
  @migration = migration
  @connection = connection
  @chunk_finder = ChunkFinder.new(migration, connection, options)
  @options = options
  @raise_on_warnings = options.fetch(:raise_on_warnings, false)
  @verifier = options[:verifier]
  if @throttler = options[:throttler]
    @throttler.connection = @connection if @throttler.respond_to?(:connection=)
  end
  @start = @chunk_finder.start
  @limit = @chunk_finder.limit
  @printer = options[:printer] || Printer::Percentage.new
  @retry_helper = SqlRetry.new(
    @connection,
    {
      log_prefix: "Chunker"
    }.merge!(options.fetch(:retriable, {}))
  )
end

Public Instance Methods

execute() click to toggle source
# File lib/lhm/chunker.rb, line 39
def execute
  return if @chunk_finder.table_empty?
  @next_to_insert = @start
  while @next_to_insert <= @limit || (@start == @limit)
    stride = @throttler.stride
    top = upper_id(@next_to_insert, stride)
    verify_can_run

    affected_rows = ChunkInsert.new(@migration, @connection, bottom, top, @options).insert_and_return_count_of_rows_created
    expected_rows = top - bottom + 1

    if affected_rows < expected_rows
      raise_on_non_pk_duplicate_warning
    end

    if @throttler && affected_rows > 0
      @throttler.run
    end
    @printer.notify(bottom, @limit)
    @next_to_insert = top + 1
    break if @start == @limit
  end
  @printer.end
rescue => e
  @printer.exception(e) if @printer.respond_to?(:exception)
  raise
end

Private Instance Methods

bottom() click to toggle source
# File lib/lhm/chunker.rb, line 79
def bottom
  @next_to_insert
end
raise_on_non_pk_duplicate_warning() click to toggle source
# File lib/lhm/chunker.rb, line 69
def raise_on_non_pk_duplicate_warning
  @connection.query("show warnings").each do |level, code, message|
    unless message.match?(/Duplicate entry .+ for key 'PRIMARY'/)
      m = "Unexpected warning found for inserted row: #{message}"
      Lhm.logger.warn(m)
      raise Error.new(m) if @raise_on_warnings
    end
  end
end
upper_id(next_id, stride) click to toggle source
# File lib/lhm/chunker.rb, line 90
def upper_id(next_id, stride)
  sql = "select id from `#{ @migration.origin_name }` where id >= #{ next_id } order by id limit 1 offset #{ stride - 1}"
  top = @retry_helper.with_retries do |retriable_connection|
    retriable_connection.select_value(sql)
  end

  [top ? top.to_i : @limit, @limit].min
end
validate() click to toggle source
# File lib/lhm/chunker.rb, line 99
def validate
  return if @chunk_finder.table_empty?
  @chunk_finder.validate
end
verify_can_run() click to toggle source
# File lib/lhm/chunker.rb, line 83
def verify_can_run
  return unless @verifier
  @retry_helper.with_retries do |retriable_connection|
    raise "Verification failed, aborting early" if !@verifier.call(retriable_connection)
  end
end