class Messagebus::Consumer

Consumer client class. Provides a single access thread for all messagebus servers. Takes in a list of messagebus servers to receive from, open connections to all servers, does round robin receives across all servers.

parameters:
dest      (String, required value, name of the queue/topic)
host_params      (list<string>, required value,  eg. '[localhost:61613]')
options : A hash map for optional values.
  user     (String,  default : '')
  passwd  (String,  default : '')
  ack_type      (String, required value: Messagebus::ACK_TYPE_AUTO_CLIENT OR Messagebus::ACK_TYPE_CLIENT)
                autoClient: module acks internally automatically for each receive.
                client: User should explicit ack *each* message.
  conn_lifetime_sec      (Int, default:300 secs)
  subscription_id      (String, required for topic, Each subscription is identified by a unique Id,
                               for a topic different subscriptions means each subscription gets copy of
                               message each, same subscription_id across multiple Consumers means load-balancing
                               messages for that subscription.)
  enable_dynamic_serverlist_fetch:  (Boolean, Enable the consumer to fetch the list of brokers actively default: true)
  dynamic_serverlist_fetch_url_override      (String, The override url to fetch the list of consumers dynamically.)
  dynamic_fetch_timeout_ms  (Integer, milliseconds to wait for http response of dynamic serverlist fetch)
  receipt_wait_timeout_ms (Int, optoional value, default: 5 seconds)

Attributes

received_messages[RW]
servers_running[RW]
state[RW]

Public Class Methods

new(host_params, options = {}) click to toggle source
Calls superclass method Messagebus::Connection::new
# File lib/messagebus/consumer.rb, line 75
def initialize(host_params, options = {})
  options = DottableHash.new({:ack_type => Messagebus::ACK_TYPE_CLIENT, :enable_dynamic_serverlist_fetch => true }).merge(options)
  options.merge!(options.cluster_defaults) if options.cluster_defaults

  super(host_params, options)

  validate_destination_config(@options.destination_name, true, options)
  validate_connection_config(@host_params, options)

  @received_messages = Queue.new
  @servers_running = {}
  @logger = Logger.new(options[:log_file]) if options[:log_file]
end
start(host_params, options={}) { |consumer| ... } click to toggle source
# File lib/messagebus/consumer.rb, line 62
def self.start(host_params, options={})
  consumer = new(host_params, options)
  consumer.start
  if block_given?
    begin
      yield consumer
    ensure
      consumer.stop
    end
  end
  consumer
end

Public Instance Methods

ack(safe_mode = false) click to toggle source

Ack the last received message. Message broker will keep resending messages (after retry_wait and upto retry_max_times) till it sees an ack for the message.

# File lib/messagebus/consumer.rb, line 170
def ack(safe_mode = false)
  if not @last_received.nil?
    begin
      logger.info("Sending ack() for message with id:#{@last_received[:decoded_msg].message_id}")
      if not safe_mode
        begin
          @servers_running[@last_received[:host_param]].acknowledge(@last_received[:msg])
          @last_received = nil
          return true
        
        rescue => e
          logger.error("Failed to ack message. Was the connection removed? #{e.message} #{e.backtrace.join("|")}")
        end
      else
        receipt_received = false
        errors_received = nil
        @servers_running[@last_received[:host_param]].acknowledge(@last_received[:msg]) do |msg|
          if msg.command == 'ERROR'
            errors_received = msg
            raise "Failed to ack message with Error: #{msg.body.to_s} #{caller}"
          else
            receipt_received = true
            @last_received = nil
          end
        end

        # wait for receipt up to given timeout.
        do_with_timeout(@options.receipt_wait_timeout_ms) do
          if errors_received
            raise "Failed to ack message in safe mode with Error: " + errors_received.body.to_s
          end

          if not receipt_received
            sleep 0.005
          else
            return true
          end
        end
      end
    end
  end
end
credit() click to toggle source

Send consumer credit back for last received message.

# File lib/messagebus/consumer.rb, line 155
def credit()
  if not @last_received.nil?
    begin
      logger.info("Sending consumer credit for message with id:#{@last_received[:decoded_msg].message_id}")

      @servers_running[@last_received[:host_param]].credit(@last_received[:msg])
    rescue NameError => e
      logger.error("Failed to credit message. Was the connection removed?. #{e.message} #{e.backtrace.join("|")}")
    end
  end
end
delete_subscription() click to toggle source
# File lib/messagebus/consumer.rb, line 275
def delete_subscription()
  host_params = @servers_running.keys
  if not host_params.nil?
    host_params.each do |host_param|
      logger.info("Unsubscribing #{@options.destination_name} consumer client for #{host_param}")
      client = @servers_running[host_param]
      client.unsubscribe(@options.destination_name)
    end
  end
