class FFWD::Plugin::GoogleCloud::Hook

Constants

HEADER_BASE

Public Class Methods

new(c) click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 32
def initialize c
  @api_url = c[:api_url]
  @metadata_url = c[:metadata_url]
  @project_id = c[:project_id]
  @project = c[:project]
  @client_id = c[:client_id]
  @scope = c[:scope]
  @debug = c[:debug]

  @api_pipeline = nil
  @metadata_pipeline = nil
  @token = nil
  @expires_at = nil
  # list of blocks waiting for a token.
  @pending = []

  @write_api = "/cloudmonitoring/v2beta2/projects/#{@project_id}/timeseries:write"
  @md_api = "/cloudmonitoring/v2beta2/projects/#{@project_id}/metricDescriptors"
  @acquire = "/0.1/meta-data/service-accounts/default/acquire"
  @expire_threshold = 10

  # cache of seen metric descriptors
  @md_cache = {}
end

Public Instance Methods

active?() click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 103
def active?
  not @api_pipeline.nil? and not @metadata_pipeline.nil?
end
api_request() click to toggle source

avoid pipelining requests by creating a new handle for each

# File lib/ffwd/plugin/google_cloud/hook.rb, line 177
def api_request
  if @debug
    return DebugRequest.new(@api_url, :responses => {
      [:get, @md_api] => {"metrics" => []},
      [:post, @write_api] => {}
    })
  end

  EM::HttpRequest.new(@api_url)
end
bind_proxy(http, p) click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 236
def bind_proxy http, p
  http.errback do
    p.err "#{http.error}"
  end

  http.callback do
    if http.response_header.status == 200
      p.call
    else
      p.err "#{http.response_header.status}: #{http.response}"
    end
  end

  p
end
close() click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 194
def close
  @metadata_pipeline.close if @metadata_pipeline
  @api_pipeline.close if @api_pipeline

  @metadata_pipeline = nil
  @api_pipeline = nil
end
connect() click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 188
def connect
  @metadata_pipeline = metadata_request
  @api_pipeline = api_request
  populate_metadata_cache
end
metadata_request() click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 167
def metadata_request
  if @debug
    return DebugRequest.new(@metadata_url, :response => {
      :accessToken => "FAKE", :expiresAt => (Time.now.to_i + 3600)})
  end

  EM::HttpRequest.new(@metadata_url)
end
new_proxy(http) click to toggle source

Setup a new proxy object for http request. This makes sure that the callback/errback/error api as is expected by flushing_output to be consistent.

# File lib/ffwd/plugin/google_cloud/hook.rb, line 232
def new_proxy http
  bind_proxy http, CallbackProxy.new
end
reporter_meta() click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 252
def reporter_meta
  {:component => :google_cloud}
end
send(metrics) click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 202
def send metrics
  dropped, timeseries = Utils.make_timeseries(metrics)

  common_labels = Utils.make_common_labels(metrics)

  if dropped > 0
    log.warning "Dropped #{dropped} points (duplicate entries)"
  end

  request = {:commonLabels => common_labels, :timeseries => timeseries}
  metrics = JSON.dump(request)

  verify_descriptors(request) do
    with_token do |token|
      head = Hash[HEADER_BASE]
      head['Authorization'] = "Bearer #{token}"

      if log.debug?
        log.debug "Sending: #{metrics}"
      end

      new_proxy api_request.post(
        :path => @write_api, :head => head, :body => metrics)
    end
  end
end
verify_descriptors(metrics, &block) click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 107
def verify_descriptors(metrics, &block)
  missing = []

  metrics[:timeseries].each do |ts|
    desc = ts[:timeseriesDesc]
    name = desc[:metric]
    missing << desc unless @md_cache[name]
  end

  if missing.empty?
    return block.call
  end

  all = EM::All.new

  # Example: {:metric=>"custom.cloudmonitoring.googleapis.com/lol", :labels=>{}}
  missing.each do |m|
    name = m[:metric]

    descriptor = {
     :name => m[:metric], :description => "",
     :labels => Utils.extract_labels(m[:labels]),
     :typeDescriptor => {:metricType => "gauge", :valueType => "double"}
    }

    descriptor = JSON.dump(descriptor)

    log.info "Creating descriptor for: #{name}"

    all << with_token{|token|
      head = Hash[HEADER_BASE]
      head['Authorization'] = "Bearer #{token}"
      p = new_proxy(api_request.post(
        :path => @md_api, :head => head, :body => descriptor))

      p.callback do
        log.info "Created (caching): #{name}"
        @md_cache[name] = true
      end

      p.errback do |error|
        log.error "Failed to create descriptor (#{error}): #{descriptor}"
      end

      p
    }
  end

  # TODO: call block _after_ descriptors have been verified.
  all.callback do
    block.call
  end

  all.errback do |errors|
    log.error "Failed to create descriptors: #{errors}"
  end

  SingleProxy.new all
end
with_token(&block) click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 57
def with_token &block
  # join a pending call
  unless @pending.empty?
    proxy = CallbackProxy.new
    @pending << [block, proxy]
    return proxy
  end

  # cached, valid token
  if @token and Time.now + @expire_threshold < @expires_at
    return block.call(@token)
  end

  current_p = CallbackProxy.new
  @pending << [block, current_p]

  log.debug "Requesting token"

  http = @metadata_pipeline.get(
    :path => @acquire,
    :query => {:client_id => @client_id, :scope => @scope})

  token_p = new_proxy(http)

  token_p.errback do
    @pending.each do |b, block_p|
      block_p.err "Token request failed: #{token_p.error}"
    end.clear
  end

  token_p.callback do
    result = JSON.load(http.response)

    @token = result['accessToken']
    @expires_at = Time.at(result['expiresAt'])

    log.debug "Got token: #{@token} (expires_at: #{@expires_at}}"

    @pending.each do |b, block_p|
      b.call(@token).into block_p
    end.clear
  end

  return current_p
end

Private Instance Methods

populate_metadata_cache() click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 258
def populate_metadata_cache
  with_token do |token|
    log.info "Downloading metric descriptors"

    head = Hash[HEADER_BASE]
    head['Authorization'] = "Bearer #{token}"
    req = @api_pipeline.get(:path => @md_api, :head => head)
    p = new_proxy(req)

    p.callback do
      res = JSON.load(req.response)

      log.info "Downloaded #{res["metrics"].length} descriptor(s)"

      res["metrics"].each do |d|
        @md_cache[d["name"]] = true
      end
    end

    p.errback do |error|
      log.error "Failed to download metric descriptors: #{error}"
    end

    p
  end
end