class Taskflow::Flow

Public Class Methods

can_launch?(klass,opts={}) click to toggle source

opts support :params,

# File lib/taskflow/flow.rb, line 26
def can_launch?(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    !Taskflow::Flow.ne(state: 'stopped').where(klass: klass,input: opts[:params]).exists?
end
launch(klass,opts={}) click to toggle source
# File lib/taskflow/flow.rb, line 31
def launch(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    flow_klass = Kernel.const_get klass
    name = flow_klass.const_get 'NAME'
    opts[:launched_by] ||= 'task-flow-engine'
    flow = flow_klass.create name: name,input: opts[:params],launched_by: opts[:launched_by]
    if opts[:next_workflow_config]
        flow.update next_config: opts[:next_workflow_config]
    end
    flow.create_logger name: name,description: opts[:workflow_description]
    flow.schedule
end

Public Instance Methods

resume() click to toggle source
# File lib/taskflow/flow.rb, line 54
def resume
    self.tasks.where(state: 'paused',result: 'error').each do |task|
        task.resume
    end
end
running_steps() click to toggle source
# File lib/taskflow/flow.rb, line 45
def running_steps
    self.tasks.in(state: ['running','paused'])
end
schedule() click to toggle source
# File lib/taskflow/flow.rb, line 60
def schedule
    return if self.halt_by || self.state == 'stopped'
    self.update_attributes! state: 'running',started_at: Time.now if self.state == 'pending'
    task_list = []
    self.reload.tasks.where(state: 'pending').each do |task|
        # 上游全部完成
        if task.upstream.empty? || task.upstream.all?{|t| %w(skipped stopped).include? t.state }
            task_list << task.id.to_s
        end
    end
    task_list.each{|tid| Taskflow::Worker.perform_async self.id.to_s,tid }
    self
end
stop!(user_id=nil) click to toggle source
# File lib/taskflow/flow.rb, line 49
def stop!(user_id=nil)
    percent = self.tasks.map(&:progress).sum / self.tasks.size
    self.update_attributes! progress: percent,halt_by: user_id,ended_at: Time.now, state: 'stopped',result: 'warning'
end

Private Instance Methods

configure_tasks() click to toggle source
# File lib/taskflow/flow.rb, line 76
def configure_tasks
    begin
        @task_list = []
        configure
        sort_index  1,[]
        TaskFlow::Task.collection.insert @task_list
        @task_list = nil
    rescue=>exception
        self.destroy
        raise exception
    end
    reload
end
run(klass,opts={}) click to toggle source

opts support :name,:params

# File lib/taskflow/flow.rb, line 91
def run(klass,opts={})
    task_data = {
        klass: klass.to_s,
        name: opts[:name] || klass.to_s,
        input: opts[:params] || {},
        index: @task_list.size + 1,
        _type: klass.to_s,
        state: 'pending',
        output: {},
        progress: 0,
        data: {},
        flow_id: self.id,
        _id: BSON::ObjectId.new,
        downstream_ids: [],
        upstream_ids: []
    }.select{|k,v| v }
    if opts[:before]
        if opts[:before].is_a? Array
            opts[:before].flatten!
            opts[:before].each do |b|
                b[:upstream_ids] << task_data[:_id]
                task_data[:downstream_ids]  << b[:_id]
            end
        else
            task_data[:downstream_ids]  << opts[:before][:_id]
            opts[:before][:upstream_ids] << task_data[:_id]
        end
    end
    if opts[:after]
        if opts[:after].is_a? Array
            opts[:after].flatten!
            opts[:after].each do |a|
                task_data[:upstream_ids]  << a[:_id]
                a[:downstream_ids] << task_data[:_id]
            end
        else
            task_data[:upstream_ids]  << opts[:after][:_id]
            opts[:after][:downstream_ids] << task_data[:_id]
        end
    end
    if opts[:before].nil? && opts[:after].nil? && @task_list.last
        @task_list.last[:downstream_ids]  << task_data
        task_data[:upstream_ids] << @task_list.last[:_id]
    end
    @task_list << task_data
    task_data
end
sort_index(i,scanned) click to toggle source
# File lib/taskflow/flow.rb, line 139
def sort_index(i,scanned)
    queue = @task_list.select{|t| !scanned.include?(t[:_id]) && (t[:upstream_ids].nil? || t[:upstream_ids].empty? || t[:upstream_ids].all?{|uid| scanned.include?(uid)}) }
    return if queue.empty?
    queue.each do |task|
        task[:index] = i
        scanned << task[:_id]
    end
    sort_index i + 1,scanned
end