class FFWD::Plugin::GoogleCloud::Hook
Constants
- HEADER_BASE
Public Class Methods
new(c)
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 32 def initialize c @api_url = c[:api_url] @metadata_url = c[:metadata_url] @project_id = c[:project_id] @project = c[:project] @client_id = c[:client_id] @scope = c[:scope] @debug = c[:debug] @api_pipeline = nil @metadata_pipeline = nil @token = nil @expires_at = nil # list of blocks waiting for a token. @pending = [] @write_api = "/cloudmonitoring/v2beta2/projects/#{@project_id}/timeseries:write" @md_api = "/cloudmonitoring/v2beta2/projects/#{@project_id}/metricDescriptors" @acquire = "/0.1/meta-data/service-accounts/default/acquire" @expire_threshold = 10 # cache of seen metric descriptors @md_cache = {} end
Public Instance Methods
active?()
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 103 def active? not @api_pipeline.nil? and not @metadata_pipeline.nil? end
api_request()
click to toggle source
avoid pipelining requests by creating a new handle for each
# File lib/ffwd/plugin/google_cloud/hook.rb, line 177 def api_request if @debug return DebugRequest.new(@api_url, :responses => { [:get, @md_api] => {"metrics" => []}, [:post, @write_api] => {} }) end EM::HttpRequest.new(@api_url) end
bind_proxy(http, p)
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 236 def bind_proxy http, p http.errback do p.err "#{http.error}" end http.callback do if http.response_header.status == 200 p.call else p.err "#{http.response_header.status}: #{http.response}" end end p end
close()
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 194 def close @metadata_pipeline.close if @metadata_pipeline @api_pipeline.close if @api_pipeline @metadata_pipeline = nil @api_pipeline = nil end
connect()
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 188 def connect @metadata_pipeline = metadata_request @api_pipeline = api_request populate_metadata_cache end
metadata_request()
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 167 def metadata_request if @debug return DebugRequest.new(@metadata_url, :response => { :accessToken => "FAKE", :expiresAt => (Time.now.to_i + 3600)}) end EM::HttpRequest.new(@metadata_url) end
new_proxy(http)
click to toggle source
Setup a new proxy object for http request. This makes sure that the callback/errback/error api as is expected by flushing_output to be consistent.
# File lib/ffwd/plugin/google_cloud/hook.rb, line 232 def new_proxy http bind_proxy http, CallbackProxy.new end
reporter_meta()
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 252 def reporter_meta {:component => :google_cloud} end
send(metrics)
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 202 def send metrics dropped, timeseries = Utils.make_timeseries(metrics) common_labels = Utils.make_common_labels(metrics) if dropped > 0 log.warning "Dropped #{dropped} points (duplicate entries)" end request = {:commonLabels => common_labels, :timeseries => timeseries} metrics = JSON.dump(request) verify_descriptors(request) do with_token do |token| head = Hash[HEADER_BASE] head['Authorization'] = "Bearer #{token}" if log.debug? log.debug "Sending: #{metrics}" end new_proxy api_request.post( :path => @write_api, :head => head, :body => metrics) end end end
verify_descriptors(metrics, &block)
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 107 def verify_descriptors(metrics, &block) missing = [] metrics[:timeseries].each do |ts| desc = ts[:timeseriesDesc] name = desc[:metric] missing << desc unless @md_cache[name] end if missing.empty? return block.call end all = EM::All.new # Example: {:metric=>"custom.cloudmonitoring.googleapis.com/lol", :labels=>{}} missing.each do |m| name = m[:metric] descriptor = { :name => m[:metric], :description => "", :labels => Utils.extract_labels(m[:labels]), :typeDescriptor => {:metricType => "gauge", :valueType => "double"} } descriptor = JSON.dump(descriptor) log.info "Creating descriptor for: #{name}" all << with_token{|token| head = Hash[HEADER_BASE] head['Authorization'] = "Bearer #{token}" p = new_proxy(api_request.post( :path => @md_api, :head => head, :body => descriptor)) p.callback do log.info "Created (caching): #{name}" @md_cache[name] = true end p.errback do |error| log.error "Failed to create descriptor (#{error}): #{descriptor}" end p } end # TODO: call block _after_ descriptors have been verified. all.callback do block.call end all.errback do |errors| log.error "Failed to create descriptors: #{errors}" end SingleProxy.new all end
with_token(&block)
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 57 def with_token &block # join a pending call unless @pending.empty? proxy = CallbackProxy.new @pending << [block, proxy] return proxy end # cached, valid token if @token and Time.now + @expire_threshold < @expires_at return block.call(@token) end current_p = CallbackProxy.new @pending << [block, current_p] log.debug "Requesting token" http = @metadata_pipeline.get( :path => @acquire, :query => {:client_id => @client_id, :scope => @scope}) token_p = new_proxy(http) token_p.errback do @pending.each do |b, block_p| block_p.err "Token request failed: #{token_p.error}" end.clear end token_p.callback do result = JSON.load(http.response) @token = result['accessToken'] @expires_at = Time.at(result['expiresAt']) log.debug "Got token: #{@token} (expires_at: #{@expires_at}}" @pending.each do |b, block_p| b.call(@token).into block_p end.clear end return current_p end
Private Instance Methods
populate_metadata_cache()
click to toggle source
# File lib/ffwd/plugin/google_cloud/hook.rb, line 258 def populate_metadata_cache with_token do |token| log.info "Downloading metric descriptors" head = Hash[HEADER_BASE] head['Authorization'] = "Bearer #{token}" req = @api_pipeline.get(:path => @md_api, :head => head) p = new_proxy(req) p.callback do res = JSON.load(req.response) log.info "Downloaded #{res["metrics"].length} descriptor(s)" res["metrics"].each do |d| @md_cache[d["name"]] = true end end p.errback do |error| log.error "Failed to download metric descriptors: #{error}" end p end end