class Emque::Consuming::Router

Constants

ConfigurationError

Attributes

mappings[RW]

Public Class Methods

new() click to toggle source
# File lib/emque/consuming/router.rb, line 6
def initialize
  self.mappings = {}
end

Public Instance Methods

map(&block) click to toggle source
# File lib/emque/consuming/router.rb, line 10
def map(&block)
  self.instance_eval(&block)
end
route(topic, type, message) click to toggle source
# File lib/emque/consuming/router.rb, line 20
def route(topic, type, message)
  mappings[topic.to_sym].each do |mapping|
    method = mapping.route(type.to_s)

    if method
      consumer = mapping.consumer

      if mapping.middleware?
        message = message.with(
          :values =>
            Oj.load(
              mapping
                .middleware
                .inject(message.original) { |compiled, callable|
                  callable.call(compiled)
                },
              :symbol_keys => true
            )
        )
      end

      consumer.new.consume(method, message)
    end
  end
end
topic(mapping, &block) click to toggle source
# File lib/emque/consuming/router.rb, line 14
def topic(mapping, &block)
  mapping = Mapping.new(mapping, &block)
  mappings[mapping.topic.to_sym] ||= []
  mappings[mapping.topic.to_sym] << mapping
end
topic_mapping() click to toggle source
# File lib/emque/consuming/router.rb, line 46
def topic_mapping
  mappings.inject({}) do |hash, (topic, maps)|
    hash.tap do |h|
      h[topic] = maps.map(&:consumer)
    end
  end
end
workers(topic) click to toggle source
# File lib/emque/consuming/router.rb, line 54
def workers(topic)
  mappings[topic.to_sym].map(&:workers).max
end