class PgSync::DataSource
Attributes
url[R]
Public Class Methods
new(url, name:, debug:)
click to toggle source
# File lib/pgsync/data_source.rb, line 7 def initialize(url, name:, debug:) @url = url @name = name @debug = debug end
Public Instance Methods
close()
click to toggle source
# File lib/pgsync/data_source.rb, line 120 def close if @conn @conn.close @conn = nil end end
conn()
click to toggle source
# File lib/pgsync/data_source.rb, line 103 def conn @conn ||= begin begin ENV["PGCONNECT_TIMEOUT"] ||= "3" if @url.start_with?("postgres://", "postgresql://") config = @url else config = {dbname: @url} end @concurrent_id = concurrent_id PG::Connection.new(config) rescue URI::InvalidURIError raise Error, "Invalid connection string. Make sure it works with `psql`" end end end
create_schema(schema)
click to toggle source
# File lib/pgsync/data_source.rb, line 84 def create_schema(schema) execute("CREATE SCHEMA #{quote_ident(schema)}") end
dbname()
click to toggle source
# File lib/pgsync/data_source.rb, line 29 def dbname @dbname ||= conninfo[:dbname] end
execute(query, params = [])
click to toggle source
# File lib/pgsync/data_source.rb, line 140 def execute(query, params = []) log_sql query, params conn.exec_params(query, params).to_a end
exists?()
click to toggle source
# File lib/pgsync/data_source.rb, line 13 def exists? @url && @url.size > 0 end
host()
click to toggle source
# File lib/pgsync/data_source.rb, line 21 def host @host ||= dedup_localhost(conninfo[:host]) end
last_value(seq)
click to toggle source
# File lib/pgsync/data_source.rb, line 63 def last_value(seq) execute("SELECT last_value FROM #{quote_ident_full(seq)}").first["last_value"] end
local?()
click to toggle source
# File lib/pgsync/data_source.rb, line 17 def local? !host || %w(localhost 127.0.0.1).include?(host) end
log_sql(query, params = {})
click to toggle source
TODO log time for each statement
# File lib/pgsync/data_source.rb, line 161 def log_sql(query, params = {}) if @debug message = "#{colorize("[#{@name}]", :cyan)} #{query.gsub(/\s+/, " ").strip}" message = "#{message} #{params.inspect}" if params.any? log message end end
max_id(table, primary_key, sql_clause = nil)
click to toggle source
# File lib/pgsync/data_source.rb, line 55 def max_id(table, primary_key, sql_clause = nil) execute("SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["max"].to_i end
min_id(table, primary_key, sql_clause = nil)
click to toggle source
# File lib/pgsync/data_source.rb, line 59 def min_id(table, primary_key, sql_clause = nil) execute("SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["min"].to_i end
port()
click to toggle source
# File lib/pgsync/data_source.rb, line 25 def port @port ||= dedup_localhost(conninfo[:port]) end
reconnect_if_needed()
click to toggle source
reconnect for new thread or process
# File lib/pgsync/data_source.rb, line 128 def reconnect_if_needed reconnect if @concurrent_id != concurrent_id end
schemas()
click to toggle source
# File lib/pgsync/data_source.rb, line 71 def schemas @schemas ||= begin query = <<~SQL SELECT schema_name FROM information_schema.schemata ORDER BY 1 SQL execute(query).map { |row| row["schema_name"] } end end
search_path()
click to toggle source
# File lib/pgsync/data_source.rb, line 132 def search_path @search_path ||= execute("SELECT unnest(current_schemas(true)) AS schema").map { |r| r["schema"] } end
server_version_num()
click to toggle source
# File lib/pgsync/data_source.rb, line 136 def server_version_num @server_version_num ||= execute("SHOW server_version_num").first["server_version_num"].to_i end
table_exists?(table)
click to toggle source
# File lib/pgsync/data_source.rb, line 51 def table_exists?(table) table_set.include?(table) end
tables()
click to toggle source
gets visible tables
# File lib/pgsync/data_source.rb, line 34 def tables @tables ||= begin query = <<~SQL SELECT table_schema AS schema, table_name AS table FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('information_schema', 'pg_catalog') ORDER BY 1, 2 SQL execute(query).map { |row| Table.new(row["schema"], row["table"]) } end end
transaction() { || ... }
click to toggle source
# File lib/pgsync/data_source.rb, line 145 def transaction if conn.transaction_status == 0 # not currently in transaction log_sql "BEGIN" result = conn.transaction do yield end log_sql "COMMIT" result else yield end end
triggers(table)
click to toggle source
# File lib/pgsync/data_source.rb, line 88 def triggers(table) query = <<~SQL SELECT tgname AS name, tgisinternal AS internal, tgenabled != 'D' AS enabled, tgconstraint != 0 AS integrity FROM pg_trigger WHERE pg_trigger.tgrelid = $1::regclass SQL execute(query, [quote_ident_full(table)]) end
truncate(table)
click to toggle source
# File lib/pgsync/data_source.rb, line 67 def truncate(table) execute("TRUNCATE #{quote_ident_full(table)} CASCADE") end
Private Instance Methods
concurrent_id()
click to toggle source
# File lib/pgsync/data_source.rb, line 171 def concurrent_id [Process.pid, Thread.current.object_id] end
conninfo()
click to toggle source
# File lib/pgsync/data_source.rb, line 184 def conninfo @conninfo ||= begin unless conn.respond_to?(:conninfo_hash) raise Error, "libpq is too old. Upgrade it and run `gem install pg`" end conn.conninfo_hash end end
dedup_localhost(value)
click to toggle source
for pg 1.4.4 github.com/ged/ruby-pg/issues/490
# File lib/pgsync/data_source.rb, line 195 def dedup_localhost(value) if conninfo[:host] == "localhost,localhost" && conninfo[:port].to_s.split(",").uniq.size == 1 value.split(",")[0] else value end end
reconnect()
click to toggle source
# File lib/pgsync/data_source.rb, line 175 def reconnect @conn.reset @concurrent_id = concurrent_id end
table_set()
click to toggle source
# File lib/pgsync/data_source.rb, line 180 def table_set @table_set ||= Set.new(tables) end