class Grumlin::Transport
Attributes
url[R]
Public Class Methods
new(url, parent: Async::Task.current, **client_options)
click to toggle source
Transport
is not reusable. Once closed should be recreated.
# File lib/grumlin/transport.rb, line 13 def initialize(url, parent: Async::Task.current, **client_options) @url = url @parent = parent @client_options = client_options @request_channel = Async::Channel.new @response_channel = Async::Channel.new end
Public Instance Methods
close()
click to toggle source
# File lib/grumlin/transport.rb, line 45 def close return if @closed @closed = true @request_channel.close @response_channel.close begin @connection.close rescue StandardError nil end @connection = nil @request_task&.stop(true) @response_task&.stop(true) end
connect()
click to toggle source
# File lib/grumlin/transport.rb, line 25 def connect raise "ClientClosed" if @closed raise AlreadyConnectedError if connected? @connection = Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(@url), **@client_options) logger.debug(self) { "Connected to #{@url}." } @response_task = @parent.async { run_response_task } @request_task = @parent.async { run_request_task } @response_channel end
connected?()
click to toggle source
# File lib/grumlin/transport.rb, line 21 def connected? !@connection.nil? end
wait()
click to toggle source
# File lib/grumlin/transport.rb, line 64 def wait @request_task.wait @response_task.wait end
write(message)
click to toggle source
# File lib/grumlin/transport.rb, line 39 def write(message) raise NotConnectedError unless connected? @request_channel << message end
Private Instance Methods
run_request_task()
click to toggle source
# File lib/grumlin/transport.rb, line 80 def run_request_task with_guard do @request_channel.each do |message| @connection.write(message) @connection.flush end end end
run_response_task()
click to toggle source
# File lib/grumlin/transport.rb, line 71 def run_response_task with_guard do loop do data = @connection.read @response_channel << data end end end
with_guard() { || ... }
click to toggle source
# File lib/grumlin/transport.rb, line 89 def with_guard yield rescue Async::Stop, Async::TimeoutError, StandardError => e logger.debug(self) { "Guard error, closing." } begin @response_channel.exception(e) rescue Async::Channel::ChannelClosedError nil end close end