class DirtyPipeline::PG::Queue

Constants

DELETE_ACTIVE
DELETE_ACTIVE_EVENT
DELETE_EVENT
DELETE_EVENTS
PUSH_EVENT
SELECT_ACTIVE_EVENT
SELECT_ALL_EVENTS
SELECT_LAST_EVENT
SET_EVENT_ACTIVE
UNSHIFT_EVENT

Public Class Methods

create!(connection) click to toggle source

decoder = PG::TextDecoder::Array.new see stackoverflow.com/questions/34886260/how-do-you-decode-a-json-field-using-the-pg-gem

# File lib/dirty_pipeline/pg/queue.rb, line 6
      def self.create!(connection)
        connection.exec <<~SQL
          CREATE TABLE dp_active_events (
            key TEXT CONSTRAINT primary_event_queues_key PRIMARY KEY,
            payload TEXT,
            created_at TIMESTAMP NOT NULL DEFAULT now()
          );

          CREATE SEQUENCE dp_event_queues_id_seq START 1;
          CREATE TABLE dp_event_queues (
            id BIGINT PRIMARY KEY DEFAULT nextval('dp_event_queues_id_seq'),
            key TEXT NOT NULL,
            payload TEXT,
            created_at TIMESTAMP NOT NULL DEFAULT now()
          );
        SQL
      end
destroy!(connection) click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 24
      def self.destroy!(connection)
        connection.exec <<~SQL
          DROP TABLE IF EXISTS dp_active_events;
          DROP TABLE IF EXISTS dp_event_queues;
          DROP SEQUENCE IF EXISTS dp_event_queues_id_seq;
        SQL
      end
new(operation, subject_class, subject_id, transaction_id) click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 32
def initialize(operation, subject_class, subject_id, transaction_id)
  @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \
          "op_#{operation}:txid_#{transaction_id}"
end

Public Instance Methods

<<(event)
Alias for: push
clear!() click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 48
def clear!
  with_postgres do |c|
    c.transaction do |tc|
      tc.exec(DELETE_ACTIVE, [active_event_key])
      tc.exec(DELETE_EVENTS, [events_queue_key])
    end
  end
end
pop() click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 106
def pop
  with_postgres do |c|
    c.transaction do |tc|
      event_id, raw_event =
        PG.multi(tc.exec(SELECT_LAST_EVENT, [events_queue_key]))
      if raw_event.nil?
        tc.exec(DELETE_ACTIVE_EVENT, [active_event_key])
      else
        tc.exec(DELETE_EVENT, [events_queue_key, event_id])
        tc.exec(SET_EVENT_ACTIVE, [active_event_key, raw_event])
      end
      unpack(raw_event)
    end
  end
end
processing_event() click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 125
def processing_event
  with_postgres do |c|
    raw_event = PG.single(
      c.exec(SELECT_ACTIVE_EVENT, [active_event_key])
    )
    unpack(raw_event)
  end
end
push(event) click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 72
def push(event)
  with_postgres do |c|
    c.exec(PUSH_EVENT, [events_queue_key, pack(event)])
  end

  self
end
Also aliased as: <<
to_a() click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 60
def to_a
  with_postgres do |c|
    c.exec(SELECT_ALL_EVENTS, [events_queue_key]).to_a.map! do |row|
      unpack(row.values.first)
    end
  end
end
unshift(event) click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 84
def unshift(event)
  with_postgres do |c|
    c.exec(UNSHIFT_EVENT, [events_queue_key, pack(event)])
  end
  self
end
with_postgres(&block) click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 37
def with_postgres(&block)
  DirtyPipeline.with_postgres(&block)
end

Private Instance Methods

active_event_key() click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 166
def active_event_key
  "#{@root}:active"
end
events_queue_key() click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 162
def events_queue_key
  "#{@root}:events"
end
pack(event) click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 136
def pack(event)
  JSON.dump(
    "evid" => event.id,
    "txid" => event.tx_id,
    "transit" => event.transition,
    "args" => event.args,
    "source" => event.source,
    "destination" => event.destination
  )
end
unpack(packed_event) click to toggle source
# File lib/dirty_pipeline/pg/queue.rb, line 147
def unpack(packed_event)
  return unless packed_event
  unpacked_event = JSON.load(packed_event)
  Event.new(
    data: {
      "uuid" => unpacked_event["evid"],
      "transaction_uuid" => unpacked_event["txid"],
      "transition" => unpacked_event["transit"],
      "args" => unpacked_event["args"],
      "source" => unpacked_event["source"],
      "destination" => unpacked_event["destination"]
    }
  )
end