end
fetch_serverlist() click to toggle source
# File lib/messagebus/consumer.rb, line 238
def fetch_serverlist
  if @options.dynamic_serverlist_fetch_url_override
    dynamic_serverlist_fetch_url = @options.dynamic_serverlist_fetch_url_override
  else
    dynamic_serverlist_fetch_url = get_dynamic_fetch_url(@host_params)
  end

  logger.info("trying to fetch dynamic url #{dynamic_serverlist_fetch_url}")
  begin
    data = fetch_uri(dynamic_serverlist_fetch_url)
    data = data.gsub(' ', '')
    serverlist = data.split(',')
    serverlist.each do |server|
      if SERVER_REGEX.match(server).nil?
        raise "bad data returned from dynamic url: #{data}"
      end
    end
    return serverlist

  rescue => e
    logger.error("Failed to fetch server list from url:#{dynamic_serverlist_fetch_url} with exception: #{e.message}, #{e.backtrace.join("|")}")
    return nil
  end
end
get_dynamic_fetch_url(host_params) click to toggle source
# File lib/messagebus/consumer.rb, line 263
def get_dynamic_fetch_url(host_params)
  case host_params
  when Array
    host_param = host_params[rand(host_params.length)]
  when String
    host_param = host_params
  end

  host, port = host_param.split(':')
  return 'http://' + host + ':8081/jmx?command=get_attribute&args=org.hornetq%3Amodule%3DCore%2Ctype%3DServer%20ListOfBrokers'
end
insert_sentinel_value(final_message=nil) click to toggle source

This is used to insert an unblock message into the consumer. A use case is when you're using a blocking receive, and you want to unblock a separate thread or tell a consumer to unblock from a signal handler. See also Messagebus::Swarm::Drone#stop

en.wikipedia.org/wiki/Sentinel_value

# File lib/messagebus/consumer.rb, line 118
def insert_sentinel_value(final_message=nil)
  # push a message onto our consumer so that if we're currently blocking on waiting for a message
  # we'll see this and do no further processing
  @received_messages.push({:stop_processing_sentinel => true, :msg => final_message})
end
keepalive() click to toggle source
# File lib/messagebus/consumer.rb, line 226
def keepalive
  @servers_running.each do |host_param, client|
    begin
      client.keepalive()
      @last_received = nil
    rescue => e
      logger.error("Failed to send keepalive to #{host_param}")
    end
  end
end
logger() click to toggle source
# File lib/messagebus/consumer.rb, line 89
def logger
  @logger ||= Client.logger
end
nack() click to toggle source
# File lib/messagebus/consumer.rb, line 213
def nack
  if not @last_received.nil?
    begin
      logger.info("Sending nack() for message with id:#{@last_received[:decoded_msg].message_id}")

      @servers_running[@last_received[:host_param]].nack(@last_received[:msg])
      @last_received = nil
    rescue => e
      logger.error("Failed to nack message. Was the connection removed? #{e.message} #{e.backtrace.join("|")}")
    end
  end
end
receive() click to toggle source

Blocking receive: block till a value is available. Returns the message(Messagebus::Message) received or block indefinately.

# File lib/messagebus/consumer.rb, line 126
def receive
  return receive_internal(non_blocking=false)
end
receive_immediate() click to toggle source

Non-Blocking receive. Returns the message(Messagebus::Message) received or nil immediately.

# File lib/messagebus/consumer.rb, line 146
def receive_immediate()
  if not @received_messages.empty?
    return receive_internal(non_blocking=true)
  else
    return nil
  end
end
receive_timeout(timeout_ms=1000) click to toggle source

Blocking receive with timeout: block till a value is available for passed timeout. Returns the message(Messagebus::Message) received or raise MessageReceiveTimeout(“timeout”)

# File lib/messagebus/consumer.rb, line 132
def receive_timeout(timeout_ms=1000)
  do_with_timeout(timeout_ms) {
    if @received_messages.empty?
      sleep 0.01
    else
      return receive_internal(non_blocking=true)
    end
  }

  raise MessageReceiveTimeout, "receive timeout(" + timeout_ms.to_s +  ") while waiting for message to arrive."
end
refresh_servers() click to toggle source
# File lib/messagebus/consumer.rb, line 286
def refresh_servers
  logger.info("refreshing consumer threads.")
  @refresh_time = Time.new()
  hosts = @servers_running.keys
  if @options.enable_dynamic_serverlist_fetch
    # Fetch the server list from the dynamic server list fetch url.
    begin
      updated_server_list = fetch_serverlist
    rescue => e
      logger.error "Error in refresh server #{e} \n Stack Trace: #{e.backtrace.join("|")}"
    end

    if not updated_server_list.nil?
      hosts = updated_server_list
    end
  end

  logger.info("refreshing servers current_list:#{@servers_running.keys.inspect} new list:#{hosts.inspect}")

  servers_added = hosts - @servers_running.keys

  # start new servers
  if servers_added and not servers_added.empty?
    logger.info("Adding new servers in:#{servers_added.inspect}")
    if not servers_added.empty?()
      start_servers(servers_added)
    end
  end

end
start() click to toggle source

