class DirtyPipeline::PG::Queue
Constants
- DELETE_ACTIVE
- DELETE_ACTIVE_EVENT
- DELETE_EVENT
- DELETE_EVENTS
- PUSH_EVENT
- SELECT_ACTIVE_EVENT
- SELECT_ALL_EVENTS
- SELECT_LAST_EVENT
- SET_EVENT_ACTIVE
- UNSHIFT_EVENT
Public Class Methods
create!(connection)
click to toggle source
decoder = PG::TextDecoder::Array.new see stackoverflow.com/questions/34886260/how-do-you-decode-a-json-field-using-the-pg-gem
# File lib/dirty_pipeline/pg/queue.rb, line 6 def self.create!(connection) connection.exec <<~SQL CREATE TABLE dp_active_events ( key TEXT CONSTRAINT primary_event_queues_key PRIMARY KEY, payload TEXT, created_at TIMESTAMP NOT NULL DEFAULT now() ); CREATE SEQUENCE dp_event_queues_id_seq START 1; CREATE TABLE dp_event_queues ( id BIGINT PRIMARY KEY DEFAULT nextval('dp_event_queues_id_seq'), key TEXT NOT NULL, payload TEXT, created_at TIMESTAMP NOT NULL DEFAULT now() ); SQL end
destroy!(connection)
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 24 def self.destroy!(connection) connection.exec <<~SQL DROP TABLE IF EXISTS dp_active_events; DROP TABLE IF EXISTS dp_event_queues; DROP SEQUENCE IF EXISTS dp_event_queues_id_seq; SQL end
new(operation, subject_class, subject_id, transaction_id)
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 32 def initialize(operation, subject_class, subject_id, transaction_id) @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \ "op_#{operation}:txid_#{transaction_id}" end
Public Instance Methods
clear!()
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 48 def clear! with_postgres do |c| c.transaction do |tc| tc.exec(DELETE_ACTIVE, [active_event_key]) tc.exec(DELETE_EVENTS, [events_queue_key]) end end end
pop()
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 106 def pop with_postgres do |c| c.transaction do |tc| event_id, raw_event = PG.multi(tc.exec(SELECT_LAST_EVENT, [events_queue_key])) if raw_event.nil? tc.exec(DELETE_ACTIVE_EVENT, [active_event_key]) else tc.exec(DELETE_EVENT, [events_queue_key, event_id]) tc.exec(SET_EVENT_ACTIVE, [active_event_key, raw_event]) end unpack(raw_event) end end end
processing_event()
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 125 def processing_event with_postgres do |c| raw_event = PG.single( c.exec(SELECT_ACTIVE_EVENT, [active_event_key]) ) unpack(raw_event) end end
push(event)
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 72 def push(event) with_postgres do |c| c.exec(PUSH_EVENT, [events_queue_key, pack(event)]) end self end
Also aliased as: <<
to_a()
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 60 def to_a with_postgres do |c| c.exec(SELECT_ALL_EVENTS, [events_queue_key]).to_a.map! do |row| unpack(row.values.first) end end end
unshift(event)
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 84 def unshift(event) with_postgres do |c| c.exec(UNSHIFT_EVENT, [events_queue_key, pack(event)]) end self end
with_postgres(&block)
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 37 def with_postgres(&block) DirtyPipeline.with_postgres(&block) end
Private Instance Methods
active_event_key()
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 166 def active_event_key "#{@root}:active" end
events_queue_key()
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 162 def events_queue_key "#{@root}:events" end
pack(event)
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 136 def pack(event) JSON.dump( "evid" => event.id, "txid" => event.tx_id, "transit" => event.transition, "args" => event.args, "source" => event.source, "destination" => event.destination ) end
unpack(packed_event)
click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 147 def unpack(packed_event) return unless packed_event unpacked_event = JSON.load(packed_event) Event.new( data: { "uuid" => unpacked_event["evid"], "transaction_uuid" => unpacked_event["txid"], "transition" => unpacked_event["transit"], "args" => unpacked_event["args"], "source" => unpacked_event["source"], "destination" => unpacked_event["destination"] } ) end