class Redact
A distributed, dependency-aware job scheduler for Redis. Like distributed make—you define the dependencies between different parts of your job, and Redact
handles the scheduling.
Attributes
Public Class Methods
Options:
-
namespace
: prefix for Redis keys, e.g. “redact/”
# File lib/redact.rb, line 23 def initialize redis, opts={} @namespace = opts[:namespace] @redis = redis @dag = TSortHash.new @queue = [@namespace, "q"].join @processing_list = [@namespace, "processing"].join @done_list = [@namespace, "done"].join @dag_key = [@namespace, "dag"].join @metadata_prefix = [@namespace, "metadata"].join @params_key = [@namespace, "params"].join end
Public Instance Methods
Add a task with dependencies. What
is the name of a task (either a symbol or a string). Deps
are any tasks that are dependencies of what
. Deps
may refer to tasks not already added by add_task
; these will be automatically added without dependencies.
Raises a CyclicDependencyError
exception if adding these dependencies would result in a cyclic dependency.
# File lib/redact.rb, line 69 def add_task what, *deps deps = deps.flatten # be nice and allow arrays to be passed in raise ArgumentError, "expecting dependencies to be zero or more task ids" unless deps.all? { |x| x.is_a?(Symbol) } @dag[what] = deps @dag.strongly_connected_components.each do |x| raise CyclicDependencyError, "cyclic dependency #{x.inspect}" if x.size != 1 end end
Schedules target
for completion among worker processes listening with each
. Returns immediately.
Targets scheduled with do!
have their tasks dispatched in generally FIFO order; i.e., work for earlier targets will generally be scheduled before work for later targets. Of course, the actual completion order of targets depends on the completion orders of dependent tasks, the time required for these tasks, etc.
You must call publish_graph!
at least once before calling this.
run_id
is the unique identifier for this run. Don’t reuse these.
run_params
are parameters that will be passed to all tasks in this run. This value will go through JSON round-trips, so should only contain variable types that are expressible with JSON.
# File lib/redact.rb, line 100 def do! target, run_id, run_params=nil raise ArgumentError, "you haven't called publish_graph!" unless @redis.exists(@dag_key) dag = load_dag target = target.to_s raise ArgumentError, "#{target.inspect} is not a recognized task" unless dag.member?(target) @redis.hset @params_key, run_id, run_params.to_json if run_params dag.each_strongly_connected_component_from(target) do |tasks| task = tasks.first # all single-element arrays by this point next unless dag[task].nil? || dag[task].empty? # only push tasks without dependencies enqueue_task! task, target, run_id, true end end
Returns the total number of completed tasks we have information about.
# File lib/redact.rb, line 126 def done_list_size; @redis.llen @done_list end
Returns information representing the set of tasks that have been completed. The return value is a hash that includes keys from in_progress_tasks
, plus these keys: worker_id
: the worker_id of the worker processing this task time_waiting
: the approximate number of seconds this task was enqueued for ts
: the timestamp at the end of processing state
: one of “done”, “skipped”, or “error” error
, backtrace
: debugging information for tasks in state “error” time_processing
: the approximate number of seconds this task was processed for
# File lib/redact.rb, line 159 def done_tasks start_idx=0, end_idx=-1 @redis.lrange(@done_list, start_idx, end_idx).map { |t| task_summary_for t } end
Yields tasks from the queue that are ready for execution. Callers should then perform the work for those tasks. Any exceptions thrown will result in the task being reinserted in the queue and tried at a later point (possibly by another process), unless the retry maximum for that task has been exceeded.
This method downloads the task graph as necessary, so live updates of the graph are possible without restarting worker processes.
opts
are:
-
blocking
: if true,each
will block until items are available (and will never return) -
block_timeout
: whenblocking
is true, the timeout (in seconds) before stopping. A value of nil or 0 will block forever. -
retries
: how many times an individual job should be retried before resulting in an error state. Default is 2 (so 3 tries total). -
worker_id
: the id of this worker process, for debugging. (If nil, will use a reasonably intelligent default.)
# File lib/redact.rb, line 177 def each opts={} worker_id = opts[:worker_id] || [Socket.gethostname, $$, $0].join("-") retries = opts[:retries] || 2 blocking = opts[:blocking] block_timeout = opts[:block_timeout] || 0 while true token = if blocking @redis.brpoplpush @queue, @processing_list, block_timeout else @redis.rpoplpush @queue, @processing_list end break unless token # no more tokens! ## decompose the token task, target, run_id, insertion_time = parse_token token ## record that we've seen this set_metadata! task, run_id, worker_id: worker_id, time_waiting: (Time.now - insertion_time).to_i ## load the target state. abort if we don't need to do anything target_state = get_state target, run_id if (target_state == :error) || (target_state == :done) #log "skipping #{task}##{run_id} because #{target}##{run_id} is in state #{target_state}" set_metadata! task, run_id, state: :skipped commit! token next end ## get any run params params = @redis.hget @params_key, run_id params = JSON.parse(params) if params ## ok, let's finally try to perform the task begin #log "performing #{task}##{run_id}" ## the task is now in progress set_metadata! task, run_id, state: :in_progress ## do it startt = Time.now yield task.to_sym, run_id, params elapsed = Time.now - startt ## update total running time total_time_processing = elapsed + (get_metadata(task, run_id)[:time_processing] || 0).to_f set_metadata! task, run_id, time_processing: total_time_processing set_metadata! task, run_id, state: :done enqueue_next_tasks! task, target, run_id commit! token rescue Exception => e num_tries = inc_num_tries! task, run_id if num_tries > retries # we fail set_metadata! target, run_id, state: :error set_metadata! task, run_id, state: :error, error: "(#{e.class}) #{e.message}", backtrace: e.backtrace commit! token else # we'll retry uncommit! token end raise end end end
Returns information representing the set of tasks currently in the queue. The return value is a hash that includes, among other things, these keys: task
: the name of the task run_id
: the run_id of the task target
: the target of the task ts
: the timestamp of queue insertion
# File lib/redact.rb, line 134 def enqueued_tasks start_idx=0, end_idx=-1 @redis.lrange(@queue, start_idx, end_idx).map { |t| task_summary_for t } end
Returns information representing the set of tasks currently in process by worker processes. The return value is a hash that includes keys from enqueued_tasks
, plus these keys: worker_id
: the worker_id of the worker processing this task time_waiting
: the approximate number of seconds this task was enqueued for ts
: the timestamp at the start of processing
# File lib/redact.rb, line 145 def in_progress_tasks start_idx=0, end_idx=-1 @redis.lrange(@processing_list, start_idx, end_idx).map { |t| task_summary_for t } end
# File lib/redact.rb, line 162 def num_done_tasks; @redis.llen @done_list end
# File lib/redact.rb, line 137 def num_enqueued_tasks; @redis.llen @queue end
# File lib/redact.rb, line 148 def num_in_progress_tasks; @redis.llen @processing_list end
Returns the total number of outstanding tasks currently being processed.
# File lib/redact.rb, line 123 def processing_list_size; @redis.llen @processing_list end
Publish the dependency graph. Must be called at least once before do!
.
# File lib/redact.rb, line 80 def publish_graph! @redis.set @dag_key, @dag.to_json end
Drop all data and reset the planner.
# File lib/redact.rb, line 39 def reset! keys = [@queue, @processing_list, @done_list, @dag_key, @params_key] keys += @redis.keys("#@metadata_prefix/*") keys.each { |k| @redis.del k } end
Return the total number of outstanding tasks in the queue. Note that this is only the number of tasks whose dependencies are satisfied (i.e. those only those that are currently ready to be performed). Queue size may fluctuate in both directions as targets are built.
# File lib/redact.rb, line 120 def size; @redis.llen @queue end
# File lib/redact.rb, line 45 def visualize stream=$stdout sorted = @dag.tsort leaves = sorted.select { |k| @dag[k].nil? || @dag[k].empty? } pos = {} curpos = 0 leaves.each do |l| string = " #{l} " pos[l] = curpos stream.print string curpos += string.length end stream.puts end
Private Instance Methods
move from processing list to done list
# File lib/redact.rb, line 260 def commit! token @redis.multi do @redis.lrem @processing_list, 1, token @redis.lpush @done_list, token end end
enqueue all tasks for target
that are unblocked by virtue of having completed task
# File lib/redact.rb, line 285 def enqueue_next_tasks! task, target, run_id ## gimme dag dag = load_dag ## find all tasks that we block blocked = dag.inject([]) { |a, (k, v)| a << k if v.member?(task.to_s); a } ## find all tasks in the path to target in_path_to_target = [] # sigh... ancient interfaces dag.each_strongly_connected_component_from(target) { |x| in_path_to_target << x.first } (blocked & in_path_to_target).each do |btask| deps = dag[btask] dep_states = deps.map { |t| get_state t, run_id } if dep_states.all? { |s| s == :done } # we've unblocked it! #log "unblocked task #{btask}##{run_id}" enqueue_task! btask, target, run_id end end end
tasks are popped from the right, so at_the_end means lpush, otherwise rpush.
# File lib/redact.rb, line 253 def enqueue_task! task, target, run_id, at_the_end=false set_metadata! task, run_id, state: :in_queue token = make_token task, target, run_id at_the_end ? @redis.lpush(@queue, token) : @redis.rpush(@queue, token) end
# File lib/redact.rb, line 334 def get_metadata task, run_id key = metadata_key_for task, run_id md = @redis.hgetall(key).inject({}) { |h, (k, v)| h[k.to_sym] = v; h } md[:ts] = Time.at md[:ts].to_i md[:tries] = md[:tries].to_i md end
# File lib/redact.rb, line 317 def get_state task, run_id key = metadata_key_for task, run_id (@redis.hget(key, "state") || "unstarted").to_sym end
# File lib/redact.rb, line 322 def inc_num_tries! task, run_id key = metadata_key_for task, run_id @redis.hincrby key, "tries", 1 end
# File lib/redact.rb, line 246 def load_dag dag = JSON.parse @redis.get(@dag_key) dag.inject(TSortHash.new) { |h, (k, v)| h[k] = v; h } # i guess end
# File lib/redact.rb, line 306 def make_token task, target, run_id; [task, target, run_id, Time.now.to_i].to_json end
# File lib/redact.rb, line 315 def metadata_key_for task, run_id; [@metadata_prefix, task, run_id].join(":") end
# File lib/redact.rb, line 310 def parse_token token task, target, run_id, ts = JSON.parse token [task, target, run_id, Time.at(ts)] end
# File lib/redact.rb, line 327 def set_metadata! task, run_id, metadata metadata.each { |k, v| metadata[k] = v.to_json unless v.is_a?(String) || v.is_a?(Symbol) } metadata[:ts] = Time.now.to_i key = metadata_key_for task, run_id @redis.mapped_hmset key, metadata end
make pretty thing for debuggin
# File lib/redact.rb, line 276 def task_summary_for token task, target, run_id, ts = parse_token token params = @redis.hget @params_key, run_id params = JSON.parse(params) if params get_metadata(task, run_id).merge task: task, run_id: run_id, target: target, insertion_time: ts, params: params end
move from processing list back to queue
# File lib/redact.rb, line 268 def uncommit! token @redis.multi do # rewind the rpoplpush @redis.rpush @queue, token @redis.lrem @processing_list, 1, token end end