class Expeditor::Command

Public Class Methods

const(value) click to toggle source
# File lib/expeditor/command.rb, line 121
def self.const(value)
  ConstCommand.new(value)
end
new(opts = {}, &block) click to toggle source
# File lib/expeditor/command.rb, line 13
def initialize(opts = {}, &block)
  @service = opts.fetch(:service, Expeditor::Services.default)
  @timeout = opts[:timeout]
  @dependencies = opts.fetch(:dependencies, [])

  @normal_future = nil
  @retryable_options = Concurrent::IVar.new
  @normal_block = block
  @fallback_block = nil
  @ivar = Concurrent::IVar.new
end
start(opts = {}, &block) click to toggle source
# File lib/expeditor/command.rb, line 125
def self.start(opts = {}, &block)
  Command.new(opts, &block).start
end

Public Instance Methods

chain(opts = {}, &block) click to toggle source

XXX: Raise ArgumentError when given `opts` has :dependencies because this forcefully change given :dependencies.

`chain` returns new command that has self as dependencies

# File lib/expeditor/command.rb, line 116
def chain(opts = {}, &block)
  opts[:dependencies] = [self]
  Command.new(opts, &block)
end
get() click to toggle source
# File lib/expeditor/command.rb, line 51
def get
  raise NotStartedError unless started?
  @normal_future.get_or_else do
    if @fallback_block && @service.fallback_enabled?
      @ivar.wait
      if @ivar.rejected?
        raise @ivar.reason
      else
        @ivar.value
      end
    else
      raise @normal_future.reason
    end
  end
end
on_complete(&block) click to toggle source

command.on_complete do |success, value, reason|

...

end

# File lib/expeditor/command.rb, line 88
def on_complete(&block)
  on do |_, value, reason|
    block.call(reason == nil, value, reason)
  end
end
on_failure(&block) click to toggle source

command.on_failure do |e|

...

end

# File lib/expeditor/command.rb, line 106
def on_failure(&block)
  on do |_, _, reason|
    block.call(reason) if reason
  end
end
on_success(&block) click to toggle source

command.on_success do |value|

...

end

# File lib/expeditor/command.rb, line 97
def on_success(&block)
  on do |_, value, reason|
    block.call(value) unless reason
  end
end
set_fallback(&block) click to toggle source
# File lib/expeditor/command.rb, line 67
def set_fallback(&block)
  if started?
    raise AlreadyStartedError, "Do not allow set_fallback call after command is started"
  end
  reset_fallback(&block)
  self
end
start(current_thread: false) click to toggle source

@param current_thread [Boolean] Execute the task on current thread(blocking)

# File lib/expeditor/command.rb, line 26
def start(current_thread: false)
  unless started?
    if current_thread
      prepare(Concurrent::ImmediateExecutor.new)
    else
      prepare
    end
    @normal_future.safe_execute
  end
  self
end
start_with_retry(current_thread: false, **retryable_options) click to toggle source

Equivalent to retryable gem options

# File lib/expeditor/command.rb, line 39
def start_with_retry(current_thread: false, **retryable_options)
  unless started?
    @retryable_options.set(retryable_options)
    start(current_thread: current_thread)
  end
  self
end
started?() click to toggle source
# File lib/expeditor/command.rb, line 47
def started?
  @normal_future && @normal_future.executed?
end
wait() click to toggle source
# File lib/expeditor/command.rb, line 80
def wait
  raise NotStartedError unless started?
  @ivar.wait
end
with_fallback(&block) click to toggle source
# File lib/expeditor/command.rb, line 75
def with_fallback(&block)
  warn 'Expeditor::Command#with_fallback is deprecated. Please use set_fallback instead'
  set_fallback(&block)
end

Private Instance Methods

breakable_block(args, &block) click to toggle source
# File lib/expeditor/command.rb, line 229
def breakable_block(args, &block)
  @service.run_if_allowed do
    block.call(*args)
  end
end
initial_normal(executor, &block) click to toggle source

timeout_block do

retryable_block do
  breakable_block do
    block.call
  end
end

end

# File lib/expeditor/command.rb, line 166
def initial_normal(executor, &block)
  future = RichFuture.new(executor: executor) do
    args = wait_dependencies
    timeout_block(args, &block)
  end
  future.add_observer do |_, _, reason|
    metrics(reason)
  end
  future
end
metrics(reason) click to toggle source
# File lib/expeditor/command.rb, line 235
def metrics(reason)
  case reason
  when nil
    @service.success
  when Timeout::Error
    @service.timeout
  when RejectedExecutionError
    @service.rejection
  when CircuitBreakError
    @service.break
  when DependencyError
    @service.dependency
  else
    @service.failure
  end
end
on(&callback) click to toggle source
# File lib/expeditor/command.rb, line 256
def on(&callback)
  @ivar.add_observer(&callback)
end
prepare(executor = @service.executor) click to toggle source

set future set fallback future as an observer start dependencies

# File lib/expeditor/command.rb, line 134
def prepare(executor = @service.executor)
  @normal_future = initial_normal(executor, &@normal_block)
  @normal_future.add_observer do |_, value, reason|
    if reason # failure
      if @fallback_block
        future = RichFuture.new(executor: executor) do
          success, value, reason = Concurrent::SafeTaskExecutor.new(@fallback_block, rescue_exception: true).execute(reason)
          if success
            @ivar.set(value)
          else
            @ivar.fail(reason)
          end
        end
        future.safe_execute
      else
        @ivar.fail(reason)
      end
    else # success
      @ivar.set(value)
    end
  end

  @dependencies.each(&:start)
end
reset_fallback(&block) click to toggle source
# File lib/expeditor/command.rb, line 252
def reset_fallback(&block)
  @fallback_block = block
end
retryable_block(args, &block) click to toggle source
# File lib/expeditor/command.rb, line 218
def retryable_block(args, &block)
  if @retryable_options.fulfilled?
    Retryable.retryable(@retryable_options.value) do |retries, exception|
      metrics(exception) if retries > 0
      breakable_block(args, &block)
    end
  else
    breakable_block(args, &block)
  end
end
timeout_block(args, &block) click to toggle source
# File lib/expeditor/command.rb, line 208
def timeout_block(args, &block)
  if @timeout
    Timeout::timeout(@timeout) do
      retryable_block(args, &block)
    end
  else
    retryable_block(args, &block)
  end
end
wait_dependencies() click to toggle source
# File lib/expeditor/command.rb, line 177
def wait_dependencies
  if @dependencies.count > 0
    current = Thread.current
    executor = Concurrent::ThreadPoolExecutor.new(
      min_threads: 0,
      max_threads: 5,
      max_queue: 0,
    )
    error = Concurrent::IVar.new
    error.add_observer do |_, e, _|
      executor.shutdown
      current.raise(DependencyError.new(e))
    end
    args = []
    @dependencies.each_with_index do |c, i|
      executor.post do
        begin
          args[i] = c.get
        rescue => e
          error.set(e)
        end
      end
    end
    executor.shutdown
    executor.wait_for_termination
    args
  else
    []
  end
end