class Fluent::GcloudPubSub::Publisher

Public Class Methods

new(project, key, autocreate_topic, dest_project, endpoint, timeout) click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 27
def initialize(project, key, autocreate_topic, dest_project, endpoint, timeout)
  @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key, endpoint: endpoint, timeout: timeout
  @autocreate_topic = autocreate_topic
  @dest_project = dest_project
  @topics = {}
end

Public Instance Methods

publish(topic_name, messages) click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 53
def publish(topic_name, messages)
  topic(topic_name).publish do |batch|
    messages.each do |m|
      batch.publish m.message, m.attributes
    end
  end
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
  raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}"
end
topic(topic_name) click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 34
def topic(topic_name)
  return @topics[topic_name] if @topics.has_key? topic_name

  if @dest_project.nil?
    client = @pubsub.topic topic_name
    if client.nil? && @autocreate_topic
      client = @pubsub.create_topic topic_name
    end
  else
    client = @pubsub.topic topic_name, project: @dest_project
  end
  if client.nil?
    raise Error.new "topic:#{topic_name} does not exist."
  end

  @topics[topic_name] = client
  client
end