class Asynchronic::Process

Constants

ATTRIBUTE_NAMES
AUTOMATIC_ABORTED_ERROR_MESSAGE
CANCELED_ERROR_MESSAGE
DEAD_ERROR_MESSAGE
STATUSES
TIME_TRACKING_MAP

Attributes

environment[R]
id[R]

Public Class Methods

all(environment) click to toggle source
# File lib/asynchronic/process.rb, line 38
def self.all(environment)
  environment.data_store.keys
                        .select { |k| k.sections.count == 2 && k.match(/created_at$/) }
                        .sort_by { |k| environment.data_store[k] }
                        .reverse
                        .map { |k| Process.new environment, k.remove_last }
end
create(environment, type, params={}) click to toggle source
# File lib/asynchronic/process.rb, line 22
def self.create(environment, type, params={})
  id = params.delete(:id) || SecureRandom.uuid

  Asynchronic.logger.debug('Asynchronic') { "Created process #{type} - #{id} - #{params}" }

  new(environment, id) do
    self.type = type
    self.name = (params.delete(:alias) || type).to_s
    self.queue = params.delete(:queue) || type.queue || parent_queue
    self.dependencies = Array(params.delete(:dependencies)) | Array(params.delete(:dependency)) | infer_dependencies(params)
    self.params = params
    self.data = {}
    pending!
  end
end
new(environment, id, &block) click to toggle source
# File lib/asynchronic/process.rb, line 46
def initialize(environment, id, &block)
  @environment = environment
  @id = DataStore::Key[id]
  instance_eval(&block) if block_given?
end

Public Instance Methods

[](process_name) click to toggle source
# File lib/asynchronic/process.rb, line 106
def [](process_name)
  processes.detect { |p| p.name == process_name.to_s }
end
abort_if_dead() click to toggle source
# File lib/asynchronic/process.rb, line 80
def abort_if_dead
  abort! DEAD_ERROR_MESSAGE if dead?
end
cancel!() click to toggle source
# File lib/asynchronic/process.rb, line 72
def cancel!
  abort! CANCELED_ERROR_MESSAGE
end
dead?() click to toggle source
# File lib/asynchronic/process.rb, line 76
def dead?
  (running? && !connected?) || processes.any?(&:dead?)
end
dependencies() click to toggle source
# File lib/asynchronic/process.rb, line 134
def dependencies
  return [] if parent.nil? || data_store[:dependencies].empty?

  parent_processes = parent.processes.each_with_object({}) do |process, hash|
    hash[process.name] = process
  end

  data_store[:dependencies].map { |d| parent_processes[d.to_s] }
end
destroy() click to toggle source
# File lib/asynchronic/process.rb, line 84
def destroy
  data_store.delete_cascade
end
enqueue() click to toggle source
# File lib/asynchronic/process.rb, line 144
def enqueue
  queued!
  environment.enqueue id, queue
end
execute() click to toggle source
# File lib/asynchronic/process.rb, line 149
def execute
  run
  Asynchronic.retry_execution(self.class, 'wakeup') do
    wakeup
  end
end
finalized?() click to toggle source
# File lib/asynchronic/process.rb, line 68
def finalized?
  completed? || aborted?
end
full_status() click to toggle source
# File lib/asynchronic/process.rb, line 88
def full_status
  processes.each_with_object(name => status) do |process, hash|
    hash.update(process.full_status)
  end
end
get(key) click to toggle source
# File lib/asynchronic/process.rb, line 172
def get(key)
  self.data[key]
end
job() click to toggle source
# File lib/asynchronic/process.rb, line 102
def job
  @job ||= type.new self
end
nest(type, params={}) click to toggle source
# File lib/asynchronic/process.rb, line 168
def nest(type, params={})
  self.class.create environment, type, params.merge(id: id[:processes][processes.count])
end
params() click to toggle source
# File lib/asynchronic/process.rb, line 94
def params
  data_store.scoped(:params).no_lazy.readonly
end
parent() click to toggle source
# File lib/asynchronic/process.rb, line 118
def parent
  Process.new environment, id.remove_last(2) if id.nested?
end
processes() click to toggle source
# File lib/asynchronic/process.rb, line 110
def processes
  data_store.scoped(:processes)
            .keys
            .select { |k| k.sections.count == 2 && k.match(/\|name$/) }
            .sort
            .map { |k| Process.new environment, id[:processes][k.remove_last] }
