class DirtyPipeline::Redis::Queue

Public Class Methods

new(operation, subject_class, subject_id, transaction_id) click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 4
def initialize(operation, subject_class, subject_id, transaction_id)
  @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \
          "op_#{operation}:txid_#{transaction_id}"
end

Public Instance Methods

<<(event)
Alias for: push
clear!() click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 9
def clear!
  DirtyPipeline.with_redis do |r|
    r.del active_event_key
    r.del events_queue_key
  end
end
pop() click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 35
def pop
  DirtyPipeline.with_redis do |r|
    data = r.lpop(events_queue_key)
    data.nil? ? r.del(active_event_key) : r.set(active_event_key, data)
    unpack(data)
  end
end
processing_event() click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 43
def processing_event
  DirtyPipeline.with_redis { |r| unpack(r.get(active_event_key)) }
end
push(event) click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 24
def push(event)
  DirtyPipeline.with_redis { |r| r.rpush(events_queue_key, pack(event)) }
  self
end
Also aliased as: <<
to_a() click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 16
def to_a
  DirtyPipeline.with_redis do |r|
    r.lrange(events_queue_key, 0, -1).map! do |packed_event|
      unpack(packed_event)
    end
  end
end
unshift(event) click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 30
def unshift(event)
  DirtyPipeline.with_redis { |r| r.lpush(events_queue_key, pack(event)) }
  self
end

Private Instance Methods

active_event_key() click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 79
def active_event_key
  "#{@root}:active"
end
events_queue_key() click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 75
def events_queue_key
  "#{@root}:events"
end
pack(event) click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 49
def pack(event)
  JSON.dump(
    "evid" => event.id,
    "txid" => event.tx_id,
    "transit" => event.transition,
    "args" => event.args,
    "source" => event.source,
    "destination" => event.destination,
  )
end
unpack(packed_event) click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 60
def unpack(packed_event)
  return unless packed_event
  unpacked_event = JSON.load(packed_event)
  Event.new(
    data: {
      "uuid" => unpacked_event["evid"],
      "transaction_uuid" => unpacked_event["txid"],
      "transition" => unpacked_event["transit"],
      "args" => unpacked_event["args"],
      "source" => unpacked_event["source"],
      "destination" => unpacked_event["destination"]
    }
  )
end