class Shamu::Events::ActiveRecord::Service

Store events in a database using ActiveRecord persistence layer.

## Runner IDS

A globally unique id (may be UUID or a well- defined internal convention that guarantees uniqueness.) The runner id is used by the system to track which messages have been delivered to the subscribers hosted by that runner process. This allows dispatching to resume should the host or process die.

Attributes

channels[R]
mutex[R]

Public Class Methods

ensure_records!() click to toggle source

Ensure that the tables are present in the database and have been initialized.

@return [void]

# File lib/shamu/events/active_record/service.rb, line 23
def self.ensure_records!
  return if @ensure_records

  @ensure_records = true
  Migration.new.migrate( :up )
end
new() click to toggle source
Calls superclass method
# File lib/shamu/events/active_record/service.rb, line 30
def initialize
  self.class.ensure_records!
  @channels ||= {}
  @mutex    ||= Mutex.new

  super
end

Public Instance Methods

channel_stats( name, runner_id: nil ) click to toggle source

(see ChannelStats#channel_stats) @param [String] runner_id if provided, only show stats for the given runner.

# File lib/shamu/events/active_record/service.rb, line 78
def channel_stats( name, runner_id: nil )
  channel = fetch_channel( name )
  queue   = Message.where( channel_id: channel[:id] )

  if runner_id && ( runner = create_runner( runner_id ) )
    if runner.last_processed_id
      queue = queue.where( Message.arel_table[ :id ].gt( runner.last_processed_id ) )
    end
  end

  {
    name: name,
    subscribers_count: channel[:subscribers].size,
    dispatching: channel[:dispatching],
    queue_size: queue.count
  }
end
dispatch( runner_id, *names, limit: nil ) click to toggle source

Dispatch queued messages up to the given `limit`. Once all the messages are dispatched, the method returns. A long running process might periodically call dispatch in a loop trapping SIGINT to shutdown.

@param [String] runner_id that identifies the host and process

responding to events.

@param [Array<String>] names of the channels to dispatch. If empty,

dispatches to all subscribed channels.

@param [Integer] limit the maximum number of messages to dispatch. If

not given, defaults to 100.

@return [Hash<String,Integer>] the number of messages actually

dispatched on each channel.
# File lib/shamu/events/active_record/service.rb, line 66
def dispatch( runner_id, *names, limit: nil )
  fail UnknownRunnerError unless runner_id.present?
  names = channels.keys unless channels.present?

  names.each_with_object( {} ) do |name, dispatched|
    state = fetch_channel( name )
    dispatched[name] = dispatch_channel( state, "#{ runner_id }::#{ name }", limit )
  end
end
publish( channel, message ) click to toggle source

(see EventsService#publish)

# File lib/shamu/events/active_record/service.rb, line 39
def publish( channel, message )
  channel_id = fetch_channel( channel )[:id]
  Message.create! channel_id: channel_id, message: serialize( message )
end
subscribe( channel, &callback ) click to toggle source

(see EventsService#subscribe)

# File lib/shamu/events/active_record/service.rb, line 45
def subscribe( channel, &callback )
  state = fetch_channel( channel )
  mutex.synchronize do
    state[:subscribers] << callback
  end
end

Private Instance Methods

bookmark_runner( runner_id, last_message ) click to toggle source
# File lib/shamu/events/active_record/service.rb, line 141
def bookmark_runner( runner_id, last_message )
  return unless last_message

  runner = create_runner( runner_id )
  runner.update_attributes last_processed_id: last_message.id, last_processed_at: Time.now.utc
end
create_channel( name ) click to toggle source
# File lib/shamu/events/active_record/service.rb, line 101
def create_channel( name )
  {
    id: create_named_channel( name ).id,
    subscribers: []
  }
end
create_named_channel( name ) click to toggle source
# File lib/shamu/events/active_record/service.rb, line 166
def create_named_channel( name )
  Channel.transaction( requires_new: true ) do
    Channel.first_or_create!( name: name )
  end
end
create_runner( runner_id ) click to toggle source
# File lib/shamu/events/active_record/service.rb, line 160
def create_runner( runner_id )
  Runner.transaction( requires_new: true ) do
    Runner.first_or_create!( id: runner_id )
  end
end
dispatch_channel( state, runner_id, limit ) click to toggle source
# File lib/shamu/events/active_record/service.rb, line 108
def dispatch_channel( state, runner_id, limit )
  mutex.synchronize do
    return if state[:dispatching]
    state[ :dispatching ] = true
  end

  dispatch_messages( state, runner_id, limit )
ensure
  mutex.synchronize do
    state[ :dispatching ] = false
  end
end
dispatch_messages( state, runner_id, limit ) click to toggle source
# File lib/shamu/events/active_record/service.rb, line 121
def dispatch_messages( state, runner_id, limit )
  last_message = nil
  count = 0

  pending_messages( state, runner_id, limit ).each do |record|
    last_message = record
    message      = deserialize( record.message )

    count += 1

    state[ :subscribers ].each do |subscriber|
      subscriber.call( message )
    end
  end

  bookmark_runner( runner_id, last_message )

  count
end
pending_messages( state, runner_id, limit ) click to toggle source
# File lib/shamu/events/active_record/service.rb, line 148
def pending_messages( state, runner_id, limit )
  messages = Message.where( channel_id: state[:id] )
                    .limit( limit )
  runner   = create_runner( runner_id )

  if runner.last_processed_id
    messages = messages.where( Message.arel_table[:id].gt( runner.last_processed_id ) )
  end

  messages
end