class CelluloidPubsub::Reactor

The reactor handles new connections. Based on what the client sends it either subscribes to a channel or will publish to a channel or just dispatch to the server if command is neither subscribe, publish or unsubscribe

@!attribute websocket

@return [Reel::WebSocket] websocket connection

@!attribute server

@return [CelluloidPubsub::Webserver] the server actor to which the reactor is connected to

@!attribute channels

@return [Array] array of channels to which the current reactor has subscribed to

Constants

AVAILABLE_ACTIONS

available actions that can be delegated

Attributes

channels[RW]

The channels to which this reactor has subscribed to @return [Array] array of channels to which the current reactor has subscribed to

options[R]

The same options passed to the server are available on the reactor too @return [Hash] Hash with all the options passed to the server

server[RW]

The server instance to which this reactor is linked to @return [CelluloidPubsub::Webserver] the server actor to which the reactor is connected to

websocket[RW]

The websocket connection received from the server @return [Reel::WebSocket] websocket connection

Public Instance Methods

actor_died(actor, reason) click to toggle source

method called when the actor is exiting

@param [actor] actor - the current actor @param [Hash] reason - the reason it crashed

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 521
def actor_died(actor, reason)
  @shutting_down = true
  log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
end
adapter_options() click to toggle source

the method will return options needed when configuring an adapter @see celluloid_pubsub_redis_adapter for more information

@return [Hash] returns options needed by the adapter

@api public

# File lib/celluloid_pubsub/reactor.rb, line 102
def adapter_options
  @adapter_options ||= options['adapter_options'] || {}
end
add_subscriber_to_channel(channel, message) click to toggle source

adds the curent actor the list of the subscribers for a particular channel and registers the new channel

@param [String] channel @param [Object] message

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 412
def add_subscriber_to_channel(channel, message)
  registry_channels = CelluloidPubsub::Registry.channels
  @channels << channel
  registry_channels << channel unless registry_channels.include?(channel)
  @server.mutex.synchronize do
    @server.subscribers[channel] = channel_subscribers(channel).push(reactor: Actor.current, message: message)
  end
end
channel_subscribers(channel) click to toggle source

this method will return a list of all subscribers to a particular channel or a empty array

@param [String] channel The channel that will be used to fetch all subscribers from this channel

@return [Array] returns a list of all subscribers to a particular channel or a empty array

@api public

# File lib/celluloid_pubsub/reactor.rb, line 399
def channel_subscribers(channel)
  @server.subscribers[channel] || []
end
clear_unpublished_messages(channel) click to toggle source

the method clears all the messages left unpublished in a channel

@param [String] channel

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 376
def clear_unpublished_messages(channel)
  CelluloidPubsub::Registry.messages[channel] = []
end
debug_enabled?() click to toggle source

the method will return true if debug is enabled

@return [Boolean] returns true if debug is enabled otherwise false

@api public

# File lib/celluloid_pubsub/reactor.rb, line 122
def debug_enabled?
  @debug_enabled = options.fetch('enable_debug', false)
  @debug_enabled == true
end
delegate_action(json_data) click to toggle source

method that checks if the data is a Hash

