class TrRMIte::Producer

Public Class Methods

new(queue_uri = nil) click to toggle source
# File lib/TrRMIte/producer.rb, line 4
def initialize(queue_uri = nil)
  DRb.start_service # ensure DRb is running for this producer process
  queue_uri ||= TrRMIte::DEFAULT_QUEUE_URI
  @queue = DRbObject.new_with_uri(queue_uri)
end

Public Instance Methods

publish(event) click to toggle source
# File lib/TrRMIte/producer.rb, line 10
def publish event
  @queue.push event.to_json
end
publish_scheduled(event) click to toggle source
# File lib/TrRMIte/producer.rb, line 28
def publish_scheduled event
  scheduled_event = Event.from(event[:scheduled_event])
  publish(scheduled_event)

  publish(
    Event.new(
      topic:   TrRMIte::INTERNAL_TOPIC,
      type:    'ScheduledEventPublished',
      payload: {
        published_at: TrRMIte.wall_clock_time,
      },
      causation_id:   event.uuid,
      correlation_id: scheduled_event[:uuid],
    )
  )
end
schedule(event, publish_at = 0) click to toggle source
# File lib/TrRMIte/producer.rb, line 14
def schedule event, publish_at = 0
  publish(
    Event.new(
      topic:   TrRMIte::INTERNAL_TOPIC,
      type:    'FutureEventScheduled',
      payload: {
        scheduled_event: event.to_h,
        scheduled_at:    TrRMIte.wall_clock_time,
        publish_at:      publish_at,
      },
    )
  )
end