class RxRuby::VirtualTimeScheduler
Base class for virtual time schedulers using a priority queue for scheduled items.
Attributes
Public Class Methods
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 16 def initialize(initial_clock) @clock = initial_clock.to_i @queue = PriorityQueue.new @enabled = false end
Public Instance Methods
Advances the scheduler’s clock by the specified relative time, running all work scheduled for that timespan.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 130 def advance_by(time) dt = @clock + time due_to_clock = dt<=>clock raise 'Time is out of range' if due_to_clock < 0 return if due_to_clock == 0 raise 'Cannot advance while running' if @enabled self.advance_to dt end
Advances the scheduler’s clock to the specified time, running all work till that point.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 102 def advance_to(time) due_to_clock = time<=>clock raise 'Time is out of range' if due_to_clock < 0 return if due_to_clock == 0 unless @enabled @enabled = true begin next_item = self.get_next if !next_item.nil? && next_item.due_time <= time @clock = next_item.due_time if next_item.due_time > @clock next_item.invoke else @enabled = false end end while @enabled @clock = time else raise 'Cannot advance while running' end end
Gets whether the scheduler is enabled to run work.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 28 def enabled? @enabled end
Gets the next scheduled item to be executed
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 153 def get_next while next_item = @queue.peek if next_item.cancelled? @queue.shift else return next_item end end return nil end
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 165 def invoke(scheduler, action) action.call Subscription.empty end
Gets the scheduler’s notion of current time.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 23 def now clock end
Schedules an action to be executed at due_time.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 78 def schedule_at_absolute(due_time, action) raise 'action cannot be nil' unless action schedule_at_absolute_with_state(action, due_time, method(:invoke)) end
Schedules an action to be executed at due_time.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 85 def schedule_at_absolute_with_state(state, due_time, action) raise 'action cannot be nil' unless action si = nil run = lambda {|scheduler, state1| @queue.delete si action.call(scheduler, state1) } si = ScheduledItem.new(self, state, due_time, &run) @queue.push si Subscription.create { si.cancel } end
Schedules an action to be executed at due_time.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 63 def schedule_at_relative(due_time, action) raise 'action cannot be nil' unless action schedule_at_relative_with_state(action, due_time, method(:invoke)) end
Schedules an action to be executed at due_time.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 70 def schedule_at_relative_with_state(state, due_time, action) raise 'action cannot be nil' unless action schedule_at_absolute_with_state(state, @clock + due_time, action) end
Schedules an action to be executed.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 57 def schedule_with_state(state, action) raise 'action cannot be nil' unless action schedule_at_absolute_with_state(state, clock, action) end
Advances the scheduler’s clock by the specified relative time.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 143 def sleep(time) dt = @clock + time due_to_clock = dt<=>@clock raise 'Time is out of range' if due_to_clock < 0 @clock = dt end
Starts the virtual time scheduler.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 33 def start unless @enabled @enabled = true begin next_item = self.get_next unless next_item.nil? @clock = next_item.due_time if next_item.due_time > @clock next_item.invoke else @enabled = false end end while @enabled end end
Stops the virtual time scheduler.
# File lib/rx_ruby/concurrency/virtual_time_scheduler.rb, line 52 def stop @enabled = false end