class PubSubModelSync::ServiceGoogle

Constants

LISTEN_SETTINGS
PUBLISH_SETTINGS
SUBSCRIPTION_SETTINGS
TOPIC_SETTINGS

Attributes

publish_topics[RW]

@!attribute topics (Hash): { key: Topic1, … } @!attribute publish_topics (Hash): { key: Topic1, … }

service[RW]

@!attribute topics (Hash): { key: Topic1, … } @!attribute publish_topics (Hash): { key: Topic1, … }

subscribers[RW]

@!attribute topics (Hash): { key: Topic1, … } @!attribute publish_topics (Hash): { key: Topic1, … }

topics[RW]

@!attribute topics (Hash): { key: Topic1, … } @!attribute publish_topics (Hash): { key: Topic1, … }

Public Class Methods

new() click to toggle source
# File lib/pub_sub_model_sync/service_google.rb, line 19
def initialize
  @service = Google::Cloud::Pubsub.new(project: config.project,
                                       credentials: config.credentials)
  Array(config.topic_name || 'model_sync').each(&method(:init_topic))
end

Public Instance Methods

listen_messages() click to toggle source
# File lib/pub_sub_model_sync/service_google.rb, line 25
def listen_messages
  log('Listener starting...')
  @subscribers = subscribe_to_topics
  log('Listener started')
  sleep
  subscribers.each { |subscriber| subscriber.stop.wait! }
  log('Listener stopped')
end
publish(payload) click to toggle source

@param payload (PubSubModelSync::Payload)

# File lib/pub_sub_model_sync/service_google.rb, line 35
def publish(payload)
  p_topic_names = Array(payload.headers[:topic_name] || config.default_topic_name)
  message_topics = p_topic_names.map(&method(:find_topic))
  message_topics.each do |topic|
    topic.publish_async(encode_payload(payload), message_headers(payload)) do |res|
      raise StandardError, 'Failed to publish the message.' unless res.succeeded?
    end
  end
end
stop() click to toggle source
# File lib/pub_sub_model_sync/service_google.rb, line 45
def stop
  log('Listener stopping...')
  (subscribers || []).each(&:stop!)
end

Private Instance Methods

find_topic(topic_name) click to toggle source
# File lib/pub_sub_model_sync/service_google.rb, line 52
def find_topic(topic_name)
  topic_name = topic_name.to_s
  return topics.values.first unless topic_name.present?

  topics[topic_name] || publish_topics[topic_name] || init_topic(topic_name, only_publish: true)
end
init_topic(topic_name, only_publish: false) click to toggle source

@param only_publish (Boolean): if false is used to listen and publish messages @return (Topic): returns created or loaded topic

# File lib/pub_sub_model_sync/service_google.rb, line 61
def init_topic(topic_name, only_publish: false)
  topic_name = topic_name.to_s
  @topics ||= {}
  @publish_topics ||= {}
  topic = service.topic(topic_name) || service.create_topic(topic_name, TOPIC_SETTINGS)
  topic.enable_message_ordering!
  publish_topics[topic_name] = topic if only_publish
  topics[topic_name] = topic unless only_publish
  topic
end
message_headers(payload) click to toggle source

@param payload (PubSubModelSync::Payload)

# File lib/pub_sub_model_sync/service_google.rb, line 73
def message_headers(payload)
  {
    SERVICE_KEY => true,
    ordering_key: payload.headers[:ordering_key]
  }.merge(PUBLISH_SETTINGS)
end
process_message(received_message) click to toggle source
# File lib/pub_sub_model_sync/service_google.rb, line 92
def process_message(received_message)
  message = received_message.message
  super(message.data) if message.attributes[SERVICE_KEY]
ensure
  received_message.acknowledge!
end
subscribe_to_topics() click to toggle source

@return [Subscriber]

# File lib/pub_sub_model_sync/service_google.rb, line 81
def subscribe_to_topics
  topics.map do |key, topic|
    subs_name = "#{config.subscription_key}_#{key}"
    subscription = topic.subscription(subs_name) || topic.subscribe(subs_name, SUBSCRIPTION_SETTINGS)
    subscriber = subscription.listen(LISTEN_SETTINGS, &method(:process_message))
    subscriber.start
    log("Subscribed to topic: #{topic.name} as: #{subs_name}")
    subscriber
  end
end