class Fluent::MetricSenseOutput::Backends::RDBTSDBBackend
Constants
- INSERT_SUPPRESS_RING_BUFFER_SIZE
- ROW_TIME_WINDOW
def shutdown end
- ROW_TIME_WINDOW_BITS
- ROW_TIME_WINDOW_MASK
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/backends/rdb_tsdb_backend.rb, line 29 def initialize super require 'sequel' @insup_ring = [] @insup_ring_index = 0 end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/backends/rdb_tsdb_backend.rb, line 36 def configure(conf) super @rdb_read_url ||= @rdb_url @metric_tag_table = "#{@rdb_table_prefix}_metric_tags" @segment_value_table = "#{@rdb_table_prefix}_segment_values" @data_table = "#{@rdb_table_prefix}_data" @metric_view = "#{@rdb_table_prefix}_metrics" @metric_json_view = "#{@rdb_table_prefix}_json" #sql_standard_concat = lambda {|array| array.join(' || ') } #sql_standard_surround = lambda {|expr| "'\"' || #{expr} || '\"'" } case @rdb_url when /^mysql/i @sql_type = :mysql @sql_autoincr_type = "INT" @sql_autoincr_ref_type = "INT" @sql_autoincr_suffix = " AUTO_INCREMENT" @sql_value_type = "SMALLINT" @sql_name_type = "VARCHAR(255)" @sql_time_type = "INT" @sql_insert_ignore = "INSERT IGNORE" @sql_insert_returns_last_id = true #@sql_concat = lambda {|array| "CONCAT(#{array.join(', ')})" } #@sql_surround = lambda {|expr| "CONCAT('\"', #{expr}, '\"')" } when /^postgres/i @sql_type = :postgresql @sql_autoincr_type = "SERIAL" @sql_autoincr_ref_type = "INT" @sql_autoincr_suffix = "" @sql_value_type = "SMALLINT" @sql_name_type = "VARCHAR(255)" @sql_time_type = "INT" @sql_insert_ignore = "INSERT" @sql_insert_returns_last_id = false #@sql_concat = sql_standard_concat #@sql_surround = sql_standard_surround when /^sqlite/i @sql_type = :sqlite @sql_autoincr_type = "INTEGER" @sql_autoincr_ref_type = "INTEGER" @sql_autoincr_suffix = " AUTOINCREMENT" @sql_value_type = "INTEGER" @sql_name_type = "TEXT" @sql_time_type = "INTEGER" @sql_insert_ignore = "INSERT OR IGNORE" @sql_insert_returns_last_id = false #@sql_concat = sql_standard_concat #@sql_surround = sql_standard_surround else @sql_type = :unknown @sql_autoincr_type = "INT" @sql_autoincr_ref_type = "INT" @sql_autoincr_suffix = " AUTO_INCREMENT" @sql_value_type = "SMALLINT" @sql_name_type = "VARCHAR(255)" @sql_time_type = "INT" @sql_insert_ignore = "INSERT IGNORE" @sql_insert_returns_last_id = false #@sql_concat = sql_standard_concat #@sql_surround = sql_standard_surround end end
ensure_connect(&block)
click to toggle source
# File lib/fluent/plugin/backends/rdb_tsdb_backend.rb, line 296 def ensure_connect(&block) db = Sequel.connect(@rdb_url, :max_connections=>1) begin block.call(db) ensure db.disconnect end end
get_metric_id(db, tag, seg_key)
click to toggle source
# File lib/fluent/plugin/backends/rdb_tsdb_backend.rb, line 252 def get_metric_id(db, tag, seg_key) key = "#{tag}\0#{seg_key}" id = @metric_names[key] return id if id begin id = db["INSERT INTO `#{@metric_tag_table}` (metric_name,segment_name) VALUES (?,?)", tag, seg_key||''].update if @sql_insert_returns_last_id @metric_names[key] = id return id end reload_metric_names!(db) return @metric_names[key] rescue => e reload_metric_names!(db) id = @metric_names[key] return id if id raise e end end
get_segment_id(db, seg_val)
click to toggle source
# File lib/fluent/plugin/backends/rdb_tsdb_backend.rb, line 274 def get_segment_id(db, seg_val) key = seg_val ? seg_val.to_s : '' id = @segment_names[key] return id if id begin id = db["INSERT INTO `#{@segment_value_table}` (name) VALUES (?)", key].update if @sql_insert_returns_last_id @segment_names[key] = id return id end reload_segment_names!(db) return @segment_names[key] rescue => e reload_segment_names!(db) id = @segment_names[key] return id if id raise e end end
reload_metric_names!(db)
click to toggle source
# File lib/fluent/plugin/backends/rdb_tsdb_backend.rb, line 235 def reload_metric_names!(db) map = {} db.fetch("SELECT id, metric_name, segment_name FROM `#{@metric_tag_table}`") {|row| key = "#{row[:metric_name]}\0#{row[:segment_name]}" map[key] = row[:id] } @metric_names = map end
reload_segment_names!(db)
click to toggle source
# File lib/fluent/plugin/backends/rdb_tsdb_backend.rb, line 244 def reload_segment_names!(db) map = {} db.fetch("SELECT id, name FROM `#{@segment_value_table}`") {|row| map[row[:name]] = row[:id] } @segment_names = map end
start()
click to toggle source
# File lib/fluent/plugin/backends/rdb_tsdb_backend.rb, line 102 def start ensure_connect do |db| db.run %[ CREATE TABLE IF NOT EXISTS `#{@metric_tag_table}` ( id #{@sql_autoincr_type} PRIMARY KEY#{@sql_autoincr_suffix}, metric_name #{@sql_name_type} NOT NULL, segment_name #{@sql_name_type} NOT NULL, UNIQUE (metric_name, segment_name) );] db.run %[ CREATE TABLE IF NOT EXISTS `#{@segment_value_table}` ( id #{@sql_autoincr_type} PRIMARY KEY#{@sql_autoincr_suffix}, name #{@sql_name_type} NOT NULL, UNIQUE (name) );] minutes = (0..59).to_a.map {|m| "m#{m} #{@sql_value_type} NOT NULL DEFAULT 0" }.join(', ') db.run %[ CREATE TABLE IF NOT EXISTS `#{@data_table}` ( base_time #{@sql_time_type} NOT NULL, metric_id #{@sql_autoincr_ref_type} NOT NULL, segment_id #{@sql_autoincr_ref_type}, #{minutes}, PRIMARY KEY (base_time, metric_id, segment_id) );] if @sql_type == :postgresql # ignore duplication error on data_table db.run %[ CREATE OR REPLACE RULE ignore_duplicated_insert AS ON INSERT TO `#{@data_table}` WHERE NEW.base_time = OLD.base_time AND NEW.metric_id = OLD.metric_id AND NEW.segment_id = OLD.segment_id DO INSTEAD NOTHING;] end #minutes = (0..59).to_a.map {|m| "m#{m}" }.join(', ') #db.run %[ # CREATE VIEW IF NOT EXISTS `#{@metric_view}` AS # SELECT # base_time * 60 AS time, # M.metric_name AS metric_name, # CASE M.segment_name WHEN '' THEN NULL ELSE M.segment_name END AS segment_name, # S.name AS segment_value, # #{minutes} # FROM `#{@data_table}` T # LEFT JOIN `#{@metric_tag_table}` M ON T.metric_id = M.id # LEFT JOIN `#{@segment_value_table}` S ON T.segment_id = S.id;] #minutes = (0..59).to_a.map {|m| ["m#{m}", "','"] }.flatten! #minutes.pop #minutes = @sql_concat.call(["'['"]+minutes+["']'"]) #db.run %[ # CREATE VIEW IF NOT EXISTS `#{@metric_json_view}` AS # SELECT # base_time * 60 AS time, # #{@sql_surround.call("M.metric_name")} AS metric_name, # CASE WHEN M.segment_name IS NULL OR M.segment_name = '' THEN 'null' ELSE #{@sql_surround.call("M.segment_name")} END AS segment_name, # CASE WHEN S.name IS NULL OR S.name = '' THEN 'null' ELSE #{@sql_surround.call("S.name")} END AS segment_value, # #{minutes} # FROM `#{@data_table}` T # LEFT JOIN `#{@metric_tag_table}` M ON T.metric_id = M.id # LEFT JOIN `#{@segment_value_table}` S ON T.segment_id = S.id;] reload_metric_names!(db) reload_segment_names!(db) end end
write(data)
click to toggle source
# File lib/fluent/plugin/backends/rdb_tsdb_backend.rb, line 177 def write(data) ensure_connect do |db| # group by row_key (base_time,metric_id,segment_id) rows = {} data.each {|tag,time,value,seg_key,seg_val,mode| # TODO update_mode is not supported yet base_time = time / ROW_TIME_WINDOW metric_id = get_metric_id(db, tag, seg_key) segment_id = get_segment_id(db, seg_val) if seg_val row_key = [base_time, metric_id, segment_id] minutes = (rows[row_key] ||= []) minutes << ((value << ROW_TIME_WINDOW_BITS) | (time % 60)) } # insert rows if not exist if @sql_type == :sqlite # sqlite3 < 1.3.7 doesn't allow multiple rows at once rows.keys.each {|row_key| next if @insup_ring.include?(row_key) db["#{@sql_insert_ignore} INTO `#{@data_table}` (base_time,metric_id,segment_id) VALUES (?,?,?)", *row_key].update rid = @insup_ring_index = (@insup_ring_index + 1) % INSERT_SUPPRESS_RING_BUFFER_SIZE @insup_ring[rid] = row_key } else insert_sql = "#{@sql_insert_ignore} INTO `#{@data_table}` (base_time,metric_id,segment_id) VALUES " + (["(?,?,?)"] * rows.size).join(', ') insert_params = [insert_sql] rows.keys.each {|row_key| next if @insup_ring.include?(row_key) insert_params.concat(row_key) rid = @insup_ring_index = (@insup_ring_index + 1) % INSERT_SUPPRESS_RING_BUFFER_SIZE @insup_ring[rid] = row_key } db[*insert_params].update end # increment values db.transaction do rows.each_pair {|row_key,minutes| update_sql = "UPDATE `#{@data_table}` SET " update_params = [update_sql] values_sql = [] minutes.each {|m| value = m >> ROW_TIME_WINDOW_BITS minute = m & ROW_TIME_WINDOW_MASK values_sql << "m#{minute} = m#{minute} + ?" update_params << value }.join(', ') update_sql << values_sql.join(', ') << " WHERE base_time=? AND metric_id=? AND segment_id=?" update_params.concat(row_key) db[*update_params].update } end end end