class ThomasUtils::Observation

Public Class Methods

new(executor, observable) click to toggle source
# File lib/thomas_utils/observation.rb, line 7
def initialize(executor, observable)
  @executor = executor
  @observable = observable
end

Public Instance Methods

ensure(&block) click to toggle source
# File lib/thomas_utils/observation.rb, line 58
def ensure(&block)
  successive(:on_complete_ensure, &block)
end
fallback(&block) click to toggle source
# File lib/thomas_utils/observation.rb, line 54
def fallback(&block)
  successive(:on_complete_fallback, &block)
end
join() click to toggle source
# File lib/thomas_utils/observation.rb, line 39
def join
  @observable.value
  self
end
none_fallback() { || ... } click to toggle source
# File lib/thomas_utils/observation.rb, line 48
def none_fallback
  self.then do |result|
    result || yield
  end
end
on_complete() { |value, error| ... } click to toggle source
# File lib/thomas_utils/observation.rb, line 30
def on_complete
  @observable.add_observer do |_, value, error|
    @executor.post do
      yield value, error
    end
  end
  self
end
on_failure() { |error| ... } click to toggle source
# File lib/thomas_utils/observation.rb, line 21
def on_failure
  @observable.add_observer do |_, _, error|
    @executor.post do
      yield error if error
    end
  end
  self
end
on_failure_ensure(&block) click to toggle source
# File lib/thomas_utils/observation.rb, line 73
def on_failure_ensure(&block)
  self.fallback do |error|
    result = block.call(error)
    error_observation = Observation.new(@executor, ConstantVar.error(error))
    if result_is_observation?(result)
      result.then { error_observation }
    else
      error_observation
    end
  end
end
on_success() { |value| ... } click to toggle source
# File lib/thomas_utils/observation.rb, line 12
def on_success
  @observable.add_observer do |_, value, error|
    @executor.post do
      yield value unless error
    end
  end
  self
end
on_success_ensure(&block) click to toggle source
# File lib/thomas_utils/observation.rb, line 62
def on_success_ensure(&block)
  self.then do |result|
    child_result = block.call(result)
    if result_is_observation?(child_result)
      child_result.then { result }
    else
      result
    end
  end
end
then(&block) click to toggle source
# File lib/thomas_utils/observation.rb, line 44
def then(&block)
  successive(:on_complete_then, &block)
end

Private Instance Methods

ensure_complete_then(error, observable, result, value) click to toggle source
# File lib/thomas_utils/observation.rb, line 143
def ensure_complete_then(error, observable, result, value)
  result.on_complete do |_, child_error|
    if child_error
      observable.fail(child_error)
    elsif error
      observable.fail(error)
    else
      observable.set(value)
    end
  end
end
ensure_then(error, observable, value) click to toggle source
# File lib/thomas_utils/observation.rb, line 155
def ensure_then(error, observable, value)
  if error
    observable.fail(error)
  else
    observable.set(value)
  end
end
on_complete_ensure(observable) { |value, error| ... } click to toggle source
# File lib/thomas_utils/observation.rb, line 128
def on_complete_ensure(observable)
  on_complete do |value, error|
    begin
      result = yield value, error
      if result_is_observation?(result)
        ensure_complete_then(error, observable, result, value)
      else
        ensure_then(error, observable, value)
      end
    rescue Exception => child_error
      observable.fail(child_error)
    end
  end
end
on_complete_fallback(observable, &block) click to toggle source
# File lib/thomas_utils/observation.rb, line 103
def on_complete_fallback(observable, &block)
  on_complete do |value, error|
    if error
      on_failure_fallback(observable, error, &block)
    else
      observable.set(value)
    end
  end
end
on_complete_then(observable, &block) click to toggle source
# File lib/thomas_utils/observation.rb, line 93
def on_complete_then(observable, &block)
  on_complete do |value, error|
    if error
      observable.fail(error)
    else
      on_success_then(observable, value, &block)
    end
  end
end
on_failure_fallback(observable, value)
Alias for: on_success_then
on_success_then(observable, value) { |value| ... } click to toggle source
# File lib/thomas_utils/observation.rb, line 113
def on_success_then(observable, value)
  result = yield value
  if result_is_observation?(result)
    result.on_complete do |child_result, child_error|
      ensure_then(child_error, observable, child_result)
    end
  else
    observable.set(result)
  end
rescue Exception => error
  observable.fail(error)
end
Also aliased as: on_failure_fallback
result_is_observation?(result) click to toggle source
# File lib/thomas_utils/observation.rb, line 163
def result_is_observation?(result)
  result.respond_to?(:on_complete) && result.respond_to?(:then)
end
successive(method, &block) click to toggle source
# File lib/thomas_utils/observation.rb, line 87
def successive(method, &block)
  observable = Concurrent::IVar.new
  send(method, observable, &block)
  Observation.new(@executor, observable)
end