class Sumac::CallDispatcher

Public Class Methods

new(connection) click to toggle source
# File lib/sumac/call_dispatcher.rb, line 4
def initialize(connection)
  raise "argument 'connection' must be a Connection" unless connection.is_a?(Connection)
  @connection = connection
  @pending_requests = {}
  @id_allocator = IDAllocator.new
end

Public Instance Methods

any_calls_pending?() click to toggle source
# File lib/sumac/call_dispatcher.rb, line 11
def any_calls_pending?
  @pending_requests.any?
end
kill_all() click to toggle source
# File lib/sumac/call_dispatcher.rb, line 15
def kill_all
  raise unless @connection.at?(:kill)
  @pending_requests.each do |id, waiter|
    @pending_requests.delete(id)
    waiter.resume(nil)
  end
end
make_call(remote_object, method_name, arguments) click to toggle source
# File lib/sumac/call_dispatcher.rb, line 23
def make_call(remote_object, method_name, arguments)
  raise unless remote_object.is_a?(RemoteObject) || remote_object.is_a?(RemoteObjectChild)
  raise ClosedError unless @connection.at?(:active)
  id = @id_allocator.allocate
  request = Message::Exchange::CallRequest.new(@connection)
  request.id = id
  @connection.local_references.start_transaction
  @connection.remote_references.start_transaction
  begin
    request.exposed_object = remote_object
    request.method_name = method_name
    request.arguments = arguments
  rescue StandardError => e # MessageError, StaleObjectError
    @connection.local_references.rollback_transaction
    @connection.remote_references.rollback_transaction
    raise e
  else
    @connection.local_references.commit_transaction
    @connection.remote_references.commit_transaction
  end
  @connection.messenger.send(request)
  raise ClosedError if @connection.at?([:kill, :close])
  waiter = QuackConcurrency::Waiter.new
  @pending_requests[id] = waiter
  @connection.mutex.unlock
  response = waiter.wait
  @connection.mutex.lock
  @id_allocator.free(id)
  @connection.closer.job_finished
  raise ClosedError if response == nil
  raise response.exception if response.exception
  response.return_value
ensure
  @id_allocator.free(id) if id && @id_allocator.allocated?(id)
end
receive(exchange) click to toggle source
# File lib/sumac/call_dispatcher.rb, line 59
def receive(exchange)
  raise MessageError unless @connection.at?([:active, :initiate_shutdown, :shutdown])
  raise MessageError unless exchange.is_a?(Message::Exchange::CallResponse)
  waiter = @pending_requests[exchange.id]
  @pending_requests.delete(exchange.id)
  raise MessageError unless waiter
  waiter.resume(exchange)
  nil
end