class Taskflow::Flow
Public Class Methods
can_launch?(klass,opts={})
click to toggle source
opts support :params,
# File lib/taskflow/flow.rb, line 26 def can_launch?(klass,opts={}) opts = HashWithIndifferentAccess.new opts !Taskflow::Flow.ne(state: 'stopped').where(klass: klass,input: opts[:params]).exists? end
launch(klass,opts={})
click to toggle source
# File lib/taskflow/flow.rb, line 31 def launch(klass,opts={}) opts = HashWithIndifferentAccess.new opts flow_klass = Kernel.const_get klass name = flow_klass.const_get 'NAME' opts[:launched_by] ||= 'task-flow-engine' flow = flow_klass.create name: name,input: opts[:params],launched_by: opts[:launched_by] if opts[:next_workflow_config] flow.update next_config: opts[:next_workflow_config] end flow.create_logger name: name,description: opts[:workflow_description] flow.schedule end
Public Instance Methods
resume()
click to toggle source
# File lib/taskflow/flow.rb, line 54 def resume self.tasks.where(state: 'paused',result: 'error').each do |task| task.resume end end
running_steps()
click to toggle source
# File lib/taskflow/flow.rb, line 45 def running_steps self.tasks.in(state: ['running','paused']) end
schedule()
click to toggle source
# File lib/taskflow/flow.rb, line 60 def schedule return if self.halt_by || self.state == 'stopped' self.update_attributes! state: 'running',started_at: Time.now if self.state == 'pending' task_list = [] self.reload.tasks.where(state: 'pending').each do |task| # 上游全部完成 if task.upstream.empty? || task.upstream.all?{|t| %w(skipped stopped).include? t.state } task_list << task.id.to_s end end task_list.each{|tid| Taskflow::Worker.perform_async self.id.to_s,tid } self end
stop!(user_id=nil)
click to toggle source
# File lib/taskflow/flow.rb, line 49 def stop!(user_id=nil) percent = self.tasks.map(&:progress).sum / self.tasks.size self.update_attributes! progress: percent,halt_by: user_id,ended_at: Time.now, state: 'stopped',result: 'warning' end
Private Instance Methods
configure_tasks()
click to toggle source
# File lib/taskflow/flow.rb, line 76 def configure_tasks begin @task_list = [] configure sort_index 1,[] TaskFlow::Task.collection.insert @task_list @task_list = nil rescue=>exception self.destroy raise exception end reload end
run(klass,opts={})
click to toggle source
opts support :name,:params
# File lib/taskflow/flow.rb, line 91 def run(klass,opts={}) task_data = { klass: klass.to_s, name: opts[:name] || klass.to_s, input: opts[:params] || {}, index: @task_list.size + 1, _type: klass.to_s, state: 'pending', output: {}, progress: 0, data: {}, flow_id: self.id, _id: BSON::ObjectId.new, downstream_ids: [], upstream_ids: [] }.select{|k,v| v } if opts[:before] if opts[:before].is_a? Array opts[:before].flatten! opts[:before].each do |b| b[:upstream_ids] << task_data[:_id] task_data[:downstream_ids] << b[:_id] end else task_data[:downstream_ids] << opts[:before][:_id] opts[:before][:upstream_ids] << task_data[:_id] end end if opts[:after] if opts[:after].is_a? Array opts[:after].flatten! opts[:after].each do |a| task_data[:upstream_ids] << a[:_id] a[:downstream_ids] << task_data[:_id] end else task_data[:upstream_ids] << opts[:after][:_id] opts[:after][:downstream_ids] << task_data[:_id] end end if opts[:before].nil? && opts[:after].nil? && @task_list.last @task_list.last[:downstream_ids] << task_data task_data[:upstream_ids] << @task_list.last[:_id] end @task_list << task_data task_data end
sort_index(i,scanned)
click to toggle source
# File lib/taskflow/flow.rb, line 139 def sort_index(i,scanned) queue = @task_list.select{|t| !scanned.include?(t[:_id]) && (t[:upstream_ids].nil? || t[:upstream_ids].empty? || t[:upstream_ids].all?{|uid| scanned.include?(uid)}) } return if queue.empty? queue.each do |task| task[:index] = i scanned << task[:_id] end sort_index i + 1,scanned end