class ActiveRecordTransactioner

Constants

ALLOWED_ARGS
DEFAULT_ARGS
EMPTY_ARGS

Public Class Methods

new(args = {}) { |self| ... } click to toggle source
# File lib/active-record-transactioner.rb, line 18
def initialize(args = {})
  args.each_key { |key| raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key) }

  @args = DEFAULT_ARGS.merge(args)
  parse_and_set_args

  return unless block_given?

  begin
    yield self
  ensure
    flush
    join if threadded?
  end
end

Public Instance Methods

bulk_create!(model) click to toggle source
# File lib/active-record-transactioner.rb, line 40
def bulk_create!(model)
  attributes = model.attributes
  attributes.delete("id")
  attributes.delete("created_at")
  attributes.delete("updated_at")

  klass = model.class
  @bulk_creates[klass] ||= []
  @bulk_creates[klass] << attributes

  @count += 1
end
destroy!(model) click to toggle source
# File lib/active-record-transactioner.rb, line 61
def destroy!(model)
  queue(model, type: :destroy!)
end
flush() click to toggle source

Flushes the specified method on all the queued models in a thread for each type of model.

# File lib/active-record-transactioner.rb, line 91
def flush
  wait_for_threads if threadded?

  @lock.synchronize do
    @bulk_creates.each do |klass, attribute_array|
      if threadded?
        bulk_insert_attribute_array_threadded(klass, attribute_array)
      else
        bulk_insert_attribute_array(klass, attribute_array)
      end
    end

    @models.each do |klass, models|
      next if models.empty?

      @models[klass] = []
      @count -= models.length

      if threadded?
        work_threadded(klass, models)
      else
        work_models_through_transaction(klass, models)
      end
    end
  end
end
join() click to toggle source

Waits for any remaining running threads.

# File lib/active-record-transactioner.rb, line 119
def join
  threads_to_join = @lock_threads.synchronize { @threads.clone }

  debug "Threads to join: #{threads_to_join}" if @debug
  threads_to_join.each(&:join)
end
queue(model, args = {}) click to toggle source

Adds another model to the queue and calls ‘flush’ if it is over the limit.

# File lib/active-record-transactioner.rb, line 66
def queue(model, args = {})
  args[:type] ||= :save!

  @lock.synchronize do
    klass = model.class

    validate = args.key?(:validate) ? args[:validate] : true

    @lock_models[klass] ||= Monitor.new

    @models[klass] ||= []
    @models[klass] << {
      model: model,
      type: args.fetch(:type),
      validate: validate,
      method_args: args[:method_args] || EMPTY_ARGS
    }

    @count += 1
  end

  flush if should_flush?
end
save!(model) click to toggle source

Adds another model to the queue and calls ‘flush’ if it is over the limit.

# File lib/active-record-transactioner.rb, line 35
def save!(model)
  raise ActiveRecord::RecordInvalid, model unless model.valid?
  queue(model, type: :save!, validate: false)
end
threadded?() click to toggle source
# File lib/active-record-transactioner.rb, line 126
def threadded?
  @args[:threadded]
end
update_column(model, column_name, new_value) click to toggle source
# File lib/active-record-transactioner.rb, line 57
def update_column(model, column_name, new_value)
  update_columns(model, column_name => new_value)
end
update_columns(model, updates) click to toggle source
# File lib/active-record-transactioner.rb, line 53
def update_columns(model, updates)
  queue(model, type: :update_columns, validate: false, method_args: [updates])
end

Private Instance Methods

allowed_to_start_new_thread?() click to toggle source
# File lib/active-record-transactioner.rb, line 225
def allowed_to_start_new_thread?
  @lock_threads.synchronize { return @threads.length < @max_running_threads }
