module Stackdriver::Core::AsyncActor

# AsyncActor

@private An module that provides a class asynchronous capability when

included. It can create a child thread to run background jobs, and
help make sure the child thread terminates properly when process
is interrupted.

To use AsyncActor, the classes that are including this module need to define a run_backgrounder method that does the background job. The classes can then control the child thread job through instance methods like async_start, async_stop, etc.

@example

class Foo
  include AsyncActor

  def initialize
    super()
  end

  def run_backgrounder
    # run async job
  end
end

foo = Foo.new
foo.async_start

Constants

CLEANUP_TIMEOUT
WAIT_INTERVAL

Attributes

async_state[R]

@private The async actor state

Public Class Methods

new() click to toggle source

@private Constructor to initialize MonitorMixin

Calls superclass method
# File lib/stackdriver/core/async_actor.rb, line 273
def initialize
  super()
  @startup_lock = Mutex.new
  @cleanup_options = {
    wait_interval: WAIT_INTERVAL,
    timeout:       CLEANUP_TIMEOUT,
    force:         true
  }
end
register_for_cleanup(actor) click to toggle source

@private Cleanup this async job when process exists

# File lib/stackdriver/core/async_actor.rb, line 237
def self.register_for_cleanup actor
  @exit_lock.synchronize do
    unless @cleanup_list
      @cleanup_list = []
      at_exit { run_cleanup }
    end
    @cleanup_list.push actor
  end
end
unregister_for_cleanup(actor) click to toggle source

@private Take this async job off exit cleanup list

# File lib/stackdriver/core/async_actor.rb, line 250
def self.unregister_for_cleanup actor
  @exit_lock.synchronize do
    @cleanup_list&.delete actor
  end
end

Private Class Methods

run_cleanup() click to toggle source

@private Cleanup the async job

# File lib/stackdriver/core/async_actor.rb, line 259
def self.run_cleanup
  @exit_lock.synchronize do
    return unless @cleanup_list
    @cleanup_list.shift.async_stop! until @cleanup_list.empty?
  end
end

Public Instance Methods

async_resume() click to toggle source

Set the state to :running if the current state is :suspended.

@return [Boolean] True if the state has been set to :running.

Otherwise return false.
# File lib/stackdriver/core/async_actor.rb, line 112
def async_resume
  ensure_thread
  synchronize do
    if async_state == :suspended
      @async_state = :running
      async_state_change
      true
    else
      false
    end
  end
end
async_running?() click to toggle source

Check if async job is running

@return [Boolean] True if state equals :running. Otherwise false.

# File lib/stackdriver/core/async_actor.rb, line 129
def async_running?
  synchronize do
    async_state == :running
  end
end
async_start() click to toggle source

Starts the child thread and asynchronous job

# File lib/stackdriver/core/async_actor.rb, line 64
def async_start
  ensure_thread
end
async_stop() click to toggle source

Nicely ask the child thread to stop by setting the state to :stopping if it's not stopped already.

@return [Boolean] False if child thread is already stopped. Otherwise

true.
# File lib/stackdriver/core/async_actor.rb, line 74
def async_stop
  ensure_thread
  synchronize do
    if async_state == :stopped
      false
    else
      @async_state = :stopping
      async_state_change
      true
    end
  end
end
async_stop!() click to toggle source

Ask async job to stop. Then forcefully kill thread if it doesn't stop after timeout if needed.

@return [Symbol] :stopped if async job already stopped. :waited if

async job terminates within timeout range. :timeout if async job
doesn't terminate after timeout. :forced if thread is killed by
force after timeout.
# File lib/stackdriver/core/async_actor.rb, line 184
def async_stop!
  ensure_thread

  timeout = @cleanup_options[:timeout]
  force = @cleanup_options[:force]

  return :stopped unless async_stop

  return :waited if timeout.to_f.positive? && wait_until_async_stopped(timeout)

  return :timeout unless force

  @thread.kill
  @thread.join
  :forced
end
async_stopped?() click to toggle source

Check if async job has stopped

@return [Boolean] True if state equals :stopped. Otherwise false.

# File lib/stackdriver/core/async_actor.rb, line 160
def async_stopped?
  synchronize do
    async_state == :stopped
  end
end
async_stopping?() click to toggle source

Check if async job is stopping

@return [Boolean] True if state equals :stopping. Otherwise false.

# File lib/stackdriver/core/async_actor.rb, line 170
def async_stopping?
  synchronize do
    async_state == :stopping
  end
