class Rookout::ComWs::AgentComWs

Constants

ACCEPTED_MESSAGE_TYPES

Attributes

pending_messages[R]

Public Class Methods

new(output, agent_host, agent_port, proxy, token, labels) click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 28
def initialize output, agent_host, agent_port, proxy, token, labels
  agent_host_with_protocl = agent_host.include?("://") ? agent_host : "ws://#{agent_host}"
  @uri = "#{agent_host_with_protocl}:#{agent_port}/v1"
  if proxy.nil? || proxy.empty?
    @proxy = nil
  else
    @proxy = proxy.include?("://") ? proxy : "http://#{proxy}"
  end

  @token = token
  @token_valid = false

  @output = output
  @info = Information.new labels
  reset_id

  @main_thread = nil
  @outgoing_thread = nil
  @pending_messages = Queue.new

  @running = false
  @ready_event = Concurrent::Event.new
  once("Com::Rookout::InitialAugsCommand") { @ready_event.set }
end

Public Instance Methods

add(message) click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 53
def add message
  buffer = wrap_in_envelope message
  if buffer.length > Config.agent_com_max_message_limit
    exc = Exceptions::RookMessageSizeExceeded.new buffer.length, Config.agent_com_max_message_limit
    warning = Processor::RookError.new exc
    UserWarnings.notify_warning warning

    Logger.instance.warning "Dropping message, size was #{buffer.length} which is over the message size limit"
    return
  end

  @pending_messages.push buffer if @pending_messages.length < Config.agent_com_max_queued_messages
end
connect() click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 71
def connect
  @running = true

  @main_thread = Thread.new { connection_thread }
  @main_thread.name = "rookout-connection-thread"
end
flush_all_messages() click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 96
def flush_all_messages
  flush = FlushMessage
  @pending_messages.push flush
  flush.event.wait Config.agent_com_flush_timeout
end
queue_full?() click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 67
def queue_full?
  @pending_messages.length >= Config.agent_com_max_queued_messages
end
stop() click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 78
def stop
  @running = false

  # Ask outgoing thread to exit (if running)
  @pending_messages << ExitMessage.new(@outgoing_thread)

  @main_thread.join
end
wait_for_ready() click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 87
def wait_for_ready
  is_finished = @ready_event.wait Config.agent_com_timeout
  # We didn't finish - will keep trying in the background
  raise Exceptions::RookCommunicationException unless is_finished

  # We finished - raise if we failed
  raise @connection_error if @connection_error
end

Private Instance Methods

connection_pump(client) click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 151
def connection_pump client
  on_outgoing_exit = proc { client.close }
  @outgoing_thread = Thread.new { outgoing client, on_outgoing_exit }
  @outgoing_thread.name = "rookout-outgoing-thread"

  message_handler = proc do |raw_message|
    envelope = Com::Rookout::Envelope.decode raw_message.pack("c*")

    ACCEPTED_MESSAGE_TYPES.each do |klass|
      next unless envelope.msg.is klass
      emit klass.name, envelope.msg.unpack(klass)
    end
  end

  client.connection_pump message_handler

  Logger.instance.debug "Incoming loop - socket disconnected"

  @pending_messages.push ExitMessage.new(send_thread)
  @outgoing_thread.join
  @outgoing_thread = nil
end
connection_thread() click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 104
def connection_thread
  backoff = Backoff.new

  while @running
    begin
      client = open_new_connection

      Logger.instance.debug "WebSocket connected successfully"
      Logger.instance.info "Finished initialization"

      @token_valid = true
      backoff.after_connect

      connection_pump client
    rescue Exception => e
      if !@token_valid && e.message.include?("403")
        @connection_error = Exceptions::RookInvalidToken.new @token
        @ready_event.set
      end

      Logger.instance.info "Connection failed; reason = #{e.message}"
    end

    backoff.after_disconnect
    Logger.instance.debug "Reconnecting"
  end
end
open_new_connection() click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 132
def open_new_connection
  client = WebsocketClient.new @uri, @proxy, @token
  client.connect

  Logger.instance.info "Registering agent with id #{@agent_id}"
  msg = Com::Rookout::NewAgentMessage.new agent_info: @info.pack
  client.send_frame wrap_in_envelope(msg)

  client
end
outgoing(client, on_exit) click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 174
def outgoing client, on_exit
  Pinger.new(client, Config.agent_com_ping_interval, Config.agent_com_ping_timeout).repeat do
    begin
      msg = @pending_messages.pop true
    rescue ThreadError
      sleep 0.25
      next
    end

    if msg.is_a? ExitMessage
      break if msg.thread == Thread.current
    elsif msg.is_a? FlushMessage
      msg.event.set
    else
      begin
        client.send_frame msg
      rescue RuntimeError
        @queue << msg
        break
      end
    end
  end
rescue Exception => e
  Logger.instance.exception "Outgoing thread failed", e
ensure
  Logger.instance.debug "Outgoing thread exiting"
  on_exit.call
end
reset_id() click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 211
def reset_id
  @agent_id = Utils.uuid
  @output.agent_id = @agent_id
  @info.agent_id = @agent_id
end
wrap_in_envelope(message) click to toggle source
# File lib/rookout/com_ws/agent_com_ws.rb, line 203
def wrap_in_envelope message
  any_message = Google::Protobuf::Any.pack message
  timestamp = Google::Protobuf::Timestamp.new
  timestamp.from_time Time.new
  envelope = Com::Rookout::Envelope.new msg: any_message, timestamp: timestamp
  envelope.to_proto.bytes
end