class ActiveRecord::Pool

Constants

DEFAULT_SERIALIZER
DEFAULT_SIZE
EMPTY_HASH
VERSION

Public Class Methods

new(query:, columns:, table:, size:, serializer:, model:, &transaction) click to toggle source

‘query` is either an ActiveRecord query object or arel `columns` is a list of columns you want to have during the transaction `table` is the Arel::Table that operations are cast against `size` is the maximum number of running iterations in the pool, default: 24 `serializer` is the dump duck for Array & Hash values, default: JSON `model` is an ActiveRecord model `transaction` is the process you want to run against your database

# File lib/active_record/pool.rb, line 18
def initialize(query:, columns:, table:, size:, serializer:, model:, &transaction)
  @query = query
  @serializer = serializer
  @table = table
  qutex = Mutex.new

  queue = case
  when activerecord?
    @query.pluck(*@columns)
  when arel?
    ActiveRecord::Base.connection.execute(@query.to_sql).map(&:values)
  when tuple?
    @query.map { |result| result.slice(*columns).values }
  when twodimensional?
    @query
  else
    raise ArgumentError, 'query wasn\'t recognizable, please use some that looks like a: ActiveRecord::Base, Arel::SelectManager, Array[Hash], Array[Array]'
  end

  puts "Migrating #{queue.count} #{table} records"

  # Spin up a number of threads based on the `maximum` given
  1.upto(size).map do
    Thread.new do
      loop do
        # Try to get a new queue item
        item = qutex.synchronize { queue.shift }

        if item.nil?
          # There is no more work
          break
        else
          # Wait for a free connection
          model.connection_pool.with_connection do
            model.transaction do
              # Execute each statement coming back
              Array[instance_exec(*item, &transaction)].each do |instruction|
                next if instruction.nil?
                model.connection.execute(instruction.to_sql)
              end
            end
          end
        end
      end
    end
  end.map(&:join)
end

Private Instance Methods

activerecord?() click to toggle source
# File lib/active_record/pool.rb, line 66
        def activerecord?
  @query.kind_of?(ActiveRecord::Base) || @query.kind_of?(ActiveRecord::Relation) || @query.try(:<, ActiveRecord::Base)
end
arel?() click to toggle source
# File lib/active_record/pool.rb, line 70
        def arel?
  @query.kind_of?(Arel::SelectManager)
end
delete(id) click to toggle source
# File lib/active_record/pool.rb, line 90
        def delete(id)
  Arel::DeleteManager.new.from(@table).where(@table[:id].eq(id))
end
insert(data) click to toggle source
# File lib/active_record/pool.rb, line 86
        def insert(data)
  Arel::InsertManager.new.tap { |m| m.insert(serialize(data)) }
end
serialize(data) click to toggle source
# File lib/active_record/pool.rb, line 94
        def serialize(data)
  data.inject(EMPTY_HASH) do |state, (key, value)|
    if value.is_a?(Array) || value.is_a?(Hash)
      state.merge(@table[key] => @serializer.dump(value))
    else
      state.merge(@table[key] => value)
    end
  end
end
tuple?() click to toggle source
# File lib/active_record/pool.rb, line 74
        def tuple?
  @query.kind_of?(Array) && @query.first.kind_of?(Hash)
end
twodimensional?() click to toggle source
# File lib/active_record/pool.rb, line 78
        def twodimensional?
  @query.kind_of?(Array) && @query.first.kind_of?(Array)
end
update(id, data) click to toggle source
# File lib/active_record/pool.rb, line 82
        def update(id, data)
  Arel::UpdateManager.new.table(@table).where(@table[:id].eq(id)).set(serialize(data))
end