class Fluent::Plugin::GcloudPubSubOutput

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_FORMATTER_TYPE

Public Instance Methods

configure(conf) click to toggle source

rubocop:disable Metrics/MethodLength

Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 51
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super
  placeholder_validate!(:topic, @topic)
  @formatter = formatter_create

  if @compress_batches && !@attribute_keys.empty?
    # The attribute_keys option is implemented by extracting keys from the
    # record and setting them on the Pub/Sub message.
    # This is not possible in compressed mode, because we're sending just a
    # single Pub/Sub message that comprises many records, therefore the
    # attribute keys would clash.
    raise Fluent::ConfigError, ":attribute_keys cannot be used when compression is enabled"
  end

  @messages_published =
    Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_per_batch") do
      ::Prometheus::Client.registry.histogram(
        :"#{@metric_prefix}_messages_published_per_batch",
        "Number of records published to Pub/Sub per buffer flush",
        {},
        [1, 10, 50, 100, 250, 500, 1000],
      )
    end

  @bytes_published =
    Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_bytes") do
      ::Prometheus::Client.registry.histogram(
        :"#{@metric_prefix}_messages_published_bytes",
        "Total size in bytes of the records published to Pub/Sub",
        {},
        [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000],
      )
    end

  @compression_enabled =
    Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_compression_enabled") do
      ::Prometheus::Client.registry.gauge(
        :"#{@metric_prefix}_compression_enabled",
        "Whether compression/batching is enabled",
        {},
      )
    end
  @compression_enabled.set(common_labels, @compress_batches ? 1 : 0)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 103
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  attributes = {}
  @attribute_keys.each do |key|
    attributes[key] = record.delete(key)
  end
  [@formatter.format(tag, time, record), attributes].to_msgpack
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 112
def formatted_to_msgpack_binary?
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 116
def multi_workers_ready?
  true
end
start() click to toggle source

rubocop:enable Metrics/MethodLength

Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 98
def start
  super
  @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @metric_prefix
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 120
def write(chunk)
  topic = extract_placeholders(@topic, chunk.metadata)

  messages = []
  size = 0

  chunk.msgpack_each do |msg, attr|
    msg = Fluent::GcloudPubSub::Message.new(msg, attr)
    if msg.bytesize > @max_message_size
      log.warn "Drop a message because its size exceeds `max_message_size`", size: msg.bytesize
      next
    end
    if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size
      publish(topic, messages)
      messages = []
      size = 0
    end
    messages << msg
    size += msg.bytesize
  end

  publish(topic, messages) unless messages.empty?
rescue Fluent::GcloudPubSub::RetryableError => e
  log.warn "Retryable error occurs. Fluentd will retry.", error_message: e.to_s, error_class: e.class.to_s
  raise e
rescue StandardError => e
  log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s
  log.error_backtrace
  raise e
end

Private Instance Methods

common_labels() click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 163
def common_labels
  { topic: @topic }
end
publish(topic, messages) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 153
def publish(topic, messages)
  size = messages.map(&:bytesize).inject(:+)
  log.debug "send message topic:#{topic} length:#{messages.length} size:#{size}"

  @messages_published.observe(common_labels, messages.length)
  @bytes_published.observe(common_labels, size)

  @publisher.publish(topic, messages, @compress_batches)
end