class Magellan::Gcs::Proxy::PubsubSubscription

Attributes

delay[R]
name[R]

Public Class Methods

new(name, delay: 1) click to toggle source
# File lib/magellan/gcs/proxy/pubsub_subscription.rb, line 9
def initialize(name, delay: 1)
  @name = name
  @delay = delay
end

Public Instance Methods

listen() { |msg| ... } click to toggle source
# File lib/magellan/gcs/proxy/pubsub_subscription.rb, line 14
def listen
  loop do
    if msg = wait_for_message
      yield msg
    else
      sleep delay
    end
  end
end
pull_req() click to toggle source
# File lib/magellan/gcs/proxy/pubsub_subscription.rb, line 24
def pull_req
  @pull_req ||= Google::Apis::PubsubV1::PullRequest.new(max_messages: 1, return_immediately: true)
end
wait_for_message() click to toggle source
# File lib/magellan/gcs/proxy/pubsub_subscription.rb, line 28
def wait_for_message
  # #<Google::Apis::PubsubV1::ReceivedMessage:0x007fdc440b58d8
  #   @ack_id="...",
  #   @message=#<Google::Apis::PubsubV1::Message:0x007fdc440be140
  #     @attributes={"download_files"=>"[\"gs://bucket1/path/to/file1\"]"},
  #     @message_id="50414480536440",
  #     @publish_time="2016-12-17T08:08:35.599Z">>
  res = GCP.pubsub.pull_subscription(name, pull_req)
  msg = (res.received_messages || []).first
  msg.nil? ? nil : MessageWrapper.new(self, msg)
end