class SBM::Coordinator

Attributes

name[R]
redis[R]

Public Class Methods

defaults() click to toggle source
# File lib/sbm/coordinator.rb, line 9
def self.defaults
  worker_name      = (ENV['SBM_WORKER'] or raise "Please ensure SBM_WORKER is set")
  coordinator_name = (ENV['SBM_COORDINATOR'] || "worker-coordinator")
  return new(coordinator_name), Worker.new(worker_name)
end
new(name) click to toggle source
# File lib/sbm/coordinator.rb, line 17
def initialize(name)
  @name = name.to_s
  @redis = Redis.current
end

Public Instance Methods

batches() click to toggle source
# File lib/sbm/coordinator.rb, line 32
def batches
  redis.smembers(key(:batches)).map { |w| Batch.new(w) }
end
clear(batch) click to toggle source
# File lib/sbm/coordinator.rb, line 67
def clear(batch)
  redis.srem key(:batches), batch.to_s
  redis.del key(:batches, batch, :completed)
  redis.del key(:batches, batch, :started)
end
clear_batches() click to toggle source
# File lib/sbm/coordinator.rb, line 73
def clear_batches
  batches.each { |b| clear b }
  redis.del key(:batches)
end
clear_workers() click to toggle source
# File lib/sbm/coordinator.rb, line 78
def clear_workers
  redis.del key(:workers)
end
complete(batch, worker) click to toggle source
# File lib/sbm/coordinator.rb, line 54
def complete(batch, worker)
  prepare worker, batch
  redis.sadd key(:batches, batch, :completed), worker.to_s
end
completed_workers_for_batch(batch) click to toggle source
# File lib/sbm/coordinator.rb, line 44
def completed_workers_for_batch(batch)
  redis.smembers(key(:batches, batch, :completed)).map { |w| Worker.new(w) }
end
start(batch, worker) click to toggle source
# File lib/sbm/coordinator.rb, line 48
def start(batch, worker)
  prepare worker, batch
  redis.sadd key(:batches, batch, :started),   worker.to_s
  redis.srem key(:batches, batch, :completed), worker.to_s
end
started_workers_for_batch(batch) click to toggle source
# File lib/sbm/coordinator.rb, line 40
def started_workers_for_batch(batch)
  redis.smembers(key(:batches, batch, :started)).map { |w| Worker.new(w) }
end
wait_for(batch, worker_count, wait_time = 15) { || ... } click to toggle source

Waits on batch to reach a count, waiting for 15 seconds at a time.

# File lib/sbm/coordinator.rb, line 60
def wait_for(batch, worker_count, wait_time = 15)
  while redis.scard(key(:batches, batch, :completed)) < worker_count
    sleep wait_time
    yield if block_given?
  end
end
workers() click to toggle source
# File lib/sbm/coordinator.rb, line 36
def workers
  redis.smembers(key(:workers)).map { |w| Worker.new(w) }
end

Private Instance Methods

key(*args) click to toggle source
# File lib/sbm/coordinator.rb, line 97
def key(*args)
  [name, *args].join(":")
end
prepare(worker, batch) click to toggle source
# File lib/sbm/coordinator.rb, line 84
def prepare(worker, batch)
  register_worker worker
  register_batch  batch
end
register_batch(batch) click to toggle source
# File lib/sbm/coordinator.rb, line 93
def register_batch(batch)
  redis.sadd key(:batches), batch.to_s
end
register_worker(worker) click to toggle source
# File lib/sbm/coordinator.rb, line 89
def register_worker(worker)
  redis.sadd key(:workers), worker.to_s
end