class SignalFxClient

Constants

EVENT_CATEGORIES
EVENT_ENDPOINT_SUFFIX
HEADER_API_TOKEN_KEY
HEADER_CONTENT_TYPE
HEADER_USER_AGENT_KEY
INGEST_ENDPOINT_SUFFIX

Public Class Methods

new(api_token, enable_aws_unique_id: false, ingest_endpoint: RbConfig::DEFAULT_INGEST_ENDPOINT, stream_endpoint: RbConfig::DEFAULT_STREAM_ENDPOINT, timeout: RbConfig::DEFAULT_TIMEOUT, batch_size: RbConfig::DEFAULT_BATCH_SIZE, user_agents: [], logger: Logger.new(STDOUT, progname: "signalfx")) click to toggle source
# File lib/signalfx/signal_fx_client.rb, line 31
def initialize(api_token,
               enable_aws_unique_id: false,
               ingest_endpoint: RbConfig::DEFAULT_INGEST_ENDPOINT,
               stream_endpoint: RbConfig::DEFAULT_STREAM_ENDPOINT,
               timeout: RbConfig::DEFAULT_TIMEOUT,
               batch_size: RbConfig::DEFAULT_BATCH_SIZE,
               user_agents: [],
               logger: Logger.new(STDOUT, progname: "signalfx"))

  @api_token = api_token
  @ingest_endpoint = ingest_endpoint
  @stream_endpoint = stream_endpoint
  @timeout = timeout
  @batch_size = batch_size
  @user_agents = user_agents
  @logger = logger

  @aws_unique_id = nil

  @queue = Queue.new
  @async_running = false

  if enable_aws_unique_id
    retrieve_aws_unique_id { |request|
      if request != nil
        json_resp = JSON.parse(request.body)
        @aws_unique_id = json_resp['instanceId']+'_'+json_resp['region']+'_'+json_resp['accountId']
        @logger.info("AWS Unique ID loaded: #{@aws_unique_id}")
      else
        @logger.warn('Failed to retrieve AWS unique ID.')
      end
    }
  end
end

Public Instance Methods

send(cumulative_counters: nil, gauges: nil, counters: nil) click to toggle source

Send the given metrics to SignalFx synchronously. You can use this method to send data via reporters such as Codahale style libraries

Args:

cumulative_counters (list): a list of dictionaries representing the
             cumulative counters to report.
gauges (list): a list of dictionaries representing the gauges to report.
counters (list): a list of dictionaries representing the counters to report.
# File lib/signalfx/signal_fx_client.rb, line 74
def send(cumulative_counters: nil, gauges: nil, counters: nil)
  process_datapoint('cumulative_counter', cumulative_counters)
  process_datapoint('gauge', gauges)
  process_datapoint('counter', counters)

  data_points_list = []
  while @queue.length > 0 && data_points_list.length < @batch_size
    data_points_list << @queue.shift
  end

  data_to_send = batch_data(data_points_list)

  begin
    post(data_to_send, @ingest_endpoint, INGEST_ENDPOINT_SUFFIX)
  ensure
    @async_running = false
  end
end
send_async(cumulative_counters: nil, gauges: nil, counters: nil) click to toggle source

Send the given metrics to SignalFx asynchronously.

Args:

cumulative_counters (list): a list of dictionaries representing the
             cumulative counters to report.
gauges (list): a list of dictionaries representing the gauges to report.
counters (list): a list of dictionaries representing the counters to report.
# File lib/signalfx/signal_fx_client.rb, line 100
def send_async(cumulative_counters: nil, gauges: nil, counters: nil)
  process_datapoint('cumulative_counter', cumulative_counters)
  process_datapoint('gauge', gauges)
  process_datapoint('counter', counters)

  if @async_running
    return
  end

  data_points_list = []
  while @queue.length > 0 && data_points_list.length < @batch_size
    data_points_list << @queue.shift
  end

  data_to_send = batch_data(data_points_list)

  @async_running = true

  Thread.abort_on_exception = true
  Thread.start {
    begin
      post(data_to_send, @ingest_endpoint, INGEST_ENDPOINT_SUFFIX) {
        @async_running = false
      }
    ensure
      @async_running = false
    end
  }
end
send_event(event_type, event_category: EVENT_CATEGORIES[:USER_DEFINED], dimensions: {}, properties: {}, timestamp: (Time.now.to_i * 1000).to_i) click to toggle source

Send an event to SignalFx.

Args:

