module Abstractivator::FiberDefer

Provides a pair of functions for handling long-running requests in a thread pool. Uses fibers to maintain a somewhat normal coding style (hides explicit continuations). with_fiber_defer defines the lexical scope of all work being done. fiber_defer defines the portion of the work to be done in the worker thread. Control passes from the calling thread, to the worker thread, and back to the calling thread. The code is capable of propagating thread variables (e.g., Mongoid::Threaded.database_override) across these thread/fiber transitions. See EventMachine::defer for more information.

Constants

ROOT_FIBER

Public Instance Methods

fiber_defer(thread_var_guard=nil, &action) click to toggle source
# File lib/abstractivator/fiber_defer.rb, line 40
def fiber_defer(thread_var_guard=nil, &action)
  # we start out in the caller thread
  inherited_guard_proc = Thread.current[:fiber_defer_guard_proc]
  raise 'fiber_defer cannot be nested within fiber_defer' if Thread.current[:inside_fiber_defer]
  raise 'fiber_defer must be called within a with_fiber_defer block' unless inherited_guard_proc
  raise 'fiber_defer must be passed an action to defer (the block)' unless action
  local_guard_proc = make_guard_proc(thread_var_guard)
  guard_proc = proc do
    inherited_guard_proc.call
    local_guard_proc.call
  end
  begin
    basic_fiber_defer do
      # in the background thread now
      begin
        Thread.current[:inside_fiber_defer] = true
        guard_proc.call
        action.call
      ensure
        Thread.current[:inside_fiber_defer] = false
      end
    end
  ensure
    # back in the caller thread
    guard_proc.call
  end
end
mongoid_fiber_defer(&action) click to toggle source
# File lib/abstractivator/fiber_defer.rb, line 68
def mongoid_fiber_defer(&action)
  thread_vars = {
    Mongoid::Threaded.database_override => proc { |db| Mongoid.override_database(db) }
  }
  fiber_defer(thread_vars, &action)
end
with_fiber_defer(thread_var_guard=nil, &block) click to toggle source
# File lib/abstractivator/fiber_defer.rb, line 22
def with_fiber_defer(thread_var_guard=nil, &block)
  raise 'this method requires an eventmachine reactor to be running' unless EM.reactor_running?
  raise 'with_fiber_defer cannot be nested within with_fiber_defer' if Thread.current[:fiber_defer_guard_proc]
  raise 'with_fiber_defer cannot be nested within fiber_defer' if Thread.current[:inside_fiber_defer]
  return unless block
  guard_proc = make_guard_proc(thread_var_guard)
  f = Fiber.new do
    guard_proc.call
    begin
      Thread.current[:fiber_defer_guard_proc] = guard_proc # make available to fiber_defer calls
      block.call
    ensure
      Thread.current[:fiber_defer_guard_proc] = nil
    end
  end
  f.resume
end

Private Instance Methods

basic_fiber_defer(&action) click to toggle source
# File lib/abstractivator/fiber_defer.rb, line 94
def basic_fiber_defer(&action)
  f = Fiber.current
  raise 'fiber_defer must be called within a with_fiber_defer block' if f == ROOT_FIBER

  safe_action = proc do
    begin
      [action.call, nil]
    rescue SignalException => e
      # The thread has been instructed to shut down, so let it.
      # Don't pass on the exception to another thread/fiber.
      # We will assume whatever is shutting down the thread will
      # also account for the paused fiber on its own.
      raise
    rescue Exception => e
      [nil, e]
    end
  end

  EM.defer(safe_action, proc { |result, error| f.resume([result, error]) })
  result, error = Fiber.yield
  raise error if error
  result
end
make_guard_proc(x) click to toggle source
# File lib/abstractivator/fiber_defer.rb, line 77
def make_guard_proc(x)
  case x
  when Proc
    x
  when Hash
    proc do
      x.each do |value, setter|
        setter.call(value)
      end
    end
  when nil
    proc { }
  else
    raise "Cannot turn #{x.inspect} into a guard proc"
  end
end