class WorkBatcher

Public Class Methods

new(options = {}) click to toggle source
# File lib/work_batcher.rb, line 27
def initialize(options = {})
  @size_limit   = get_option(options, :size_limit)
  @time_limit   = get_option(options, :time_limit, 5)
  @deduplicate  = get_option(options, :deduplicate)
  @deduplicator = get_option(options, :deduplicator, method(:default_deduplicator))
  @executor     = get_option(options, :executor, Concurrent.global_io_executor)
  @processor    = get_option!(options, :processor)

  @mutex = Mutex.new
  if @deduplicate
    @queue = {}
  else
    @queue = []
  end
  @processed = 0
end

Public Instance Methods

add(work_object) click to toggle source
# File lib/work_batcher.rb, line 54
def add(work_object)
  @mutex.synchronize do
    if @deduplicate
      key = @deduplicator.call(work_object)
      @queue[key] = work_object
    else
      @queue << work_object
    end
    schedule_processing
  end
end
add_multiple(work_objects) click to toggle source
# File lib/work_batcher.rb, line 66
def add_multiple(work_objects)
  return if work_objects.empty?

  @mutex.synchronize do
    if @deduplicate
      work_objects.each do |work_object|
        key = @deduplicator.call(work_object)
        @queue[key] = work_object
      end
    else
      @queue.concat(work_objects)
    end
    schedule_processing
  end
end
inspect_queue() click to toggle source
# File lib/work_batcher.rb, line 94
def inspect_queue
  @mutex.synchronize do
    if @deduplicate
      @queue.values.dup
    else
      @queue.dup
    end
  end
end
shutdown() click to toggle source
# File lib/work_batcher.rb, line 44
def shutdown
  task = @mutex.synchronize do
    @scheduled_processing_task
  end
  if task
    task.reschedule(0)
    task.wait!
  end
end
status() click to toggle source
# File lib/work_batcher.rb, line 82
def status
  result = {}
  @mutex.synchronize do
    if @scheduled_processing_task
      result[:scheduled_processing_time] = @scheduled_processing_time
    end
    result[:queue_count] = @queue.size
    result[:processed_count] = @processed
  end
  result
end

Private Instance Methods

create_scheduled_processing_task(delay) click to toggle source
# File lib/work_batcher.rb, line 120
def create_scheduled_processing_task(delay)
  @scheduled_processing_time = Time.now + delay
  args = [delay, executor: @executor]
  Concurrent::ScheduledTask.execute(*args) do
    handle_uncaught_exception do
      @mutex.synchronize do
        begin
          process_queue
        ensure
          @scheduled_processing_task = nil
          @scheduled_processing_time = nil
        end
      end
    end
  end
end
default_deduplicator(work_object) click to toggle source
# File lib/work_batcher.rb, line 150
def default_deduplicator(work_object)
  work_object
end
get_option(options, key, default_value = nil) click to toggle source
# File lib/work_batcher.rb, line 164
def get_option(options, key, default_value = nil)
  if options.key?(key)
    options[key]
  else
    default_value
  end
end
get_option!(options, key) click to toggle source
# File lib/work_batcher.rb, line 172
def get_option!(options, key)
  if options.key?(key)
    options[key]
  else
    raise ArgumentError, "Option required: #{key}"
  end
end
handle_uncaught_exception() { || ... } click to toggle source
# File lib/work_batcher.rb, line 154
def handle_uncaught_exception
  begin
    yield
  rescue Exception => e
    STDERR.puts(
      "Uncaught exception in WorkBatcher: #{e} (#{e.class})\n" \
      "#{e.backtrace.join("\n")}")
  end
end
process_queue() click to toggle source
# File lib/work_batcher.rb, line 137
def process_queue
  if @deduplicate
    queue_copy = @queue.values
  else
    queue_copy = @queue.dup
  end
  @processed += @queue.size
  @queue.clear
  @executor.post do
    @processor.call(queue_copy)
  end
end
schedule_processing() click to toggle source
# File lib/work_batcher.rb, line 105
def schedule_processing
  if @scheduled_processing_task
    if @size_limit && @queue.size >= @size_limit
      @scheduled_processing_time = Time.now
      @scheduled_processing_task.reschedule(0)
    end
  else
    if @size_limit && @queue.size >= @size_limit
      @scheduled_processing_task = create_scheduled_processing_task(0)
    else
      @scheduled_processing_task = create_scheduled_processing_task(@time_limit)
    end
  end
end