module RxRuby::Scheduler
Module for scheduling actions
Public Class Methods
Normalizes the specified TimeSpan value to a positive value.
# File lib/rx_ruby/concurrency/scheduler.rb, line 78 def self.normalize(time_span) time_span < 0 ? 0 : time_span end
Gets the current time according to the local machine’s system clock.
# File lib/rx_ruby/concurrency/scheduler.rb, line 11 def self.now Time.now end
Public Instance Methods
Schedules an action to be executed.
# File lib/rx_ruby/concurrency/scheduler.rb, line 16 def schedule(action) raise 'action cannot be nil' unless action schedule_with_state(action, method(:invoke)) end
Schedules an action to be executed at the specified absolute due time.
# File lib/rx_ruby/concurrency/scheduler.rb, line 28 def schedule_absolute(due_time, action) raise 'action cannot be nil' unless action schedule_absolute_with_state(action, due_time, method(:invoke)) end
Schedules an action to be executed recursively.
# File lib/rx_ruby/concurrency/scheduler.rb, line 34 def schedule_recursive(action) raise 'action cannot be nil' unless action schedule_recursive_with_state(action, lambda {|_action, _self| _action.call(lambda { _self.call(_action) }) }) end
Schedules an action to be executed recursively after a specified absolute due time.
# File lib/rx_ruby/concurrency/scheduler.rb, line 62 def schedule_recursive_absolute(due_time, action) raise 'action cannot be nil' unless action schedule_recursive_absolute_with_state(action, due_time, lambda {|_action, _self| _action.call(lambda {|dt| _self.call(_action, dt) }) }) end
Schedules an action to be executed recursively after a specified absolute due time.
# File lib/rx_ruby/concurrency/scheduler.rb, line 68 def schedule_recursive_absolute_with_state(state, due_time, action) raise 'action cannot be nil' unless action schedule_absolute_with_state( { :state => state, :action => action}, due_time, lambda { |sched, pair| invoke_recursive_time(sched, pair, 'schedule_absolute_with_state') } ) end
Schedules an action to be executed recursively after a specified relative due time.
# File lib/rx_ruby/concurrency/scheduler.rb, line 46 def schedule_recursive_relative(due_time, action) raise 'action cannot be nil' unless action schedule_recursive_relative_with_state(action, due_time, lambda {|_action, _self| _action.call(lambda {|dt| _self.call(_action, dt) }) }) end
Schedules an action to be executed recursively after a specified relative due time.
# File lib/rx_ruby/concurrency/scheduler.rb, line 52 def schedule_recursive_relative_with_state(state, due_time, action) raise 'action cannot be nil' unless action schedule_relative_with_state( { :state => state, :action => action}, due_time, lambda { |sched, pair| invoke_recursive_time(sched, pair, 'schedule_relative_with_state') } ) end
Schedules an action to be executed recursively.
# File lib/rx_ruby/concurrency/scheduler.rb, line 40 def schedule_recursive_with_state(state, action) raise 'action cannot be nil' unless action schedule_with_state({ :state => state, :action => action}, method(:invoke_recursive)) end
Schedules an action to be executed after the specified relative due time.
# File lib/rx_ruby/concurrency/scheduler.rb, line 22 def schedule_relative(due_time, action) raise 'action cannot be nil' unless action schedule_relative_with_state(action, due_time, method(:invoke)) end
Private Instance Methods
# File lib/rx_ruby/concurrency/scheduler.rb, line 84 def invoke(scheduler, action) action.call() Subscription.empty end
# File lib/rx_ruby/concurrency/scheduler.rb, line 89 def invoke_recursive(scheduler, pair) group = CompositeSubscription.new gate = Mutex.new state = pair[:state] action = pair[:action] recursive_action = lambda {|state1| action.call(state1, lambda {|state2| is_added = false is_done = false d = nil d = scheduler.schedule_with_state(state2, lambda do |_, state3| gate.synchronize do if is_added group.delete(d) else is_done = true end end recursive_action.call(state3) Subscription.empty end) gate.synchronize do unless is_done group.push(d) is_added = true end end }) } recursive_action.call(state) group end
# File lib/rx_ruby/concurrency/scheduler.rb, line 126 def invoke_recursive_time(scheduler, pair, method) group = CompositeSubscription.new gate = Mutex.new state = pair[:state] action = pair[:action] recursive_action = ->(state1) do internal_action = ->(state2, due_time1) do d = scheduler.send(method, state2, due_time1, lambda do |_, state3| gate.synchronize do group.delete(d) end recursive_action.call(state3) Subscription.empty end) end action.call(state1, internal_action) end recursive_action.call(state) group end