class Wamp::Worker::Queue

Attributes

default_timeout[R]
redis[R]

Public Class Methods

new(name) click to toggle source

Constructor

# File lib/wamp/worker/queue.rb, line 44
def initialize(name)
  @redis = Wamp::Worker.config.redis(name)
  @default_timeout = Wamp::Worker.config.timeout(name)
end

Public Instance Methods

pop(queue_name, wait: false, delete: false, timeout: nil) click to toggle source

Pops a command off of the queue

@param queue_name [String, Array] - The name of the queue (or multiple queues if brpop) @param wait [Bool] - True if we want to block waiting for the response @param delete [Bool] - True if we want the queue deleted (only applicable if wait) @param timeout [Int] - Number of seconds to wait before timing out

# File lib/wamp/worker/queue.rb, line 73
def pop(queue_name, wait: false, delete: false, timeout: nil)

  # Retrieve the response from the queue
  if wait
    # Use the default timeout if non is specified
    timeout ||= self.default_timeout

    # Make the pop call
    response = self.redis.brpop(queue_name, timeout: timeout)

    # Returns [queue, value]
    if response != nil
      queue_name = response[0]
      response = response[1]
    end
  else
    # Else just call the method
    response = self.redis.rpop(queue_name)
  end

  # If delete was set, delete the queue
  if delete
    self.redis.del(queue_name)
  end

  # Parse the response
  descriptor = response != nil ? Descriptor.from_json(response) : nil

  # Log the info
  log(:pop, queue_name, descriptor)

  # Return the queue_name and the descriptor
  descriptor
end
push(queue_name, command, params, handle=nil) click to toggle source

Pushes a command onto the queue

@param queue_name [String] - The name of the queue @param command [Symbol] - The command @param params [Hash] - The params for the request @param handle [String] - The response handle

# File lib/wamp/worker/queue.rb, line 55
def push(queue_name, command, params, handle=nil)

  # Create the descriptor
  descriptor = Descriptor.new(command, handle, params)

  # Log the info
  log(:push, queue_name, descriptor)

  # Queue the command
  self.redis.lpush(queue_name, descriptor.to_json)
end

Private Instance Methods

log(type, queue_name, descriptor) click to toggle source

Logs the info

# File lib/wamp/worker/queue.rb, line 118
def log(type, queue_name, descriptor)
  return unless logger.level == Logger::DEBUG

  if descriptor
    logger.debug("#{self.class.name} #{type.upcase} : #{queue_name}")
    logger.debug("   command: #{descriptor.command}")
    logger.debug("   params: #{descriptor.params}")
    logger.debug("   handle: #{descriptor.handle}")
  else
    logger.debug("#{self.class.name} #{type.upcase} : #{queue_name} : EMPTY")
  end
end
logger() click to toggle source

Returns the logger

# File lib/wamp/worker/queue.rb, line 112
def logger
  Wamp::Worker.logger
end