end
ready?() click to toggle source
# File lib/asynchronic/process.rb, line 64
def ready?
  pending? && dependencies.all?(&:completed?)
end
real_error() click to toggle source
# File lib/asynchronic/process.rb, line 126
def real_error
  return nil if !aborted?

  first_aborted_child = processes.select(&:aborted?).sort_by(&:finalized_at).first

  first_aborted_child ? first_aborted_child.real_error : error.message
end
result() click to toggle source
# File lib/asynchronic/process.rb, line 98
def result
  data_store.lazy[:result]
end
root() click to toggle source
# File lib/asynchronic/process.rb, line 122
def root
  id.nested? ? Process.new(environment, id.sections.first) : self
end
set(key, value) click to toggle source
# File lib/asynchronic/process.rb, line 176
def set(key, value)
  self.data = self.data.merge key => value
end
wakeup() click to toggle source
# File lib/asynchronic/process.rb, line 156
def wakeup
  Asynchronic.logger.info('Asynchronic') { "Wakeup started #{type} (#{id})" }
  if environment.queue_engine.asynchronic?
    data_store.synchronize(id) { wakeup_children }
  else
    wakeup_children
  end
  Asynchronic.logger.info('Asynchronic') { "Wakeup finalized #{type} (#{id})" }

  parent.wakeup if parent && finalized?
end

Private Instance Methods

abort!(exception) click to toggle source
# File lib/asynchronic/process.rb, line 220
def abort!(exception)
  self.error = Error.new exception
  aborted!
end
connected?() click to toggle source
# File lib/asynchronic/process.rb, line 270
def connected?
  connection_name && environment.queue_engine.active_connections.include?(connection_name)
rescue => ex
  Asynchronic.logger.error('Asynchronic') { "#{ex.message}\n#{ex.backtrace.join("\n")}" }
  true
end
data_store() click to toggle source
# File lib/asynchronic/process.rb, line 184
def data_store
  @data_store ||= environment.data_store.scoped id
end
infer_dependencies(params) click to toggle source
# File lib/asynchronic/process.rb, line 261
def infer_dependencies(params)
  params.values.select { |v| v.respond_to?(:proxy?) && v.proxy_class == DataStore::LazyValue }
               .map { |v| Process.new(environment, v.data_store.scope).name }
end
params=(params) click to toggle source
# File lib/asynchronic/process.rb, line 194
def params=(params)
  data_store.scoped(:params).merge params
end
parent_queue() click to toggle source
# File lib/asynchronic/process.rb, line 266
def parent_queue
  parent.queue if parent
end
run() click to toggle source
# File lib/asynchronic/process.rb, line 225
def run
  self.connection_name = Asynchronic.connection_name

  if root.aborted?
    abort! AUTOMATIC_ABORTED_ERROR_MESSAGE
  else
    running!
    self.result = job.call
    waiting!
  end

rescue Exception => ex
  message = "Failed process #{type} (#{id})\n#{ex.class} #{ex.message}\n#{ex.backtrace.join("\n")}"
  Asynchronic.logger.error('Asynchronic') { message }
  Asynchronic.retry_execution(self.class, 'abort') do
    abort! ex
  end
end
status=(status) click to toggle source
# File lib/asynchronic/process.rb, line 198
def status=(status)
  Asynchronic.logger.info('Asynchronic') { "#{status.to_s.capitalize} #{type} (#{id})" }

  data_store[:status] = status
  data_store[TIME_TRACKING_MAP[status]] = Time.now if TIME_TRACKING_MAP.key? status

  environment.notifier.publish id, :status_changed, status
  environment.notifier.publish id, :finalized if finalized?
end
wakeup_children() click to toggle source
# File lib/asynchronic/process.rb, line 244
def wakeup_children
  if waiting?
    children = processes # Cached child processes
    if children.any?(&:aborted?)
      childs_with_errors = children.select(&:error)
      error = childs_with_errors.any? ? "Error caused by #{childs_with_errors.map(&:name).join(', ')}" : nil
      abort! error
    elsif children.all?(&:completed?)
      completed!
    else
      children.each do |p|
        p.enqueue if p.ready?
      end
    end
  end
end