class Signalwire::Blade::Connection
Attributes
connected[R]
connection[R]
node_id[R]
session_id[R]
Public Class Methods
new(**options)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 17 def initialize(**options) @options = options @session_id = nil @node_id = nil @connected = false @url = @options.fetch(:url, 'wss://relay.signalwire.com') @log_traffic = options.fetch(:log_traffic, true) @authentication = options.fetch(:authentication, nil) @inbound_queue = EM::Queue.new @outbound_queue = EM::Queue.new @pong = Concurrent::AtomicBoolean.new @keep_alive_timer = nil @ping_is_sent = Concurrent::AtomicBoolean.new @shutdown_list = [] end
Public Instance Methods
clear_connections()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 154 def clear_connections @ws = nil @connected = false @keep_alive_timer.cancel if @keep_alive_timer end
connect!()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 38 def connect! setup_started_event enable_epoll handle_signals main_loop! end
connect_request()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 182 def connect_request req = Connect.new req[:params][:authentication] = @authentication if @authentication req end
connected?()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 188 def connected? @connected == true end
disconnect!()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 160 def disconnect! clear_connections EM.stop end
enable_epoll()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 97 def enable_epoll # This is only enabled on Linux EM.epoll logger.debug "Running with epoll #{EM.epoll?}" end
enqueue_inbound(message)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 174 def enqueue_inbound(message) @inbound_queue.push message end
enqueue_outbound(message)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 178 def enqueue_outbound(message) @outbound_queue.push message end
execute(params, &block)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 135 def execute(params, &block) block_given? ? write_command(Execute.new(params), &block) : write_command(Execute.new(params)) end
flush_queues()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 165 def flush_queues @inbound_queue.pop { |inbound| receive(inbound) } until @inbound_queue.empty? if connected? @outbound_queue.pop { |outbound| write(outbound) } until @outbound_queue.empty? end schedule_flush_queues end
handle_close()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 149 def handle_close logger.warn "WS Socket closed!" reconnect! end
handle_execute_response(event, &block)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 130 def handle_execute_response(event, &block) logger.error("Blade error occurred, code #{event.error_code}: #{event.error_message}") if event.error? block.call(event) end
handle_signals()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 223 def handle_signals Signal.trap('INT') do shutdown_from_signal end Signal.trap('TERM') do shutdown_from_signal end end
keep_alive()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 192 def keep_alive if @ping_is_sent.false? ping do @pong.make_true end @ping_is_sent.make_true else if @pong.false? logger.error "KEEPALIVE: Ping failed" reconnect! if connected? end @ping_is_sent.make_false end @keep_alive_timer = EventMachine::Timer.new(Signalwire::Relay::PING_TIMEOUT) do keep_alive end end
log_traffic(direction, message)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 211 def log_traffic(direction, message) if @log_traffic pretty = case direction when :send JSON.pretty_generate(JSON.parse(message)) when :recv JSON.pretty_generate(message) end end logger.debug "#{direction.to_s.upcase}: #{pretty}" end
main_loop!()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 54 def main_loop! EM.run do logger.info "CREATING SOCKET" @ws = Faye::WebSocket::Client.new(@url) @ws.on(:open) { |event| broadcast :started, event } @ws.on(:message) { |event| enqueue_inbound event } @ws.on(:close) { handle_close } @ws.on :error do |error| logger.error "Error occurred: #{error.message}" end schedule_flush_queues end end
ping(&block)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 139 def ping(&block) ping_cmd = Ping.new block_given? ? write_command(ping_cmd, &block) : write_command(ping_cmd) ping_cmd end
receive(message)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 112 def receive(message) event = Message.from_json(message.data) log_traffic :recv, event.payload EM.defer do broadcast :message, event end end
reconnect!()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 46 def reconnect! clear_connections return if @shutdown sleep Signalwire::Blade::RECONNECT_PERIOD logger.info "Attempting reconnection" main_loop! end
register_for_shutdown(obj)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 241 def register_for_shutdown(obj) @shutdown_list << obj end
schedule_flush_queues()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 71 def schedule_flush_queues EM.add_timer(0.005) { flush_queues } end
setup_started_event()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 75 def setup_started_event on :started do |_event| begin @connected = true myreq = connect_request @pong.make_false write_command(myreq) do |event| @session_id = event.dig(:result, :sessionid) unless @session_id @node_id = event.dig(:result, :nodeid) unless @node_d logger.info "Blade Session connected with id: #{@session_id}" broadcast :connected, event keep_alive end rescue StandardError => e logger.error e.inspect logger.error e.backtrace end end end
shutdown_from_signal()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 234 def shutdown_from_signal @shutdown = true shutdown_registered disconnect! exit end
shutdown_registered()
click to toggle source
# File lib/signalwire/blade/connection.rb, line 245 def shutdown_registered @shutdown_list.each do |obj| obj.stop end end
subscribe(params, &block)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 145 def subscribe(params, &block) block_given? ? write_command(Subscribe.new(params), &block) : write_command(Subscribe.new(params)) end
transmit(message)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 103 def transmit(message) enqueue_outbound message end
write(message)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 107 def write(message) log_traffic :send, message @ws.send(message) end
write_command(command, &block)
click to toggle source
# File lib/signalwire/blade/connection.rb, line 120 def write_command(command, &block) if block_given? once :message, id: command.id do |event| handle_execute_response(event, &block) end end transmit(command.build_request.to_json) end