class Hoze::PubSubSource

Attributes

max_tries[R]
subscription[R]
topic[R]

Public Class Methods

new(configuration) click to toggle source
# File lib/hoze/pubsub/source.rb, line 11
def initialize configuration
  @engine = Google::Cloud::Pubsub.new(
    project: configuration.connector.project,
    credentials: configuration.connector.path_to_key
  )

  @channel = configuration.channel
  @key = configuration.key

  @topic = ensure_topic
  @subscription = ensure_subscription

  @max_tries = configuration.max_tries
end

Public Instance Methods

listen() { |msg| ... } click to toggle source
# File lib/hoze/pubsub/source.rb, line 27
def listen &block
  raise HozeSourceError.new("Tryng to listen source but no key configured") if @key.nil?
  subscriber = @subscription.listen do |received_message|
    begin
      msg = Hoze::PubSubMessage.new(received_message, self)
      yield msg
    rescue Exception => e
      puts "Exception: #{e.message}"
      raise # always reraise
    end
  end
  subscriber.start
  begin
    puts "Starts listening"
    while true do
      sleep 10
    end
  rescue Interrupt
    puts "Interrupted, cleaning ok"
  ensure
    subscriber.stop.wait!
  end
end
push(payload, metadata) click to toggle source
# File lib/hoze/pubsub/source.rb, line 51
def push payload, metadata
  @topic.publish_async payload, metadata
end

Private Instance Methods

ensure_subscription() click to toggle source
# File lib/hoze/pubsub/source.rb, line 62
def ensure_subscription
  return nil if @key.nil?
  subname = [@channel,@key].join('+')
  @topic.subscription(subname) || @topic.subscribe(subname)
end
ensure_topic() click to toggle source
# File lib/hoze/pubsub/source.rb, line 57
def ensure_topic
  raise HozeSourceError.new("No topic configured for source") if @channel.nil?
  @engine.topic(@channel) || @engine.create_topic(@channel)
end