class TrRMIte::Consumers::Scheduler

Public Class Methods

default_name() click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 8
def self.default_name
  'EVENT_SCHEDULER'
end
new(topic, registry_uri, queue_uri, interval, state_store) click to toggle source
Calls superclass method TrRMIte::Consumer::new
# File lib/TrRMIte/consumers/scheduler.rb, line 12
def initialize(topic, registry_uri, queue_uri, interval, state_store)
  super(default_name, topic, registry_uri)

  @scheduled_events = []
  @semaphore = Mutex.new

  @queue_uri = queue_uri

  @interval      = interval
  @state_store   = state_store
end

Public Instance Methods

run() click to toggle source
Calls superclass method TrRMIte::Consumer#run
# File lib/TrRMIte/consumers/scheduler.rb, line 24
def run
  @scheduler = Thread.new do
    Thread.current.name = 'TRMT/publisher'
    producer = Producer.new(@queue_uri)
    loop do
      @semaphore.synchronize do
        publish_overdue_events(producer)
      end
      sleep(@interval)
    end
  end

  super

  @semaphore.synchronize do
    @scheduler.terminate
  end
end

Private Instance Methods

finalize() click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 72
def finalize
  # we already persisted the state... nothing to do!
end
persist_state(_event) click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 64
def persist_state(_event)
  write_state_store
end
publish_overdue_events(producer) click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 76
def publish_overdue_events(producer)
  wall_clock_time = TrRMIte.wall_clock_time

  overdue_events = @scheduled_events.find_all { |event|
    event[:publish_at] <= wall_clock_time
  }

  overdue_events.each do |event|
    producer.publish_scheduled(event)
    @scheduled_events.delete(event)
  end

  write_state_store if overdue_events.any?
end
restore_state(_sequence) click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 51
def restore_state(_sequence)
  if File.exist? @state_store
    json_state = File.read(@state_store)
    return if json_state.strip.empty?
    stored_events = JSON.parse(json_state, symbolize_names: true)
    return if stored_events.nil?
    restored_events = stored_events.map { |event| Event.from(event) }
    @semaphore.synchronize do
      @scheduled_events += restored_events
    end
  end
end
write_state_store() click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 68
def write_state_store
  File.write(@state_store, @scheduled_events.to_json)
end