class ActiveRecordTransactioner
Constants
- ALLOWED_ARGS
- DEFAULT_ARGS
- EMPTY_ARGS
Public Class Methods
new(args = {}) { |self| ... }
click to toggle source
# File lib/active-record-transactioner.rb, line 18 def initialize(args = {}) args.each_key { |key| raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key) } @args = DEFAULT_ARGS.merge(args) parse_and_set_args return unless block_given? begin yield self ensure flush join if threadded? end end
Public Instance Methods
bulk_create!(model)
click to toggle source
# File lib/active-record-transactioner.rb, line 40 def bulk_create!(model) attributes = model.attributes attributes.delete("id") attributes.delete("created_at") attributes.delete("updated_at") klass = model.class @bulk_creates[klass] ||= [] @bulk_creates[klass] << attributes @count += 1 end
destroy!(model)
click to toggle source
# File lib/active-record-transactioner.rb, line 61 def destroy!(model) queue(model, type: :destroy!) end
flush()
click to toggle source
Flushes the specified method on all the queued models in a thread for each type of model.
# File lib/active-record-transactioner.rb, line 91 def flush wait_for_threads if threadded? @lock.synchronize do @bulk_creates.each do |klass, attribute_array| if threadded? bulk_insert_attribute_array_threadded(klass, attribute_array) else bulk_insert_attribute_array(klass, attribute_array) end end @models.each do |klass, models| next if models.empty? @models[klass] = [] @count -= models.length if threadded? work_threadded(klass, models) else work_models_through_transaction(klass, models) end end end end
join()
click to toggle source
Waits for any remaining running threads.
# File lib/active-record-transactioner.rb, line 119 def join threads_to_join = @lock_threads.synchronize { @threads.clone } debug "Threads to join: #{threads_to_join}" if @debug threads_to_join.each(&:join) end
queue(model, args = {})
click to toggle source
Adds another model to the queue and calls ‘flush’ if it is over the limit.
# File lib/active-record-transactioner.rb, line 66 def queue(model, args = {}) args[:type] ||= :save! @lock.synchronize do klass = model.class validate = args.key?(:validate) ? args[:validate] : true @lock_models[klass] ||= Monitor.new @models[klass] ||= [] @models[klass] << { model: model, type: args.fetch(:type), validate: validate, method_args: args[:method_args] || EMPTY_ARGS } @count += 1 end flush if should_flush? end
save!(model)
click to toggle source
Adds another model to the queue and calls ‘flush’ if it is over the limit.
# File lib/active-record-transactioner.rb, line 35 def save!(model) raise ActiveRecord::RecordInvalid, model unless model.valid? queue(model, type: :save!, validate: false) end
threadded?()
click to toggle source
# File lib/active-record-transactioner.rb, line 126 def threadded? @args[:threadded] end
update_column(model, column_name, new_value)
click to toggle source
# File lib/active-record-transactioner.rb, line 57 def update_column(model, column_name, new_value) update_columns(model, column_name => new_value) end
update_columns(model, updates)
click to toggle source
# File lib/active-record-transactioner.rb, line 53 def update_columns(model, updates) queue(model, type: :update_columns, validate: false, method_args: [updates]) end
Private Instance Methods
allowed_to_start_new_thread?()
click to toggle source
# File lib/active-record-transactioner.rb, line 225 def allowed_to_start_new_thread? @lock_threads.synchronize { return @threads.length < @max_running_threads } end
bulk_insert_attribute_array(klass, attribute_array)
click to toggle source
# File lib/active-record-transactioner.rb, line 249 def bulk_insert_attribute_array(klass, attribute_array) sql = "INSERT INTO `#{klass.table_name}` (" first = true attribute_array.first.each_key do |key| if first first = false else sql << ", " end sql << "`#{key}`" end sql << ") VALUES (" first_insert = true attribute_array.each do |attributes| if first_insert first_insert = false else sql << "), (" end first_value = true attributes.each_value do |value| if first_value first_value = false else sql << ", " end sql << klass.connection.quote(value) end end sql << ")" klass.connection.execute(sql) @lock.synchronize do @count -= attribute_array.length end end
bulk_insert_attribute_array_threadded(klass, attribute_array)
click to toggle source
# File lib/active-record-transactioner.rb, line 229 def bulk_insert_attribute_array_threadded(klass, attribute_array) @lock_threads.synchronize do @threads << Thread.new do begin bulk_insert_attribute_array(klass, attribute_array) rescue => e puts e.inspect puts e.backtrace raise e ensure debug "Removing thread #{Thread.current.__id__}" if @debug @lock_threads.synchronize { @threads.delete(Thread.current) } debug "Threads count after remove: #{@threads.length}" if @debug end end end end
debug(str)
click to toggle source
# File lib/active-record-transactioner.rb, line 145 def debug(str) print "{ActiveRecordTransactioner}: #{str}\n" if @debug end
parse_and_set_args()
click to toggle source
# File lib/active-record-transactioner.rb, line 132 def parse_and_set_args @models = {} @bulk_creates = {} @threads = [] @count = 0 @lock = Monitor.new @lock_threads = Monitor.new @lock_models = {} @max_running_threads = @args[:max_running_threads].to_i @transaction_size = @args[:transaction_size].to_i @debug = @args[:debug] end
should_flush?()
click to toggle source
# File lib/active-record-transactioner.rb, line 221 def should_flush? @count >= @transaction_size end
wait_for_threads()
click to toggle source
# File lib/active-record-transactioner.rb, line 149 def wait_for_threads loop do debug "Running threads: #{@threads.length} / #{@max_running_threads}" if @debug if allowed_to_start_new_thread? break else debug "Waiting for threads #{@threads.length} / #{@max_running_threads}" if @debug end sleep 0.2 end debug "Done waiting." if @debug end
work_models(models)
click to toggle source
# File lib/active-record-transactioner.rb, line 176 def work_models(models) debug "Going through models." if @debug models.each do |work| debug work if @debug work_type = work.fetch(:type) model = work.fetch(:model) if work_type == :save! validate = work.key?(:validate) ? work[:validate] : true model.save! validate: validate elsif work_type == :update_columns || work_type == :destroy! model.__send__(work_type, *work.fetch(:method_args)) else raise "Invalid type: '#{work[:type]}'." end end debug "Done working with models." if @debug end
work_models_through_transaction(klass, models)
click to toggle source
# File lib/active-record-transactioner.rb, line 164 def work_models_through_transaction(klass, models) debug "Synchronizing model: #{klass.name}" @lock_models[klass].synchronize do debug "Opening new transaction by using '#{@args[:transaction_method]}'." if @debug klass.__send__(@args[:transaction_method]) do work_models(models) end end end
work_threadded(klass, models)
click to toggle source
# File lib/active-record-transactioner.rb, line 197 def work_threadded(klass, models) @lock_threads.synchronize do @threads << Thread.new do begin ActiveRecord::Base.connection_pool.with_connection do work_models_through_transaction(klass, models) end rescue => e puts e.inspect puts e.backtrace raise e ensure debug "Removing thread #{Thread.current.__id__}" if @debug @lock_threads.synchronize { @threads.delete(Thread.current) } debug "Threads count after remove: #{@threads.length}" if @debug end end end debug "Threads-count after started to work: #{@threads.length}" end