class TensorStream::Evaluator::BaseEvaluator

Evaluator base class

Base class to be used by all tensor_stream evaluators, provides support functions

Public Class Methods

default_device() click to toggle source

Select the best device available in the system for this evaluator

# File lib/tensor_stream/evaluator/base_evaluator.rb, line 48
def self.default_device
  Device.new("cpu", :cpu, self)
end
fetch_device(_query = []) click to toggle source

Selects the best device with the specified query, query can be evaluator specific

# File lib/tensor_stream/evaluator/base_evaluator.rb, line 55
def self.fetch_device(_query = [])
  Device.new("cpu", :cpu, self)
end
new(session, _device, thread_pool: nil, log_intermediates: false) click to toggle source
# File lib/tensor_stream/evaluator/base_evaluator.rb, line 33
def initialize(session, _device, thread_pool: nil, log_intermediates: false)
  @session = session
  @log_intermediates = log_intermediates
  @thread_pool = thread_pool || Concurrent::ImmediateExecutor.new
  @context[:compute_history] = [] if log_intermediates
end
ops() click to toggle source

gets all supported ops for this Evaluator class

# File lib/tensor_stream/evaluator/base_evaluator.rb, line 113
def self.ops
  @ops ||= {}
end
query_device(query) click to toggle source

Select device using uri

# File lib/tensor_stream/evaluator/base_evaluator.rb, line 61
def self.query_device(query)
  return default_device if query.nil? || query == :default

  all_devices = query_supported_devices
  substrs = query.split("/")
  substrs.each do |q|
    components = q.split(":")
    next if components.size.zero?

    if components[0] == "device" # use tensorflow convention
      device_type = components[1]
      select_index = components[2].to_i

      devices = all_devices.select { |d| d.type == device_type.downcase.to_sym }
      return nil if devices.empty?

      select_index = [devices.size - 1, select_index].min
      return devices[select_index]
    elsif %w[cpu gpu].include?(components[0])
      device_type = components[0].to_sym
      select_index = components[1].to_i

      devices = all_devices.select { |d| d.type == device_type.downcase.to_sym }
      return nil if devices.empty?

      select_index = [devices.size - 1, select_index].min
      return devices[select_index]
    elsif components[0] == "ts" # tensorstream specific
      evaluator_class = TensorStream::Evaluator.evaluators[components[1]][:class]
      return nil unless self == evaluator_class
      return evaluator_class.fetch_device(components[2..components.size]) if evaluator_class.respond_to?(:fetch_device)

      return nil
    end
  end
end
query_supported_devices() click to toggle source

Query all supported devices

# File lib/tensor_stream/evaluator/base_evaluator.rb, line 42
def self.query_supported_devices
  [Device.new("cpu", :cpu, self)]
end
register_op(opcode, options = {}, &block) click to toggle source

registers an op for the current evaluator class

# File lib/tensor_stream/evaluator/base_evaluator.rb, line 100
def self.register_op(opcode, options = {}, &block)
  @ops ||= {}
  if opcode.is_a?(Array)
    opcode.each do |op|
      @ops[op.to_sym] = {options: options, block: block}
    end
  else
    @ops[opcode.to_sym] = {options: options, block: block}
  end
end

Public Instance Methods

invoke(tensor, execution_context) click to toggle source
# File lib/tensor_stream/evaluator/base_evaluator.rb, line 117
def invoke(tensor, execution_context)
  return eval_tensor(tensor, execution_context) unless tensor.is_a?(Operation)
  raise UnsupportedOp.new(tensor), "op #{tensor.operation} is not yet supported" unless self.class.ops.key?(tensor.operation.to_sym)

  op = self.class.ops[tensor.operation.to_sym]
  op_options = op[:options]

  resolved_inputs = tensor.inputs.map { |i|
    next if i.nil?
    next i if op_options[:noop]

    if i.is_a?(Array)
      next i.collect { |sub_item| sub_item.is_a?(Tensor) ? global_eval(tensor, sub_item, execution_context) : sub_item }
    end

    global_eval(tensor, i, execution_context, op_options)
  }

  start_time = if profile_enabled?
    time = Time.now
    time.to_i * (10**9) + time.nsec
  end

  instance_exec(execution_context, tensor, resolved_inputs, &op[:block]).tap do |result|
    if profile_enabled?
      time = Time.now
      end_time = time.to_i * (10**9) + time.nsec
      @context[:profile] ||= {step: 0, operations: {}}
      @context[:profile][:step] += 1
      @context[:profile][:operations][tensor.name] = {op: tensor.operation,
                                                       step: @context[:profile][:step],
                                                       eval_time: end_time - start_time,
                                                       shape: tensor.shape ? tensor.shape.shape : nil,
                                                       data_type: tensor.data_type,
                                                       tensor: tensor,}
    end
  end
