class RSpec::Buildkite::Analytics::Session

Constants

CONFIRMATION_TIMEOUT

Picked 75 as the magic timeout number as it's longer than the TCP timeout of 60s 🤷‍♀️

MAX_RECONNECTION_ATTEMPTS
WAIT_BETWEEN_RECONNECTIONS

Public Class Methods

new(url, authorization_header, channel) click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 14
def initialize(url, authorization_header, channel)
  @queue = Queue.new
  @channel = channel

  @unconfirmed_idents = {}
  @idents_mutex = Mutex.new
  @empty = ConditionVariable.new
  @closing = false
  @reconnection_mutex = Mutex.new

  @url = url
  @authorization_header = authorization_header

  connect
rescue TimeoutError => e
  $stderr.puts "rspec-buildkite-analytics could not establish an initial connection with Buildkite. Please contact support."
end

Public Instance Methods

close() click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 62
def close()
  @closing = true

  # Because the server only sends us confirmations after every 10mb of
  # data it uploads to S3, we'll never get confirmation of the
  # identifiers of the last upload part unless we send an explicit finish,
  # to which the server will respond with the last bits of data
  send_eot

  @idents_mutex.synchronize do
    # Here, we sleep for 75 seconds while waiting for the server to confirm the last idents.
    # We are woken up when the unconfirmed_idents is empty, and given back the mutex to
    # continue operation.
    @empty.wait(@idents_mutex, CONFIRMATION_TIMEOUT) unless @unconfirmed_idents.empty?
  end

  # Then we always disconnect cos we can't wait forever? 🤷‍♀️
  @connection.close
end
disconnected(connection) click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 32
def disconnected(connection)
  reconnection_count = 0
  @reconnection_mutex.synchronize do
    # When the first thread detects a disconnection, it calls the disconnect method
    # with the current connection. This thread grabs the reconnection mutex and does the
    # reconnection, which then updates the value of @connection.
    #
    # At some point in that process, the second thread would have detected the
    # disconnection too, and it also calls it with the current connection. However, the
    # second thread can't run the reconnection code because of the mutex. By the
    # time the mutex is released, the value of @connection has been refreshed, and so
    # the second thread returns early and does not reattempt the reconnection.
    return unless connection == @connection

    begin
      reconnection_count += 1
      connect
    rescue SocketConnection::HandshakeError, RejectedSubscription, TimeoutError, SocketConnection::SocketError => e
      if reconnection_count > MAX_RECONNECTION_ATTEMPTS
        $stderr.puts "rspec-buildkite-analytics experienced a disconnection and could not reconnect to Buildkite due to #{e.message}. Please contact support."
        raise e
      else
        sleep(WAIT_BETWEEN_RECONNECTIONS)
        retry
      end
    end
  end
  retransmit
end
handle(_connection, data) click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 82
def handle(_connection, data)
  data = JSON.parse(data)
  case data["type"]
  when "ping"
    # In absence of other message, the server sends us a ping every 3 seconds
    # We are currently not doing anything with these
  when "welcome", "confirm_subscription"
    # Push these two messages onto the queue, so that we block on waiting for the
    # initializing phase to complete
    @queue.push(data)
  when "reject_subscription"
    raise RejectedSubscription
  else
    process_message(data)
  end
end
unconfirmed_idents_count() click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 107
def unconfirmed_idents_count
  @idents_mutex.synchronize do
    @unconfirmed_idents.count
  end
end
write_result(result) click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 99
def write_result(result)
  result_as_json = result.as_json

  add_unconfirmed_idents(result.id, result_as_json)

  transmit_results([result_as_json])
end

Private Instance Methods

add_unconfirmed_idents(ident, data) click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 163
def add_unconfirmed_idents(ident, data)
  @idents_mutex.synchronize do
    @unconfirmed_idents[ident] = data
  end
end
connect() click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 126
def connect
  @connection = SocketConnection.new(self, @url, {
    "Authorization" => @authorization_header,
  })

  wait_for_welcome

  @connection.transmit({
    "command" => "subscribe",
    "identifier" => @channel
  })

  wait_for_confirm
end
pop_with_timeout() click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 141
def pop_with_timeout
  Timeout.timeout(30, RSpec::Buildkite::Analytics::TimeoutError, "Waited 30 seconds") do
    @queue.pop
  end
end
process_message(data) click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 198
def process_message(data)
  # Check we're getting the data we expect
  return unless data["identifier"] == @channel

  case
  when data["message"].key?("confirm")
    remove_unconfirmed_idents(data["message"]["confirm"])
  else
    # unhandled message
  end
end
remove_unconfirmed_idents(idents) click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 169
def remove_unconfirmed_idents(idents)
  return if idents.empty?

  @idents_mutex.synchronize do
    # Remove received idents from unconfirmed_idents
    idents.each { |key| @unconfirmed_idents.delete(key) }

    # This @empty ConditionVariable broadcasts every time that @unconfirmed_idents is
    # empty, which will happen about every 10mb of data as that's when the server
    # sends back confirmations.
    #
    # However, there aren't any threads waiting on this signal until after we
    # send the EOT message, so the prior broadcasts shouldn't do anything.
    @empty.broadcast if @unconfirmed_idents.empty?
  end
end
retransmit() click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 210
def retransmit
  data = @idents_mutex.synchronize do
    @unconfirmed_idents.values
  end

  # send the contents of the buffer, unless it's empty
  transmit_results(data) unless data.empty?
  # if we were disconnected in the closing phase, then resend the EOT
  # message so the server can persist the last upload part
  send_eot if @closing
end
send_eot() click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 186
def send_eot
  # Expect server to respond with data of indentifiers last upload part

  @connection.transmit({
    "identifier" => @channel,
    "command" => "message",
    "data" => {
      "action" => "end_of_transmission"
    }.to_json
  })
end
transmit_results(results_as_json) click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 115
def transmit_results(results_as_json)
  @connection.transmit({
    "identifier" => @channel,
    "command" => "message",
    "data" => {
      "action" => "record_results",
      "results" => results_as_json
      }.to_json
    })
end
wait_for_confirm() click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 155
def wait_for_confirm
  confirm = pop_with_timeout

  if confirm && confirm != { "type" => "confirm_subscription", "identifier" => @channel }
    raise "Not a confirm: #{confirm.inspect}"
  end
end
wait_for_welcome() click to toggle source
# File lib/rspec/buildkite/analytics/session.rb, line 147
def wait_for_welcome
  welcome = pop_with_timeout

  if welcome && welcome != { "type" => "welcome" }
    raise "Not a welcome: #{welcome.inspect}"
  end
end