class Queuel::Base::Queue

Attributes

client[RW]
name[RW]

Public Class Methods

new(client, queue_name) click to toggle source
# File lib/queuel/base/queue.rb, line 6
def initialize(client, queue_name)
  self.client = client
  self.name = queue_name
end

Public Instance Methods

max_pool_tasks() click to toggle source
# File lib/queuel/base/queue.rb, line 39
def max_pool_tasks
  Queuel.max_pool_tasks || nil
end
peek(options = {}) click to toggle source
# File lib/queuel/base/queue.rb, line 11
def peek(options = {})
  raise NotImplementedError, "must implement #peek"
end
pop(options = {}) { |message| ... } click to toggle source
# File lib/queuel/base/queue.rb, line 19
def pop(options = {}, &block)
  message_options, engine_options = Queuel::Hash.new(options).partition { |(k,_)| message_option_keys.include? k.to_s }
  bare_message = pop_bare_message(engine_options)
  unless bare_message.nil?
    build_new_message(bare_message, message_options).tap { |message|
      if block_given? && message.present?
        message.delete if yield(message)
      end
    }
  end
end
push(message, options = {}) click to toggle source
# File lib/queuel/base/queue.rb, line 15
def push(message, options = {})
  raise NotImplementedError, "must implement #push"
end
receive(options = {}, &block) click to toggle source
# File lib/queuel/base/queue.rb, line 31
def receive(options = {}, &block)
  poller_klass.new(self, block, options, thread_count).poll
end
size() click to toggle source
# File lib/queuel/base/queue.rb, line 35
def size
  raise NotImplementedError, "must implement #size"
end

Private Instance Methods

build_new_message(bare_message, options = {}) click to toggle source
# File lib/queuel/base/queue.rb, line 65
def build_new_message(bare_message, options = {})
  message_klass.new(bare_message, options)
end
build_push_message(message, options = {}) click to toggle source
# File lib/queuel/base/queue.rb, line 51
def build_push_message(message, options = {})
  message_klass.new(nil, options).tap { |m|
    m.body = message
  }.raw_body
end
message_klass() click to toggle source
# File lib/queuel/base/queue.rb, line 69
def message_klass
  self.class.const_with_nesting("Message")
end
message_option_keys() click to toggle source
# File lib/queuel/base/queue.rb, line 47
def message_option_keys
  %w[encode encoder decode decoder]
end
poller_klass() click to toggle source
# File lib/queuel/base/queue.rb, line 73
def poller_klass
  self.class.const_with_nesting("Poller")
end
pop_bare_message(options = {}) click to toggle source
# File lib/queuel/base/queue.rb, line 61
def pop_bare_message(options = {})
  raise NotImplementedError, "must implement bare Message getter"
end
thread_count() click to toggle source
# File lib/queuel/base/queue.rb, line 57
def thread_count
  Queuel.receiver_threads || 1
end