class Dwf::Client
Attributes
config[R]
Public Class Methods
new(config = Dwf.configuration)
click to toggle source
# File lib/dwf/client.rb, line 7 def initialize(config = Dwf.configuration) @config = config end
Public Instance Methods
build_job_id(workflow_id, job_klass)
click to toggle source
# File lib/dwf/client.rb, line 57 def build_job_id(workflow_id, job_klass) jid = nil loop do jid = SecureRandom.uuid available = !redis.hexists( "dwf.jobs.#{workflow_id}.#{job_klass}", jid ) break if available end jid end
build_workflow_id()
click to toggle source
# File lib/dwf/client.rb, line 73 def build_workflow_id wid = nil loop do wid = SecureRandom.uuid available = !redis.exists?("dwf.workflow.#{wid}") break if available end wid end
check_or_lock(workflow_id, job_name)
click to toggle source
# File lib/dwf/client.rb, line 39 def check_or_lock(workflow_id, job_name) key = "wf_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}" if key_exists?(key) sleep 2 else set(key, 'running') end end
delete(key)
click to toggle source
# File lib/dwf/client.rb, line 93 def delete(key) redis.del(key) end
find_job(workflow_id, job_name)
click to toggle source
# File lib/dwf/client.rb, line 11 def find_job(workflow_id, job_name) job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name) data = if job_name_match find_job_by_klass_and_id(workflow_id, job_name) else find_job_by_klass(workflow_id, job_name) end return nil if data.nil? data = JSON.parse(data) Dwf::Item.from_hash(Dwf::Utils.symbolize_keys(data)) end
find_workflow(id)
click to toggle source
# File lib/dwf/client.rb, line 25 def find_workflow(id) data = redis.get("dwf.workflows.#{id}") raise WorkflowNotFound, "Workflow with given id doesn't exist" if data.nil? hash = JSON.parse(data) hash = Dwf::Utils.symbolize_keys(hash) nodes = parse_nodes(id) workflow_from_hash(hash, nodes) end
key_exists?(key)
click to toggle source
# File lib/dwf/client.rb, line 85 def key_exists?(key) redis.exists?(key) end
persist_job(job)
click to toggle source
# File lib/dwf/client.rb, line 35 def persist_job(job) redis.hset("dwf.jobs.#{job.workflow_id}.#{job.klass}", job.id, job.as_json) end
persist_workflow(workflow)
click to toggle source
# File lib/dwf/client.rb, line 53 def persist_workflow(workflow) redis.set("dwf.workflows.#{workflow.id}", workflow.as_json) end
release_lock(workflow_id, job_name)
click to toggle source
# File lib/dwf/client.rb, line 49 def release_lock(workflow_id, job_name) delete("dwf_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}") end
set(key, value)
click to toggle source
# File lib/dwf/client.rb, line 89 def set(key, value) redis.set(key, value) end
Private Instance Methods
find_job_by_klass(workflow_id, job_name)
click to toggle source
# File lib/dwf/client.rb, line 105 def find_job_by_klass(workflow_id, job_name) _new_cursor, result = redis.hscan("dwf.jobs.#{workflow_id}.#{job_name}", 0, count: 1) return nil if result.empty? _job_id, job = *result[0] job end
find_job_by_klass_and_id(workflow_id, job_name)
click to toggle source
# File lib/dwf/client.rb, line 99 def find_job_by_klass_and_id(workflow_id, job_name) job_klass, job_id = job_name.split('|') redis.hget("dwf.jobs.#{workflow_id}.#{job_klass}", job_id) end
parse_nodes(id)
click to toggle source
# File lib/dwf/client.rb, line 114 def parse_nodes(id) keys = redis.scan_each(match: "dwf.jobs.#{id}.*") keys.map do |key| redis.hvals(key).map do |json| Dwf::Utils.symbolize_keys JSON.parse(json) end end.flatten end
redis()
click to toggle source
# File lib/dwf/client.rb, line 136 def redis @redis ||= Redis.new(config.redis_opts) end
workflow_from_hash(hash, nodes = [])
click to toggle source
# File lib/dwf/client.rb, line 124 def workflow_from_hash(hash, nodes = []) flow = Module.const_get(hash[:klass]).new(*hash[:arguments]) flow.jobs = [] flow.stopped = hash.fetch(:stopped, false) flow.id = hash[:id] flow.jobs = nodes.map do |node| Dwf::Item.from_hash(node) end flow end