class Fluent::GcloudPubSub::Subscriber
Public Class Methods
new(project, key, topic_name, subscription_name)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 65 def initialize(project, key, topic_name, subscription_name) pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key if topic_name.nil? @client = pubsub.subscription subscription_name else topic = pubsub.topic topic_name @client = topic.subscription subscription_name end raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil? end
Public Instance Methods
acknowledge(messages)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 82 def acknowledge(messages) @client.acknowledge messages rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google acknowledge api returns error:#{ex.class.to_s} message:#{ex.to_s}" end
pull(immediate, max)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 76 def pull(immediate, max) @client.pull immediate: immediate, max: max rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google pull api returns error:#{ex.class.to_s} message:#{ex.to_s}" end