class Que::Listener
Constants
- MESSAGE_FORMATS
Attributes
channel[R]
connection[R]
Public Class Methods
new(connection:, channel: nil)
click to toggle source
# File lib/que/listener.rb, line 9 def initialize(connection:, channel: nil) @connection = connection @channel = channel || "que_listener_#{connection.backend_pid}" Que.internal_log :listener_instantiate, self do { backend_pid: connection.backend_pid, } end end
Public Instance Methods
listen()
click to toggle source
# File lib/que/listener.rb, line 20 def listen connection.execute "LISTEN #{channel}" end
unlisten()
click to toggle source
# File lib/que/listener.rb, line 129 def unlisten # Be sure to drain all notifications so that any code that uses this # connection later doesn't receive any nasty surprises. connection.execute "UNLISTEN *" connection.drain_notifications Que.internal_log :listener_unlisten, self do { backend_pid: connection.backend_pid, channel: channel, } end end
wait_for_grouped_messages(timeout)
click to toggle source
# File lib/que/listener.rb, line 24 def wait_for_grouped_messages(timeout) messages = wait_for_messages(timeout) output = {} messages.each do |message| message_type = message.delete(:message_type) (output[message_type.to_sym] ||= []) << message.freeze end output end
wait_for_messages(timeout)
click to toggle source
# File lib/que/listener.rb, line 38 def wait_for_messages(timeout) # Make sure we never pass nil to this method, so we don't hang the thread. Que.assert(Numeric, timeout) Que.internal_log :listener_waiting, self do { backend_pid: connection.backend_pid, channel: channel, timeout: timeout, } end accumulated_messages = [] # Notifications often come in batches (especially when a transaction that # inserted many jobs commits), so we want to loop and pick up all the # received notifications before continuing. loop do notification_received = connection.wait_for_notify(timeout) do |channel, pid, payload| # We've received a notification, so zero out the timeout before we # loop again to check for another message. This ensures that we # don't wait an additional `timeout` seconds after processing the # final message before this method returns. timeout = 0 Que.internal_log(:listener_received_notification, self) do { channel: channel, backend_pid: connection.backend_pid, source_pid: pid, payload: payload, } end # Be very defensive about the message we receive - it may not be # valid JSON or have the structure we expect. next unless message = parse_payload(payload) case message when Array then accumulated_messages.concat(message) when Hash then accumulated_messages << message else raise Error, "Unexpected parse_payload output: #{message.class}" end end break unless notification_received end return accumulated_messages if accumulated_messages.empty? Que.internal_log(:listener_received_messages, self) do { backend_pid: connection.backend_pid, channel: channel, messages: accumulated_messages, } end accumulated_messages.keep_if do |message| next unless message.is_a?(Hash) next unless type = message[:message_type] next unless type.is_a?(String) next unless format = MESSAGE_FORMATS[type.to_sym] if message_matches_format?(message, format) true else error_message = [ "Message of type '#{type}' doesn't match format!", # Massage message and format a bit to make these errors more readable. "Message: #{Hash[message.reject{|k,v| k == :message_type}.sort_by{|k,v| k}].inspect}", "Format: #{Hash[format.sort_by{|k,v| k}].inspect}", ].join("\n") Que.notify_error_async(Error.new(error_message)) false end end Que.internal_log(:listener_filtered_messages, self) do { backend_pid: connection.backend_pid, channel: channel, messages: accumulated_messages, } end accumulated_messages end
Private Instance Methods
message_has_all_keys?(message, format)
click to toggle source
# File lib/que/listener.rb, line 158 def message_has_all_keys?(message, format) format.all? { |k,v| message.has_key?(k) } end
message_has_no_excess_keys?(message, format)
click to toggle source
# File lib/que/listener.rb, line 162 def message_has_no_excess_keys?(message, format) message.all? { |k,v| format.has_key?(k) || k == :message_type } end
message_keys_all_valid?(message, format)
click to toggle source
# File lib/que/listener.rb, line 166 def message_keys_all_valid?(message, format) message.all? do |key, value| if type = format[key] Que.assert?(type, value) else true end end end
message_matches_format?(message, format)
click to toggle source
# File lib/que/listener.rb, line 152 def message_matches_format?(message, format) message_has_all_keys?(message, format) && message_has_no_excess_keys?(message, format) && message_keys_all_valid?(message, format) end
parse_payload(payload)
click to toggle source
# File lib/que/listener.rb, line 145 def parse_payload(payload) Que.deserialize_json(payload) rescue JSON::ParserError => e Que.notify_error_async(e) nil end