class Dwf::Client

Attributes

config[R]

Public Class Methods

new(config = Dwf.configuration) click to toggle source
# File lib/dwf/client.rb, line 7
def initialize(config = Dwf.configuration)
  @config = config
end

Public Instance Methods

build_job_id(workflow_id, job_klass) click to toggle source
# File lib/dwf/client.rb, line 57
def build_job_id(workflow_id, job_klass)
  jid = nil

  loop do
    jid = SecureRandom.uuid
    available = !redis.hexists(
      "dwf.jobs.#{workflow_id}.#{job_klass}",
      jid
    )

    break if available
  end

  jid
end
build_workflow_id() click to toggle source
# File lib/dwf/client.rb, line 73
def build_workflow_id
  wid = nil
  loop do
    wid = SecureRandom.uuid
    available = !redis.exists?("dwf.workflow.#{wid}")

    break if available
  end

  wid
end
check_or_lock(workflow_id, job_name) click to toggle source
# File lib/dwf/client.rb, line 39
def check_or_lock(workflow_id, job_name)
  key = "wf_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}"

  if key_exists?(key)
    sleep 2
  else
    set(key, 'running')
  end
end
delete(key) click to toggle source
# File lib/dwf/client.rb, line 93
def delete(key)
  redis.del(key)
end
find_job(workflow_id, job_name) click to toggle source
# File lib/dwf/client.rb, line 11
def find_job(workflow_id, job_name)
  job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name)
  data = if job_name_match
           find_job_by_klass_and_id(workflow_id, job_name)
         else
           find_job_by_klass(workflow_id, job_name)
         end

  return nil if data.nil?

  data = JSON.parse(data)
  Dwf::Item.from_hash(Dwf::Utils.symbolize_keys(data))
end
find_workflow(id) click to toggle source
# File lib/dwf/client.rb, line 25
def find_workflow(id)
  data = redis.get("dwf.workflows.#{id}")
  raise WorkflowNotFound, "Workflow with given id doesn't exist" if data.nil?

  hash = JSON.parse(data)
  hash = Dwf::Utils.symbolize_keys(hash)
  nodes = parse_nodes(id)
  workflow_from_hash(hash, nodes)
end
key_exists?(key) click to toggle source
# File lib/dwf/client.rb, line 85
def key_exists?(key)
  redis.exists?(key)
end
persist_job(job) click to toggle source
# File lib/dwf/client.rb, line 35
def persist_job(job)
  redis.hset("dwf.jobs.#{job.workflow_id}.#{job.klass}", job.id, job.as_json)
end
persist_workflow(workflow) click to toggle source
# File lib/dwf/client.rb, line 53
def persist_workflow(workflow)
  redis.set("dwf.workflows.#{workflow.id}", workflow.as_json)
end
release_lock(workflow_id, job_name) click to toggle source
# File lib/dwf/client.rb, line 49
def release_lock(workflow_id, job_name)
  delete("dwf_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}")
end
set(key, value) click to toggle source
# File lib/dwf/client.rb, line 89
def set(key, value)
  redis.set(key, value)
end

Private Instance Methods

find_job_by_klass(workflow_id, job_name) click to toggle source
# File lib/dwf/client.rb, line 105
def find_job_by_klass(workflow_id, job_name)
  _new_cursor, result = redis.hscan("dwf.jobs.#{workflow_id}.#{job_name}", 0, count: 1)
  return nil if result.empty?

  _job_id, job = *result[0]

  job
end
find_job_by_klass_and_id(workflow_id, job_name) click to toggle source
# File lib/dwf/client.rb, line 99
def find_job_by_klass_and_id(workflow_id, job_name)
  job_klass, job_id = job_name.split('|')

  redis.hget("dwf.jobs.#{workflow_id}.#{job_klass}", job_id)
end
parse_nodes(id) click to toggle source
# File lib/dwf/client.rb, line 114
def parse_nodes(id)
  keys = redis.scan_each(match: "dwf.jobs.#{id}.*")

  keys.map do |key|
    redis.hvals(key).map do |json|
      Dwf::Utils.symbolize_keys JSON.parse(json)
    end
  end.flatten
end
redis() click to toggle source
# File lib/dwf/client.rb, line 136
def redis
  @redis ||= Redis.new(config.redis_opts)
end
workflow_from_hash(hash, nodes = []) click to toggle source
# File lib/dwf/client.rb, line 124
def workflow_from_hash(hash, nodes = [])
  flow = Module.const_get(hash[:klass]).new(*hash[:arguments])
  flow.jobs = []
  flow.stopped = hash.fetch(:stopped, false)
  flow.id = hash[:id]
  flow.jobs = nodes.map do |node|
    Dwf::Item.from_hash(node)
  end

  flow
end