class Delayer::Deferred::Worker

Deferredを実行するためのWorker。Deferredチェインを実行するFiberを 管理する。

pushに渡すオブジェクトについて

Worker#push に渡す引数は、activateメソッドを実装している必要がある。

activate(response)

Args

response

Delayer::Deferred::Response::Base Deferredに渡す値

Returns

Delayer::Deferred::Response::Base

これを返すと、値の自動変換が行われないため、意図的に失敗させたり、Deferredを次のブロックに伝搬させることができる。

Delayer::Deferred::Chainable

戻り値のDeferredが終わるまでWorkerの処理を停止する。 再開された時、結果は戻り値のDeferredの結果に置き換えられる。

else

Delayer::Deferred::Response::Ok.new の引数に渡され、その結果が利用される

Public Class Methods

new(delayer:, initial:) click to toggle source
# File lib/delayer/deferred/worker.rb, line 25
def initialize(delayer:, initial:)
  @delayer, @initial = delayer, initial
end

Public Instance Methods

give_response(response, deferred) click to toggle source

Awaitから復帰した時に呼ばれる。

Args

response

Awaitの結果(Delayer::Deferred::Response::Base)

deferred

現在実行中のDeferred

# File lib/delayer/deferred/worker.rb, line 48
def give_response(response, deferred)
  @delayer.new do
    next if deferred.spoiled?
    deferred.exit_await
    fiber.resume(response).accept_request(worker: self,
                                          deferred: deferred)
  end
  nil
end
push(deferred) click to toggle source
# File lib/delayer/deferred/worker.rb, line 29
def push(deferred)
  deferred.reserve_activate
  @delayer.new do
    next if deferred.spoiled?
    begin
      fiber.resume(deferred).accept_request(worker: self,
                                            deferred: deferred)
    rescue Delayer::Deferred::SequenceError => err
      err.deferred = deferred
      raise
    end
  end
  nil
end
resume_pass(deferred) click to toggle source

Tools#pass から復帰した時に呼ばれる。

Args

deferred

現在実行中のDeferred

# File lib/delayer/deferred/worker.rb, line 61
def resume_pass(deferred)
  deferred.exit_pass
  @delayer.new do
    next if deferred.spoiled?
    fiber.resume(nil).accept_request(worker: self,
                                     deferred: deferred)
  end
end

Private Instance Methods

fiber() click to toggle source
# File lib/delayer/deferred/worker.rb, line 72
def fiber
  @fiber ||= Fiber.new{|response|
    loop do
      response = wait_and_activate(response)
      case response.value
      when Delayer::Deferred::SequenceError
        raise response.value
      end
    end
  }.tap{|f| f.resume(@initial); @initial = nil }
end
wait_and_activate(argument) click to toggle source
# File lib/delayer/deferred/worker.rb, line 84
def wait_and_activate(argument)
  response = catch(:success) do
    failed = catch(:__deferredable_fail) do
      begin
        if argument.value.is_a? Deferredable::Awaitable
          throw :success, +argument.value
        else
          defer = Fiber.yield(Request::NEXT_WORKER)
          res = defer.activate(argument)
          if res.is_a? Delayer::Deferred::Deferredable::Awaitable
            defer.add_awaited(res)
          end
        end
        throw :success, res
      rescue Exception => err
        throw :__deferredable_fail, err
      end
    end
    Response::Ng.new(failed)
  end
  if response.is_a?(Response::Base)
    response
  else
    Response::Ok.new(response)
  end
end