class Fluent::GcloudPubSub::Publisher
Public Class Methods
new(project, key, autocreate_topic, dest_project, endpoint, timeout)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 27 def initialize(project, key, autocreate_topic, dest_project, endpoint, timeout) @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key, endpoint: endpoint, timeout: timeout @autocreate_topic = autocreate_topic @dest_project = dest_project @topics = {} end
Public Instance Methods
publish(topic_name, messages)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 53 def publish(topic_name, messages) topic(topic_name).publish do |batch| messages.each do |m| batch.publish m.message, m.attributes end end rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}" end
topic(topic_name)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 34 def topic(topic_name) return @topics[topic_name] if @topics.has_key? topic_name if @dest_project.nil? client = @pubsub.topic topic_name if client.nil? && @autocreate_topic client = @pubsub.create_topic topic_name end else client = @pubsub.topic topic_name, project: @dest_project end if client.nil? raise Error.new "topic:#{topic_name} does not exist." end @topics[topic_name] = client client end