module RxRuby::Scheduler

Module for scheduling actions

Public Class Methods

normalize(time_span) click to toggle source

Normalizes the specified TimeSpan value to a positive value.

# File lib/rx_ruby/concurrency/scheduler.rb, line 78
def self.normalize(time_span)
  time_span < 0 ? 0 : time_span
end
now() click to toggle source

Gets the current time according to the local machine’s system clock.

# File lib/rx_ruby/concurrency/scheduler.rb, line 11
def self.now
  Time.now
end

Public Instance Methods

schedule(action) click to toggle source

Schedules an action to be executed.

# File lib/rx_ruby/concurrency/scheduler.rb, line 16
def schedule(action)
  raise 'action cannot be nil' unless action
  schedule_with_state(action, method(:invoke))
end
schedule_absolute(due_time, action) click to toggle source

Schedules an action to be executed at the specified absolute due time.

# File lib/rx_ruby/concurrency/scheduler.rb, line 28
def schedule_absolute(due_time, action)
  raise 'action cannot be nil' unless action
  schedule_absolute_with_state(action, due_time, method(:invoke))
end
schedule_recursive(action) click to toggle source

Schedules an action to be executed recursively.

# File lib/rx_ruby/concurrency/scheduler.rb, line 34
def schedule_recursive(action)
  raise 'action cannot be nil' unless action
  schedule_recursive_with_state(action, lambda {|_action, _self| _action.call(lambda { _self.call(_action) }) })
end
schedule_recursive_absolute(due_time, action) click to toggle source

Schedules an action to be executed recursively after a specified absolute due time.

# File lib/rx_ruby/concurrency/scheduler.rb, line 62
def schedule_recursive_absolute(due_time, action)
  raise 'action cannot be nil' unless action
  schedule_recursive_absolute_with_state(action, due_time, lambda {|_action, _self| _action.call(lambda {|dt| _self.call(_action, dt) }) })
end
schedule_recursive_absolute_with_state(state, due_time, action) click to toggle source

Schedules an action to be executed recursively after a specified absolute due time.

# File lib/rx_ruby/concurrency/scheduler.rb, line 68
def schedule_recursive_absolute_with_state(state, due_time, action)
  raise 'action cannot be nil' unless action
  schedule_absolute_with_state(
    { :state => state, :action => action}, 
    due_time,
    lambda { |sched, pair| invoke_recursive_time(sched, pair, 'schedule_absolute_with_state') }
  )
end
schedule_recursive_relative(due_time, action) click to toggle source

Schedules an action to be executed recursively after a specified relative due time.

# File lib/rx_ruby/concurrency/scheduler.rb, line 46
def schedule_recursive_relative(due_time, action)
  raise 'action cannot be nil' unless action
  schedule_recursive_relative_with_state(action, due_time, lambda {|_action, _self| _action.call(lambda {|dt| _self.call(_action, dt) }) })
end
schedule_recursive_relative_with_state(state, due_time, action) click to toggle source

Schedules an action to be executed recursively after a specified relative due time.

# File lib/rx_ruby/concurrency/scheduler.rb, line 52
def schedule_recursive_relative_with_state(state, due_time, action)
  raise 'action cannot be nil' unless action
  schedule_relative_with_state(
    { :state => state, :action => action}, 
    due_time,
    lambda { |sched, pair| invoke_recursive_time(sched, pair, 'schedule_relative_with_state') }
  )
end
schedule_recursive_with_state(state, action) click to toggle source

Schedules an action to be executed recursively.

# File lib/rx_ruby/concurrency/scheduler.rb, line 40
def schedule_recursive_with_state(state, action)
  raise 'action cannot be nil' unless action
  schedule_with_state({ :state => state, :action => action}, method(:invoke_recursive))
end
schedule_relative(due_time, action) click to toggle source

Schedules an action to be executed after the specified relative due time.

# File lib/rx_ruby/concurrency/scheduler.rb, line 22
def schedule_relative(due_time, action)
  raise 'action cannot be nil' unless action
  schedule_relative_with_state(action, due_time, method(:invoke))
end

Private Instance Methods

invoke(scheduler, action) click to toggle source
# File lib/rx_ruby/concurrency/scheduler.rb, line 84
def invoke(scheduler, action)
  action.call()
  Subscription.empty
end
invoke_recursive(scheduler, pair) click to toggle source
# File lib/rx_ruby/concurrency/scheduler.rb, line 89
def invoke_recursive(scheduler, pair)
  group = CompositeSubscription.new
  gate = Mutex.new
  state = pair[:state]
  action = pair[:action]

  recursive_action = lambda {|state1|
    action.call(state1, lambda {|state2|  
      is_added = false
      is_done = false
      d = nil
      d = scheduler.schedule_with_state(state2, lambda do |_, state3|
        gate.synchronize do
          if is_added
            group.delete(d)
          else
            is_done = true
          end
        end

        recursive_action.call(state3)
        Subscription.empty
      end)

      gate.synchronize do
        unless is_done
          group.push(d)
          is_added = true
        end
      end
    })
  }

  recursive_action.call(state)
  group
end
invoke_recursive_time(scheduler, pair, method) click to toggle source
# File lib/rx_ruby/concurrency/scheduler.rb, line 126
def invoke_recursive_time(scheduler, pair, method)
  group = CompositeSubscription.new
  gate = Mutex.new
  state = pair[:state]
  action = pair[:action]

  recursive_action = ->(state1) do
    internal_action = ->(state2, due_time1) do

      d = scheduler.send(method, state2, due_time1, lambda do |_, state3|
        gate.synchronize do
          group.delete(d)
        end
        recursive_action.call(state3)
        Subscription.empty
      end)
    end
    action.call(state1, internal_action)
  end

  recursive_action.call(state)
  group            
end