class TrRMIte::Producer
Public Class Methods
new(queue_uri = nil)
click to toggle source
# File lib/TrRMIte/producer.rb, line 4 def initialize(queue_uri = nil) DRb.start_service # ensure DRb is running for this producer process queue_uri ||= TrRMIte::DEFAULT_QUEUE_URI @queue = DRbObject.new_with_uri(queue_uri) end
Public Instance Methods
publish(event)
click to toggle source
# File lib/TrRMIte/producer.rb, line 10 def publish event @queue.push event.to_json end
publish_scheduled(event)
click to toggle source
# File lib/TrRMIte/producer.rb, line 28 def publish_scheduled event scheduled_event = Event.from(event[:scheduled_event]) publish(scheduled_event) publish( Event.new( topic: TrRMIte::INTERNAL_TOPIC, type: 'ScheduledEventPublished', payload: { published_at: TrRMIte.wall_clock_time, }, causation_id: event.uuid, correlation_id: scheduled_event[:uuid], ) ) end
schedule(event, publish_at = 0)
click to toggle source
# File lib/TrRMIte/producer.rb, line 14 def schedule event, publish_at = 0 publish( Event.new( topic: TrRMIte::INTERNAL_TOPIC, type: 'FutureEventScheduled', payload: { scheduled_event: event.to_h, scheduled_at: TrRMIte.wall_clock_time, publish_at: publish_at, }, ) ) end