class Rworkflow::Flow
Constants
- REDIS_NS
- STATES_FAILED
- STATES_TERMINAL
- STATE_FAILED
- STATE_SUCCESSFUL
- WORKFLOW_REGISTRY
Attributes
id[RW]
lifecycle[R]
Public Class Methods
all(options = {})
click to toggle source
# File lib/rworkflow/flow.rb, line 393 def all(options = {}) return registry.all(options.reverse_merge(parent_class: self)).map { |id| load(id) } end
cleanup(id)
click to toggle source
# File lib/rworkflow/flow.rb, line 380 def cleanup(id) workflow = new(id) workflow.cleanup end
create(lifecycle, name = '', options = {})
click to toggle source
# File lib/rworkflow/flow.rb, line 359 def create(lifecycle, name = '', options = {}) id = generate_id(name) workflow = new(id) workflow.name = name workflow.lifecycle = lifecycle workflow.set(:created_at, Time.now.to_i) workflow.set(:public, options.fetch(:public, false)) workflow.set(:logging, options.fetch(:logging, true)) register(workflow) return workflow end
failure?(state)
click to toggle source
# File lib/rworkflow/flow.rb, line 436 def failure?(state) return self::STATES_FAILED.include?(state) end
get_private_workflows(options = {})
click to toggle source
# File lib/rworkflow/flow.rb, line 389 def get_private_workflows(options = {}) return registry.private_flows(options.reverse_merge(parent_class: self)).map { |id| load(id) } end
get_public_workflows(options = {})
click to toggle source
# File lib/rworkflow/flow.rb, line 385 def get_public_workflows(options = {}) return registry.public_flows(options.reverse_merge(parent_class: self)).map { |id| load(id) } end
load(id, klass = nil)
click to toggle source
# File lib/rworkflow/flow.rb, line 397 def load(id, klass = nil) workflow = nil klass = read_flow_class(id) if klass.nil? workflow = klass.new(id) if klass.respond_to?(:new) return workflow end
new(id)
click to toggle source
# File lib/rworkflow/flow.rb, line 14 def initialize(id) @id = id @redis_key = "#{REDIS_NS}:#{id}" @storage = RedisRds::Hash.new(@redis_key) @flow_data = RedisRds::Hash.new("#{@redis_key}__data") @processing = RedisRds::Hash.new("#{@redis_key}__processing") load_lifecycle end
read_flow_class(id)
click to toggle source
# File lib/rworkflow/flow.rb, line 405 def read_flow_class(id) klass = nil raw_class = id.split('__').first if !raw_class.nil? klass = begin raw_class.constantize rescue NameError => _ Rails.logger.warn("Unknown flow class for workflow id #{id}") nil end end return klass end
register(workflow)
click to toggle source
# File lib/rworkflow/flow.rb, line 424 def register(workflow) registry.add(workflow) end
registered?(workflow)
click to toggle source
# File lib/rworkflow/flow.rb, line 420 def registered?(workflow) return registry.include?(workflow) end
registry()
click to toggle source
# File lib/rworkflow/flow.rb, line 440 def registry return @registry ||= begin FlowRegistry.new(Rworkflow::VERSION.to_s) end end
serializer()
click to toggle source
# File lib/rworkflow/flow.rb, line 446 def serializer YAML end
terminal?(state)
click to toggle source
# File lib/rworkflow/flow.rb, line 432 def terminal?(state) return self::STATES_TERMINAL.include?(state) end
unregister(workflow)
click to toggle source
# File lib/rworkflow/flow.rb, line 428 def unregister(workflow) registry.remove(workflow) end
Private Class Methods
generate_id(workflow_name)
click to toggle source
# File lib/rworkflow/flow.rb, line 373 def generate_id(workflow_name) now = Time.now.to_f random = Random.new(now) return "#{name}__#{workflow_name}__#{(Time.now.to_f * 1000).to_i}__#{random.rand(now).to_i}" end
Public Instance Methods
cleaned_up?()
click to toggle source
# File lib/rworkflow/flow.rb, line 204 def cleaned_up? return states_list.all? { |name| !get_list(name).exists? } end
cleanup()
click to toggle source
# File lib/rworkflow/flow.rb, line 296 def cleanup return if Rails.env.test? states_cleanup logger.delete @processing.delete self.class.unregister(self) @flow_data.delete @storage.delete end
count(state)
click to toggle source
# File lib/rworkflow/flow.rb, line 89 def count(state) return get_list(state).size end
counters()
click to toggle source
# File lib/rworkflow/flow.rb, line 93 def counters the_counters = @storage.get(:counters) if !the_counters.nil? the_counters = begin self.class.serializer.load(the_counters) rescue => e Rails.logger.error("Error loading stored flow counters: #{e.message}") nil end end return the_counters || counters! end
created_at()
click to toggle source
# File lib/rworkflow/flow.rb, line 57 def created_at return @created_at ||= begin Time.zone.at(get(:created_at, 0)) end end
expected_duration()
click to toggle source
# File lib/rworkflow/flow.rb, line 81 def expected_duration return Float::INFINITY end
failed?()
click to toggle source
# File lib/rworkflow/flow.rb, line 349 def failed? return false if !finished? return total_objects_failed > 0 end
fetch(fetcher_id, state_name) { |objects| ... }
click to toggle source
# File lib/rworkflow/flow.rb, line 127 def fetch(fetcher_id, state_name) @processing.set(fetcher_id, 1) list = get_state_list(state_name) unless list.nil? failed = [] cardinality = @lifecycle.states[state_name].cardinality cardinality = get(:start_count).to_i if cardinality == Lifecycle::CARDINALITY_ALL_STARTED force_list_complete = @lifecycle.states[state_name].policy == State::STATE_POLICY_WAIT raw_objects = list.lpop(cardinality, force_list_complete) unless raw_objects.empty? objects = raw_objects.map do |raw_object| begin self.class.serializer.load(raw_object) rescue StandardError => _ failed << raw_object nil end end.compact @processing.set(fetcher_id, objects.size) unless failed.empty? push(failed, STATE_FAILED) Rails.logger.error("Failed to parse #{failed.size} in workflow #{@id} for fetcher id #{fetcher_id} at state #{state_name}") end yield(objects) if block_given? end end ensure @processing.remove(fetcher_id) terminate if finished? end
finish_time()
click to toggle source
# File lib/rworkflow/flow.rb, line 77 def finish_time return Time.zone.at(get(:finish_time, 0)) end
finished?()
click to toggle source
# File lib/rworkflow/flow.rb, line 41 def finished? return false unless started? total = self.counters.reduce(0) do |sum, pair| self.class.terminal?(pair[0]) ? sum : (sum + pair[1].to_i) end return total == 0 end
get(key, default = nil)
click to toggle source
# File lib/rworkflow/flow.rb, line 268 def get(key, default = nil) value = @flow_data.get(key) value = value.nil? ? default : self.class.serializer.load(value) return value end
get_state_cardinality(state_name)
click to toggle source
# File lib/rworkflow/flow.rb, line 258 def get_state_cardinality(state_name) cardinality = @lifecycle.states[state_name].cardinality cardinality = self.get(:start_count).to_i if cardinality == Rworkflow::Lifecycle::CARDINALITY_ALL_STARTED return cardinality end
incr(key, value = 1)
click to toggle source
# File lib/rworkflow/flow.rb, line 275 def incr(key, value = 1) return @flow_data.incrby(key, value) end
lifecycle=(new_lifecycle)
click to toggle source
# File lib/rworkflow/flow.rb, line 36 def lifecycle=(new_lifecycle) @lifecycle = new_lifecycle @storage.set(:lifecycle, self.class.serializer.dump(@lifecycle.serialize)) end
list_objects(state_name, limit = -1)
click to toggle source
# File lib/rworkflow/flow.rb, line 160 def list_objects(state_name, limit = -1) list = get_list(state_name) return list.get(0, limit).map { |object| self.class.serializer.load(object) } end
log(from_state, transition, num_objects)
click to toggle source
# File lib/rworkflow/flow.rb, line 234 def log(from_state, transition, num_objects) logger.incrby("#{from_state}__#{transition}", num_objects.to_i) if logging? end
logger()
click to toggle source
# File lib/rworkflow/flow.rb, line 238 def logger return @logger ||= begin RedisRds::Hash.new("#{@redis_key}__logger") end end
logging?()
click to toggle source
# File lib/rworkflow/flow.rb, line 230 def logging? return get(:logging, false) end
logs()
click to toggle source
# File lib/rworkflow/flow.rb, line 244 def logs logs = {} if valid? && logging? state_transition_counters = logger.getall state_transition_counters.each do |state_transition, counter| state, transition = state_transition.split('__') logs[state] = {} unless logs.key?(state) logs[state][transition] = counter.to_i end end return logs end
metadata_string()
click to toggle source
# File lib/rworkflow/flow.rb, line 200 def metadata_string return "Rworkflow: #{self.name}" end
name()
click to toggle source
# File lib/rworkflow/flow.rb, line 65 def name return get(:name, @id) end
name=(name)
click to toggle source
# File lib/rworkflow/flow.rb, line 69 def name=(name) return set(:name, name) end
public?()
click to toggle source
# File lib/rworkflow/flow.rb, line 354 def public? return @public ||= begin get(:public, false) end end
set(key, value)
click to toggle source
# File lib/rworkflow/flow.rb, line 264 def set(key, value) @flow_data.set(key, self.class.serializer.dump(value)) end
start(objects)
click to toggle source
# File lib/rworkflow/flow.rb, line 312 def start(objects) objects = Array.wrap(objects) set(:start_time, Time.now.to_i) set(:start_count, objects.size) push(objects, lifecycle.initial) log(lifecycle.initial, 'initial', objects.size) end
start_time()
click to toggle source
# File lib/rworkflow/flow.rb, line 73 def start_time return Time.zone.at(get(:start_time, 0)) end
started?()
click to toggle source
# File lib/rworkflow/flow.rb, line 61 def started? return !get(:start_time).nil? end
states_list()
click to toggle source
# File lib/rworkflow/flow.rb, line 208 def states_list states = self.class::STATES_TERMINAL states += @lifecycle.states.keys if valid? return states end
status()
click to toggle source
# File lib/rworkflow/flow.rb, line 50 def status status = 'Running' status = successful? ? 'Finished' : 'Failed' if finished? return status end
successful?()
click to toggle source
# File lib/rworkflow/flow.rb, line 344 def successful? return false if !finished? return !failed? end
terminate()
click to toggle source
# File lib/rworkflow/flow.rb, line 178 def terminate mutex = RedisRds::Mutex.new(self.id) mutex.synchronize do if !self.cleaned_up? set(:finish_time, Time.now.to_i) post_process if self.public? the_counters = counters! the_counters[:processing] = 0 # Some worker might have increased the processing flag at that time even if there is no more jobs to be done @storage.setnx(:counters, self.class.serializer.dump(the_counters)) states_cleanup else self.cleanup end end end end
total_objects(counters = nil)
click to toggle source
# File lib/rworkflow/flow.rb, line 330 def total_objects(counters = nil) return (counters || self.counters).reduce(0) { |sum, pair| sum + pair[1] } end
total_objects_failed(counters = nil)
click to toggle source
# File lib/rworkflow/flow.rb, line 334 def total_objects_failed(counters = nil) return (counters || self.counters).reduce(0) do |sum, pair| if self.class.failure?(pair[0]) sum + pair[1] else sum end end end
total_objects_processed(counters = nil)
click to toggle source
# File lib/rworkflow/flow.rb, line 320 def total_objects_processed(counters = nil) return (counters || self.counters).reduce(0) do |sum, pair| if self.class.terminal?(pair[0]) sum + pair[1] else sum end end end
transition(from_state, name, objects)
click to toggle source
# File lib/rworkflow/flow.rb, line 215 def transition(from_state, name, objects) objects = Array.wrap(objects) to_state = begin lifecycle.transition(from_state, name) rescue Rworkflow::StateError => e Rails.logger.error("Error transitioning: #{e}") nil end if !to_state.nil? push(objects, to_state) log(from_state, name, objects.size) end end
valid?()
click to toggle source
# File lib/rworkflow/flow.rb, line 85 def valid? return !@lifecycle.nil? end
Protected Instance Methods
post_process()
click to toggle source
# File lib/rworkflow/flow.rb, line 197 def post_process; end
states_cleanup()
click to toggle source
# File lib/rworkflow/flow.rb, line 306 def states_cleanup return if Rails.env.test? states_list.each { |name| get_list(name).delete } end
Private Instance Methods
counters!()
click to toggle source
fetches counters atomically
# File lib/rworkflow/flow.rb, line 107 def counters! the_counters = { processing: 0 } names = @lifecycle.states.keys results = RedisRds::Object.connection.multi do self.class::STATES_TERMINAL.each { |name| get_list(name).size } names.each { |name| get_list(name).size } @processing.getall end (self.class::STATES_TERMINAL + names).each do |name| the_counters[name] = results.shift.to_i end the_counters[:processing] = results.shift.reduce(0) { |sum, pair| sum + pair.last.to_i } return the_counters end
get_list(name)
click to toggle source
# File lib/rworkflow/flow.rb, line 291 def get_list(name) return RedisRds::List.new("#{@redis_key}:lists:#{name}") end
get_state_list(state_name)
click to toggle source
# File lib/rworkflow/flow.rb, line 165 def get_state_list(state_name) list = nil state = @lifecycle.states[state_name] if !state.nil? list = get_list(state_name) else Rails.logger.error("Tried accessing invalid state #{state_name} for workflow #{id}") end return list end
load_lifecycle()
click to toggle source
# File lib/rworkflow/flow.rb, line 25 def load_lifecycle serialized = @storage.get(:lifecycle) unless serialized.nil? raw = self.class.serializer.load(serialized) @lifecycle = Rworkflow::Lifecycle.unserialize(raw) unless raw.nil? end rescue @lifecycle = nil end
push(objects, state)
click to toggle source
# File lib/rworkflow/flow.rb, line 279 def push(objects, state) objects = Array.wrap(objects) return 0 if objects.empty? list = get_list(state) list.rpush(objects.map { |object| self.class.serializer.dump(object) }) return objects.size end