class PgSync::DataSource

Attributes

url[R]

Public Class Methods

new(url, name:, debug:) click to toggle source
# File lib/pgsync/data_source.rb, line 7
def initialize(url, name:, debug:)
  @url = url
  @name = name
  @debug = debug
end

Public Instance Methods

close() click to toggle source
# File lib/pgsync/data_source.rb, line 120
def close
  if @conn
    @conn.close
    @conn = nil
  end
end
conn() click to toggle source
# File lib/pgsync/data_source.rb, line 103
def conn
  @conn ||= begin
    begin
      ENV["PGCONNECT_TIMEOUT"] ||= "3"
      if @url.start_with?("postgres://", "postgresql://")
        config = @url
      else
        config = {dbname: @url}
      end
      @concurrent_id = concurrent_id
      PG::Connection.new(config)
    rescue URI::InvalidURIError
      raise Error, "Invalid connection string. Make sure it works with `psql`"
    end
  end
end
create_schema(schema) click to toggle source
# File lib/pgsync/data_source.rb, line 84
def create_schema(schema)
  execute("CREATE SCHEMA #{quote_ident(schema)}")
end
dbname() click to toggle source
# File lib/pgsync/data_source.rb, line 29
def dbname
  @dbname ||= conninfo[:dbname]
end
execute(query, params = []) click to toggle source
# File lib/pgsync/data_source.rb, line 140
def execute(query, params = [])
  log_sql query, params
  conn.exec_params(query, params).to_a
end
exists?() click to toggle source
# File lib/pgsync/data_source.rb, line 13
def exists?
  @url && @url.size > 0
end
host() click to toggle source
# File lib/pgsync/data_source.rb, line 21
def host
  @host ||= dedup_localhost(conninfo[:host])
end
last_value(seq) click to toggle source
# File lib/pgsync/data_source.rb, line 63
def last_value(seq)
  execute("SELECT last_value FROM #{quote_ident_full(seq)}").first["last_value"]
end
local?() click to toggle source
# File lib/pgsync/data_source.rb, line 17
def local?
  !host || %w(localhost 127.0.0.1).include?(host)
end
log_sql(query, params = {}) click to toggle source

TODO log time for each statement

# File lib/pgsync/data_source.rb, line 161
def log_sql(query, params = {})
  if @debug
    message = "#{colorize("[#{@name}]", :cyan)} #{query.gsub(/\s+/, " ").strip}"
    message = "#{message} #{params.inspect}" if params.any?
    log message
  end
end
max_id(table, primary_key, sql_clause = nil) click to toggle source
# File lib/pgsync/data_source.rb, line 55
def max_id(table, primary_key, sql_clause = nil)
  execute("SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["max"].to_i
end
min_id(table, primary_key, sql_clause = nil) click to toggle source
# File lib/pgsync/data_source.rb, line 59
def min_id(table, primary_key, sql_clause = nil)
  execute("SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["min"].to_i
end
port() click to toggle source
# File lib/pgsync/data_source.rb, line 25
def port
  @port ||= dedup_localhost(conninfo[:port])
end
reconnect_if_needed() click to toggle source

reconnect for new thread or process

# File lib/pgsync/data_source.rb, line 128
def reconnect_if_needed
  reconnect if @concurrent_id != concurrent_id
end
schemas() click to toggle source
# File lib/pgsync/data_source.rb, line 71
    def schemas
      @schemas ||= begin
        query = <<~SQL
          SELECT
            schema_name
          FROM
            information_schema.schemata
          ORDER BY 1
        SQL
        execute(query).map { |row| row["schema_name"] }
      end
    end
search_path() click to toggle source
# File lib/pgsync/data_source.rb, line 132
def search_path
  @search_path ||= execute("SELECT unnest(current_schemas(true)) AS schema").map { |r| r["schema"] }
end
server_version_num() click to toggle source
# File lib/pgsync/data_source.rb, line 136
def server_version_num
  @server_version_num ||= execute("SHOW server_version_num").first["server_version_num"].to_i
end
table_exists?(table) click to toggle source
# File lib/pgsync/data_source.rb, line 51
def table_exists?(table)
  table_set.include?(table)
end
tables() click to toggle source

gets visible tables

# File lib/pgsync/data_source.rb, line 34
    def tables
      @tables ||= begin
        query = <<~SQL
          SELECT
            table_schema AS schema,
            table_name AS table
          FROM
            information_schema.tables
          WHERE
            table_type = 'BASE TABLE' AND
            table_schema NOT IN ('information_schema', 'pg_catalog')
          ORDER BY 1, 2
        SQL
        execute(query).map { |row| Table.new(row["schema"], row["table"]) }
      end
    end
transaction() { || ... } click to toggle source
# File lib/pgsync/data_source.rb, line 145
def transaction
  if conn.transaction_status == 0
    # not currently in transaction
    log_sql "BEGIN"
    result =
      conn.transaction do
        yield
      end
    log_sql "COMMIT"
    result
  else
    yield
  end
end
triggers(table) click to toggle source
# File lib/pgsync/data_source.rb, line 88
    def triggers(table)
      query = <<~SQL
        SELECT
          tgname AS name,
          tgisinternal AS internal,
          tgenabled != 'D' AS enabled,
          tgconstraint != 0 AS integrity
        FROM
          pg_trigger
        WHERE
          pg_trigger.tgrelid = $1::regclass
      SQL
      execute(query, [quote_ident_full(table)])
    end
truncate(table) click to toggle source
# File lib/pgsync/data_source.rb, line 67
def truncate(table)
  execute("TRUNCATE #{quote_ident_full(table)} CASCADE")
end

Private Instance Methods

concurrent_id() click to toggle source
# File lib/pgsync/data_source.rb, line 171
def concurrent_id
  [Process.pid, Thread.current.object_id]
end
conninfo() click to toggle source
# File lib/pgsync/data_source.rb, line 184
def conninfo
  @conninfo ||= begin
    unless conn.respond_to?(:conninfo_hash)
      raise Error, "libpq is too old. Upgrade it and run `gem install pg`"
    end
    conn.conninfo_hash
  end
end
dedup_localhost(value) click to toggle source

for pg 1.4.4 github.com/ged/ruby-pg/issues/490

# File lib/pgsync/data_source.rb, line 195
def dedup_localhost(value)
  if conninfo[:host] == "localhost,localhost" && conninfo[:port].to_s.split(",").uniq.size == 1
    value.split(",")[0]
  else
    value
  end
end
reconnect() click to toggle source
# File lib/pgsync/data_source.rb, line 175
def reconnect
  @conn.reset
  @concurrent_id = concurrent_id
end
table_set() click to toggle source
# File lib/pgsync/data_source.rb, line 180
def table_set
  @table_set ||= Set.new(tables)
end