class PubSubModelSync::ServiceRabbit

Constants

LISTEN_SETTINGS
PUBLISH_SETTINGS
QUEUE_SETTINGS

Attributes

channels[RW]

@!attribute topic_names (Array): ['Topic 1', 'Topic 2'] @!attribute channels (Array): [Channel1] @!attribute exchanges (Hash<key: Exchange>): {topic_name: Exchange1}

exchanges[RW]

@!attribute topic_names (Array): ['Topic 1', 'Topic 2'] @!attribute channels (Array): [Channel1] @!attribute exchanges (Hash<key: Exchange>): {topic_name: Exchange1}

service[RW]

@!attribute topic_names (Array): ['Topic 1', 'Topic 2'] @!attribute channels (Array): [Channel1] @!attribute exchanges (Hash<key: Exchange>): {topic_name: Exchange1}

topic_names[RW]

@!attribute topic_names (Array): ['Topic 1', 'Topic 2'] @!attribute channels (Array): [Channel1] @!attribute exchanges (Hash<key: Exchange>): {topic_name: Exchange1}

Public Class Methods

new() click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 19
def initialize
  @service = Bunny.new(*config.bunny_connection)
  @topic_names = Array(config.topic_name || 'model_sync')
  @channels = []
  @exchanges = {}
end

Public Instance Methods

listen_messages() click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 26
def listen_messages
  log('Listener starting...')
  subscribe_to_queues { |queue| queue.subscribe(LISTEN_SETTINGS, &method(:process_message)) }
  log('Listener started')
  loop { sleep 5 }
rescue PubSubModelSync::Runner::ShutDown
  log('Listener stopped')
rescue => e
  log("Error listening message: #{[e.message, e.backtrace]}", :error)
end
publish(payload) click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 37
def publish(payload)
  qty_retry ||= 0
  deliver_data(payload)
rescue => e
  if e.is_a?(Timeout::Error) && (qty_retry += 1) <= 2
    log("Error publishing (retrying....): #{e.message}", :error)
    initialize
    retry
  end
  raise
end
stop() click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 49
def stop
  log('Listener stopping...')
  channels.each(&:close)
  service.close
end

Private Instance Methods

deliver_data(payload) click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 92
def deliver_data(payload)
  message_topics = Array(payload.headers[:topic_name] || config.default_topic_name)
  message_topics.each do |topic_name|
    subscribe_to_exchange(topic_name) do |_channel, exchange|
      exchange.publish(encode_payload(payload), message_settings(payload))
    end
  end
end
message_settings(payload) click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 57
def message_settings(payload)
  {
    routing_key: payload.headers[:ordering_key],
    type: SERVICE_KEY,
    persistent: true
  }.merge(PUBLISH_SETTINGS)
end
process_message(_delivery_info, meta_info, payload) click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 65
def process_message(_delivery_info, meta_info, payload)
  super(payload) if meta_info[:type] == SERVICE_KEY
end
subscribe_to_exchange(topic_name, &block) click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 82
def subscribe_to_exchange(topic_name, &block)
  topic_name = topic_name.to_s
  exchanges[topic_name] ||= begin
    service.start
    channel = service.create_channel
    channel.fanout(topic_name)
  end
  block.call(channel, exchanges[topic_name])
end
subscribe_to_queues(&block) click to toggle source
# File lib/pub_sub_model_sync/service_rabbit.rb, line 69
def subscribe_to_queues(&block)
  @channels = []
  topic_names.each do |topic_name|
    subscribe_to_exchange(topic_name) do |channel, exchange|
      queue = channel.queue(config.subscription_key, QUEUE_SETTINGS)
      queue.bind(exchange)
      @channels << channel
      log("Subscribed to topic: #{topic_name} as #{queue.name}")
      block.call(queue)
    end
  end
end