class Anyt::Client
Synchronous websocket client Based on github.com/rails/rails/blob/v5.0.1/actioncable/test/client_test.rb
Constants
- WAIT_WHEN_EXPECTING_EVENT
- WAIT_WHEN_NOT_EXPECTING_EVENT
Public Class Methods
new( ignore: [], url: Anyt.config.target_url, qs: "", cookies: "", headers: {}, timeout_multiplier: Anyt.config.timeout_multiplier )
click to toggle source
rubocop: disable Metrics/AbcSize rubocop: disable Metrics/MethodLength rubocop: disable Metrics/BlockLength
# File lib/anyt/client.rb, line 18 def initialize( ignore: [], url: Anyt.config.target_url, qs: "", cookies: "", headers: {}, timeout_multiplier: Anyt.config.timeout_multiplier ) ignore_message_types = @ignore_message_types = ignore messages = @messages = Queue.new closed = @closed = Concurrent::Event.new has_messages = @has_messages = Concurrent::Semaphore.new(0) @timeout_multiplier = timeout_multiplier headers = headers.merge("cookie" => cookies) open = Concurrent::Promise.new @ws = WebSocket::Client::Simple.connect( url + "?#{qs}", headers: headers ) do |ws| ws.on(:error) do |event| event = RuntimeError.new(event.message) unless event.is_a?(Exception) if open.pending? open.fail(event) else messages << event has_messages.release end end ws.on(:open) do |_event| open.set(true) end ws.on(:message) do |event| next if event.type == :ping if event.type == :close closed.set else message = JSON.parse(event.data) next if ignore_message_types.include?(message["type"]) AnyCable.logger.debug "Message received: #{message}" messages << message has_messages.release end end ws.on(:close) do |_event| closed.set end end open.wait!(WAIT_WHEN_EXPECTING_EVENT * @timeout_multiplier) end
Public Instance Methods
close(allow_messages: false)
click to toggle source
# File lib/anyt/client.rb, line 96 def close(allow_messages: false) sleep WAIT_WHEN_NOT_EXPECTING_EVENT * @timeout_multiplier raise "#{@messages.size} messages unprocessed" unless allow_messages || @messages.empty? @ws.close wait_for_close end
closed?()
click to toggle source
# File lib/anyt/client.rb, line 109 def closed? @closed.set? end
receive(timeout: WAIT_WHEN_EXPECTING_EVENT)
click to toggle source
rubocop: enable Metrics/BlockLength rubocop: enable Metrics/AbcSize rubocop: enable Metrics/MethodLength
# File lib/anyt/client.rb, line 80 def receive(timeout: WAIT_WHEN_EXPECTING_EVENT) timeout *= @timeout_multiplier raise TimeoutError, "Timed out to receive message" unless @has_messages.try_acquire(1, timeout) msg = @messages.pop(true) raise msg if msg.is_a?(Exception) msg end
send(message)
click to toggle source
# File lib/anyt/client.rb, line 92 def send(message) @ws.send(JSON.generate(message)) end
wait_for_close()
click to toggle source
# File lib/anyt/client.rb, line 105 def wait_for_close @closed.wait(WAIT_WHEN_EXPECTING_EVENT * @timeout_multiplier) end