class DruidDB::Query

Attributes

aggregations[R]
broker[R]
dimensions[R]
end_interval[R]
fill_value[R]
granularity[R]
query_opts[R]
query_type[R]
range[R]
result_key[R]
start_interval[R]

Public Class Methods

new(opts) click to toggle source
# File lib/druiddb/query.rb, line 15
def initialize(opts)
  @aggregations = opts[:aggregations].map { |agg| agg[:name] }
  @broker = opts[:broker]
  @dimensions = opts[:dimensions]
  @fill_value = opts[:fill_value]
  @granularity = opts[:granularity]
  @range = parse_range(opts[:intervals])
  @query_type = opts[:queryType]
  @end_interval = calculate_end_interval
  @start_interval = calculate_start_interval
  @query_opts = opts_for_query(opts)
end

Private Class Methods

create(opts) click to toggle source
# File lib/druiddb/query.rb, line 176
def create(opts)
  new(opts).execute
end

Public Instance Methods

execute() click to toggle source
# File lib/druiddb/query.rb, line 28
def execute
  result = broker.query(query_opts)
  fill_query_results(result)
end

Private Instance Methods

advance_interval(time) click to toggle source

TODO: Can this be made smarter? Prefer to avoid case statements. Cases found here: druid.io/docs/latest/querying/granularities.html

# File lib/druiddb/query.rb, line 37
def advance_interval(time)
  case granularity
  when 'second'
    time.advance(seconds: 1)
  when 'minute'
    time.advance(minutes: 1)
  when 'fifteen_minute'
    time.advance(minutes: 15)
  when 'thirty_minute'
    time.advance(minutes: 30)
  when 'hour'
    time.advance(hours: 1)
  when 'day'
    time.advance(days: 1)
  when 'week'
    time.advance(weeks: 1)
  when 'month'
    time.advance(months: 1)
  when 'quarter'
    time.advance(months: 3)
  when 'year'
    time.advance(years: 1)
  else
    raise DruidDB::QueryError, 'Unsupported granularity'
  end
end
calculate_end_interval() click to toggle source
# File lib/druiddb/query.rb, line 64
def calculate_end_interval
  iso8601_duration_end_interval(range)
end
calculate_start_interval() click to toggle source
# File lib/druiddb/query.rb, line 68
def calculate_start_interval
  time = iso8601_duration_start_interval(range)
  start_of_interval(time)
end
fill_empty_intervals(points, opts = {}) click to toggle source
# File lib/druiddb/query.rb, line 73
def fill_empty_intervals(points, opts = {})
  interval = start_interval
  result = []

  while interval <= end_interval
    # TODO: This will search the points every time, could be more performant if
    # we track the 'current point' in the points and only compare the
    # current point's timestamp
    point = find_or_create_point(interval, points)
    aggregations.each do |aggregation|
      point[result_key][aggregation] = fill_value if point[result_key][aggregation].blank?
      point[result_key].merge!(opts)
    end
    result << point
    interval = advance_interval(interval)
  end

  result
end
fill_query_results(query_result) click to toggle source

NOTE: This responsibility really lies in Druid, but until the feature works reliably in Druid, this is serves the purpose. github.com/druid-io/druid/issues/2106

# File lib/druiddb/query.rb, line 97
def fill_query_results(query_result)
  return query_result unless query_result.present? && fill_value.present?
  parse_result_key(query_result.first)

  # TODO: handle multi-dimensional group by
  if group_by?
    result = []
    dimension_key = dimensions.first
    groups = query_result.group_by { |point| point[result_key][dimension_key] }
    groups.each do |dimension_value, dimension_points|
      result += fill_empty_intervals(dimension_points, dimension_key => dimension_value)
    end
    result
  else
    fill_empty_intervals(query_result)
  end
end
find_or_create_point(interval, points) click to toggle source
# File lib/druiddb/query.rb, line 115
def find_or_create_point(interval, points)
  point = points.find { |p| p['timestamp'].to_s.to_time == interval.to_time }
  point.present? ? point : { 'timestamp' => interval.iso8601(3), result_key => {} }
end
group_by?() click to toggle source
# File lib/druiddb/query.rb, line 120
def group_by?
  query_type == 'groupBy'
end
iso8601_duration_end_interval(duration) click to toggle source
# File lib/druiddb/query.rb, line 128
def iso8601_duration_end_interval(duration)
  duration.split('/').last.to_time.utc
end
iso8601_duration_start_interval(duration) click to toggle source
# File lib/druiddb/query.rb, line 124
def iso8601_duration_start_interval(duration)
  duration.split('/').first.to_time.utc
end
opts_for_query(opts) click to toggle source
# File lib/druiddb/query.rb, line 132
def opts_for_query(opts)
  opts.except(:fill_value, :broker)
end
parse_range(range) click to toggle source
# File lib/druiddb/query.rb, line 136
def parse_range(range)
  range.is_a?(Array) ? range.first : range
end
parse_result_key(point) click to toggle source
# File lib/druiddb/query.rb, line 140
def parse_result_key(point)
  @result_key = point['event'].present? ? 'event' : 'result'
end
start_of_interval(time) click to toggle source

TODO: Can this be made smarter? Prefer to avoid case statements. Cases found here: druid.io/docs/latest/querying/granularities.html

# File lib/druiddb/query.rb, line 146
def start_of_interval(time)
  case granularity
  when 'second'
    time.change(usec: 0)
  when 'minute'
    time.beginning_of_minute
  when 'fifteen_minute'
    first_fifteen = [45, 30, 15, 0].detect { |m| m <= time.min }
    time.change(min: first_fifteen)
  when 'thirty_minute'
    first_thirty = [30, 0].detect { |m| m <= time.min }
    time.change(min: first_thirty)
  when 'hour'
    time.beginning_of_hour
  when 'day'
    time.beginning_of_day
  when 'week'
    time.beginning_of_week
  when 'month'
    time.beginning_of_month
  when 'quarter'
    time.beginning_of_quarter
  when 'year'
    time.beginning_of_year
  else
    time
  end
end