class Async::Bus::Protocol::Transaction

Attributes

id[R]
received[R]

Public Class Methods

new(connection, id) click to toggle source
# File lib/async/bus/protocol/transaction.rb, line 29
def initialize(connection, id)
        @connection = connection
        @id = id
        
        @received = Async::Queue.new
        @accept = nil
end

Public Instance Methods

accept(object, arguments, options, block) click to toggle source

Accept a remote procedure invokation.

# File lib/async/bus/protocol/transaction.rb, line 91
def accept(object, arguments, options, block)
        if block
                result = object.public_send(*arguments, **options) do |*yield_arguments|
                        self.write(:yield, yield_arguments)
                        what, result = self.read
                        
                        case what
                        when :next
                                result
                        when :close
                                return
                        when :error
                                raise(result)
                        end
                end
        else
                result = object.public_send(*arguments, **options)
        end
        
        self.write(:return, result)
rescue UncaughtThrowError => error
        self.write(:throw, error.tag)
rescue => error
        self.write(:error, error)
# ensure
#   self.write(:close)
end
close() click to toggle source
# File lib/async/bus/protocol/transaction.rb, line 53
def close
        if @connection
                connection = @connection
                @connection = nil
                
                connection.transactions.delete(@id)
        end
end
invoke(name, arguments, options) { |*result| ... } click to toggle source

Invoke a remote procedure.

# File lib/async/bus/protocol/transaction.rb, line 63
def invoke(name, arguments, options, &block)
        Console.logger.debug(self) {[name, arguments, options, block]}
        
        self.write(:invoke, name, arguments, options, block_given?)
        
        while response = self.read
                what, result = response
                
                case what
                when :error
                        raise(result)
                when :return
                        return(result)
                when :yield
                        begin
                                result = yield(*result)
                                self.write(:next, result)
                        rescue => error
                                self.write(:error, error)
                        end
                end
        end
        
# ensure
#   self.write(:close)
end
read() click to toggle source
# File lib/async/bus/protocol/transaction.rb, line 40
def read
        if @received.empty?
                @connection.packer.flush
        end
        
        @received.dequeue
end
write(*arguments) click to toggle source
# File lib/async/bus/protocol/transaction.rb, line 48
def write(*arguments)
        @connection.packer.write([id, *arguments])
        @connection.packer.flush
end