class Blazer::Adapters::SqlAdapter
Attributes
connection_model[R]
Public Class Methods
name()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 11 def self.name "Blazer::Connection::Adapter#{object_id}" end
new(data_source)
click to toggle source
Calls superclass method
Blazer::Adapters::BaseAdapter::new
# File lib/blazer/adapters/sql_adapter.rb, line 6 def initialize(data_source) super @connection_model = Class.new(Blazer::Connection) do def self.name "Blazer::Connection::Adapter#{object_id}" end establish_connection(data_source.settings["url"]) if data_source.settings["url"] end end
Public Instance Methods
cachable?(statement)
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 141 def cachable?(statement) !%w[CREATE ALTER UPDATE INSERT DELETE].include?(statement.split.first.to_s.upcase) end
cancel(run_id)
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 130 def cancel(run_id) if postgresql? select_all("SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND query LIKE ?", ["%,run_id:#{run_id}%"]) elsif redshift? first_row = select_all("SELECT pid FROM stv_recents WHERE status = 'Running' AND query LIKE ?", ["%,run_id:#{run_id}%"]).first if first_row select_all("CANCEL #{first_row["pid"].to_i}") end end end
cohort_analysis_statement(statement, period:, days:)
click to toggle source
TODO treat date columns as already in time zone
# File lib/blazer/adapters/sql_adapter.rb, line 150 def cohort_analysis_statement(statement, period:, days:) raise "Cohort analysis not supported" unless supports_cohort_analysis? cohort_column = statement.match?(/\bcohort_time\b/) ? "cohort_time" : "conversion_time" tzname = Blazer.time_zone.tzinfo.name if mysql? time_sql = "CONVERT_TZ(cohorts.cohort_time, '+00:00', ?)" case period when "day" date_sql = "CAST(DATE_FORMAT(#{time_sql}, '%Y-%m-%d') AS DATE)" date_params = [tzname] when "week" date_sql = "CAST(DATE_FORMAT(#{time_sql} - INTERVAL ((5 + DAYOFWEEK(#{time_sql})) % 7) DAY, '%Y-%m-%d') AS DATE)" date_params = [tzname, tzname] else date_sql = "CAST(DATE_FORMAT(#{time_sql}, '%Y-%m-01') AS DATE)" date_params = [tzname] end bucket_sql = "CAST(CEIL(TIMESTAMPDIFF(SECOND, cohorts.cohort_time, query.conversion_time) / ?) AS SIGNED)" else date_sql = "date_trunc(?, cohorts.cohort_time::timestamptz AT TIME ZONE ?)::date" date_params = [period, tzname] bucket_sql = "CEIL(EXTRACT(EPOCH FROM query.conversion_time - cohorts.cohort_time) / ?)::int" end # WITH not an optimization fence in Postgres 12+ statement = <<~SQL WITH query AS ( {placeholder} ), cohorts AS ( SELECT user_id, MIN(#{cohort_column}) AS cohort_time FROM query WHERE user_id IS NOT NULL AND #{cohort_column} IS NOT NULL GROUP BY 1 ) SELECT #{date_sql} AS period, 0 AS bucket, COUNT(DISTINCT cohorts.user_id) FROM cohorts GROUP BY 1 UNION ALL SELECT #{date_sql} AS period, #{bucket_sql} AS bucket, COUNT(DISTINCT query.user_id) FROM cohorts INNER JOIN query ON query.user_id = cohorts.user_id WHERE query.conversion_time IS NOT NULL AND query.conversion_time >= cohorts.cohort_time #{cohort_column == "conversion_time" ? "AND query.conversion_time != cohorts.cohort_time" : ""} GROUP BY 1, 2 SQL params = [statement] + date_params + date_params + [days.to_i * 86400] connection_model.send(:sanitize_sql_array, params) end
cost(statement)
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 104 def cost(statement) result = explain(statement) if sqlserver? result["TotalSubtreeCost"] else match = /cost=\d+\.\d+..(\d+\.\d+) /.match(result) match[1] if match end end
explain(statement)
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 114 def explain(statement) if postgresql? || redshift? select_all("EXPLAIN #{statement}").rows.first.first elsif sqlserver? begin execute("SET SHOWPLAN_ALL ON") result = select_all(statement).each.first ensure execute("SET SHOWPLAN_ALL OFF") end result end rescue nil end
parameter_binding()
click to toggle source
Redshift adapter silently ignores binds
# File lib/blazer/adapters/sql_adapter.rb, line 211 def parameter_binding if postgresql? :numeric elsif sqlite? && prepared_statements? # Active Record silently ignores binds with SQLite when prepared statements are disabled :numeric elsif mysql? && prepared_statements? # Active Record silently ignores binds with MySQL when prepared statements are disabled :positional elsif sqlserver? proc do |statement, variables| variables.each_with_index do |(k, _), i| statement = statement.gsub("{#{k}}", "@#{i} ") end [statement, variables.values] end end end
preview_statement()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 92 def preview_statement if sqlserver? "SELECT TOP (10) * FROM {table}" else "SELECT * FROM {table} LIMIT 10" end end
quoting()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 206 def quoting ->(value) { connection_model.connection.quote(value) } end
reconnect()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 100 def reconnect connection_model.establish_connection(settings["url"]) end
run_statement(statement, comment, bind_params = [])
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 18 def run_statement(statement, comment, bind_params = []) columns = [] rows = [] error = nil begin result = nil in_transaction do set_timeout(data_source.timeout) if data_source.timeout binds = bind_params.map { |v| ActiveRecord::Relation::QueryAttribute.new(nil, v, ActiveRecord::Type::Value.new) } result = connection_model.connection.select_all("#{statement} /*#{comment}*/", nil, binds) end columns = result.columns rows = result.rows # cast values if result.column_types.any? types = columns.map { |c| result.column_types[c] } rows = rows.map do |row| row.map.with_index do |v, i| v && (t = types[i]) ? t.send(:cast_value, v) : v end end end # fix for non-ASCII column names and charts if adapter_name == "Trilogy" columns = columns.map { |k| k.dup.force_encoding(Encoding::UTF_8) } end rescue => e error = e.message.sub(/.+ERROR: /, "") error = Blazer::TIMEOUT_MESSAGE if Blazer::TIMEOUT_ERRORS.any? { |e| error.include?(e) } error = Blazer::VARIABLE_MESSAGE if error.include?("syntax error at or near \"$") || error.include?("Incorrect syntax near '@") || error.include?("your MySQL server version for the right syntax to use near '?") if error.include?("could not determine data type of parameter") error += " - try adding casting to variables and make sure none are inside a string literal" end reconnect if error.include?("PG::ConnectionBad") end [columns, rows, error] end
schema()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 86 def schema sql = add_schemas("SELECT table_schema, table_name, column_name, data_type, ordinal_position FROM information_schema.columns") result = data_source.run_statement(sql) result.rows.group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }.sort_by { |t| [t[:schema] == default_schema ? "" : t[:schema], t[:table]] } end
supports_cohort_analysis?()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 145 def supports_cohort_analysis? postgresql? || mysql? end
tables()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 62 def tables sql = add_schemas("SELECT table_schema, table_name FROM information_schema.tables") result = data_source.run_statement(sql, refresh_cache: true) if postgresql? || redshift? || snowflake? result.rows.sort_by { |r| [r[0] == default_schema ? "" : r[0], r[1]] }.map do |row| table = if row[0] == default_schema row[1] else "#{row[0]}.#{row[1]}" end table = table.downcase if snowflake? { table: table, value: connection_model.connection.quote_table_name(table) } end else result.rows.map(&:second).sort end end
Protected Instance Methods
adapter_name()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 266 def adapter_name # prevent bad data source from taking down queries/new connection_model.connection.adapter_name rescue nil end
add_schemas(query)
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 285 def add_schemas(query) if settings["schemas"] where = "table_schema IN (?)" schemas = settings["schemas"] elsif mysql? where = "table_schema IN (?)" schemas = [default_schema] else where = "table_schema NOT IN (?)" schemas = ["information_schema"] schemas.map!(&:upcase) if snowflake? schemas << "pg_catalog" if postgresql? || redshift? end connection_model.send(:sanitize_sql_array, ["#{query} WHERE #{where}", schemas]) end
default_schema()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 271 def default_schema @default_schema ||= begin if postgresql? || redshift? "public" elsif sqlserver? "dbo" elsif connection_model.respond_to?(:connection_db_config) connection_model.connection_db_config.database else connection_model.connection_config[:database] end end end
execute(statement)
click to toggle source
seperate from select_all
to prevent mysql error
# File lib/blazer/adapters/sql_adapter.rb, line 238 def execute(statement) connection_model.connection.execute(statement) end
in_transaction() { || ... }
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 321 def in_transaction connection_model.connection_pool.with_connection do if use_transaction? connection_model.transaction do yield raise ActiveRecord::Rollback end else yield end end end
mysql?()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 250 def mysql? ["MySQL", "Mysql2", "Mysql2Spatial", "Trilogy"].include?(adapter_name) end
postgresql?()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 242 def postgresql? ["PostgreSQL", "PostGIS"].include?(adapter_name) end
prepared_statements?()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 334 def prepared_statements? connection_model.connection.prepared_statements end
redshift?()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 246 def redshift? ["Redshift"].include?(adapter_name) end
select_all(statement, params = [])
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 232 def select_all(statement, params = []) statement = connection_model.send(:sanitize_sql_array, [statement] + params) if params.any? connection_model.connection.select_all(statement) end
set_timeout(timeout)
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 301 def set_timeout(timeout) if postgresql? || redshift? execute("SET #{use_transaction? ? "LOCAL " : ""}statement_timeout = #{timeout.to_i * 1000}") elsif mysql? # use send as this method is private in Rails 4.2 mariadb = connection_model.connection.send(:mariadb?) rescue false if mariadb execute("SET max_statement_time = #{timeout.to_i * 1000}") else execute("SET max_execution_time = #{timeout.to_i * 1000}") end else raise Blazer::TimeoutNotSupported, "Timeout not supported for #{adapter_name} adapter" end end
snowflake?()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 262 def snowflake? data_source.adapter == "snowflake" end
sqlite?()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 254 def sqlite? ["SQLite"].include?(adapter_name) end
sqlserver?()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 258 def sqlserver? ["SQLServer", "tinytds", "mssql"].include?(adapter_name) end
use_transaction?()
click to toggle source
# File lib/blazer/adapters/sql_adapter.rb, line 317 def use_transaction? settings.key?("use_transaction") ? settings["use_transaction"] : true end