if the data is a hash then will stringify the keys and will call the method {#delegate_action} that will handle the message, otherwise will call the method {#handle_unknown_action}

@see delegate_action @see handle_unknown_action

@param [Hash] json_data @option json_data [String] :client_action The action based on which the reactor will decide what action should make

Possible values are:
  unsubscribe_all
  unsubscribe_clients
  unsubscribe
  subscribe
  publish

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 247
def delegate_action(json_data)
  async.send(json_data['client_action'], json_data['channel'], json_data)
end
delete_server_subscribers(channel) click to toggle source

the method will delete the reactor from the channel list on the server

@param [String] channel

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 302
def delete_server_subscribers(channel)
  @server.mutex.synchronize do
    (@server.subscribers[channel] || []).delete_if do |hash|
      hash[:reactor] == Actor.current
    end
  end
end
forget_channel(channel) click to toggle source

if the reactor has unsubscribed from all his channels will close the websocket connection, otherwise will delete the channel from his channel list

@param [String] channel The channel that needs to be deleted from the reactor's list of subscribed channels

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 272
def forget_channel(channel)
  if @channels.blank?
    @websocket.close
  else
    @channels.delete(channel)
  end
end
handle_parsed_websocket_message(json_data) click to toggle source

method that checks if the data is a Hash

if the data is a hash then will stringify the keys and will call the method {#delegate_action} that will handle the message, otherwise will call the method {#handle_unknown_action}

@see delegate_action @see handle_unknown_action

@param [Hash] json_data

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 215
def handle_parsed_websocket_message(json_data)
  data = json_data.is_a?(Hash) ? json_data.stringify_keys : {}
  if CelluloidPubsub::Reactor::AVAILABLE_ACTIONS.include?(data['client_action'].to_s)
    log_debug "#{self.class} finds actions for  #{json_data}"
    delegate_action(data) if data['client_action'].present?
  else
    handle_unknown_action(data['channel'], json_data)
  end
end
handle_unknown_action(channel, json_data) click to toggle source

the method will delegate the message to the server in an asyncronous way by sending the current actor and the message @see CelluloidPubsub::WebServer#handle_dispatched_message

@param [Hash] json_data

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 259
def handle_unknown_action(channel, json_data)
  log_debug "Trying to dispatch   to server  #{json_data} on channel #{channel}"
  @server.async.handle_dispatched_message(Actor.current, json_data)
end
handle_websocket_message(message) click to toggle source

method that handles the message received from the websocket connection first will try to parse the message {#parse_json_data} and then it will dispatch it to another method that will decide depending the message what action should the reactor take {#handle_parsed_websocket_message}

@see parse_json_data @see handle_parsed_websocket_message

@param [JSON] message

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 196
def handle_websocket_message(message)
  log_debug "#{reactor_class} read message  #{message}"
  json_data = parse_json_data(message)
  handle_parsed_websocket_message(json_data)
end
initialize_data(websocket, server) { |websocket, server| ... } click to toggle source

initializes the actor

@param [Reel::WebSocket] websocket @param [CelluloidPubsub::WebServer] server

@return [Celluloid::Actor] returns the actor

@api public

# File lib/celluloid_pubsub/reactor.rb, line 65
def initialize_data(websocket, server)
  @websocket = websocket
  @server = server
  @options = @server.server_options
  @channels = []
  @shutting_down = false
  setup_celluloid_logger
  log_debug "#{self.class} Streaming changes for #{websocket.url} #{websocket.class.name}"
  yield(websocket, server) if block_given?
  cell_actor
end
log_file_path() click to toggle source

the method will return the file path of the log file where debug messages will be printed

@return [String] returns the file path of the log file where debug messages will be printed

@api public

# File lib/celluloid_pubsub/reactor.rb, line 83
def log_file_path
  @log_file_path ||= options.fetch('log_file_path', nil)
end
log_level() click to toggle source

the method will return the log level of the logger

@return [Integer, nil] return the log level used by the logger ( default is 1 - info)

@api public

# File lib/celluloid_pubsub/reactor.rb, line 92
def log_level
  @log_level ||= options['log_level'] || ::Logger::Severity::INFO
end
parse_json_data(message) click to toggle source

method used to parse a JSON object into a Hash object

@param [JSON] message

@return [Hash]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 175
def parse_json_data(message)
  log_debug "#{reactor_class} received #{message}"
  JSON.parse(message)
rescue StandardError => e
  log_debug "#{reactor_class} could not parse #{message} because of #{e.inspect}"
  message
end
publish(current_topic, json_data) click to toggle source
method for publishing data to a channel

@param [String] current_topic The Channel to which the reactor instance {CelluloidPubsub::Reactor} will publish the message to @param [Object] json_data The additional data that contains the message that needs to be sent

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 429
def publish(current_topic, json_data)
  message = json_data['data'].to_json
  return if current_topic.blank? || message.blank?
  server_publish_event(current_topic, message)
rescue StandardError => e
  log_debug("could not publish message #{message} into topic #{current_topic} because of #{e.inspect}")
end
reactor_class() click to toggle source

the method will return the reactor's class name used in debug messages

@return [Class] returns the reactor's class name used in debug messages

@api public

# File lib/celluloid_pubsub/reactor.rb, line 164
def reactor_class
  self.class
end
run() click to toggle source

reads from the socket the message and dispatches it to the handle_websocket_message method @see handle_websocket_message

@return [void]

@api public

:nocov:

# File lib/celluloid_pubsub/reactor.rb, line 136
def run
  loop do
    break if shutting_down? || actor_dead?(Actor.current) || @websocket.closed? || actor_dead?(@server)
    message = try_read_websocket
    handle_websocket_message(message) if message.present?
  end
end
save_unpublished_message(current_topic, message) click to toggle source

the method save the message for a specific channel if there are no subscribers

@param [String] current_topic @param [#to_s] message

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 463
def save_unpublished_message(current_topic, message)
  @server.timers_mutex.synchronize do
    (CelluloidPubsub::Registry.messages[current_topic] ||= []) << message
  end
end
send_unpublished(channel) click to toggle source

this method will write to the socket all messages that were published to that channel before the actor subscribed

@param [String] channel @return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 362
def send_unpublished(channel)
  return if (messages = unpublished_messages(channel)).blank?
  messages.each do |msg|
    @websocket << msg.to_json
  end
end
server_kill_reactors(channel) click to toggle source

kills all reactors registered on a channel and closes their websocket connection

@param [String] channel @return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 503
def server_kill_reactors(channel)
  @server.mutex.synchronize do
    (@server.subscribers[channel] || []).dup.pmap do |hash|
      reactor = hash[:reactor]
      reactor.websocket.close
      Celluloid::Actor.kill(reactor)
    end
  end
end
server_publish_event(current_topic, message) click to toggle source

the method will publish to all subsribers of a channel a message

@param [String] current_topic @param [#to_s] message

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 445
def server_publish_event(current_topic, message)
  if (subscribers = @server.subscribers[current_topic]).present?
    subscribers.dup.pmap do |hash|
      hash[:reactor].websocket << message
    end
  else
    save_unpublished_message(current_topic, message)
  end
end
shutdown() click to toggle source

the method will terminate the current actor

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 330
def shutdown
  @shutting_down = true
  log_debug "#{self.class} tries to 'shutdown'"
  @websocket.close if @websocket.present? && !@websocket.closed?
  terminate
end
shutting_down?() click to toggle source

the method will return true if the actor is shutting down

@return [Boolean] returns true if the actor is shutting down

@api public

# File lib/celluloid_pubsub/reactor.rb, line 112
def shutting_down?
  @shutting_down == true
end
subscribe(channel, message) click to toggle source

this method will add the current actor to the list of the subscribers {#add_subscriber_to_channel} and will write to the socket a message for succesful subscription

@see add_subscriber_to_channel

@param [String] channel @param [Object] message

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 348
def subscribe(channel, message)
  return unless channel.present?
  add_subscriber_to_channel(channel, message)
  log_debug "#{self.class} subscribed to #{channel} with #{message}"
  @websocket << message.merge('client_action' => 'successful_subscription', 'channel' => channel).to_json if @server.adapter == CelluloidPubsub::WebServer::CLASSIC_ADAPTER
end
try_read_websocket() click to toggle source

will try to read the message from the websocket and if it fails will log the exception if debug is enabled

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 152
def try_read_websocket
  @websocket.closed? ? nil : @websocket.read
rescue StandardError
  nil
end
unpublished_messages(channel) click to toggle source

the method will return a list of all unpublished messages in a channel

@param [String] channel

@return [Array] the list of messages that were not published

@api public

# File lib/celluloid_pubsub/reactor.rb, line 387
def unpublished_messages(channel)
  (messages = CelluloidPubsub::Registry.messages[channel]).present? ? messages : []
end
unsubscribe(channel, _json_data) click to toggle source

the method will unsubscribe a client by closing the websocket connection if has unscribed from all channels and deleting the reactor from the channel list on the server

@param [String] channel

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 288
def unsubscribe(channel, _json_data)
  log_debug "#{self.class} runs 'unsubscribe' method with  #{channel}"
  return unless channel.present?
  forget_channel(channel)
  delete_server_subscribers(channel)
end
unsubscribe_all(_channel, json_data) click to toggle source

unsubscribes all actors from all channels and terminates the current actor

@param [String] _channel NOT USED - needed to maintain compatibility with the other methods @param [Object] json_data NOT USED - needed to maintain compatibility with the other methods

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 477
def unsubscribe_all(_channel, json_data)
  log_debug "#{self.class} runs 'unsubscribe_all' method"
  CelluloidPubsub::Registry.channels.dup.pmap do |channel|
    unsubscribe_clients(channel, json_data)
  end
  log_debug 'clearing connections'
  shutdown
end
unsubscribe_clients(channel, _json_data) click to toggle source

the method will unsubscribe all clients subscribed to a channel by closing the

@param [String] channel

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 317
def unsubscribe_clients(channel, _json_data)
  log_debug "#{self.class} runs 'unsubscribe_clients' method with  #{channel}"
  return if channel.blank?
  unsubscribe_from_channel(channel)
  @server.subscribers[channel] = []
end
unsubscribe_from_channel(channel) click to toggle source

unsubscribes all actors from the specified chanel

@param [String] channel @return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 492
def unsubscribe_from_channel(channel)
  log_debug "#{self.class} runs 'unsubscribe_from_channel' method with #{channel}"
  server_kill_reactors(channel)
end
work(websocket, server) click to toggle source
rececives a new socket connection from the server
and listens for messages

@param [Reel::WebSocket] websocket

@return [void]

@api public

# File lib/celluloid_pubsub/reactor.rb, line 51
def work(websocket, server)
  initialize_data(websocket, server)
  server.reactors << Actor.current
  async.run
end