class ActiveRecord::Pool
Constants
- DEFAULT_SERIALIZER
- DEFAULT_SIZE
- EMPTY_HASH
- VERSION
Public Class Methods
new(query:, columns:, table:, size:, serializer:, model:, &transaction)
click to toggle source
‘query` is either an ActiveRecord
query object or arel `columns` is a list of columns you want to have during the transaction `table` is the Arel::Table that operations are cast against `size` is the maximum number of running iterations in the pool, default: 24 `serializer` is the dump duck for Array & Hash values, default: JSON `model` is an ActiveRecord
model `transaction` is the process you want to run against your database
# File lib/active_record/pool.rb, line 18 def initialize(query:, columns:, table:, size:, serializer:, model:, &transaction) @query = query @serializer = serializer @table = table qutex = Mutex.new queue = case when activerecord? @query.pluck(*@columns) when arel? ActiveRecord::Base.connection.execute(@query.to_sql).map(&:values) when tuple? @query.map { |result| result.slice(*columns).values } when twodimensional? @query else raise ArgumentError, 'query wasn\'t recognizable, please use some that looks like a: ActiveRecord::Base, Arel::SelectManager, Array[Hash], Array[Array]' end puts "Migrating #{queue.count} #{table} records" # Spin up a number of threads based on the `maximum` given 1.upto(size).map do Thread.new do loop do # Try to get a new queue item item = qutex.synchronize { queue.shift } if item.nil? # There is no more work break else # Wait for a free connection model.connection_pool.with_connection do model.transaction do # Execute each statement coming back Array[instance_exec(*item, &transaction)].each do |instruction| next if instruction.nil? model.connection.execute(instruction.to_sql) end end end end end end end.map(&:join) end
Private Instance Methods
activerecord?()
click to toggle source
# File lib/active_record/pool.rb, line 66 def activerecord? @query.kind_of?(ActiveRecord::Base) || @query.kind_of?(ActiveRecord::Relation) || @query.try(:<, ActiveRecord::Base) end
arel?()
click to toggle source
# File lib/active_record/pool.rb, line 70 def arel? @query.kind_of?(Arel::SelectManager) end
delete(id)
click to toggle source
# File lib/active_record/pool.rb, line 90 def delete(id) Arel::DeleteManager.new.from(@table).where(@table[:id].eq(id)) end
insert(data)
click to toggle source
# File lib/active_record/pool.rb, line 86 def insert(data) Arel::InsertManager.new.tap { |m| m.insert(serialize(data)) } end
serialize(data)
click to toggle source
# File lib/active_record/pool.rb, line 94 def serialize(data) data.inject(EMPTY_HASH) do |state, (key, value)| if value.is_a?(Array) || value.is_a?(Hash) state.merge(@table[key] => @serializer.dump(value)) else state.merge(@table[key] => value) end end end
tuple?()
click to toggle source
# File lib/active_record/pool.rb, line 74 def tuple? @query.kind_of?(Array) && @query.first.kind_of?(Hash) end
twodimensional?()
click to toggle source
# File lib/active_record/pool.rb, line 78 def twodimensional? @query.kind_of?(Array) && @query.first.kind_of?(Array) end
update(id, data)
click to toggle source
# File lib/active_record/pool.rb, line 82 def update(id, data) Arel::UpdateManager.new.table(@table).where(@table[:id].eq(id)).set(serialize(data)) end