class Async::Bus::Protocol::Transaction
Attributes
id[R]
received[R]
Public Class Methods
new(connection, id)
click to toggle source
# File lib/async/bus/protocol/transaction.rb, line 29 def initialize(connection, id) @connection = connection @id = id @received = Async::Queue.new @accept = nil end
Public Instance Methods
accept(object, arguments, options, block)
click to toggle source
Accept a remote procedure invokation.
# File lib/async/bus/protocol/transaction.rb, line 91 def accept(object, arguments, options, block) if block result = object.public_send(*arguments, **options) do |*yield_arguments| self.write(:yield, yield_arguments) what, result = self.read case what when :next result when :close return when :error raise(result) end end else result = object.public_send(*arguments, **options) end self.write(:return, result) rescue UncaughtThrowError => error self.write(:throw, error.tag) rescue => error self.write(:error, error) # ensure # self.write(:close) end
close()
click to toggle source
# File lib/async/bus/protocol/transaction.rb, line 53 def close if @connection connection = @connection @connection = nil connection.transactions.delete(@id) end end
invoke(name, arguments, options) { |*result| ... }
click to toggle source
Invoke a remote procedure.
# File lib/async/bus/protocol/transaction.rb, line 63 def invoke(name, arguments, options, &block) Console.logger.debug(self) {[name, arguments, options, block]} self.write(:invoke, name, arguments, options, block_given?) while response = self.read what, result = response case what when :error raise(result) when :return return(result) when :yield begin result = yield(*result) self.write(:next, result) rescue => error self.write(:error, error) end end end # ensure # self.write(:close) end
read()
click to toggle source
# File lib/async/bus/protocol/transaction.rb, line 40 def read if @received.empty? @connection.packer.flush end @received.dequeue end
write(*arguments)
click to toggle source
# File lib/async/bus/protocol/transaction.rb, line 48 def write(*arguments) @connection.packer.write([id, *arguments]) @connection.packer.flush end