class SignalFlowWebsocketTransport
A WebSocket transport for SignalFlow. This should not be used directly by end-users.
Constants
- COMPUTATION_START_TIMEOUT_SECONDS
A lower bound on the amount of time to wait for a computation to start
- DETACHED
Public Class Methods
new(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 26 def initialize(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false) @api_token = api_token @stream_endpoint = stream_endpoint @logger = logger @compress = true @proxy_url = proxy_url @debug = debug @lock = Mutex.new @close_reason = nil @last_error = nil reinit end
Public Instance Methods
attach(handle, filters: nil, resolution: nil)
click to toggle source
This doesn't actually work on the backend yet
# File lib/signalfx/signalflow/websocket.rb, line 150 def attach(handle, filters: nil, resolution: nil) channel = make_new_channel send_msg({ :type => "attach", :channel => channel.name, :handle => handle, :filters => filters, :resolution => resolution, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) channel end
close()
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 180 def close if @ws @ws.close end end
detach(channel, reason=nil)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 165 def detach(channel, reason=nil) send_msg({ :type => "detach", :channel => channel, :reason => reason, }.to_json) # There is no response message from the server signifying detach complete # and there could be messages coming in even after the detach request is # sent. Therefore, use a sentinal value in place of the callback block so # that the message receiver logic can distinguish this case from some # anomolous case (say, due to bad logic in the code). @chan_callbacks[channel] = DETACHED end
execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 95 def execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) start_job do |channel_name| send_msg({ :type => "execute", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, :persistent => persistent, :immediate => immediate, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) end end
on_close(msg)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 214 def on_close(msg) if @debug @logger.info("Websocket on_close: #{msg}") end @close_reason = "(#{msg.code}, #{msg.reason})" @chan_callbacks.keys.each do |channel_name| invoke_callback_for_channel({ :event => "CONNECTION_CLOSED" }, channel_name) end reinit end
on_error(e)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 227 def on_error(e) @logger.error("ERROR #{e.inspect}") @last_error = e end
on_message(m)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 233 def on_message(m) if @debug @logger.info("Websocket on_message: #{m}") end is_text = m.data.kind_of?(String) begin message_received(m.data, is_text) rescue Exception => e @logger.error("Error processing SignalFlow message: #{e.backtrace.first}: #{e.message} (#{e.class})") end end
on_open()
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 247 def on_open if @debug @logger.info("Websocket on_open") end @ws.send({ :type => "authenticate", :token => @api_token, }.to_json) end
preflight(program, start, stop, resolution: nil, max_delay: nil)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 112 def preflight(program, start, stop, resolution: nil, max_delay: nil) start_job do |channel_name| send_msg({ :type => "preflight", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) end end
start(program, start: nil, stop: nil, resolution: nil, max_delay: nil)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 127 def start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) start_job do |channel_name| send_msg({ :type => "start", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, }.reject!{|k,v| v.nil?}.to_json) end end
start_job() { |name| ... }
click to toggle source
Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object. Yields to the given block which should send the WS message to start the job.
# File lib/signalfx/signalflow/websocket.rb, line 61 def start_job computation = nil channel = make_new_channel yield channel.name while true begin msg = channel.pop(COMPUTATION_START_TIMEOUT_SECONDS) rescue ChannelTimeout raise "Computation did not start after at least #{COMPUTATION_START_TIMEOUT_SECONDS} seconds" end if msg[:type] == "error" raise ComputationFailure.new(msg[:message]) end # STREAM_START comes before this but contains no useful information if msg[:event] == "JOB_START" computation = Computation.new(msg[:handle], method(:attach), method(:stop)) computation.channel = channel elsif msg[:type] == "computation-started" computation = Computation.new(msg[:computationId], method(:attach), method(:stop)) # Start jobs only use the channel to get error messages and can # detach from the channel once the job has started. channel.detach else next end return computation end end
stop(handle, reason)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 141 def stop(handle, reason) send_msg({ :type => "stop", :handle => handle, :reason => reason, }.reject!{|k,v| v.nil?}.to_json) end
Private Instance Methods
add_parsed_timestamp!(msg)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 341 def add_parsed_timestamp!(msg) if msg.has_key?(:timestampMs) msg[:timestamp] = Time.at(msg[:timestampMs] / 1000.0) end msg end
invoke_callback_for_channel(msg, channel_name)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 297 def invoke_callback_for_channel(msg, channel_name) chan = @chan_callbacks[channel_name] raise "Callback for channel #{channel_name} is missing!" unless chan if chan == DETACHED return else chan.inject_message(msg) end end
make_new_channel()
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 349 def make_new_channel name = @channel_namer.() channel = Channel.new(name, ->(){ detach(name) }) @chan_callbacks[name] = channel channel end
message_received(raw_msg, is_text)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 310 def message_received(raw_msg, is_text) msg = add_parsed_timestamp!(parse_message(raw_msg, is_text)) if msg[:type] == "authenticated" @authenticated = true return end if msg[:channel] invoke_callback_for_channel(msg, msg[:channel]) else # Ignore keep-alives if msg[:event] == "KEEP_ALIVE" return else raise "Unknown SignalFlow message: #{msg}" end end end
parse_message(raw_msg, is_text)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 331 def parse_message(raw_msg, is_text) if is_text JSON.parse(raw_msg, {:symbolize_names => true}) else # Convert the byte array to a string BinaryMessageParser.parse(raw_msg.pack("c*")) end end
reinit()
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 40 def reinit @ws = nil @authenticated = false @chan_callbacks = {} name_lock = Mutex.new num = 0 # Returns a unique channel name each time it is called @channel_namer = ->{ name_lock.synchronize do num += 1 "channel-#{num}" end } end
send_msg(msg)
click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 186 def send_msg(msg) @lock.synchronize do if @ws.nil? startup_client # Polling is the simplest and most robust way to handle blocking until # authenticated. Using ConditionVariable requires more complex logic # that gains very little in terms of efficiecy given how quick auth # should be. start_time = Time.now while !@authenticated # The socket will be closed by the server if auth isn't successful # within 5 seconds so no point in waiting longer if Time.now - start_time > 5 || @close_reason if @last_error raise WebsocketError.new(@last_error) end raise "Could not authenticate to SignalFlow WebSocket: #{@close_reason}" end sleep 0.1 end end @ws.send(msg) end end
startup_client()
click to toggle source
Start up a new WS client in its own thread that runs an EventMachine reactor.
# File lib/signalfx/signalflow/websocket.rb, line 260 def startup_client this = self options = { :tls => { :verify_peer => true, } } if @proxy_url options[:proxy] = { :origin => @proxy_url, } end Thread.new { EM.run { @ws = Faye::WebSocket::Client.new("#{@stream_endpoint}/v2/signalflow/connect", [], options) @ws.on :error do |e| this.on_error(e) end @ws.on :close do |e| this.on_close(e) EM.stop_event_loop end @ws.on :message do |m| this.on_message(m) end @ws.on :open do this.on_open end } } end