class PubSubModelSync::ServiceGoogle
Constants
- LISTEN_SETTINGS
- PUBLISH_SETTINGS
- SUBSCRIPTION_SETTINGS
- TOPIC_SETTINGS
Attributes
publish_topics[RW]
@!attribute topics (Hash): { key: Topic1, … } @!attribute publish_topics
(Hash): { key: Topic1, … }
service[RW]
@!attribute topics (Hash): { key: Topic1, … } @!attribute publish_topics
(Hash): { key: Topic1, … }
subscribers[RW]
@!attribute topics (Hash): { key: Topic1, … } @!attribute publish_topics
(Hash): { key: Topic1, … }
topics[RW]
@!attribute topics (Hash): { key: Topic1, … } @!attribute publish_topics
(Hash): { key: Topic1, … }
Public Class Methods
new()
click to toggle source
# File lib/pub_sub_model_sync/service_google.rb, line 19 def initialize @service = Google::Cloud::Pubsub.new(project: config.project, credentials: config.credentials) Array(config.topic_name || 'model_sync').each(&method(:init_topic)) end
Public Instance Methods
listen_messages()
click to toggle source
# File lib/pub_sub_model_sync/service_google.rb, line 25 def listen_messages log('Listener starting...') @subscribers = subscribe_to_topics log('Listener started') sleep subscribers.each { |subscriber| subscriber.stop.wait! } log('Listener stopped') end
publish(payload)
click to toggle source
@param payload (PubSubModelSync::Payload
)
# File lib/pub_sub_model_sync/service_google.rb, line 35 def publish(payload) p_topic_names = Array(payload.headers[:topic_name] || config.default_topic_name) message_topics = p_topic_names.map(&method(:find_topic)) message_topics.each do |topic| topic.publish_async(encode_payload(payload), message_headers(payload)) do |res| raise StandardError, 'Failed to publish the message.' unless res.succeeded? end end end
stop()
click to toggle source
# File lib/pub_sub_model_sync/service_google.rb, line 45 def stop log('Listener stopping...') (subscribers || []).each(&:stop!) end
Private Instance Methods
find_topic(topic_name)
click to toggle source
# File lib/pub_sub_model_sync/service_google.rb, line 52 def find_topic(topic_name) topic_name = topic_name.to_s return topics.values.first unless topic_name.present? topics[topic_name] || publish_topics[topic_name] || init_topic(topic_name, only_publish: true) end
init_topic(topic_name, only_publish: false)
click to toggle source
@param only_publish (Boolean): if false is used to listen and publish messages @return (Topic): returns created or loaded topic
# File lib/pub_sub_model_sync/service_google.rb, line 61 def init_topic(topic_name, only_publish: false) topic_name = topic_name.to_s @topics ||= {} @publish_topics ||= {} topic = service.topic(topic_name) || service.create_topic(topic_name, TOPIC_SETTINGS) topic.enable_message_ordering! publish_topics[topic_name] = topic if only_publish topics[topic_name] = topic unless only_publish topic end
message_headers(payload)
click to toggle source
@param payload (PubSubModelSync::Payload
)
# File lib/pub_sub_model_sync/service_google.rb, line 73 def message_headers(payload) { SERVICE_KEY => true, ordering_key: payload.headers[:ordering_key] }.merge(PUBLISH_SETTINGS) end
process_message(received_message)
click to toggle source
Calls superclass method
PubSubModelSync::ServiceBase#process_message
# File lib/pub_sub_model_sync/service_google.rb, line 92 def process_message(received_message) message = received_message.message super(message.data) if message.attributes[SERVICE_KEY] ensure received_message.acknowledge! end
subscribe_to_topics()
click to toggle source
@return [Subscriber]
# File lib/pub_sub_model_sync/service_google.rb, line 81 def subscribe_to_topics topics.map do |key, topic| subs_name = "#{config.subscription_key}_#{key}" subscription = topic.subscription(subs_name) || topic.subscribe(subs_name, SUBSCRIPTION_SETTINGS) subscriber = subscription.listen(LISTEN_SETTINGS, &method(:process_message)) subscriber.start log("Subscribed to topic: #{topic.name} as: #{subs_name}") subscriber end end