class Fluent::Plugin::GcloudPubSubOutput

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_FORMATTER_TYPE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 53
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super
  placeholder_validate!(:topic, @topic)
  @formatter = formatter_create
  @compress = if @compression == 'gzip'
                method(:gzip_compress)
              else
                method(:no_compress)
              end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 70
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  attributes = {}
  @attribute_keys.each do |key|
    attributes[key] = record.delete(key)
  end
  attributes.merge! @attribute_key_values
  [@compress.call(@formatter.format(tag, time, record)), attributes].to_msgpack
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 80
def formatted_to_msgpack_binary?
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 84
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 65
def start
  super
  @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project, @endpoint, @timeout
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 88
def write(chunk)
  topic = extract_placeholders(@topic, chunk.metadata)

  messages = []
  size = 0

  chunk.msgpack_each do |msg, attr|
    msg = Fluent::GcloudPubSub::Message.new(msg, attr)
    if msg.bytesize > @max_message_size
      log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize
      next
    end
    if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size
      publish(topic, messages)
      messages = []
      size = 0
    end
    messages << msg
    size += msg.bytesize
  end

  if messages.length > 0
    publish(topic, messages)
  end
rescue Fluent::GcloudPubSub::RetryableError => ex
  log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s
  raise ex
rescue => ex
  log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
  log.error_backtrace
  raise ex
end

Private Instance Methods

gzip_compress(message) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 128
def gzip_compress(message)
  compress message
end
no_compress(message) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 132
def no_compress(message)
  message
end
publish(topic, messages) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 123
def publish(topic, messages)
  log.debug "send message topic:#{topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}"
  @publisher.publish(topic, messages)
end