class Fluent::Plugin::S3Output::ArrowCompressor
Constants
- SUPPORTED_COMPRESSION
Public Instance Methods
compress(chunk, tmp)
click to toggle source
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 53 def compress(chunk, tmp) buffer = Arrow::Buffer.new(chunk.read) stream = Arrow::BufferInputStream.new(buffer) table = Arrow::JSONReader.new(stream, @options) table.read.save(tmp, format: @arrow.format, chunk_size: @arrow.chunk_size, compression: @arrow.compression, ) end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 33 def configure(conf) super if INVALID_COMBINATIONS[@arrow.format]&.include? @arrow.compression raise Fluent::ConfigError, "#{@arrow.format} unsupported with #{@arrow.format}" end @options = Arrow::JSONReadOptions.new @options.schema = resolve_schema @options.unexpected_field_behavior = :ignore end
content_type()
click to toggle source
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 49 def content_type 'application/x-apache-arrow-file'.freeze end
ext()
click to toggle source
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 45 def ext @arrow.format.freeze end
Private Instance Methods
resolve_schema()
click to toggle source
# File lib/fluent/plugin/s3_compressor_arrow.rb, line 67 def resolve_schema case @arrow.schema_from when :static Arrow::Schema.new(@arrow.static.schema) when :glue glue_schema = FluentPluginS3Arrow::Schemas::AWSGlue.new(@arrow.glue.table, { catalog_id: @arrow.glue.catalog, database_name: @arrow.glue.database, }) glue_schema.to_arrow end end