class ActiveRecord::Write

Constants

DEFAULT_SERIALIZER
DEFAULT_SIZE
EMPTY_HASH
VERSION

Public Class Methods

new(query:, columns:, target:, size:, serializer:, &transaction) click to toggle source

‘query` is either an ActiveRecord query object or arel `columns` is a list of columns you want to have during the transaction `target` is the table you want to talk to `size` is the maximum number of running iterations in the pool, default: 24 `serializer` is the dump duck for Array & Hash values, default: JSON `transaction` is the process you want to run against your database

# File lib/active_record/write.rb, line 21
def initialize(query:, columns:, target:, size:, serializer:, &transaction)
  @query = query
  @columns = columns
  @target = target
  @size = size
  @serializer = serializer
  @transaction = transaction
  @table = Arel::Table.new(@target)
  @queue = case
  when activerecord?
    @query.pluck(*@columns)
  when arel?
    ActiveRecord::Base.connection.execute(@query.to_sql).map(&:values)
  when tuple?
    @query.map { |result| result.slice(*@columns).values }
  when twodimensional?
    @query
  else
    raise ArgumentError, 'query wasn\'t recognizable, please use some that looks like a: ActiveRecord::Base, Arel::SelectManager, Array<*Hash>, Array<*Array>'
  end
  puts "Migrating #{@queue.count} #{@target} records"
end

Public Instance Methods

pool(qutex = Mutex.new) click to toggle source
# File lib/active_record/write.rb, line 44
def pool(qutex = Mutex.new)
  # Spin up a number of threads based on the `maximum` given
  1.upto(@size).map do
    Thread.new do
      loop do
        # Try to get a new queue item
        item = qutex.synchronize { @queue.shift }

        if item.nil?
          # There is no more work
          break
        else
          # Wait for a free connection
          ActiveRecord::Base.connection_pool.with_connection do
            ActiveRecord::Base.transaction do
              # Execute each statement coming back
              Array[instance_exec(*item, &@transaction)].each do |instruction|
                ActiveRecord::Base.connection.execute(instruction.to_sql)
              end
            end
          end
        end
      end
    end
  end.map(&:join)
end

Private Instance Methods

activerecord?() click to toggle source
# File lib/active_record/write.rb, line 71
        def activerecord?
  @query.kind_of?(ActiveRecord::Base) || @query.kind_of?(ActiveRecord::Relation)
end
arel?() click to toggle source
# File lib/active_record/write.rb, line 75
        def arel?
  @query.kind_of?(Arel::SelectManager)
end
insert(data) click to toggle source
# File lib/active_record/write.rb, line 91
        def insert(data)
  Arel::InsertManager.new(ActiveRecord::Base).tap { |m| m.insert(serialize(data)) }
end
serialize(data) click to toggle source
# File lib/active_record/write.rb, line 95
        def serialize(data)
  data.inject(EMPTY_HASH) do |state, (key, value)|
    if value.is_a?(Array) || value.is_a?(Hash)
      state.merge(@table[key] => JSON.dump(value))
    else
      state.merge(@table[key] => value)
    end
  end
end
tuple?() click to toggle source
# File lib/active_record/write.rb, line 79
        def tuple?
  @query.kind_of?(Array) && @query.first.kind_of?(Hash)
end
twodimensional?() click to toggle source
# File lib/active_record/write.rb, line 83
        def twodimensional?
  @query.kind_of?(Array) && @query.first.kind_of?(Array)
end
update(id, data) click to toggle source
# File lib/active_record/write.rb, line 87
        def update(id, data)
  Arel::UpdateManager.new(ActiveRecord::Base).table(@table).where(@table[:id].eq(id)).set(serialize(data))
end