class Fluent::Plugin::S3Output::ArrowCompressor

Constants

SUPPORTED_COMPRESSION

Public Instance Methods

compress(chunk, tmp) click to toggle source
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 53
def compress(chunk, tmp)
  buffer = Arrow::Buffer.new(chunk.read)
  stream = Arrow::BufferInputStream.new(buffer)
  table = Arrow::JSONReader.new(stream, @options)

  table.read.save(tmp,
    format: @arrow.format,
    chunk_size: @arrow.chunk_size,
    compression: @arrow.compression,
  )
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 33
def configure(conf)
  super

  if INVALID_COMBINATIONS[@arrow.format]&.include? @arrow.compression
    raise Fluent::ConfigError, "#{@arrow.format} unsupported with #{@arrow.format}"
  end

  @options = Arrow::JSONReadOptions.new
  @options.schema = resolve_schema
  @options.unexpected_field_behavior = :ignore
end
content_type() click to toggle source
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 49
def content_type
  'application/x-apache-arrow-file'.freeze
end
ext() click to toggle source
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 45
def ext
  @arrow.format.freeze
end

Private Instance Methods

resolve_schema() click to toggle source
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 67
def resolve_schema
  case @arrow.schema_from
  when :static
    Arrow::Schema.new(@arrow.static.schema)
  when :glue
    glue_schema = FluentPluginS3Arrow::Schemas::AWSGlue.new(@arrow.glue.table, {
      catalog_id: @arrow.glue.catalog,
      database_name: @arrow.glue.database,
    })
    glue_schema.to_arrow
  end
end