class SBM::Coordinator
Attributes
name[R]
redis[R]
Public Class Methods
defaults()
click to toggle source
# File lib/sbm/coordinator.rb, line 9 def self.defaults worker_name = (ENV['SBM_WORKER'] or raise "Please ensure SBM_WORKER is set") coordinator_name = (ENV['SBM_COORDINATOR'] || "worker-coordinator") return new(coordinator_name), Worker.new(worker_name) end
new(name)
click to toggle source
# File lib/sbm/coordinator.rb, line 17 def initialize(name) @name = name.to_s @redis = Redis.current end
Public Instance Methods
batches()
click to toggle source
# File lib/sbm/coordinator.rb, line 32 def batches redis.smembers(key(:batches)).map { |w| Batch.new(w) } end
clear(batch)
click to toggle source
# File lib/sbm/coordinator.rb, line 67 def clear(batch) redis.srem key(:batches), batch.to_s redis.del key(:batches, batch, :completed) redis.del key(:batches, batch, :started) end
clear_batches()
click to toggle source
# File lib/sbm/coordinator.rb, line 73 def clear_batches batches.each { |b| clear b } redis.del key(:batches) end
clear_workers()
click to toggle source
# File lib/sbm/coordinator.rb, line 78 def clear_workers redis.del key(:workers) end
complete(batch, worker)
click to toggle source
# File lib/sbm/coordinator.rb, line 54 def complete(batch, worker) prepare worker, batch redis.sadd key(:batches, batch, :completed), worker.to_s end
completed_workers_for_batch(batch)
click to toggle source
# File lib/sbm/coordinator.rb, line 44 def completed_workers_for_batch(batch) redis.smembers(key(:batches, batch, :completed)).map { |w| Worker.new(w) } end
start(batch, worker)
click to toggle source
# File lib/sbm/coordinator.rb, line 48 def start(batch, worker) prepare worker, batch redis.sadd key(:batches, batch, :started), worker.to_s redis.srem key(:batches, batch, :completed), worker.to_s end
started_workers_for_batch(batch)
click to toggle source
# File lib/sbm/coordinator.rb, line 40 def started_workers_for_batch(batch) redis.smembers(key(:batches, batch, :started)).map { |w| Worker.new(w) } end
wait_for(batch, worker_count, wait_time = 15) { || ... }
click to toggle source
Waits on batch to reach a count, waiting for 15 seconds at a time.
# File lib/sbm/coordinator.rb, line 60 def wait_for(batch, worker_count, wait_time = 15) while redis.scard(key(:batches, batch, :completed)) < worker_count sleep wait_time yield if block_given? end end
workers()
click to toggle source
# File lib/sbm/coordinator.rb, line 36 def workers redis.smembers(key(:workers)).map { |w| Worker.new(w) } end
Private Instance Methods
key(*args)
click to toggle source
# File lib/sbm/coordinator.rb, line 97 def key(*args) [name, *args].join(":") end
prepare(worker, batch)
click to toggle source
# File lib/sbm/coordinator.rb, line 84 def prepare(worker, batch) register_worker worker register_batch batch end
register_batch(batch)
click to toggle source
# File lib/sbm/coordinator.rb, line 93 def register_batch(batch) redis.sadd key(:batches), batch.to_s end
register_worker(worker)
click to toggle source
# File lib/sbm/coordinator.rb, line 89 def register_worker(worker) redis.sadd key(:workers), worker.to_s end