class Asynchronic::QueueEngine::Ost

Attributes

default_queue[R]
queues[R]
redis[R]

Public Class Methods

new(options={}) click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 7
def initialize(options={})
  @redis = Asynchronic.establish_redis_connection options
  @default_queue = options.fetch(:default_queue, Asynchronic.default_queue)
  @queues ||= Hash.new { |h,k| h[k] = Queue.new k, redis }
  @keep_alive_thread = notify_keep_alive
end

Public Instance Methods

[](name) click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 14
def [](name)
  queues[name]
end
active_connections() click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 35
def active_connections
  redis.call!('CLIENT', 'LIST').split("\n").map do |connection_info|
    name_attr = connection_info.split(' ').detect { |a| a.match(/name=/) }
    name_attr ? name_attr[5..-1] : nil
  end.uniq.compact.reject(&:empty?)
end
asynchronic?() click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 31
def asynchronic?
  true
end
clear() click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 22
def clear
  queues.clear
  redis.call!('KEYS', 'ost:*').each { |k| redis.call!('DEL', k) }
end
listener() click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 27
def listener
  Listener.new
end
queue_names() click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 18
def queue_names
  (queues.values.map(&:key) | redis.call!('KEYS', 'ost:*')).map { |q| q.to_s[4..-1].to_sym }
end

Private Instance Methods

notify_keep_alive() click to toggle source
# File lib/asynchronic/queue_engine/ost.rb, line 46
def notify_keep_alive
  Thread.new do
    loop do
      redis.call! 'CLIENT', 'SETNAME', Asynchronic.connection_name
      sleep Asynchronic.keep_alive_timeout
    end
  end
end