end
async_suspend() click to toggle source

Set the state to :suspend if the current state is :running.

@return [Boolean] Returns true if the state has been set to

:suspended. Otherwise return false.
# File lib/stackdriver/core/async_actor.rb, line 93
def async_suspend
  ensure_thread
  synchronize do
    if async_state == :running
      @async_state = :suspended
      async_state_change
      true
    else
      false
    end
  end
end
async_suspended?() click to toggle source

Check if async job is suspended

@return [Boolean] True if state equals :suspended. Otherwise false.

# File lib/stackdriver/core/async_actor.rb, line 139
def async_suspended?
  synchronize do
    async_state == :suspended
  end
end
async_working?() click to toggle source

Check if async job is working.

@return [Boolean] True if state is either :running or :suspended.

Otherwise false.
# File lib/stackdriver/core/async_actor.rb, line 150
def async_working?
  synchronize do
    async_state == :suspended || async_state == :running
  end
end
run_backgrounder() click to toggle source

Abstract method that the inheriting classes should implement.

This should be the main task job that will be run asynchronously and repeatly.

# File lib/stackdriver/core/async_actor.rb, line 230
def run_backgrounder
  raise "#{self.class} class should override #run_backgrounder method"
end
wait_until_async_stopped(timeout = nil) click to toggle source

Block current thread until the async job is fully stopped.

@param [Integer] timeout If given, lift the blocking when the time has

exceeded the timeout. If nil, block indefinitely.

@return [Boolean] True if async job is fully stopped. False if timeout.

# File lib/stackdriver/core/async_actor.rb, line 209
def wait_until_async_stopped timeout = nil
  ensure_thread
  deadline = timeout ? ::Time.new.to_f + timeout : nil
  synchronize do
    until async_state == :stopped
      cur_time = ::Time.new.to_f
      return false if deadline && cur_time >= deadline
      max_interval = @cleanup_options[:wait_interval]
      interval = deadline ? deadline - cur_time : max_interval
      interval = max_interval if interval > max_interval
      @lock_cond.wait interval
    end
  end
  true
end

Private Instance Methods

async_run_job() click to toggle source

@private Wrapper method for running the async job. It runs a loop while the condition allows, and it calls the run_backgrounder method. This method also ensures the state variable gets set to :stopped when it exits.

# File lib/stackdriver/core/async_actor.rb, line 288
def async_run_job
  until async_stopped?
    run_backgrounder

    return if async_stopping? && backgrounder_stoppable?
  end
ensure
  @async_state = :stopped
  async_state_change
end
async_state_change() click to toggle source

@private Handler when the async actor's state changes. Call the `on_async_state_change` callback function if it's defined.

# File lib/stackdriver/core/async_actor.rb, line 342
def async_state_change
  on_async_state_change

  synchronize do
    @lock_cond.broadcast
  end
end
backgrounder_stoppable?() click to toggle source

@private Default backgrounder stop condition when asked to be stopped gracefully. Called from async_run_job when async actor state changes to :stopping

# File lib/stackdriver/core/async_actor.rb, line 335
def backgrounder_stoppable?
  true
end
ensure_thread() click to toggle source

@private Ensures the child thread is started and kick off the job

to run async_run_job. Also make calls register_for_cleanup on the
async job to make sure it exits properly.
# File lib/stackdriver/core/async_actor.rb, line 303
def ensure_thread
  raise "async_actor not initialized" if @startup_lock.nil?
  @startup_lock.synchronize do
    if (@thread.nil? || !@thread.alive?) && @async_state != :stopped
      @lock_cond = new_cond
      AsyncActor.register_for_cleanup self

      @async_state = :running
      async_state_change

      @thread = Thread.new do
        async_run_job
        AsyncActor.unregister_for_cleanup self
      end
    end
  end
end
on_async_state_change() click to toggle source

@private Default abstract definition of this function that's a no-op. The extending classes can override this method to handle state changing logic.

# File lib/stackdriver/core/async_actor.rb, line 354
def on_async_state_change
end
set_cleanup_options(**kwargs) click to toggle source

@private Set cleanup options.

@param [Hash] kwargs Hash of cleanup options. `:timeout` is the cleanup

wait timeout. `:wait_interval` is the cleanup wait interval. `:force`
for forcefully terminate actor when all other options fail.
# File lib/stackdriver/core/async_actor.rb, line 327
def set_cleanup_options **kwargs
  @cleanup_options.merge! kwargs
end