class Grumlin::Transport

Attributes

url[R]

Public Class Methods

new(url, parent: Async::Task.current, **client_options) click to toggle source

Transport is not reusable. Once closed should be recreated.

# File lib/grumlin/transport.rb, line 13
def initialize(url, parent: Async::Task.current, **client_options)
  @url = url
  @parent = parent
  @client_options = client_options
  @request_channel = Async::Channel.new
  @response_channel = Async::Channel.new
end

Public Instance Methods

close() click to toggle source
# File lib/grumlin/transport.rb, line 45
def close
  return if @closed

  @closed = true

  @request_channel.close
  @response_channel.close

  begin
    @connection.close
  rescue StandardError
    nil
  end
  @connection = nil

  @request_task&.stop(true)
  @response_task&.stop(true)
end
connect() click to toggle source
# File lib/grumlin/transport.rb, line 25
def connect
  raise "ClientClosed" if @closed
  raise AlreadyConnectedError if connected?

  @connection = Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(@url), **@client_options)
  logger.debug(self) { "Connected to #{@url}." }

  @response_task = @parent.async { run_response_task }

  @request_task = @parent.async { run_request_task }

  @response_channel
end
connected?() click to toggle source
# File lib/grumlin/transport.rb, line 21
def connected?
  !@connection.nil?
end
wait() click to toggle source
# File lib/grumlin/transport.rb, line 64
def wait
  @request_task.wait
  @response_task.wait
end
write(message) click to toggle source
# File lib/grumlin/transport.rb, line 39
def write(message)
  raise NotConnectedError unless connected?

  @request_channel << message
end

Private Instance Methods

run_request_task() click to toggle source
# File lib/grumlin/transport.rb, line 80
def run_request_task
  with_guard do
    @request_channel.each do |message|
      @connection.write(message)
      @connection.flush
    end
  end
end
run_response_task() click to toggle source
# File lib/grumlin/transport.rb, line 71
def run_response_task
  with_guard do
    loop do
      data = @connection.read
      @response_channel << data
    end
  end
end
with_guard() { || ... } click to toggle source
# File lib/grumlin/transport.rb, line 89
def with_guard
  yield
rescue Async::Stop, Async::TimeoutError, StandardError => e
  logger.debug(self) { "Guard error, closing." }
  begin
    @response_channel.exception(e)
  rescue Async::Channel::ChannelClosedError
    nil
  end
  close
end