class Hawkei::Processor::Async

Public Class Methods

new() click to toggle source
# File lib/hawkei/processor/async.rb, line 5
def initialize
  @queue = Queue.new
  @state_worker = Concurrent::AtomicBoolean.new(true)
  @worker = Worker.new(@queue, @state_worker)

  at_exit do
    shutdown_worker if worker_running?
  end
end

Public Instance Methods

enqueue(attributes) click to toggle source
# File lib/hawkei/processor/async.rb, line 15
def enqueue(attributes)
  ensure_worker_running

  @queue << attributes

  true
end

Private Instance Methods

ensure_worker_running() click to toggle source
# File lib/hawkei/processor/async.rb, line 38
def ensure_worker_running
  return if worker_running?

  @worker_thread = Concurrent::Future.execute { @worker.run }
end
executor() click to toggle source
# File lib/hawkei/processor/async.rb, line 34
def executor
  @executor ||= Concurrent.global_io_executor
end
shutdown_worker() click to toggle source
# File lib/hawkei/processor/async.rb, line 25
def shutdown_worker
  @state_worker.make_false
  @queue << Hawkei::Processor::Worker::SHUTDOWN_MESSAGE
  @worker_thread.wait

  executor.shutdown
  executor.wait_for_termination
end
worker_running?() click to toggle source
# File lib/hawkei/processor/async.rb, line 44
def worker_running?
  @worker_thread && @worker_thread.incomplete?
end