event_type (string): the event type (name of the event time series).
event_category (string): the category of event. Choose one from EVENT_CATEGORIES list
dimensions (dict): a map of event dimensions.
properties (dict): a map of extra properties on that event.
timestamp (int64): a timestamp, by default is current time
# File lib/signalfx/signal_fx_client.rb, line 139
def send_event(event_type, event_category: EVENT_CATEGORIES[:USER_DEFINED],
               dimensions: {}, properties: {}, timestamp: (Time.now.to_i * 1000).to_i)
  if event_type.blank?
    raise 'Type of event should not be empty!'
  end

  event_cat = event_category
  if event_category.blank?
    event_cat = EVENT_CATEGORIES[:USER_DEFINED]
  end

  if !event_cat.blank? and !EVENT_CATEGORIES.has_value?(event_cat)
    raise 'Unsupported event category: ' + event_cat
  end

  data = {
      category: event_cat,
      eventType: event_type,
      dimensions: dimensions,
      properties: properties,
      timestamp: timestamp
  }

  if @aws_unique_id
    data[:dimensions][RbConfig::AWS_UNIQUE_ID_DIMENSION_NAME] = @aws_unique_id
  end

  post(build_event(data), @ingest_endpoint, EVENT_ENDPOINT_SUFFIX)
end
signalflow(proxy_url: nil, debug: false) click to toggle source

Create a new SignalFlow client. A single client can execute multiple computations that will be multiplexed over the same WebSocket connection.

@return [SignalFlowClient] a newly instantiated client, configured with the

api token and endpoints from this class
# File lib/signalfx/signal_fx_client.rb, line 174
def signalflow(proxy_url: nil, debug: false)
  if ENV["http_proxy"] and proxy_url == nil
    proxy_url = ENV["http_proxy"]
  end
  SignalFlowClient.new(@api_token, @stream_endpoint, proxy_url: proxy_url, debug: debug)
end

Protected Instance Methods

add_to_queue(metric_type, datapoint) click to toggle source
# File lib/signalfx/signal_fx_client.rb, line 191
def add_to_queue(metric_type, datapoint)
  raise 'Subclasses should implement this!'
end
batch_data(data_point_list) click to toggle source
# File lib/signalfx/signal_fx_client.rb, line 195
def batch_data(data_point_list)
  raise 'Subclasses should implement this!'
end
build_event(event) click to toggle source
# File lib/signalfx/signal_fx_client.rb, line 199
def build_event(event)
  raise 'Subclasses should implement this!'
end
get_queue() click to toggle source
# File lib/signalfx/signal_fx_client.rb, line 183
def get_queue
  @queue
end
header_content_type() click to toggle source
# File lib/signalfx/signal_fx_client.rb, line 187
def header_content_type
  raise 'Subclasses should implement this!'
end

Private Instance Methods

post(data_to_send, url, suffix, &block) click to toggle source
# File lib/signalfx/signal_fx_client.rb, line 205
def post(data_to_send, url, suffix, &block)
  begin
    http_user_agents = ''
    if @user_agents != nil && @user_agents.length > 0
      http_user_agents = ', ' + @user_agents.join(', ')
    end

    headers = {HEADER_CONTENT_TYPE => header_content_type,
               HEADER_API_TOKEN_KEY => @api_token,
               HEADER_USER_AGENT_KEY => SignalFx::Version::NAME + '/' + SignalFx::Version::VERSION + http_user_agents}

    RestClient::Request.execute(
        method: :post,
        url: url + '/' + suffix,
        headers: headers,
        payload: data_to_send,
        verify_ssl: OpenSSL::SSL::VERIFY_PEER,
        timeout: @timeout) { |response|
      case response.code
        when 200
          if block
            block.call(response)
          end
        else
          @logger.error("Failed to send datapoints. Response code: #{response.code}")
          if block
            block.call(nil)
          end
      end
    }
  rescue Exception => e
    @logger.error("Failed to send datapoints. Error: #{e}")
    if block
      block.call(nil)
    end
  end
end
process_datapoint(metric_type, data_points) click to toggle source
# File lib/signalfx/signal_fx_client.rb, line 262
def process_datapoint(metric_type, data_points)
  if data_points != nil && data_points.kind_of?(Array)
    data_points.each { |datapoint|
      if @aws_unique_id
        if datapoint[:dimensions] == nil
          datapoint[:dimensions] = []
        end
        datapoint[:dimensions] << {:key => RbConfig::AWS_UNIQUE_ID_DIMENSION_NAME, :value => @aws_unique_id}
      end

      add_to_queue(metric_type, datapoint)
    }
  end
end
retrieve_aws_unique_id(&block) click to toggle source
# File lib/signalfx/signal_fx_client.rb, line 243
def retrieve_aws_unique_id(&block)
  begin
    RestClient::Request.execute(method: :get,
                                url: RbConfig::AWS_UNIQUE_ID_URL,
                                timeout: 1) { |response|
      case response.code
        when 200
          return block.call(response)
        else
          @logger.warn("Failed to retrieve AWS unique ID. Response code: #{response.code}")
          return block.call(nil)
      end
    }
  rescue Exception => e
    @logger.warn("Failed to retrieve AWS unique ID. Error: #{e}")
    block.call(nil)
  end
end