class Asynchronic::Process
Constants
- ATTRIBUTE_NAMES
- AUTOMATIC_ABORTED_ERROR_MESSAGE
- CANCELED_ERROR_MESSAGE
- DEAD_ERROR_MESSAGE
- STATUSES
- TIME_TRACKING_MAP
Attributes
environment[R]
id[R]
Public Class Methods
all(environment)
click to toggle source
# File lib/asynchronic/process.rb, line 38 def self.all(environment) environment.data_store.keys .select { |k| k.sections.count == 2 && k.match(/created_at$/) } .sort_by { |k| environment.data_store[k] } .reverse .map { |k| Process.new environment, k.remove_last } end
create(environment, type, params={})
click to toggle source
# File lib/asynchronic/process.rb, line 22 def self.create(environment, type, params={}) id = params.delete(:id) || SecureRandom.uuid Asynchronic.logger.debug('Asynchronic') { "Created process #{type} - #{id} - #{params}" } new(environment, id) do self.type = type self.name = (params.delete(:alias) || type).to_s self.queue = params.delete(:queue) || type.queue || parent_queue self.dependencies = Array(params.delete(:dependencies)) | Array(params.delete(:dependency)) | infer_dependencies(params) self.params = params self.data = {} pending! end end
new(environment, id, &block)
click to toggle source
# File lib/asynchronic/process.rb, line 46 def initialize(environment, id, &block) @environment = environment @id = DataStore::Key[id] instance_eval(&block) if block_given? end
Public Instance Methods
[](process_name)
click to toggle source
# File lib/asynchronic/process.rb, line 106 def [](process_name) processes.detect { |p| p.name == process_name.to_s } end
abort_if_dead()
click to toggle source
# File lib/asynchronic/process.rb, line 80 def abort_if_dead abort! DEAD_ERROR_MESSAGE if dead? end
cancel!()
click to toggle source
# File lib/asynchronic/process.rb, line 72 def cancel! abort! CANCELED_ERROR_MESSAGE end
dead?()
click to toggle source
# File lib/asynchronic/process.rb, line 76 def dead? (running? && !connected?) || processes.any?(&:dead?) end
dependencies()
click to toggle source
# File lib/asynchronic/process.rb, line 134 def dependencies return [] if parent.nil? || data_store[:dependencies].empty? parent_processes = parent.processes.each_with_object({}) do |process, hash| hash[process.name] = process end data_store[:dependencies].map { |d| parent_processes[d.to_s] } end
destroy()
click to toggle source
# File lib/asynchronic/process.rb, line 84 def destroy data_store.delete_cascade end
enqueue()
click to toggle source
# File lib/asynchronic/process.rb, line 144 def enqueue queued! environment.enqueue id, queue end
execute()
click to toggle source
# File lib/asynchronic/process.rb, line 149 def execute run Asynchronic.retry_execution(self.class, 'wakeup') do wakeup end end
finalized?()
click to toggle source
# File lib/asynchronic/process.rb, line 68 def finalized? completed? || aborted? end
full_status()
click to toggle source
# File lib/asynchronic/process.rb, line 88 def full_status processes.each_with_object(name => status) do |process, hash| hash.update(process.full_status) end end
get(key)
click to toggle source
# File lib/asynchronic/process.rb, line 172 def get(key) self.data[key] end
job()
click to toggle source
# File lib/asynchronic/process.rb, line 102 def job @job ||= type.new self end
nest(type, params={})
click to toggle source
# File lib/asynchronic/process.rb, line 168 def nest(type, params={}) self.class.create environment, type, params.merge(id: id[:processes][processes.count]) end
params()
click to toggle source
# File lib/asynchronic/process.rb, line 94 def params data_store.scoped(:params).no_lazy.readonly end
parent()
click to toggle source
# File lib/asynchronic/process.rb, line 118 def parent Process.new environment, id.remove_last(2) if id.nested? end
processes()
click to toggle source
# File lib/asynchronic/process.rb, line 110 def processes data_store.scoped(:processes) .keys .select { |k| k.sections.count == 2 && k.match(/\|name$/) } .sort .map { |k| Process.new environment, id[:processes][k.remove_last] } end
ready?()
click to toggle source
# File lib/asynchronic/process.rb, line 64 def ready? pending? && dependencies.all?(&:completed?) end
real_error()
click to toggle source
# File lib/asynchronic/process.rb, line 126 def real_error return nil if !aborted? first_aborted_child = processes.select(&:aborted?).sort_by(&:finalized_at).first first_aborted_child ? first_aborted_child.real_error : error.message end
result()
click to toggle source
# File lib/asynchronic/process.rb, line 98 def result data_store.lazy[:result] end
root()
click to toggle source
# File lib/asynchronic/process.rb, line 122 def root id.nested? ? Process.new(environment, id.sections.first) : self end
set(key, value)
click to toggle source
# File lib/asynchronic/process.rb, line 176 def set(key, value) self.data = self.data.merge key => value end
wakeup()
click to toggle source
# File lib/asynchronic/process.rb, line 156 def wakeup Asynchronic.logger.info('Asynchronic') { "Wakeup started #{type} (#{id})" } if environment.queue_engine.asynchronic? data_store.synchronize(id) { wakeup_children } else wakeup_children end Asynchronic.logger.info('Asynchronic') { "Wakeup finalized #{type} (#{id})" } parent.wakeup if parent && finalized? end
Private Instance Methods
abort!(exception)
click to toggle source
# File lib/asynchronic/process.rb, line 220 def abort!(exception) self.error = Error.new exception aborted! end
connected?()
click to toggle source
# File lib/asynchronic/process.rb, line 270 def connected? connection_name && environment.queue_engine.active_connections.include?(connection_name) rescue => ex Asynchronic.logger.error('Asynchronic') { "#{ex.message}\n#{ex.backtrace.join("\n")}" } true end
data_store()
click to toggle source
# File lib/asynchronic/process.rb, line 184 def data_store @data_store ||= environment.data_store.scoped id end
infer_dependencies(params)
click to toggle source
# File lib/asynchronic/process.rb, line 261 def infer_dependencies(params) params.values.select { |v| v.respond_to?(:proxy?) && v.proxy_class == DataStore::LazyValue } .map { |v| Process.new(environment, v.data_store.scope).name } end
params=(params)
click to toggle source
# File lib/asynchronic/process.rb, line 194 def params=(params) data_store.scoped(:params).merge params end
parent_queue()
click to toggle source
# File lib/asynchronic/process.rb, line 266 def parent_queue parent.queue if parent end
run()
click to toggle source
# File lib/asynchronic/process.rb, line 225 def run self.connection_name = Asynchronic.connection_name if root.aborted? abort! AUTOMATIC_ABORTED_ERROR_MESSAGE else running! self.result = job.call waiting! end rescue Exception => ex message = "Failed process #{type} (#{id})\n#{ex.class} #{ex.message}\n#{ex.backtrace.join("\n")}" Asynchronic.logger.error('Asynchronic') { message } Asynchronic.retry_execution(self.class, 'abort') do abort! ex end end
status=(status)
click to toggle source
# File lib/asynchronic/process.rb, line 198 def status=(status) Asynchronic.logger.info('Asynchronic') { "#{status.to_s.capitalize} #{type} (#{id})" } data_store[:status] = status data_store[TIME_TRACKING_MAP[status]] = Time.now if TIME_TRACKING_MAP.key? status environment.notifier.publish id, :status_changed, status environment.notifier.publish id, :finalized if finalized? end
wakeup_children()
click to toggle source
# File lib/asynchronic/process.rb, line 244 def wakeup_children if waiting? children = processes # Cached child processes if children.any?(&:aborted?) childs_with_errors = children.select(&:error) error = childs_with_errors.any? ? "Error caused by #{childs_with_errors.map(&:name).join(', ')}" : nil abort! error elsif children.all?(&:completed?) completed! else children.each do |p| p.enqueue if p.ready? end end end end