class Kinetic::Publisher
Public Class Methods
method_missing(meth, *args, &block)
click to toggle source
# File lib/kinetic/publisher.rb, line 18 def method_missing(meth, *args, &block) self.instance.send(meth, *args, &block) end
new()
click to toggle source
# File lib/kinetic/publisher.rb, line 24 def initialize logger.debug "Using serializer #{config.serializer}" require_relative "./serializers/#{config.serializer.to_s.downcase}" @serializer = Kinetic::Serializers.const_get(config.serializer) end
Public Instance Methods
publish_direct(key, message)
click to toggle source
# File lib/kinetic/publisher.rb, line 39 def publish_direct(key, message) direct_exchange.publish(serialize(message), routing_key: key) end
Also aliased as: publish
publish_fanout(key, message)
click to toggle source
# File lib/kinetic/publisher.rb, line 35 def publish_fanout(key, message) fanout_exchange.publish(serialize(message), routing_key: key) end
publish_topic(key, message)
click to toggle source
# File lib/kinetic/publisher.rb, line 30 def publish_topic(key, message) topic_exchange.publish(serialize(message), routing_key: key) true end
Private Instance Methods
channel()
click to toggle source
# File lib/kinetic/publisher.rb, line 74 def channel unless @connection logger.info "Establishing publisher connection to amqp://#{config.host}:#{config.port}" @connection = Bunny.new(host: config.host, port: config.port, threaded: false) logger.debug 'Starting connection' @connection.start @channel = @connection.channel end @channel end
deserialize(message)
click to toggle source
# File lib/kinetic/publisher.rb, line 63 def deserialize(message) @serializer.deserialize(message) end
direct_exchange()
click to toggle source
# File lib/kinetic/publisher.rb, line 55 def direct_exchange exchange(:direct) end
exchange(type)
click to toggle source
# File lib/kinetic/publisher.rb, line 69 def exchange(type) Bunny::Exchange.new(channel, type, "#{config.name}.#{type}") end
fanout_exchange()
click to toggle source
# File lib/kinetic/publisher.rb, line 51 def fanout_exchange exchange(:fanout) end
logger()
click to toggle source
# File lib/kinetic/publisher.rb, line 85 def logger @logger ||= Logger.new(File.join(config.root, config.log_file)) end
serialize(message)
click to toggle source
# File lib/kinetic/publisher.rb, line 59 def serialize(message) @serializer.serialize(message) end
topic_exchange()
click to toggle source
# File lib/kinetic/publisher.rb, line 47 def topic_exchange exchange(:topic) end