class Hoze::PubSubSource
Attributes
max_tries[R]
subscription[R]
topic[R]
Public Class Methods
new(configuration)
click to toggle source
# File lib/hoze/pubsub/source.rb, line 11 def initialize configuration @engine = Google::Cloud::Pubsub.new( project: configuration.connector.project, credentials: configuration.connector.path_to_key ) @channel = configuration.channel @key = configuration.key @topic = ensure_topic @subscription = ensure_subscription @max_tries = configuration.max_tries end
Public Instance Methods
listen() { |msg| ... }
click to toggle source
# File lib/hoze/pubsub/source.rb, line 27 def listen &block raise HozeSourceError.new("Tryng to listen source but no key configured") if @key.nil? subscriber = @subscription.listen do |received_message| begin msg = Hoze::PubSubMessage.new(received_message, self) yield msg rescue Exception => e puts "Exception: #{e.message}" raise # always reraise end end subscriber.start begin puts "Starts listening" while true do sleep 10 end rescue Interrupt puts "Interrupted, cleaning ok" ensure subscriber.stop.wait! end end
push(payload, metadata)
click to toggle source
# File lib/hoze/pubsub/source.rb, line 51 def push payload, metadata @topic.publish_async payload, metadata end
Private Instance Methods
ensure_subscription()
click to toggle source
# File lib/hoze/pubsub/source.rb, line 62 def ensure_subscription return nil if @key.nil? subname = [@channel,@key].join('+') @topic.subscription(subname) || @topic.subscribe(subname) end
ensure_topic()
click to toggle source
# File lib/hoze/pubsub/source.rb, line 57 def ensure_topic raise HozeSourceError.new("No topic configured for source") if @channel.nil? @engine.topic(@channel) || @engine.create_topic(@channel) end