class WorkBatcher
Public Class Methods
new(options = {})
click to toggle source
# File lib/work_batcher.rb, line 27 def initialize(options = {}) @size_limit = get_option(options, :size_limit) @time_limit = get_option(options, :time_limit, 5) @deduplicate = get_option(options, :deduplicate) @deduplicator = get_option(options, :deduplicator, method(:default_deduplicator)) @executor = get_option(options, :executor, Concurrent.global_io_executor) @processor = get_option!(options, :processor) @mutex = Mutex.new if @deduplicate @queue = {} else @queue = [] end @processed = 0 end
Public Instance Methods
add(work_object)
click to toggle source
# File lib/work_batcher.rb, line 54 def add(work_object) @mutex.synchronize do if @deduplicate key = @deduplicator.call(work_object) @queue[key] = work_object else @queue << work_object end schedule_processing end end
add_multiple(work_objects)
click to toggle source
# File lib/work_batcher.rb, line 66 def add_multiple(work_objects) return if work_objects.empty? @mutex.synchronize do if @deduplicate work_objects.each do |work_object| key = @deduplicator.call(work_object) @queue[key] = work_object end else @queue.concat(work_objects) end schedule_processing end end
inspect_queue()
click to toggle source
# File lib/work_batcher.rb, line 94 def inspect_queue @mutex.synchronize do if @deduplicate @queue.values.dup else @queue.dup end end end
shutdown()
click to toggle source
# File lib/work_batcher.rb, line 44 def shutdown task = @mutex.synchronize do @scheduled_processing_task end if task task.reschedule(0) task.wait! end end
status()
click to toggle source
# File lib/work_batcher.rb, line 82 def status result = {} @mutex.synchronize do if @scheduled_processing_task result[:scheduled_processing_time] = @scheduled_processing_time end result[:queue_count] = @queue.size result[:processed_count] = @processed end result end
Private Instance Methods
create_scheduled_processing_task(delay)
click to toggle source
# File lib/work_batcher.rb, line 120 def create_scheduled_processing_task(delay) @scheduled_processing_time = Time.now + delay args = [delay, executor: @executor] Concurrent::ScheduledTask.execute(*args) do handle_uncaught_exception do @mutex.synchronize do begin process_queue ensure @scheduled_processing_task = nil @scheduled_processing_time = nil end end end end end
default_deduplicator(work_object)
click to toggle source
# File lib/work_batcher.rb, line 150 def default_deduplicator(work_object) work_object end
get_option(options, key, default_value = nil)
click to toggle source
# File lib/work_batcher.rb, line 164 def get_option(options, key, default_value = nil) if options.key?(key) options[key] else default_value end end
get_option!(options, key)
click to toggle source
# File lib/work_batcher.rb, line 172 def get_option!(options, key) if options.key?(key) options[key] else raise ArgumentError, "Option required: #{key}" end end
handle_uncaught_exception() { || ... }
click to toggle source
# File lib/work_batcher.rb, line 154 def handle_uncaught_exception begin yield rescue Exception => e STDERR.puts( "Uncaught exception in WorkBatcher: #{e} (#{e.class})\n" \ "#{e.backtrace.join("\n")}") end end
process_queue()
click to toggle source
# File lib/work_batcher.rb, line 137 def process_queue if @deduplicate queue_copy = @queue.values else queue_copy = @queue.dup end @processed += @queue.size @queue.clear @executor.post do @processor.call(queue_copy) end end
schedule_processing()
click to toggle source
# File lib/work_batcher.rb, line 105 def schedule_processing if @scheduled_processing_task if @size_limit && @queue.size >= @size_limit @scheduled_processing_time = Time.now @scheduled_processing_task.reschedule(0) end else if @size_limit && @queue.size >= @size_limit @scheduled_processing_task = create_scheduled_processing_task(0) else @scheduled_processing_task = create_scheduled_processing_task(@time_limit) end end end