class Spark::Command::Base
Parent for all commands (Map, FlatMap, Sort, …)
Constants
- DEFAULT_VARIABLE_OPTIONS
Attributes
Public Class Methods
error(message)
click to toggle source
# File lib/spark/command/base.rb, line 23 def self.error(message) raise Spark::CommandError, message end
init_settings()
click to toggle source
Init empty settings
# File lib/spark/command/base.rb, line 52 def self.init_settings if !class_variable_defined?(:@@settings) struct = Struct.new(:variables) class_variable_set(:@@settings, struct.new) settings.variables = {} end end
new(*args)
click to toggle source
# File lib/spark/command/base.rb, line 13 def initialize(*args) settings.variables.each do |name, options| instance_variable_set("@#{name}", args.shift) end end
settings()
click to toggle source
Settings for command (variables)
# File lib/spark/command/base.rb, line 42 def self.settings init_settings class_variable_get(:@@settings) end
variable(name, options={})
click to toggle source
New variable for command
Example:¶ ↑
class Map < Spark::Command::Base variable :map_function end command = Map.new(1) command.instance_variables # => [:@map_function] command.instance_variable_get(:@map_function) # => 1
# File lib/spark/command/base.rb, line 76 def self.variable(name, options={}) if settings.variables.has_key?(name) error "Function #{name} already exist." end settings.variables[name] = DEFAULT_VARIABLE_OPTIONS.merge(options) end
Public Instance Methods
before_run()
click to toggle source
This method is called before every execution.
# File lib/spark/command/base.rb, line 141 def before_run end
error(message)
click to toggle source
# File lib/spark/command/base.rb, line 27 def error(message) self.class.error(message) end
execute(iterator, split_index)
click to toggle source
Execute command for data and split index
# File lib/spark/command/base.rb, line 89 def execute(iterator, split_index) # Implemented on Base but can be override before_run # Run has to be implemented on child if iterator.is_a?(Enumerator::Lazy) && respond_to?(:lazy_run) return lazy_run(iterator, split_index) end iterator = iterator.to_a run(iterator, split_index) end
log(message=nil)
click to toggle source
# File lib/spark/command/base.rb, line 31 def log(message=nil) $stdout.puts %{==> #{Time.now.strftime("%H:%M:%S")} [#{self.class.name}] #{message}} $stdout.flush end
method_missing(method, *args, &block)
click to toggle source
Calls superclass method
# File lib/spark/command/base.rb, line 150 def method_missing(method, *args, &block) if __objects__ && __objects__.has_key?(method) return __objects__[method] end super end
prepare()
click to toggle source
This is called before execution. Executing will be stopped if some command contains error (e.g. badly serialized lambda).
What is doing?¶ ↑
-
evaluate lambda
-
evaluate method
-
make new lambda
# File lib/spark/command/base.rb, line 114 def prepare return if prepared? to_function = settings.variables.select {|_, options| options[:function]} to_function.each do |name, options| name = "@#{name}" data = instance_variable_get(name) case data[:type] when 'proc' result = eval(data[:content]) when 'symbol' result = lambda(&data[:content]) when 'method' # Method must me added to instance not Class instance_eval(data[:content]) # Method will be available as Proc result = lambda(&method(data[:name])) end instance_variable_set(name, result) end @prepared = true end
prepared?()
click to toggle source
# File lib/spark/command/base.rb, line 102 def prepared? !!@prepared end
settings()
click to toggle source
# File lib/spark/command/base.rb, line 47 def settings self.class.settings end
to_s()
click to toggle source
# File lib/spark/command/base.rb, line 19 def to_s self.class.name.split('::').last end