module StartHer::Subscriber

Constants

DEFAULT_ERROR_BLOCK
DEFAULT_OPTS

Attributes

channels[R]
options[R]
subscriber_heartbeat_block[R]
subscriber_on_psubscribe_block[R]

Public Class Methods

included(base) click to toggle source
# File lib/start_her/subscriber.rb, line 13
def self.included(base)
  base.extend(ClassMethods)
end
new(channels) click to toggle source
# File lib/start_her/subscriber.rb, line 42
def initialize(channels)
  @channels = channels
  @options = self.class.subscriber_options_hash || DEFAULT_OPTS
  @subscriber_heartbeat_block = self.class.subscriber_heartbeat_block || ->(_response) {}
  @subscriber_on_psubscribe_block = self.class.subscriber_on_psubscribe_block || ->(_chan) {}
end

Public Instance Methods

psubscribe(channels, &block) click to toggle source

rubocop:disable Metrics/AbcSize,Metrics/MethodLength

# File lib/start_her/subscriber.rb, line 64
def psubscribe(channels, &block)
  exponential_backoff({}, ::Redis::BaseConnectionError) do
    client.psubscribe(options[:heartbeat][:in], *channels) do |on|
      on.psubscribe do |channel, _|
        StartHer.logger.info "PSuscribe on #{channel}"

        resync_messages(client, channel, &block)
        subscriber_on_psubscribe_block.call(unamespace(channel))
      end

      on.pmessage do |_pattern, channel, message|
        chan = unamespace(channel)
        msg = MessagePack.unpack(message)

        if chan == options[:heartbeat][:in]
          Heartbeat.call(msg, service_klass,
            heartbeat: options[:heartbeat], &subscriber_heartbeat_block)
        else
          block.call(chan, msg)
        end
      end
    end
  end
rescue => e
  StartHer.logger.error e
  raise e
end
resync_messages(client, channel, &block) click to toggle source

rubocop:enable Metrics/AbcSize,Metrics/MethodLength

# File lib/start_her/subscriber.rb, line 93
def resync_messages(client, channel, &block)
  StartHer.logger.info "Re-synchronize message from #{channel}"

  # Retrieve all backlogs for a given channel
  client.keys(unamespace(channel) + '*').each do |backlog|
    # Retrieve all messages for a given backlog
    client.lrange(backlog, 0, -1).each do |message|
      block.call(backlog.split(':').first, MessagePack.unpack(message))
    end
  end
end
run!() click to toggle source
# File lib/start_her/subscriber.rb, line 49
def run!
  psubscribe(channels) do |channel, message|
    begin
      process_message(channel, message)
    rescue => e
      if block_given?
        subscriber_error_block.call(e)
      else
        DEFAULT_ERROR_BLOCK.call(e)
      end
    end
  end
end

Private Instance Methods

unamespace(channel) click to toggle source
# File lib/start_her/subscriber.rb, line 107
def unamespace(channel)
  channel.split(':').last
end