class WCC::Contentful::Store::PostgresStore
Implements the store interface where all Contentful
entries are stored in a JSONB table.
Constants
- EXPECTED_VERSION
Attributes
connection_pool[R]
logger[RW]
Public Class Methods
new(_config = nil, connection_options = nil, pool_options = nil)
click to toggle source
Calls superclass method
WCC::Contentful::Store::Base::new
# File lib/wcc/contentful/store/postgres_store.rb, line 20 def initialize(_config = nil, connection_options = nil, pool_options = nil) super() @schema_ensured = false connection_options ||= { dbname: 'postgres' } pool_options ||= {} @connection_pool = PostgresStore.build_connection_pool(connection_options, pool_options) @dirty = false end
Private Class Methods
build_connection_pool(connection_options, pool_options)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 305 def build_connection_pool(connection_options, pool_options) ConnectionPool.new(pool_options) do PG.connect(connection_options).tap do |conn| unless schema_ensured?(conn) @@schema_mutex.synchronize do ensure_schema(conn) unless schema_ensured?(conn) end end prepare_statements(conn) end end end
ensure_schema(conn)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 329 def ensure_schema(conn) result = begin conn.exec('SELECT version FROM wcc_contentful_schema_version' \ ' ORDER BY version DESC') rescue PG::UndefinedTable [] end 1.upto(EXPECTED_VERSION).each do |version_num| next if result.find { |row| row['version'].to_s == version_num.to_s } conn.exec(File.read(File.join(__dir__, "postgres_store/schema_#{version_num}.sql"))) end end
prepare_statements(conn)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 293 def prepare_statements(conn) conn.prepare('upsert_entry', 'SELECT * FROM fn_contentful_upsert_entry($1,$2,$3)') conn.prepare('select_entry', 'SELECT * FROM contentful_raw WHERE id = $1') conn.prepare('select_ids', 'SELECT id FROM contentful_raw') conn.prepare('delete_by_id', 'DELETE FROM contentful_raw WHERE id = $1 RETURNING *') conn.prepare('refresh_views_concurrently', 'REFRESH MATERIALIZED VIEW CONCURRENTLY contentful_raw_includes_ids_jointable') end
schema_ensured?(conn)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 318 def schema_ensured?(conn) result = conn.exec('SELECT version FROM wcc_contentful_schema_version' \ ' ORDER BY version DESC LIMIT 1') return false if result.num_tuples == 0 result[0]['version'].to_i >= EXPECTED_VERSION rescue PG::UndefinedTable # need to run v1 schema migration false end
Public Instance Methods
delete(key)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 68 def delete(key) result = @connection_pool.with { |conn| conn.exec_prepared('delete_by_id', [key]) } return if result.num_tuples == 0 JSON.parse(result.getvalue(0, 1)) end
exec_query(statement, params = [])
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 92 def exec_query(statement, params = []) if mutex.with_read_lock { @dirty } was_dirty = mutex.with_write_lock do was_dirty = @dirty @dirty = false was_dirty end if was_dirty _instrument 'refresh_views' do @connection_pool.with { |conn| conn.exec_prepared('refresh_views_concurrently') } end end end logger&.debug('[PostgresStore] ' + statement + "\n" + params.inspect) _instrument 'exec' do @connection_pool.with { |conn| conn.exec(statement, params) } end end
find(key, **_options)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 75 def find(key, **_options) result = @connection_pool.with { |conn| conn.exec_prepared('select_entry', [key]) } return if result.num_tuples == 0 JSON.parse(result.getvalue(0, 1)) rescue PG::ConnectionBad nil end
find_all(content_type:, options: nil)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 84 def find_all(content_type:, options: nil) Query.new( self, content_type: content_type, options: options ) end
keys()
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 59 def keys result = @connection_pool.with { |conn| conn.exec_prepared('select_ids') } arr = [] result.each { |r| arr << r['id'].strip } arr rescue PG::ConnectionBad [] end
set(key, value)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 29 def set(key, value) ensure_hash value result = @connection_pool.with do |conn| conn.exec_prepared('upsert_entry', [ key, value.to_json, quote_array(extract_links(value)) ]) end previous_value = if result.num_tuples == 0 nil else val = result.getvalue(0, 0) JSON.parse(val) if val end if views_need_update?(value, previous_value) # mark dirty - we need to refresh the materialized view unless mutex.with_read_lock { @dirty } _instrument 'mark_dirty' mutex.with_write_lock { @dirty = true } end end previous_value end
Private Instance Methods
extract_links(entry)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 116 def extract_links(entry) return [] unless fields = entry && entry['fields'] links = fields.flat_map do |_f, locale_hash| locale_hash&.flat_map do |_locale, value| if value.is_a? Array value.map { |val| val.dig('sys', 'id') if link?(val) } elsif link?(value) value.dig('sys', 'id') end end end links.compact end
link?(value)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 133 def link?(value) value.is_a?(Hash) && value.dig('sys', 'type') == 'Link' end
quote_array(arr)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 137 def quote_array(arr) return unless arr encoder = PG::TextEncoder::Array.new encoder.encode(arr) end
views_need_update?(value, previous_value)
click to toggle source
# File lib/wcc/contentful/store/postgres_store.rb, line 144 def views_need_update?(value, previous_value) # contentful_raw_includes_ids_jointable needs update if any links change return true if extract_links(value) != extract_links(previous_value) end