class Fluent::Plugin::GcloudPubSubOutput
Constants
- DEFAULT_BUFFER_TYPE
- DEFAULT_FORMATTER_TYPE
Public Instance Methods
configure(conf)
click to toggle source
rubocop:disable Metrics/MethodLength
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 51 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter) super placeholder_validate!(:topic, @topic) @formatter = formatter_create if @compress_batches && !@attribute_keys.empty? # The attribute_keys option is implemented by extracting keys from the # record and setting them on the Pub/Sub message. # This is not possible in compressed mode, because we're sending just a # single Pub/Sub message that comprises many records, therefore the # attribute keys would clash. raise Fluent::ConfigError, ":attribute_keys cannot be used when compression is enabled" end @messages_published = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_per_batch") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_published_per_batch", "Number of records published to Pub/Sub per buffer flush", {}, [1, 10, 50, 100, 250, 500, 1000], ) end @bytes_published = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_bytes") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_published_bytes", "Total size in bytes of the records published to Pub/Sub", {}, [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000], ) end @compression_enabled = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_compression_enabled") do ::Prometheus::Client.registry.gauge( :"#{@metric_prefix}_compression_enabled", "Whether compression/batching is enabled", {}, ) end @compression_enabled.set(common_labels, @compress_batches ? 1 : 0) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 103 def format(tag, time, record) record = inject_values_to_record(tag, time, record) attributes = {} @attribute_keys.each do |key| attributes[key] = record.delete(key) end [@formatter.format(tag, time, record), attributes].to_msgpack end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 112 def formatted_to_msgpack_binary? true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 116 def multi_workers_ready? true end
start()
click to toggle source
rubocop:enable Metrics/MethodLength
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 98 def start super @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @metric_prefix end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 120 def write(chunk) topic = extract_placeholders(@topic, chunk.metadata) messages = [] size = 0 chunk.msgpack_each do |msg, attr| msg = Fluent::GcloudPubSub::Message.new(msg, attr) if msg.bytesize > @max_message_size log.warn "Drop a message because its size exceeds `max_message_size`", size: msg.bytesize next end if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size publish(topic, messages) messages = [] size = 0 end messages << msg size += msg.bytesize end publish(topic, messages) unless messages.empty? rescue Fluent::GcloudPubSub::RetryableError => e log.warn "Retryable error occurs. Fluentd will retry.", error_message: e.to_s, error_class: e.class.to_s raise e rescue StandardError => e log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s log.error_backtrace raise e end
Private Instance Methods
common_labels()
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 163 def common_labels { topic: @topic } end
publish(topic, messages)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 153 def publish(topic, messages) size = messages.map(&:bytesize).inject(:+) log.debug "send message topic:#{topic} length:#{messages.length} size:#{size}" @messages_published.observe(common_labels, messages.length) @bytes_published.observe(common_labels, size) @publisher.publish(topic, messages, @compress_batches) end