class PubSubModelSync::ServiceRabbit
Constants
- LISTEN_SETTINGS
- PUBLISH_SETTINGS
- QUEUE_SETTINGS
Attributes
channels[RW]
@!attribute topic_names
(Array): ['Topic 1', 'Topic 2'] @!attribute channels (Array): [Channel1] @!attribute exchanges (Hash<key: Exchange>): {topic_name: Exchange1}
exchanges[RW]
@!attribute topic_names
(Array): ['Topic 1', 'Topic 2'] @!attribute channels (Array): [Channel1] @!attribute exchanges (Hash<key: Exchange>): {topic_name: Exchange1}
service[RW]
@!attribute topic_names
(Array): ['Topic 1', 'Topic 2'] @!attribute channels (Array): [Channel1] @!attribute exchanges (Hash<key: Exchange>): {topic_name: Exchange1}
topic_names[RW]
@!attribute topic_names
(Array): ['Topic 1', 'Topic 2'] @!attribute channels (Array): [Channel1] @!attribute exchanges (Hash<key: Exchange>): {topic_name: Exchange1}
Public Class Methods
new()
click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 19 def initialize @service = Bunny.new(*config.bunny_connection) @topic_names = Array(config.topic_name || 'model_sync') @channels = [] @exchanges = {} end
Public Instance Methods
listen_messages()
click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 26 def listen_messages log('Listener starting...') subscribe_to_queues { |queue| queue.subscribe(LISTEN_SETTINGS, &method(:process_message)) } log('Listener started') loop { sleep 5 } rescue PubSubModelSync::Runner::ShutDown log('Listener stopped') rescue => e log("Error listening message: #{[e.message, e.backtrace]}", :error) end
publish(payload)
click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 37 def publish(payload) qty_retry ||= 0 deliver_data(payload) rescue => e if e.is_a?(Timeout::Error) && (qty_retry += 1) <= 2 log("Error publishing (retrying....): #{e.message}", :error) initialize retry end raise end
stop()
click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 49 def stop log('Listener stopping...') channels.each(&:close) service.close end
Private Instance Methods
deliver_data(payload)
click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 92 def deliver_data(payload) message_topics = Array(payload.headers[:topic_name] || config.default_topic_name) message_topics.each do |topic_name| subscribe_to_exchange(topic_name) do |_channel, exchange| exchange.publish(encode_payload(payload), message_settings(payload)) end end end
message_settings(payload)
click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 57 def message_settings(payload) { routing_key: payload.headers[:ordering_key], type: SERVICE_KEY, persistent: true }.merge(PUBLISH_SETTINGS) end
process_message(_delivery_info, meta_info, payload)
click to toggle source
Calls superclass method
PubSubModelSync::ServiceBase#process_message
# File lib/pub_sub_model_sync/service_rabbit.rb, line 65 def process_message(_delivery_info, meta_info, payload) super(payload) if meta_info[:type] == SERVICE_KEY end
subscribe_to_exchange(topic_name, &block)
click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 82 def subscribe_to_exchange(topic_name, &block) topic_name = topic_name.to_s exchanges[topic_name] ||= begin service.start channel = service.create_channel channel.fanout(topic_name) end block.call(channel, exchanges[topic_name]) end
subscribe_to_queues(&block)
click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 69 def subscribe_to_queues(&block) @channels = [] topic_names.each do |topic_name| subscribe_to_exchange(topic_name) do |channel, exchange| queue = channel.queue(config.subscription_key, QUEUE_SETTINGS) queue.bind(exchange) @channels << channel log("Subscribed to topic: #{topic_name} as #{queue.name}") block.call(queue) end end end