class AsyncEmitter

The AsyncEmitter class provides a mechanism for asyncronous communication in Ruby programs. Each instantiation provides notification of events registered using any object that is valid as a Hash key. Multiple events can be registered for each key and listeners can be registered for one or many events. Listeners for a key event can be released.

Where more then one listener is registered for an event they are notified in the order they are recieved.

Example

emitter = AsyncEmitter.new
emitter.on :error, lambda { |e| puts "Error: #{e}" }
emitter.on :data, lambda { |data| puts "Data: #{data}" }

begin
    data = get_data_from_somewhere
    emitter.emit :data, data
rescue Exception => e
    emitter.emit :error, e
end

@author Greg Martin

Public Class Methods

new() click to toggle source
# File lib/async_emitter.rb, line 30
def initialize
        @emissions = {}
end

Public Instance Methods

emit(token, data) click to toggle source

Send notification of an event

@param token [Object] the Hash key representing the event @param data [Object] argument to be passed to the events procedure #####################################################################

# File lib/async_emitter.rb, line 87
def emit (token, data)
        @emissions[token][:semaphore] ||= Mutex.new
        @emissions[token][:cv] ||= ConditionVariable.new 
        @emissions[token][:data] ||= []

        @emissions[token][:semaphore].synchronize do
                @emissions[token][:data].push data
                @emissions[token][:cv].signal
        end

end
on(token, p, once_only=false) click to toggle source

Register for notification

@param token [Object] any valid Hash key representing the event @param p [Proc] a procedure to be called on notification @param once_only [Boolean] defualts to false, if true the notification

is removed after being fired once

######################################################################

# File lib/async_emitter.rb, line 42
def on (token, p, once_only=false)
        @emissions[token] ||= {}
        @emissions[token][:p] ||= []
        @emissions[token][:data] ||= []
        @emissions[token][:semaphore] ||= Mutex.new
        @emissions[token][:cv] ||= ConditionVariable.new 
        
        @emissions[token][:p].push Hash[:p => p, :o => once_only]

        @emissions[token][:thread] ||= Thread.new do
                
                @emissions[token][:active] = true
                
                while @emissions[token][:active]
                        @emissions[token][:semaphore].synchronize do
                                self.post_data_for token
                                @emissions[token][:cv].wait @emissions[token][:semaphore]
                                if @emissions[token][:active]
                                        self.post_data_for token
                                end
                        end
                end

        end

end
once(token, p) click to toggle source

Register for single notification - convenience and self documenting method for: on token, proc, true

@param token [Object] any valid Hash key representing the event @param p [Proc] a procedure to be called on notification ######################################################################

# File lib/async_emitter.rb, line 76
def once (token, p)
        self.on token, p, true
end
release(token) click to toggle source

Remove notification for an event

@param token [Object] Hash key representing the event

# File lib/async_emitter.rb, line 105
def release (token)
        @emissions[token][:active] = false
        Thread.kill @emissions[token][:thread]
end
release_all() click to toggle source

Remove all notifications

# File lib/async_emitter.rb, line 113
def release_all 
        @emissions.each do |key, value|
                value[:active] = false
                Thread.kill value[:thread]
        end
end

Protected Instance Methods

post_data_for(token) click to toggle source
# File lib/async_emitter.rb, line 121
def post_data_for (token)
        @emissions[token][:p].each_index do |i|
                o = @emissions[token][:p][i][:o]
                      p = @emissions[token][:p][i][:p]
                
                if i >= @emissions[token][:p].length - 1     
                        while @emissions[token][:data].length > 0 do
                                data = @emissions[token][:data].shift 
                                p.call data
                                if o 
                                        @emissions[token][:data] = []
                                        @emissions[token][:p].slice! i
                                        break
                                end
                        end
                else
                        @emissions[token][:data].each do |data|
                                p.call data
                                if o 
                                        @emissions[token][:p].slice! i
                                        break
                                end
                        end
                end
                
        end
end