class Metacrunch::Job

Attributes

dsl[R]

Public Class Methods

define(file_content = nil, &block) click to toggle source
# File lib/metacrunch/job.rb, line 8
def define(file_content = nil, &block)
  self.new(file_content, &block)
end
new(file_content = nil, &block) click to toggle source
# File lib/metacrunch/job.rb, line 13
def initialize(file_content = nil, &block)
  @dsl = Dsl.new(self)

  @deprecator = ActiveSupport::Deprecation.new("5.0.0", "metacrunch")

  if file_content
    @dsl.instance_eval(file_content, "Check your metacrunch Job at Line")
  elsif block_given?
    @dsl.instance_eval(&block)
  end
end

Public Instance Methods

add_transformation(callable, buffer_size: nil, buffer: nil) click to toggle source
# File lib/metacrunch/job.rb, line 65
def add_transformation(callable, buffer_size: nil, buffer: nil)
  ensure_callable!(callable)

  if buffer_size && buffer_size.is_a?(Numeric)
    @deprecator.deprecation_warning(:buffer_size, :buffer)
    buffer = buffer_size
  end

  if buffer
    transformations << Metacrunch::Job::Buffer.new(buffer)
  end

  transformations << callable
end
destination() click to toggle source
# File lib/metacrunch/job.rb, line 34
def destination
  @destination
end
destination=(destination) click to toggle source
# File lib/metacrunch/job.rb, line 38
def destination=(destination)
  ensure_destination!(destination)
  @destination = destination
end
post_process() click to toggle source
# File lib/metacrunch/job.rb, line 52
def post_process
  @post_process
end
post_process=(callable) click to toggle source
# File lib/metacrunch/job.rb, line 56
def post_process=(callable)
  ensure_callable!(callable)
  @post_process = callable
end
pre_process() click to toggle source
# File lib/metacrunch/job.rb, line 43
def pre_process
  @pre_process
end
pre_process=(callable) click to toggle source
# File lib/metacrunch/job.rb, line 47
def pre_process=(callable)
  ensure_callable!(callable)
  @pre_process = callable
end
run() click to toggle source
# File lib/metacrunch/job.rb, line 80
def run
  run_pre_process

  if source
    # Run transformation for each data object available in source
    source.each do |data|
      data = run_transformations(transformations, data)
      write_destination(data)
    end

    # Run all transformations a last time to flush existing buffers
    data = run_transformations(transformations, nil, flush_buffers: true)
    write_destination(data)

    # Close destination
    destination.close if destination
  end

  run_post_process

  self
end
source() click to toggle source
# File lib/metacrunch/job.rb, line 25
def source
  @source
end
source=(source) click to toggle source
# File lib/metacrunch/job.rb, line 29
def source=(source)
  ensure_source!(source)
  @source = source
end
transformations() click to toggle source
# File lib/metacrunch/job.rb, line 61
def transformations
  @transformations ||= []
end

Private Instance Methods

ensure_callable!(object) click to toggle source
# File lib/metacrunch/job.rb, line 114
def ensure_callable!(object)
  raise ArgumentError, "#{object} doesn't respond to #call." unless object.respond_to?(:call)
end
ensure_destination!(object) click to toggle source
# File lib/metacrunch/job.rb, line 109
def ensure_destination!(object)
  raise ArgumentError, "#{object} doesn't respond to #write." unless object.respond_to?(:write)
  raise ArgumentError, "#{object} doesn't respond to #close." unless object.respond_to?(:close)
end
ensure_source!(object) click to toggle source
# File lib/metacrunch/job.rb, line 105
def ensure_source!(object)
  raise ArgumentError, "#{object} doesn't respond to #each." unless object.respond_to?(:each)
end
run_post_process() click to toggle source
# File lib/metacrunch/job.rb, line 122
def run_post_process
  post_process.call if post_process
end
run_pre_process() click to toggle source
# File lib/metacrunch/job.rb, line 118
def run_pre_process
  pre_process.call if pre_process
end
run_transformations(transformations, data, flush_buffers: false) click to toggle source
# File lib/metacrunch/job.rb, line 126
def run_transformations(transformations, data, flush_buffers: false)
  transformations.each.with_index do |transformation, i|
    if transformation.is_a?(Buffer)
      data = transformation.buffer(data) if data
      data = transformation.flush if flush_buffers
    else
      data = transformation.call(data) if data

      if data&.is_a?(Enumerator)
        data.each { |d| run_transformations(transformations.slice(i+1..-1), d, flush_buffers: flush_buffers) }
        data = nil
      end
    end
  end

  data
end
write_destination(data) click to toggle source
# File lib/metacrunch/job.rb, line 144
def write_destination(data)
  destination.write(data) if destination && data.present?
end