end
bulk_insert_attribute_array(klass, attribute_array) click to toggle source
# File lib/active-record-transactioner.rb, line 249
def bulk_insert_attribute_array(klass, attribute_array)
  sql = "INSERT INTO `#{klass.table_name}` ("

  first = true
  attribute_array.first.each_key do |key|
    if first
      first = false
    else
      sql << ", "
    end

    sql << "`#{key}`"
  end

  sql << ") VALUES ("

  first_insert = true
  attribute_array.each do |attributes|
    if first_insert
      first_insert = false
    else
      sql << "), ("
    end

    first_value = true
    attributes.each_value do |value|
      if first_value
        first_value = false
      else
        sql << ", "
      end

      sql << klass.connection.quote(value)
    end
  end

  sql << ")"

  klass.connection.execute(sql)

  @lock.synchronize do
    @count -= attribute_array.length
  end
end
bulk_insert_attribute_array_threadded(klass, attribute_array) click to toggle source
# File lib/active-record-transactioner.rb, line 229
def bulk_insert_attribute_array_threadded(klass, attribute_array)
  @lock_threads.synchronize do
    @threads << Thread.new do
      begin
        bulk_insert_attribute_array(klass, attribute_array)
      rescue => e
        puts e.inspect
        puts e.backtrace

        raise e
      ensure
        debug "Removing thread #{Thread.current.__id__}" if @debug
        @lock_threads.synchronize { @threads.delete(Thread.current) }

        debug "Threads count after remove: #{@threads.length}" if @debug
      end
    end
  end
end
debug(str) click to toggle source
# File lib/active-record-transactioner.rb, line 145
def debug(str)
  print "{ActiveRecordTransactioner}: #{str}\n" if @debug
end
parse_and_set_args() click to toggle source
# File lib/active-record-transactioner.rb, line 132
def parse_and_set_args
  @models = {}
  @bulk_creates = {}
  @threads = []
  @count = 0
  @lock = Monitor.new
  @lock_threads = Monitor.new
  @lock_models = {}
  @max_running_threads = @args[:max_running_threads].to_i
  @transaction_size = @args[:transaction_size].to_i
  @debug = @args[:debug]
end
should_flush?() click to toggle source
# File lib/active-record-transactioner.rb, line 221
def should_flush?
  @count >= @transaction_size
end
wait_for_threads() click to toggle source
# File lib/active-record-transactioner.rb, line 149
def wait_for_threads
  loop do
    debug "Running threads: #{@threads.length} / #{@max_running_threads}" if @debug
    if allowed_to_start_new_thread?
      break
    else
      debug "Waiting for threads #{@threads.length} / #{@max_running_threads}" if @debug
    end

    sleep 0.2
  end

  debug "Done waiting." if @debug
end
work_models(models) click to toggle source
# File lib/active-record-transactioner.rb, line 176
def work_models(models)
  debug "Going through models." if @debug
  models.each do |work|
    debug work if @debug

    work_type = work.fetch(:type)
    model = work.fetch(:model)

    if work_type == :save!
      validate = work.key?(:validate) ? work[:validate] : true
      model.save! validate: validate
    elsif work_type == :update_columns || work_type == :destroy!
      model.__send__(work_type, *work.fetch(:method_args))
    else
      raise "Invalid type: '#{work[:type]}'."
    end
  end

  debug "Done working with models." if @debug
end
work_models_through_transaction(klass, models) click to toggle source
# File lib/active-record-transactioner.rb, line 164
def work_models_through_transaction(klass, models)
  debug "Synchronizing model: #{klass.name}"

  @lock_models[klass].synchronize do
    debug "Opening new transaction by using '#{@args[:transaction_method]}'." if @debug

    klass.__send__(@args[:transaction_method]) do
      work_models(models)
    end
  end
end
work_threadded(klass, models) click to toggle source
# File lib/active-record-transactioner.rb, line 197
def work_threadded(klass, models)
  @lock_threads.synchronize do
    @threads << Thread.new do
      begin
        ActiveRecord::Base.connection_pool.with_connection do
          work_models_through_transaction(klass, models)
        end
      rescue => e
        puts e.inspect
        puts e.backtrace

        raise e
      ensure
        debug "Removing thread #{Thread.current.__id__}" if @debug
        @lock_threads.synchronize { @threads.delete(Thread.current) }

        debug "Threads count after remove: #{@threads.length}" if @debug
      end
    end
  end

  debug "Threads-count after started to work: #{@threads.length}"
end