class EXEL::Processors::SplitProcessor

Implements the split instruction. Used to concurrently process a large file by splitting it into small chunks to be separately processed.

Supported Context Options

Constants

DEFAULT_CHUNK_SIZE

Number of lines to include in each chunk. Can be overridden by setting :chunk_size in the context

Attributes

block[RW]
file_name[RW]

Public Class Methods

new(context) click to toggle source

The context must contain a CSV File object in context

# File lib/exel/processors/split_processor.rb, line 27
def initialize(context)
  @buffer = []
  @tempfile_count = 0
  @context = context
  @file = context[:resource]
  @max_chunks = @context[:max_chunks] || Float::INFINITY
  @context[:delete_resource] = true if @context[:delete_resource].nil?
end

Public Instance Methods

generate_chunk(content) click to toggle source
# File lib/exel/processors/split_processor.rb, line 54
def generate_chunk(content)
  @tempfile_count += 1
  chunk = Tempfile.new([chunk_filename, '.csv'])
  chunk.write(content)
  chunk.rewind

  log_info "Generated chunk # #{@tempfile_count} for file #{filename(@file)} in #{chunk.path}"
  chunk
end
process(callback) click to toggle source
# File lib/exel/processors/split_processor.rb, line 36
def process(callback)
  process_file(callback)
  finish(callback)
ensure
  @file.close
  File.delete(@file.path) if @context[:delete_resource]
end
process_line(line, callback) click to toggle source
# File lib/exel/processors/split_processor.rb, line 44
def process_line(line, callback)
  if line == :eof
    flush_buffer(callback)
  else
    @buffer << CSV.generate_line(line)

    flush_buffer(callback) if buffer_full?
  end
end

Private Instance Methods

buffer_full?() click to toggle source
# File lib/exel/processors/split_processor.rb, line 87
def buffer_full?
  @buffer.size == chunk_size
end
chunk_filename() click to toggle source
# File lib/exel/processors/split_processor.rb, line 95
def chunk_filename
  "#{filename(@file)}_#{@tempfile_count}_"
end
chunk_size() click to toggle source
# File lib/exel/processors/split_processor.rb, line 91
def chunk_size
  @context[:chunk_size] || DEFAULT_CHUNK_SIZE
end
filename(file) click to toggle source
# File lib/exel/processors/split_processor.rb, line 99
def filename(file)
  file_name_with_extension = file.path.split('/').last
  file_name_with_extension.split('.').first
end
finish(callback) click to toggle source
# File lib/exel/processors/split_processor.rb, line 104
def finish(callback)
  process_line(:eof, callback)
end
flush_buffer(callback) click to toggle source
# File lib/exel/processors/split_processor.rb, line 78
def flush_buffer(callback)
  unless @buffer.empty?
    file = generate_chunk(@buffer.join(''))
    callback.run(@context.merge!(resource: file))
  end

  @buffer = []
end
process_file(callback) click to toggle source
# File lib/exel/processors/split_processor.rb, line 66
def process_file(callback)
  csv_options = @context[:csv_options] || {col_sep: ','}

  CSV.foreach(@file.path, **csv_options) do |line|
    process_line(line, callback)

    break if @tempfile_count == @max_chunks
  end
rescue CSV::MalformedCSVError => e
  log_error "CSV::MalformedCSVError => #{e.message}"
end