class Sumac::Scheduler

Public Class Methods

new(connection, worker_count) click to toggle source
# File lib/sumac/scheduler.rb, line 4
def initialize(connection, worker_count)
  raise "argument 'connection' must be a Connection" unless connection.is_a?(Connection)
  @connection = connection
  @worker_pool = WorkerPool.new(worker_count)
  @dispatch_worker = nil
end

Public Instance Methods

dispatch(message_string) click to toggle source
# File lib/sumac/scheduler.rb, line 39
def dispatch(message_string)
  #@worker_pool.run do
    @connection.mutex.synchronize do
      break if @connection.at?([:kill, :close])
      @connection.messenger.receive(message_string)
    end
  #end
  nil
end
join() click to toggle source
# File lib/sumac/scheduler.rb, line 49
def join
  @dispatch_worker.join
  @worker_pool.join
  nil
end
receiver_loop() click to toggle source
# File lib/sumac/scheduler.rb, line 22
def receiver_loop
  loop do
    begin
      message_string = @connection.messenger_adapter.receive
    rescue Adapter::ClosedError
      @connection.mutex.synchronize do
        unless @connection.at?([:kill, :close])
          @connection.to(:kill)
        end
      end
    end
    break if @connection.mutex.synchronize { @connection.at?([:kill, :close]) }
    dispatch(message_string)
  end
  nil
end
run() click to toggle source
# File lib/sumac/scheduler.rb, line 11
def run
  @dispatch_worker = Thread.new do
    @connection.mutex.synchronize do
      raise unless @connection.at?(:initial)
      @connection.to(:compatibility_handshake)
    end
    receiver_loop
  end
  nil
end