class Async::Bus::Protocol::Connection

Attributes

objects[R]
packer[R]
proxies[R]
transactions[R]
unpacker[R]

Public Class Methods

client(peer) click to toggle source
# File lib/async/bus/protocol/connection.rb, line 38
def self.client(peer)
        self.new(peer, 1)
end
new(peer, id) click to toggle source
# File lib/async/bus/protocol/connection.rb, line 46
def initialize(peer, id)
        @peer = peer
        
        @wrapper = Wrapper.new(self)
        @unpacker = @wrapper.unpacker(peer)
        @packer = @wrapper.packer(peer)
        
        @transactions = {}
        @id = id
        
        @objects = {}
        @proxies = ::ObjectSpace::WeakMap.new
        @finalized = Thread::Queue.new
end
server(peer) click to toggle source
# File lib/async/bus/protocol/connection.rb, line 42
def self.server(peer)
        self.new(peer, 2)
end

Public Instance Methods

[](name) click to toggle source
# File lib/async/bus/protocol/connection.rb, line 92
def [](name)
        unless proxy = @proxies[name]
                proxy = Proxy.new(self, name)
                @proxies[name] = proxy
                
                ObjectSpace.define_finalizer(proxy, finalize(name))
        end
        
        return proxy
end
bind(name, object) click to toggle source
# File lib/async/bus/protocol/connection.rb, line 84
def bind(name, object)
        @objects[name] = object
end
close() click to toggle source
# File lib/async/bus/protocol/connection.rb, line 157
def close
        @transactions.each do |id, transaction|
                transaction.close
        end
        
        @peer.close
end
invoke(name, arguments, options = {}, &block) click to toggle source
# File lib/async/bus/protocol/connection.rb, line 103
def invoke(name, arguments, options = {}, &block)
        
        id = self.next_id
        
        transaction = Transaction.new(self, id)
        @transactions[id] = transaction
        
        transaction.invoke(name, arguments, options, &block)
end
next_id() click to toggle source
# File lib/async/bus/protocol/connection.rb, line 67
def next_id
        id = @id
        @id += 2
        
        return id
end
proxy(object) click to toggle source
# File lib/async/bus/protocol/connection.rb, line 76
def proxy(object)
        name = next_id.to_s(16).freeze
        
        bind(name, object)
        
        return name
end
run() click to toggle source
# File lib/async/bus/protocol/connection.rb, line 113
def run
        finalizer_task = Async do
                while name = @finalized.pop
                        @packer.write([:release, name])
                end
        end
        
        @unpacker.each do |message|
                id = message.shift
                
                if id == :release
                        name = message.shift
                        @objects.delete(name) if name.is_a?(String)
                elsif transaction = @transactions[id]
                        transaction.received.enqueue(message)
                elsif message.first == :invoke
                        message.shift
                        
                        transaction = Transaction.new(self, id)
                        @transactions[id] = transaction
                        
                        name = message.shift
                        object = @objects[name]
                        
                        Async do
                                transaction.accept(object, *message)
                        ensure
                                transaction.close
                        end
                else
                        raise "Out of order message: #{message}"
                end
        end
ensure
        finalizer_task.stop
        
        @transactions.each do |id, transaction|
                transaction.close
        end
        
        @transactions.clear
        @proxies = ::ObjectSpace::WeakMap.new
end

Private Instance Methods

finalize(name) click to toggle source
# File lib/async/bus/protocol/connection.rb, line 88
        def finalize(name)
        proc{@finalized << name}
end