class Hivent::Redis::Consumer

Constants

CONSUMER_TTL
LUA_CONSUMER
LUA_HEARTBEAT
SLEEP_TIME

In milliseconds

Public Class Methods

new(redis, service_name, name, life_cycle_event_handler) click to toggle source
# File lib/hivent/redis/consumer.rb, line 16
def initialize(redis, service_name, name, life_cycle_event_handler)
  @redis                    = redis
  @service_name             = service_name
  @name                     = name
  @stop                     = false
  @life_cycle_event_handler = life_cycle_event_handler
end

Public Instance Methods

consume() click to toggle source
# File lib/hivent/redis/consumer.rb, line 38
def consume
  to_process = items

  to_process.each do |(queue, item)|
    payload = nil
    begin
      payload = JSON.parse(item).with_indifferent_access

      Hivent.emitter.broadcast(payload)

      @life_cycle_event_handler.event_processing_succeeded(event_name(payload), event_version(payload), payload)
    rescue => e
      @redis.lpush(dead_letter_queue_name(queue), item)

      @life_cycle_event_handler.event_processing_failed(e, payload, item, dead_letter_queue_name(queue))
    ensure
      @redis.rpop(queue)
    end
  end

  Kernel.sleep(SLEEP_TIME.to_f / 1000) if to_process.empty?
end
queues() click to toggle source
# File lib/hivent/redis/consumer.rb, line 34
def queues
  script(LUA_CONSUMER, @service_name, @name, CONSUMER_TTL) || []
end
run!() click to toggle source
# File lib/hivent/redis/consumer.rb, line 24
def run!
  start_heartbeat!
  consume while !@stop
end
stop!() click to toggle source
# File lib/hivent/redis/consumer.rb, line 29
def stop!
  @stop = true
  stop_heartbeat!
end

Private Instance Methods

dead_letter_queue_name(queue) click to toggle source
# File lib/hivent/redis/consumer.rb, line 97
def dead_letter_queue_name(queue)
  "#{queue}:dead_letter"
end
event_name(payload) click to toggle source
# File lib/hivent/redis/consumer.rb, line 89
def event_name(payload)
  payload["meta"].try(:[], "name")
end
event_version(payload) click to toggle source
# File lib/hivent/redis/consumer.rb, line 93
def event_version(payload)
  payload["meta"].try(:[], "version")
end
heartbeat!() click to toggle source
# File lib/hivent/redis/consumer.rb, line 79
def heartbeat!
  script(LUA_HEARTBEAT, @service_name, @name, CONSUMER_TTL)
end
items() click to toggle source
# File lib/hivent/redis/consumer.rb, line 83
def items
  queues
    .map    { |queue| [queue, @redis.lindex(queue, -1)] }
    .select { |(_queue, item)| item }
end
start_heartbeat!() click to toggle source
# File lib/hivent/redis/consumer.rb, line 63
def start_heartbeat!
  stop_heartbeat!

  @heartbeat = Thread.new do
    loop do
      heartbeat!

      Kernel.sleep(SLEEP_TIME.to_f / 1000)
    end
  end
end
stop_heartbeat!() click to toggle source
# File lib/hivent/redis/consumer.rb, line 75
def stop_heartbeat!
  @heartbeat.exit if @heartbeat.present?
end