end

Protected Instance Methods

convert_from_buffer(_tensor, _result) click to toggle source

converts from a ruby Buffer object to the evaluator's native buffer format

# File lib/tensor_stream/evaluator/base_evaluator.rb, line 216
def convert_from_buffer(_tensor, _result)
  raise "need implementation"
end
get_broadcast_gradient_args(input_a, input_b) click to toggle source
# File lib/tensor_stream/evaluator/base_evaluator.rb, line 192
def get_broadcast_gradient_args(input_a, input_b)
  return [[], []] if input_a == input_b

  input_a_args = []
  input_b_args = []

  input_a = Array.new(input_b.size) { |i| i < input_a.size ? input_a[i] : nil }.reverse if input_a.size < input_b.size
  input_b = Array.new(input_a.size) { |i| i < input_b.size ? input_b[i] : nil }.reverse if input_a.size > input_b.size

  input_a.reverse.zip(input_b.reverse).each_with_index do |item, index|
    a, b = item

    if a.nil? || b && (a < b)
      input_a_args << input_b.size - index - 1
    elsif b.nil? || a && (a > b)
      input_b_args << input_a.size - index - 1
    end
  end

  [input_a_args.reverse, input_b_args.reverse]
end
global_eval(tensor, input, execution_context, op_options = {}) click to toggle source
# File lib/tensor_stream/evaluator/base_evaluator.rb, line 174
def global_eval(tensor, input, execution_context, op_options = {})
  return nil unless input
  return input unless input.is_a?(Tensor)

  # puts "global eval #{tensor.name}"
  @context[:_cache][:placement][input.name] = @session.assign_evaluator(input) if @context[:_cache][:placement][input.name].nil?
  if !on_same_device?(input) # tensor is on another device or evaluator
    # puts "transition #{object_id} -> #{@context[:_cache][:placement][input.name][1].object_id}"
    perform_transition(tensor, input, @context[:_cache][:placement][input.name][1], execution_context)
  else
    prepare_input(input, execution_context, op_options)
  end
end
on_same_device?(tensor) click to toggle source
# File lib/tensor_stream/evaluator/base_evaluator.rb, line 188
def on_same_device?(tensor)
  object_id == @context[:_cache][:placement][tensor.name][1].object_id
end
perform_transition(tensor, input, _next_evaluator, execution_context) click to toggle source

called when passing control to another evaluator

# File lib/tensor_stream/evaluator/base_evaluator.rb, line 164
def perform_transition(tensor, input, _next_evaluator, execution_context)
  cache_key = "#{tensor.graph.object_id}_#{input.name}:#{object_id}"
  return @context[:_cache][cache_key] if @context[:_cache].key?(cache_key)

  result = @session.delegate_to_evaluator(input, @context, execution_context)
  convert_from_buffer(input, result).tap do |buffer|
    @context[:_cache][cache_key] = buffer if input.is_const
  end
end
prepare_input(_tensor, _context, _options = {}) click to toggle source
# File lib/tensor_stream/evaluator/base_evaluator.rb, line 220
def prepare_input(_tensor, _context, _options = {})
  raise "need implementation"
end
profile_enabled?() click to toggle source
# File lib/tensor_stream/evaluator/base_evaluator.rb, line 158
def profile_enabled?
  @context[:_options][:profile_enabled]
end