class Dwf::Item

Attributes

callback_type[R]
enqueued_at[R]
failed_at[R]
finished_at[R]
id[R]
incoming[RW]
klass[R]
outgoing[RW]
output_payload[R]
params[R]
queue[R]
started_at[R]
workflow_id[R]

Public Class Methods

from_hash(hash) click to toggle source
# File lib/dwf/item.rb, line 15
def self.from_hash(hash)
  Module.const_get(hash[:klass]).new(hash)
end
new(options = {}) click to toggle source
# File lib/dwf/item.rb, line 11
def initialize(options = {})
  assign_attributes(options)
end

Public Instance Methods

as_json() click to toggle source
# File lib/dwf/item.rb, line 159
def as_json
  to_hash.to_json
end
cb_build_in?() click to toggle source
# File lib/dwf/item.rb, line 27
def cb_build_in?
  callback_type == Dwf::Workflow::BUILD_IN
end
current_timestamp() click to toggle source
# File lib/dwf/item.rb, line 128
def current_timestamp
  Time.now.to_i
end
enqueue!() click to toggle source
# File lib/dwf/item.rb, line 70
def enqueue!
  @enqueued_at = current_timestamp
  @started_at = nil
  @finished_at = nil
  @failed_at = nil
end
enqueue_outgoing_jobs() click to toggle source
# File lib/dwf/item.rb, line 132
def enqueue_outgoing_jobs
  outgoing.each do |job_name|
    client.check_or_lock(workflow_id, job_name)
    out = client.find_job(workflow_id, job_name)
    out.persist_and_perform_async! if out.ready_to_start?
    client.release_lock(workflow_id, job_name)
  end
end
enqueued?() click to toggle source
# File lib/dwf/item.rb, line 100
def enqueued?
  !enqueued_at.nil?
end
fail!() click to toggle source
# File lib/dwf/item.rb, line 96
def fail!
  @finished_at = @failed_at = current_timestamp
end
failed?() click to toggle source
# File lib/dwf/item.rb, line 108
def failed?
  !failed_at.nil?
end
finish!() click to toggle source
# File lib/dwf/item.rb, line 92
def finish!
  @finished_at = current_timestamp
end
finished?() click to toggle source
# File lib/dwf/item.rb, line 104
def finished?
  !finished_at.nil?
end
mark_as_finished() click to toggle source
# File lib/dwf/item.rb, line 82
def mark_as_finished
  finish!
  persist!
end
mark_as_started() click to toggle source
# File lib/dwf/item.rb, line 77
def mark_as_started
  start!
  persist!
end
name() click to toggle source
# File lib/dwf/item.rb, line 41
def name
  @name ||= "#{klass}|#{id}"
end
no_dependencies?() click to toggle source
# File lib/dwf/item.rb, line 49
def no_dependencies?
  incoming.empty?
end
output(data) click to toggle source
# File lib/dwf/item.rb, line 45
def output(data)
  @output_payload = data
end
parents_succeeded?() click to toggle source
# File lib/dwf/item.rb, line 53
def parents_succeeded?
  incoming.all? do |name|
    client.find_job(workflow_id, name).succeeded?
  end
end
payloads() click to toggle source
# File lib/dwf/item.rb, line 59
def payloads
  incoming.map do |job_name|
    job = client.find_job(workflow_id, job_name)
    {
      id: job.name,
      class: job.klass.to_s,
      output: job.output_payload
    }
  end
end
perform() click to toggle source
# File lib/dwf/item.rb, line 25
def perform; end
perform_async() click to toggle source
# File lib/dwf/item.rb, line 36
def perform_async
  Dwf::Worker.set(queue: queue || client.config.namespace)
             .perform_async(workflow_id, name)
end
persist!() click to toggle source
# File lib/dwf/item.rb, line 163
def persist!
  client.persist_job(self)
end
persist_and_perform_async!() click to toggle source
# File lib/dwf/item.rb, line 19
def persist_and_perform_async!
  enqueue!
  persist!
  perform_async
end
ready_to_start?() click to toggle source
# File lib/dwf/item.rb, line 124
def ready_to_start?
  !running? && !enqueued? && !finished? && !failed? && parents_succeeded?
end
reload() click to toggle source
# File lib/dwf/item.rb, line 31
def reload
  item = client.find_job(workflow_id, name)
  assign_attributes(item.to_hash)
end
running?() click to toggle source
# File lib/dwf/item.rb, line 120
def running?
  started? && !finished?
end
start!() click to toggle source
# File lib/dwf/item.rb, line 87
def start!
  @started_at = current_timestamp
  @failed_at = nil
end
started?() click to toggle source
# File lib/dwf/item.rb, line 116
def started?
  !started_at.nil?
end
succeeded?() click to toggle source
# File lib/dwf/item.rb, line 112
def succeeded?
  finished? && !failed?
end
to_hash() click to toggle source
# File lib/dwf/item.rb, line 141
def to_hash
  {
    id: id,
    klass: klass.to_s,
    queue: queue,
    incoming: incoming,
    outgoing: outgoing,
    finished_at: finished_at,
    enqueued_at: enqueued_at,
    started_at: started_at,
    failed_at: failed_at,
    params: params,
    workflow_id: workflow_id,
    callback_type: callback_type,
    output_payload: output_payload
  }
end

Private Instance Methods

assign_attributes(options) click to toggle source
# File lib/dwf/item.rb, line 173
def assign_attributes(options)
  @workflow_id = options[:workflow_id]
  @id = options[:id]
  @params = options[:params]
  @queue = options[:queue]
  @incoming = options[:incoming] || []
  @outgoing = options[:outgoing] || []
  @klass = options[:klass] || self.class
  @failed_at = options[:failed_at]
  @finished_at = options[:finished_at]
  @enqueued_at = options[:enqueued_at]
  @started_at = options[:started_at]
  @callback_type = options[:callback_type]
  @output_payload = options[:output_payload]
end
client() click to toggle source
# File lib/dwf/item.rb, line 169
def client
  @client ||= Dwf::Client.new
end