class Fluent::GcloudPubSub::MessageUnpacker

Public Class Methods

unpack(message) click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 153
def self.unpack(message)
  attributes = message.attributes
  algorithm = attributes["compression_algorithm"]

  case algorithm
  when nil
    # For an uncompressed message return the single line and attributes
    [[message.message.data.chomp, message.attributes]]
  when COMPRESSION_ALGORITHM_ZLIB
    # Return all of the lines in the message, with empty attributes
    Zlib::Inflate
      .inflate(message.message.data)
      .split(BATCHED_RECORD_SEPARATOR)
      .map { |line| [line, {}] }
  else
    raise ArgumentError, "unknown compression algorithm: '#{algorithm}'"
  end
end