class Fluent::Plugin::GcloudPubSubOutput
Constants
- DEFAULT_BUFFER_TYPE
- DEFAULT_FORMATTER_TYPE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 53 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter) super placeholder_validate!(:topic, @topic) @formatter = formatter_create @compress = if @compression == 'gzip' method(:gzip_compress) else method(:no_compress) end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 70 def format(tag, time, record) record = inject_values_to_record(tag, time, record) attributes = {} @attribute_keys.each do |key| attributes[key] = record.delete(key) end attributes.merge! @attribute_key_values [@compress.call(@formatter.format(tag, time, record)), attributes].to_msgpack end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 80 def formatted_to_msgpack_binary? true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 84 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 65 def start super @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project, @endpoint, @timeout end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 88 def write(chunk) topic = extract_placeholders(@topic, chunk.metadata) messages = [] size = 0 chunk.msgpack_each do |msg, attr| msg = Fluent::GcloudPubSub::Message.new(msg, attr) if msg.bytesize > @max_message_size log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize next end if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size publish(topic, messages) messages = [] size = 0 end messages << msg size += msg.bytesize end if messages.length > 0 publish(topic, messages) end rescue Fluent::GcloudPubSub::RetryableError => ex log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s raise ex rescue => ex log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s log.error_backtrace raise ex end
Private Instance Methods
gzip_compress(message)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 128 def gzip_compress(message) compress message end
no_compress(message)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 132 def no_compress(message) message end
publish(topic, messages)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 123 def publish(topic, messages) log.debug "send message topic:#{topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}" @publisher.publish(topic, messages) end