module StartHer::Subscriber
Constants
- DEFAULT_ERROR_BLOCK
- DEFAULT_OPTS
Attributes
channels[R]
options[R]
subscriber_heartbeat_block[R]
subscriber_on_psubscribe_block[R]
Public Class Methods
included(base)
click to toggle source
# File lib/start_her/subscriber.rb, line 13 def self.included(base) base.extend(ClassMethods) end
new(channels)
click to toggle source
# File lib/start_her/subscriber.rb, line 42 def initialize(channels) @channels = channels @options = self.class.subscriber_options_hash || DEFAULT_OPTS @subscriber_heartbeat_block = self.class.subscriber_heartbeat_block || ->(_response) {} @subscriber_on_psubscribe_block = self.class.subscriber_on_psubscribe_block || ->(_chan) {} end
Public Instance Methods
psubscribe(channels, &block)
click to toggle source
rubocop:disable Metrics/AbcSize,Metrics/MethodLength
# File lib/start_her/subscriber.rb, line 64 def psubscribe(channels, &block) exponential_backoff({}, ::Redis::BaseConnectionError) do client.psubscribe(options[:heartbeat][:in], *channels) do |on| on.psubscribe do |channel, _| StartHer.logger.info "PSuscribe on #{channel}" resync_messages(client, channel, &block) subscriber_on_psubscribe_block.call(unamespace(channel)) end on.pmessage do |_pattern, channel, message| chan = unamespace(channel) msg = MessagePack.unpack(message) if chan == options[:heartbeat][:in] Heartbeat.call(msg, service_klass, heartbeat: options[:heartbeat], &subscriber_heartbeat_block) else block.call(chan, msg) end end end end rescue => e StartHer.logger.error e raise e end
resync_messages(client, channel, &block)
click to toggle source
rubocop:enable Metrics/AbcSize,Metrics/MethodLength
# File lib/start_her/subscriber.rb, line 93 def resync_messages(client, channel, &block) StartHer.logger.info "Re-synchronize message from #{channel}" # Retrieve all backlogs for a given channel client.keys(unamespace(channel) + '*').each do |backlog| # Retrieve all messages for a given backlog client.lrange(backlog, 0, -1).each do |message| block.call(backlog.split(':').first, MessagePack.unpack(message)) end end end
run!()
click to toggle source
# File lib/start_her/subscriber.rb, line 49 def run! psubscribe(channels) do |channel, message| begin process_message(channel, message) rescue => e if block_given? subscriber_error_block.call(e) else DEFAULT_ERROR_BLOCK.call(e) end end end end
Private Instance Methods
unamespace(channel)
click to toggle source
# File lib/start_her/subscriber.rb, line 107 def unamespace(channel) channel.split(':').last end