class Druid::DataSource
Attributes
dimensions[R]
metrics[R]
name[R]
uri[R]
Public Class Methods
new(name, uri)
click to toggle source
# File lib/druid/data_source.rb, line 9 def initialize(name, uri) @name = name.split('/').last uri = uri.sample if uri.is_a?(Array) if uri.is_a?(String) @uri = URI(uri) else @uri = uri end end
Public Instance Methods
metadata()
click to toggle source
# File lib/druid/data_source.rb, line 19 def metadata @metadata ||= metadata! end
metadata!(opts = {})
click to toggle source
# File lib/druid/data_source.rb, line 23 def metadata!(opts = {}) meta_path = "#{@uri.path}datasources/#{name}" if opts[:interval] from, to = opts[:interval] from = from.respond_to?(:iso8601) ? from.iso8601 : ISO8601::DateTime.new(from).to_s to = to.respond_to?(:iso8601) ? to.iso8601 : ISO8601::DateTime.new(to).to_s meta_path += "?interval=#{from}/#{to}" end req = Net::HTTP::Get.new(meta_path) response = Net::HTTP.new(uri.host, uri.port).start do |http| http.open_timeout = 10 # if druid is down fail fast http.read_timeout = nil # we wait until druid is finished http.request(req) end if response.code != '200' raise "Request failed: #{response.code}: #{response.body}" end MultiJson.load(response.body) end
post(query)
click to toggle source
# File lib/druid/data_source.rb, line 56 def post(query) query = query.query if query.is_a?(Druid::Query::Builder) query = Query.new(MultiJson.load(query)) if query.is_a?(String) query.dataSource = name req = Net::HTTP::Post.new(uri.path, { 'Content-Type' => 'application/json' }) query_as_json = query.as_json req.body = MultiJson.dump(query_as_json) response = ActiveSupport::Notifications.instrument('post.druid', data_source: name, query: query_as_json) do Net::HTTP.new(uri.host, uri.port).start do |http| http.open_timeout = 10 # if druid is down fail fast http.read_timeout = nil # we wait until druid is finished http.request(req) end end if response.code != '200' # ignore GroupBy cache issues and try again without cached results if query.context.useCache != false && response.code == "500" && response.body =~ /Cannot have a null result!/ query.context.useCache = false return self.post(query) end raise Error.new(response) end MultiJson.load(response.body) end