class Spark::Command::Base

Spark::Command::Base

Parent for all commands (Map, FlatMap, Sort, …)

Constants

DEFAULT_VARIABLE_OPTIONS

Attributes

__objects__[RW]

Bound objects

Public Class Methods

error(message) click to toggle source
# File lib/spark/command/base.rb, line 23
def self.error(message)
  raise Spark::CommandError, message
end
init_settings() click to toggle source

Init empty settings

# File lib/spark/command/base.rb, line 52
def self.init_settings
  if !class_variable_defined?(:@@settings)
    struct = Struct.new(:variables)

    class_variable_set(:@@settings, struct.new)
    settings.variables = {}
  end
end
new(*args) click to toggle source
# File lib/spark/command/base.rb, line 13
def initialize(*args)
  settings.variables.each do |name, options|
    instance_variable_set("@#{name}", args.shift)
  end
end
settings() click to toggle source

Settings for command (variables)

# File lib/spark/command/base.rb, line 42
def self.settings
  init_settings
  class_variable_get(:@@settings)
end
variable(name, options={}) click to toggle source

New variable for command

Example:

class Map < Spark::Command::Base
  variable :map_function
end

command = Map.new(1)

command.instance_variables
# => [:@map_function]
command.instance_variable_get(:@map_function)
# => 1
# File lib/spark/command/base.rb, line 76
def self.variable(name, options={})
  if settings.variables.has_key?(name)
    error "Function #{name} already exist."
  end

  settings.variables[name] = DEFAULT_VARIABLE_OPTIONS.merge(options)
end

Public Instance Methods

before_run() click to toggle source

This method is called before every execution.

# File lib/spark/command/base.rb, line 141
def before_run
end
error(message) click to toggle source
# File lib/spark/command/base.rb, line 27
def error(message)
  self.class.error(message)
end
execute(iterator, split_index) click to toggle source

Execute command for data and split index

# File lib/spark/command/base.rb, line 89
def execute(iterator, split_index)
  # Implemented on Base but can be override
  before_run

  # Run has to be implemented on child
  if iterator.is_a?(Enumerator::Lazy) && respond_to?(:lazy_run)
    return lazy_run(iterator, split_index)
  end

  iterator = iterator.to_a
  run(iterator, split_index)
end
log(message=nil) click to toggle source
# File lib/spark/command/base.rb, line 31
def log(message=nil)
  $stdout.puts %{==> #{Time.now.strftime("%H:%M:%S")} [#{self.class.name}] #{message}}
  $stdout.flush
end
method_missing(method, *args, &block) click to toggle source
Calls superclass method
# File lib/spark/command/base.rb, line 150
def method_missing(method, *args, &block)
  if __objects__ && __objects__.has_key?(method)
    return __objects__[method]
  end

  super
end
prepare() click to toggle source

This is called before execution. Executing will be stopped if some command contains error (e.g. badly serialized lambda).

What is doing?

  • evaluate lambda

  • evaluate method

  • make new lambda

# File lib/spark/command/base.rb, line 114
def prepare
  return if prepared?

  to_function = settings.variables.select {|_, options| options[:function]}
  to_function.each do |name, options|
    name = "@#{name}"
    data = instance_variable_get(name)

    case data[:type]
    when 'proc'
      result = eval(data[:content])
    when 'symbol'
      result = lambda(&data[:content])
    when 'method'
      # Method must me added to instance not Class
      instance_eval(data[:content])
      # Method will be available as Proc
      result = lambda(&method(data[:name]))
    end

    instance_variable_set(name, result)
  end

  @prepared = true
end
prepared?() click to toggle source
# File lib/spark/command/base.rb, line 102
def prepared?
  !!@prepared
end
settings() click to toggle source
# File lib/spark/command/base.rb, line 47
def settings
  self.class.settings
end
to_s() click to toggle source
# File lib/spark/command/base.rb, line 19
def to_s
  self.class.name.split('::').last
end