Start the consumers and all connections. Optionally takes a block to which it yields self. When the block is passed, it will auto close the connections after the block finishes.

# File lib/messagebus/consumer.rb, line 96
def start
  @state = STARTED
  logger.info("Starting consumers with host_params:#{@host_params.inspect} for destination:#{@options.destination_name}")
  start_servers(@host_params, true)
  refresh_servers
end
stop() click to toggle source

Close the consumers and all connections

# File lib/messagebus/consumer.rb, line 104
def stop
  @state = STOPPED
  logger.info("Stopping consumers for running servers:#{@servers_running.keys.inspect}")

  stop_servers(@servers_running.keys)
end

Private Instance Methods

fetch_uri(uri) click to toggle source
# File lib/messagebus/consumer.rb, line 318
def fetch_uri(uri)
  uri = URI(uri)
  begin
    if uri and http = Net::HTTP.new(uri.host, uri.port)
      http.open_timeout = @options.dynamic_fetch_timeout_ms/1000.0
      http.read_timeout = @options.dynamic_fetch_timeout_ms/1000.0
      http.start {|http|
        response = http.request_get(uri.request_uri)
          if response.class == Net::HTTPOK
           logger.info("fetched_uri: #{response.body}")
           return response.body
          else
            logger.error("fetch_uri got bad response from server:#{response}")
            return nil
        end
      }
    end
  rescue => e
    logger.error("Failed to fetch dynamic server list with exception: #{e.message}, #{e.backtrace.join("|")}")
    return nil
  end
end
receive_internal(non_blocking=false) click to toggle source
# File lib/messagebus/consumer.rb, line 412
def receive_internal(non_blocking=false)
  current_time = Time.new();
  if current_time > @refresh_time + @options.conn_lifetime_sec
    refresh_servers
  end

  received_message = @received_messages.pop(non_blocking)
  if received_message[:stop_processing_sentinel]
    return received_message[:msg]
  end

  @last_received = received_message
  if !@last_received.nil?
    original_message = @last_received[:msg]
    if original_message.command == 'ERROR'
      logger.error("received error frame from server:\n#{original_message}")
      raise ErrorFrameReceived, "received error frame from server:\n#{original_message}"
    else
      message = @last_received[:decoded_msg]
      logger.info("received message with id:#{message.message_id}")

      # send back credits for this message.
      credit

      # send back the ack if user has choosen autoClient ack mode.
      if @options.ack_type == Messagebus::ACK_TYPE_AUTO_CLIENT
        ack
      end

      return message
    end
  end
end
start_servers(host_params, fail_on_error = false) click to toggle source
# File lib/messagebus/consumer.rb, line 372
def start_servers(host_params, fail_on_error = false)
  host_params = [host_params] if host_params.is_a?(String)

  if host_params
    host_params.each do |host_param|
      begin
        logger.info("Starting messagebus consumer client for #{host_param}")

        client = start_server(host_param, @options.user, @options.passwd, @options.subscription_id)
        subscribe_client(client)
        @servers_running[host_param] = client
      rescue => e
        logger.error("Failed to start server #{host_param} with exception: #{e.message}, #{e.backtrace.join("|")}")
        raise if fail_on_error
      end
    end
  end
rescue
  stop
  raise
end
stop_servers(host_params) click to toggle source
# File lib/messagebus/consumer.rb, line 394
def stop_servers(host_params)
  if not host_params.nil?
    host_params.each do |host_param|
      host, port = host_param.split(':')
      if @servers_running.has_key?(host_param)
        begin
          logger.info("Stopping messagebus consumer client for #{host_param}")
          client = @servers_running[host_param]
          stop_server(client)
          @servers_running.delete(host_param)
        rescue => e
          logger.error("Failed to stop server #{host_param} with exception: #{e.message}, #{e.backtrace.join("|")}")
        end
      end
    end
  end
end
subscribe_client(client) click to toggle source
# File lib/messagebus/consumer.rb, line 342
def subscribe_client(client)
  # Add the options for the client.
  # ack is 'client' for server in both 'client'/'autoClient' cases.
  options = {:ack => "client"}
  if not @options.subscription_id.nil?
    options["durable-subscriber-name"] = @options.subscription_id
    options["id"] = @options.subscription_id
    options["client-id"] = @options.subscription_id
  else
    # We need to set 'id' irrespective here.
    options[:id] = @options.destination_name
  end

  client.subscribe(@options.destination_name, options) do |msg|
    decoded_msg = nil
    if msg.command != 'ERROR'
      begin
        decoded_msg = Messagebus::Message.get_message_from_thrift_binary(msg.body)
        decoded_msg.message_properties = msg.headers
        @received_messages.push({:msg => msg, :host_param => client.host + ":" + client.port.to_s, :decoded_msg => decoded_msg})
      rescue => e
        logger.error("Failed to decode message:\n#{msg}\n#{e.message} #{e.backtrace.join("|")}")
      end
    else
      logger.info("ERROR frame received:\n#{msg}" )
    end

  end
end