class Queuel::Base::Queue
Attributes
client[RW]
name[RW]
Public Class Methods
new(client, queue_name)
click to toggle source
# File lib/queuel/base/queue.rb, line 6 def initialize(client, queue_name) self.client = client self.name = queue_name end
Public Instance Methods
max_pool_tasks()
click to toggle source
# File lib/queuel/base/queue.rb, line 39 def max_pool_tasks Queuel.max_pool_tasks || nil end
peek(options = {})
click to toggle source
# File lib/queuel/base/queue.rb, line 11 def peek(options = {}) raise NotImplementedError, "must implement #peek" end
pop(options = {}) { |message| ... }
click to toggle source
# File lib/queuel/base/queue.rb, line 19 def pop(options = {}, &block) message_options, engine_options = Queuel::Hash.new(options).partition { |(k,_)| message_option_keys.include? k.to_s } bare_message = pop_bare_message(engine_options) unless bare_message.nil? build_new_message(bare_message, message_options).tap { |message| if block_given? && message.present? message.delete if yield(message) end } end end
push(message, options = {})
click to toggle source
# File lib/queuel/base/queue.rb, line 15 def push(message, options = {}) raise NotImplementedError, "must implement #push" end
receive(options = {}, &block)
click to toggle source
# File lib/queuel/base/queue.rb, line 31 def receive(options = {}, &block) poller_klass.new(self, block, options, thread_count).poll end
size()
click to toggle source
# File lib/queuel/base/queue.rb, line 35 def size raise NotImplementedError, "must implement #size" end
Private Instance Methods
build_new_message(bare_message, options = {})
click to toggle source
# File lib/queuel/base/queue.rb, line 65 def build_new_message(bare_message, options = {}) message_klass.new(bare_message, options) end
build_push_message(message, options = {})
click to toggle source
# File lib/queuel/base/queue.rb, line 51 def build_push_message(message, options = {}) message_klass.new(nil, options).tap { |m| m.body = message }.raw_body end
message_klass()
click to toggle source
# File lib/queuel/base/queue.rb, line 69 def message_klass self.class.const_with_nesting("Message") end
message_option_keys()
click to toggle source
# File lib/queuel/base/queue.rb, line 47 def message_option_keys %w[encode encoder decode decoder] end
poller_klass()
click to toggle source
# File lib/queuel/base/queue.rb, line 73 def poller_klass self.class.const_with_nesting("Poller") end
pop_bare_message(options = {})
click to toggle source
# File lib/queuel/base/queue.rb, line 61 def pop_bare_message(options = {}) raise NotImplementedError, "must implement bare Message getter" end
thread_count()
click to toggle source
# File lib/queuel/base/queue.rb, line 57 def thread_count Queuel.receiver_threads || 1 end