class DirtyPipeline::Redis::Queue
Public Class Methods
new(operation, subject_class, subject_id, transaction_id)
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 4 def initialize(operation, subject_class, subject_id, transaction_id) @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \ "op_#{operation}:txid_#{transaction_id}" end
Public Instance Methods
clear!()
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 9 def clear! DirtyPipeline.with_redis do |r| r.del active_event_key r.del events_queue_key end end
pop()
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 35 def pop DirtyPipeline.with_redis do |r| data = r.lpop(events_queue_key) data.nil? ? r.del(active_event_key) : r.set(active_event_key, data) unpack(data) end end
processing_event()
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 43 def processing_event DirtyPipeline.with_redis { |r| unpack(r.get(active_event_key)) } end
push(event)
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 24 def push(event) DirtyPipeline.with_redis { |r| r.rpush(events_queue_key, pack(event)) } self end
Also aliased as: <<
to_a()
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 16 def to_a DirtyPipeline.with_redis do |r| r.lrange(events_queue_key, 0, -1).map! do |packed_event| unpack(packed_event) end end end
unshift(event)
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 30 def unshift(event) DirtyPipeline.with_redis { |r| r.lpush(events_queue_key, pack(event)) } self end
Private Instance Methods
active_event_key()
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 79 def active_event_key "#{@root}:active" end
events_queue_key()
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 75 def events_queue_key "#{@root}:events" end
pack(event)
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 49 def pack(event) JSON.dump( "evid" => event.id, "txid" => event.tx_id, "transit" => event.transition, "args" => event.args, "source" => event.source, "destination" => event.destination, ) end
unpack(packed_event)
click to toggle source
# File lib/dirty_pipeline/redis/queue.rb, line 60 def unpack(packed_event) return unless packed_event unpacked_event = JSON.load(packed_event) Event.new( data: { "uuid" => unpacked_event["evid"], "transaction_uuid" => unpacked_event["txid"], "transition" => unpacked_event["transit"], "args" => unpacked_event["args"], "source" => unpacked_event["source"], "destination" => unpacked_event["destination"] } ) end