class EXEL::Processors::SplitProcessor
Implements the split
instruction. Used to concurrently process a large file by splitting it into small chunks to be separately processed.
Supported Context
Options¶ ↑
-
:delete_resource
Defaults to true, can be set to false to preserve the original resource. Otherwise, it will be deleted when splitting is complete -
:chunk_size
Set to specify the number of lines that each chunk should contain -
:max_chunks
Set to specify the maximum number of chunks that should be processed. The resource will not be consumed beyond this limit.
Constants
- DEFAULT_CHUNK_SIZE
Number of lines to include in each chunk. Can be overridden by setting :chunk_size in the context
Attributes
block[RW]
file_name[RW]
Public Class Methods
new(context)
click to toggle source
The context must contain a CSV File object in context
# File lib/exel/processors/split_processor.rb, line 27 def initialize(context) @buffer = [] @tempfile_count = 0 @context = context @file = context[:resource] @max_chunks = @context[:max_chunks] || Float::INFINITY @context[:delete_resource] = true if @context[:delete_resource].nil? end
Public Instance Methods
generate_chunk(content)
click to toggle source
# File lib/exel/processors/split_processor.rb, line 54 def generate_chunk(content) @tempfile_count += 1 chunk = Tempfile.new([chunk_filename, '.csv']) chunk.write(content) chunk.rewind log_info "Generated chunk # #{@tempfile_count} for file #{filename(@file)} in #{chunk.path}" chunk end
process(callback)
click to toggle source
# File lib/exel/processors/split_processor.rb, line 36 def process(callback) process_file(callback) finish(callback) ensure @file.close File.delete(@file.path) if @context[:delete_resource] end
process_line(line, callback)
click to toggle source
# File lib/exel/processors/split_processor.rb, line 44 def process_line(line, callback) if line == :eof flush_buffer(callback) else @buffer << CSV.generate_line(line) flush_buffer(callback) if buffer_full? end end
Private Instance Methods
buffer_full?()
click to toggle source
# File lib/exel/processors/split_processor.rb, line 87 def buffer_full? @buffer.size == chunk_size end
chunk_filename()
click to toggle source
# File lib/exel/processors/split_processor.rb, line 95 def chunk_filename "#{filename(@file)}_#{@tempfile_count}_" end
chunk_size()
click to toggle source
# File lib/exel/processors/split_processor.rb, line 91 def chunk_size @context[:chunk_size] || DEFAULT_CHUNK_SIZE end
filename(file)
click to toggle source
# File lib/exel/processors/split_processor.rb, line 99 def filename(file) file_name_with_extension = file.path.split('/').last file_name_with_extension.split('.').first end
finish(callback)
click to toggle source
# File lib/exel/processors/split_processor.rb, line 104 def finish(callback) process_line(:eof, callback) end
flush_buffer(callback)
click to toggle source
# File lib/exel/processors/split_processor.rb, line 78 def flush_buffer(callback) unless @buffer.empty? file = generate_chunk(@buffer.join('')) callback.run(@context.merge!(resource: file)) end @buffer = [] end
process_file(callback)
click to toggle source
# File lib/exel/processors/split_processor.rb, line 66 def process_file(callback) csv_options = @context[:csv_options] || {col_sep: ','} CSV.foreach(@file.path, **csv_options) do |line| process_line(line, callback) break if @tempfile_count == @max_chunks end rescue CSV::MalformedCSVError => e log_error "CSV::MalformedCSVError => #{e.message}" end