class Rookout::ComWs::AgentComWs
Constants
- ACCEPTED_MESSAGE_TYPES
Attributes
pending_messages[R]
Public Class Methods
new(output, agent_host, agent_port, proxy, token, labels)
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 28 def initialize output, agent_host, agent_port, proxy, token, labels agent_host_with_protocl = agent_host.include?("://") ? agent_host : "ws://#{agent_host}" @uri = "#{agent_host_with_protocl}:#{agent_port}/v1" if proxy.nil? || proxy.empty? @proxy = nil else @proxy = proxy.include?("://") ? proxy : "http://#{proxy}" end @token = token @token_valid = false @output = output @info = Information.new labels reset_id @main_thread = nil @outgoing_thread = nil @pending_messages = Queue.new @running = false @ready_event = Concurrent::Event.new once("Com::Rookout::InitialAugsCommand") { @ready_event.set } end
Public Instance Methods
add(message)
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 53 def add message buffer = wrap_in_envelope message if buffer.length > Config.agent_com_max_message_limit exc = Exceptions::RookMessageSizeExceeded.new buffer.length, Config.agent_com_max_message_limit warning = Processor::RookError.new exc UserWarnings.notify_warning warning Logger.instance.warning "Dropping message, size was #{buffer.length} which is over the message size limit" return end @pending_messages.push buffer if @pending_messages.length < Config.agent_com_max_queued_messages end
connect()
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 71 def connect @running = true @main_thread = Thread.new { connection_thread } @main_thread.name = "rookout-connection-thread" end
flush_all_messages()
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 96 def flush_all_messages flush = FlushMessage @pending_messages.push flush flush.event.wait Config.agent_com_flush_timeout end
queue_full?()
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 67 def queue_full? @pending_messages.length >= Config.agent_com_max_queued_messages end
stop()
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 78 def stop @running = false # Ask outgoing thread to exit (if running) @pending_messages << ExitMessage.new(@outgoing_thread) @main_thread.join end
wait_for_ready()
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 87 def wait_for_ready is_finished = @ready_event.wait Config.agent_com_timeout # We didn't finish - will keep trying in the background raise Exceptions::RookCommunicationException unless is_finished # We finished - raise if we failed raise @connection_error if @connection_error end
Private Instance Methods
connection_pump(client)
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 151 def connection_pump client on_outgoing_exit = proc { client.close } @outgoing_thread = Thread.new { outgoing client, on_outgoing_exit } @outgoing_thread.name = "rookout-outgoing-thread" message_handler = proc do |raw_message| envelope = Com::Rookout::Envelope.decode raw_message.pack("c*") ACCEPTED_MESSAGE_TYPES.each do |klass| next unless envelope.msg.is klass emit klass.name, envelope.msg.unpack(klass) end end client.connection_pump message_handler Logger.instance.debug "Incoming loop - socket disconnected" @pending_messages.push ExitMessage.new(send_thread) @outgoing_thread.join @outgoing_thread = nil end
connection_thread()
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 104 def connection_thread backoff = Backoff.new while @running begin client = open_new_connection Logger.instance.debug "WebSocket connected successfully" Logger.instance.info "Finished initialization" @token_valid = true backoff.after_connect connection_pump client rescue Exception => e if !@token_valid && e.message.include?("403") @connection_error = Exceptions::RookInvalidToken.new @token @ready_event.set end Logger.instance.info "Connection failed; reason = #{e.message}" end backoff.after_disconnect Logger.instance.debug "Reconnecting" end end
open_new_connection()
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 132 def open_new_connection client = WebsocketClient.new @uri, @proxy, @token client.connect Logger.instance.info "Registering agent with id #{@agent_id}" msg = Com::Rookout::NewAgentMessage.new agent_info: @info.pack client.send_frame wrap_in_envelope(msg) client end
outgoing(client, on_exit)
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 174 def outgoing client, on_exit Pinger.new(client, Config.agent_com_ping_interval, Config.agent_com_ping_timeout).repeat do begin msg = @pending_messages.pop true rescue ThreadError sleep 0.25 next end if msg.is_a? ExitMessage break if msg.thread == Thread.current elsif msg.is_a? FlushMessage msg.event.set else begin client.send_frame msg rescue RuntimeError @queue << msg break end end end rescue Exception => e Logger.instance.exception "Outgoing thread failed", e ensure Logger.instance.debug "Outgoing thread exiting" on_exit.call end
reset_id()
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 211 def reset_id @agent_id = Utils.uuid @output.agent_id = @agent_id @info.agent_id = @agent_id end
wrap_in_envelope(message)
click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 203 def wrap_in_envelope message any_message = Google::Protobuf::Any.pack message timestamp = Google::Protobuf::Timestamp.new timestamp.from_time Time.new envelope = Com::Rookout::Envelope.new msg: any_message, timestamp: timestamp envelope.to_proto.bytes end