class WCC::Contentful::Store::PostgresStore

Implements the store interface where all Contentful entries are stored in a JSONB table.

Constants

EXPECTED_VERSION

Attributes

connection_pool[R]
logger[RW]

Public Class Methods

new(_config = nil, connection_options = nil, pool_options = nil) click to toggle source
Calls superclass method WCC::Contentful::Store::Base::new
# File lib/wcc/contentful/store/postgres_store.rb, line 20
def initialize(_config = nil, connection_options = nil, pool_options = nil)
  super()
  @schema_ensured = false
  connection_options ||= { dbname: 'postgres' }
  pool_options ||= {}
  @connection_pool = PostgresStore.build_connection_pool(connection_options, pool_options)
  @dirty = false
end

Private Class Methods

build_connection_pool(connection_options, pool_options) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 305
def build_connection_pool(connection_options, pool_options)
  ConnectionPool.new(pool_options) do
    PG.connect(connection_options).tap do |conn|
      unless schema_ensured?(conn)
        @@schema_mutex.synchronize do
          ensure_schema(conn) unless schema_ensured?(conn)
        end
      end
      prepare_statements(conn)
    end
  end
end
ensure_schema(conn) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 329
def ensure_schema(conn)
  result =
    begin
      conn.exec('SELECT version FROM wcc_contentful_schema_version' \
    ' ORDER BY version DESC')
    rescue PG::UndefinedTable
      []
    end
  1.upto(EXPECTED_VERSION).each do |version_num|
    next if result.find { |row| row['version'].to_s == version_num.to_s }

    conn.exec(File.read(File.join(__dir__, "postgres_store/schema_#{version_num}.sql")))
  end
end
prepare_statements(conn) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 293
def prepare_statements(conn)
  conn.prepare('upsert_entry', 'SELECT * FROM fn_contentful_upsert_entry($1,$2,$3)')
  conn.prepare('select_entry', 'SELECT * FROM contentful_raw WHERE id = $1')
  conn.prepare('select_ids', 'SELECT id FROM contentful_raw')
  conn.prepare('delete_by_id', 'DELETE FROM contentful_raw WHERE id = $1 RETURNING *')
  conn.prepare('refresh_views_concurrently',
    'REFRESH MATERIALIZED VIEW CONCURRENTLY contentful_raw_includes_ids_jointable')
end
schema_ensured?(conn) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 318
def schema_ensured?(conn)
  result = conn.exec('SELECT version FROM wcc_contentful_schema_version' \
    ' ORDER BY version DESC LIMIT 1')
  return false if result.num_tuples == 0

  result[0]['version'].to_i >= EXPECTED_VERSION
rescue PG::UndefinedTable
  # need to run v1 schema migration
  false
end

Public Instance Methods

delete(key) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 68
def delete(key)
  result = @connection_pool.with { |conn| conn.exec_prepared('delete_by_id', [key]) }
  return if result.num_tuples == 0

  JSON.parse(result.getvalue(0, 1))
end
exec_query(statement, params = []) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 92
def exec_query(statement, params = [])
  if mutex.with_read_lock { @dirty }
    was_dirty =
      mutex.with_write_lock do
        was_dirty = @dirty
        @dirty = false
        was_dirty
      end

    if was_dirty
      _instrument 'refresh_views' do
        @connection_pool.with { |conn| conn.exec_prepared('refresh_views_concurrently') }
      end
    end
  end

  logger&.debug('[PostgresStore] ' + statement + "\n" + params.inspect)
  _instrument 'exec' do
    @connection_pool.with { |conn| conn.exec(statement, params) }
  end
end
find(key, **_options) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 75
def find(key, **_options)
  result = @connection_pool.with { |conn| conn.exec_prepared('select_entry', [key]) }
  return if result.num_tuples == 0

  JSON.parse(result.getvalue(0, 1))
rescue PG::ConnectionBad
  nil
end
find_all(content_type:, options: nil) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 84
def find_all(content_type:, options: nil)
  Query.new(
    self,
    content_type: content_type,
    options: options
  )
end
keys() click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 59
def keys
  result = @connection_pool.with { |conn| conn.exec_prepared('select_ids') }
  arr = []
  result.each { |r| arr << r['id'].strip }
  arr
rescue PG::ConnectionBad
  []
end
set(key, value) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 29
def set(key, value)
  ensure_hash value
  result =
    @connection_pool.with do |conn|
      conn.exec_prepared('upsert_entry', [
                           key,
                           value.to_json,
                           quote_array(extract_links(value))
                         ])
    end

  previous_value =
    if result.num_tuples == 0
      nil
    else
      val = result.getvalue(0, 0)
      JSON.parse(val) if val
    end

  if views_need_update?(value, previous_value)
    # mark dirty - we need to refresh the materialized view
    unless mutex.with_read_lock { @dirty }
      _instrument 'mark_dirty'
      mutex.with_write_lock { @dirty = true }
    end
  end

  previous_value
end

Private Instance Methods

quote_array(arr) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 137
def quote_array(arr)
  return unless arr

  encoder = PG::TextEncoder::Array.new
  encoder.encode(arr)
end
views_need_update?(value, previous_value) click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 144
def views_need_update?(value, previous_value)
  # contentful_raw_includes_ids_jointable needs update if any links change
  return true if extract_links(value) != extract_links(previous_value)
end