class Asynchronic::QueueEngine::Ost
Attributes
default_queue[R]
queues[R]
redis[R]
Public Class Methods
new(options={})
click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 7 def initialize(options={}) @redis = Asynchronic.establish_redis_connection options @default_queue = options.fetch(:default_queue, Asynchronic.default_queue) @queues ||= Hash.new { |h,k| h[k] = Queue.new k, redis } @keep_alive_thread = notify_keep_alive end
Public Instance Methods
[](name)
click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 14 def [](name) queues[name] end
active_connections()
click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 35 def active_connections redis.call!('CLIENT', 'LIST').split("\n").map do |connection_info| name_attr = connection_info.split(' ').detect { |a| a.match(/name=/) } name_attr ? name_attr[5..-1] : nil end.uniq.compact.reject(&:empty?) end
asynchronic?()
click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 31 def asynchronic? true end
clear()
click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 22 def clear queues.clear redis.call!('KEYS', 'ost:*').each { |k| redis.call!('DEL', k) } end
listener()
click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 27 def listener Listener.new end
queue_names()
click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 18 def queue_names (queues.values.map(&:key) | redis.call!('KEYS', 'ost:*')).map { |q| q.to_s[4..-1].to_sym } end
Private Instance Methods
notify_keep_alive()
click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 46 def notify_keep_alive Thread.new do loop do redis.call! 'CLIENT', 'SETNAME', Asynchronic.connection_name sleep Asynchronic.keep_alive_timeout end end end