class Spectator::Publisher

Internal class used to publish measurements to an aggregator service

Constants

ADD_OP
COUNTER_STATS
MAX_OP

Public Class Methods

new(registry) click to toggle source
# File lib/spectator/registry.rb, line 121
def initialize(registry)
  @registry = registry
  @started = false
  @should_stop = false
  @frequency = registry.config[:frequency] || 5
  @http = Http.new(registry)
end

Public Instance Methods

append_measurement(payload, table, measure) click to toggle source

Add a measurement to our payload table. The serialization for a measurement is:

- length of tags
- indexes for the tags based on the string table
- operation (add (0), max (10))
- floating point value
# File lib/spectator/registry.rb, line 233
def append_measurement(payload, table, measure)
  op = op_for_measurement(measure)
  common_tags = @registry.common_tags
  tags = measure.id.tags
  len = tags.length + 1 + common_tags.length
  payload.push(len)
  common_tags.each do |k, v|
    payload.push(table[k])
    payload.push(table[v])
  end
  tags.each do |k, v|
    payload.push(table[k])
    payload.push(table[v])
  end
  payload.push(table[:name])
  payload.push(table[measure.id.name])
  payload.push(op)
  payload.push(measure.value)
end
build_string_table(measurements) click to toggle source

Build a string table from the list of measurements Unique words are identified, and assigned a number starting from 0 based on their lexicographical order

# File lib/spectator/registry.rb, line 205
def build_string_table(measurements)
  common_tags = @registry.common_tags
  table = {}
  common_tags.each do |k, v|
    table[k] = 0
    table[v] = 0
  end
  table[:name] = 0
  measurements.each do |m|
    table[m.id.name] = 0
    m.id.tags.each do |k, v|
      table[k] = 0
      table[v] = 0
    end
  end
  keys = table.keys.sort
  keys.each_with_index do |str, index|
    table[str] = index
  end
  table
end
op_for_measurement(measure) click to toggle source

Get the operation to be used for the given Measure Gauges are aggregated using MAX_OP, counters with ADD_OP

# File lib/spectator/registry.rb, line 184
def op_for_measurement(measure)
  stat = measure.id.tags.fetch(:statistic, '')
  if COUNTER_STATS.include? stat
    ADD_OP
  else
    MAX_OP
  end
end
payload_for_measurements(measurements) click to toggle source

Generate a payload from the list of measurements The payload is an array, with the number of elements in the string table The string table, and measurements

# File lib/spectator/registry.rb, line 256
def payload_for_measurements(measurements)
  table = build_string_table(measurements)
  payload = []
  payload.push(table.length)
  strings = table.keys.sort
  payload.concat(strings)
  measurements.each { |m| append_measurement(payload, table, m) }
  payload
end
publish() click to toggle source

Publish loop:

send measurements to the aggregator endpoint ':uri',
every ':frequency' seconds
# File lib/spectator/registry.rb, line 296
def publish
  clock = @registry.clock
  until @should_stop
    start = clock.wall_time
    Spectator.logger.info 'Publishing'
    send_metrics_now
    elapsed = clock.wall_time - start
    sleep @frequency - elapsed if elapsed < @frequency
  end
  Spectator.logger.info 'Stopping publishing thread'
end
registry_measurements() click to toggle source

Get a list of measurements that should be sent

# File lib/spectator/registry.rb, line 267
def registry_measurements
  @registry.measurements.select { |m| should_send(m) }
end
send_metrics_now() click to toggle source

Send the current measurements to our aggregator service

# File lib/spectator/registry.rb, line 272
def send_metrics_now
  ms = registry_measurements

  if ms.empty?
    Spectator.logger.debug 'No measurements to send'
  else
    uri = @registry.config[:uri]
    if uri.nil? || uri.empty?
      Spectator.logger.info('Ignoring sending of metrics ' \
        'since Spectator registry has no valid uri')
      return
    end

    ms.each_slice(@registry.batch_size) do |batch|
      payload = payload_for_measurements(batch)
      Spectator.logger.info "Sending #{batch.length} measurements to #{uri}"
      @http.post_json(uri, payload)
    end
  end
end
should_send(measure) click to toggle source

Gauges are sent if they have a value Counters if they have a number of increments greater than 0

# File lib/spectator/registry.rb, line 195
def should_send(measure)
  op = op_for_measurement(measure)
  return measure.value.positive? if op == ADD_OP

  !measure.value.nan?
end
should_start?() click to toggle source
# File lib/spectator/registry.rb, line 129
def should_start?
  if @started
    Spectator.logger.info('Ignoring start request. ' \
      'Spectator registry already started')
    return false
  end

  @started = true
  uri = @registry.config[:uri]
  if uri.nil? || uri.empty?
    Spectator.logger.info('Ignoring start request since Spectator ' \
                              'registry has no valid uri')
    return false
  end

  true
end
start() click to toggle source

Start publishing if the config is acceptable:

uri is non-nil or empty
# File lib/spectator/registry.rb, line 149
def start
  return unless should_start?

  Spectator.logger.info 'Starting Spectator registry'

  @should_stop = false
  @publish_thread = Thread.new do
    publish
  end
end
stop() click to toggle source

Stop publishing measurements

# File lib/spectator/registry.rb, line 161
def stop
  unless @started
    Spectator.logger.info('Attemping to stop Spectator ' \
      'without a previous call to start')
    return
  end

  @should_stop = true
  Spectator.logger.info('Stopping spectator')
  @publish_thread&.kill

  @started = false
  Spectator.logger.info('Sending last batch of metrics before exiting')
  send_metrics_now
end