class Async::Bus::Protocol::Connection
Attributes
objects[R]
packer[R]
proxies[R]
transactions[R]
unpacker[R]
Public Class Methods
client(peer)
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 38 def self.client(peer) self.new(peer, 1) end
new(peer, id)
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 46 def initialize(peer, id) @peer = peer @wrapper = Wrapper.new(self) @unpacker = @wrapper.unpacker(peer) @packer = @wrapper.packer(peer) @transactions = {} @id = id @objects = {} @proxies = ::ObjectSpace::WeakMap.new @finalized = Thread::Queue.new end
server(peer)
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 42 def self.server(peer) self.new(peer, 2) end
Public Instance Methods
[](name)
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 92 def [](name) unless proxy = @proxies[name] proxy = Proxy.new(self, name) @proxies[name] = proxy ObjectSpace.define_finalizer(proxy, finalize(name)) end return proxy end
bind(name, object)
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 84 def bind(name, object) @objects[name] = object end
close()
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 157 def close @transactions.each do |id, transaction| transaction.close end @peer.close end
invoke(name, arguments, options = {}, &block)
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 103 def invoke(name, arguments, options = {}, &block) id = self.next_id transaction = Transaction.new(self, id) @transactions[id] = transaction transaction.invoke(name, arguments, options, &block) end
next_id()
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 67 def next_id id = @id @id += 2 return id end
proxy(object)
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 76 def proxy(object) name = next_id.to_s(16).freeze bind(name, object) return name end
run()
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 113 def run finalizer_task = Async do while name = @finalized.pop @packer.write([:release, name]) end end @unpacker.each do |message| id = message.shift if id == :release name = message.shift @objects.delete(name) if name.is_a?(String) elsif transaction = @transactions[id] transaction.received.enqueue(message) elsif message.first == :invoke message.shift transaction = Transaction.new(self, id) @transactions[id] = transaction name = message.shift object = @objects[name] Async do transaction.accept(object, *message) ensure transaction.close end else raise "Out of order message: #{message}" end end ensure finalizer_task.stop @transactions.each do |id, transaction| transaction.close end @transactions.clear @proxies = ::ObjectSpace::WeakMap.new end
Private Instance Methods
finalize(name)
click to toggle source
# File lib/async/bus/protocol/connection.rb, line 88 def finalize(name) proc{@finalized << name} end