class Messagebus::Producer

Producr client class. Provides simple API to publish events. Refresh connections every connectionLifetime user can specify a load-balancer and the producer will connect with different server each connectionLifetime interval effectively rotating publish load to all machines.

parameters:
dest      (String, required value, name of the queue/topic)
host_params      (list<string>, required value,  eg. '[localhost:61613]')
options : A hash map for optional values.
  user     (String,  default : '')
  passwd  (String,  default : '')
  conn_lifetime_sec      (Int, default:300 secs)
  receipt_wait_timeout_ms (Int, optoional value, default: 5 seconds)

Constants

PUBLISH_HEADERS
SCHEDULED_DELIVERY_TIME_MS_HEADER

Attributes

state[RW]

Public Class Methods

new(host_params, options={}) click to toggle source
Calls superclass method Messagebus::Connection::new
# File lib/messagebus/producer.rb, line 53
def initialize(host_params, options={})
  options = DottableHash.new(options)
  super(host_params, options)
  validate_connection_config(@host_params, options)
end

Public Instance Methods

actual_publish(dest, message, connect_headers={}, safe=true) click to toggle source

This is the actual publish method. See publish for why this is designed this way.

# File lib/messagebus/producer.rb, line 94
def actual_publish(dest, message, connect_headers={}, safe=true)
  if !started?
    logger.error "Cannot publish without first starting the producer. Current state is '#{@state}'"
    return
  end
  validate_destination_config(dest)
  publish_internal(dest, message, connect_headers, safe)
rescue => e
  logger.error "Error occurred while publishing the message: #{e}\n #{e.backtrace.join("\n")}"
  return false
end
publish(*args) click to toggle source

This is implemented with a *args to workaround the historical api requiring a dest_type parameter. That parameter has been removed, but the api has been kept backwards compatible for now.

Historical version

def publish(dest, dest_type, message, connect_headers={}, safe=true)

For the current version see actual_publish.

# File lib/messagebus/producer.rb, line 85
def publish(*args)
  if args.size > 2 && args[1].is_a?(String) && (args[1].downcase == 'topic' || args[1].downcase == 'queue')
    logger.warn "Passing dest_type to Producer#publish is deprecated (it isn't needed). Please update your usage."
    args.delete_at(1)
  end
  actual_publish(*args)
end
start() click to toggle source

Start the producer client

# File lib/messagebus/producer.rb, line 60
def start
  @state = STARTED
  logger.info "Starting producer with host_params:#{@host_params}"
  @connection_start_time = Time.new
  @stomp = start_server(@host_params, @options.user, @options.passwd)
rescue => e
  logger.error "Error occurred while starting a connection: #{e}\n #{e.backtrace.join("\n")}"
end
stop() click to toggle source

Close the producer client

# File lib/messagebus/producer.rb, line 70
def stop
  @state = STOPPED
  logger.info "Stopping producer with host_params:#{@host_params}"
  stop_server(@stomp)
rescue => e
  logger.error "Error occurred while stopping a connection: #{e}\n #{e.backtrace.join("\n")}"
end

Private Instance Methods

check_and_refresh_connection() click to toggle source
# File lib/messagebus/producer.rb, line 174
def check_and_refresh_connection
  if check_refresh_required
    logger.debug("Connection status = #{@state}")
    logger.debug("Refreshing connection...")

    if @state != STOPPED
      stop
    end
    start
  end
end
check_refresh_required() click to toggle source
# File lib/messagebus/producer.rb, line 151
def check_refresh_required
  logger.debug("Checking if we need to refresh connections....")
  stale_connection = false
  current_time = Time.new()
  conn_time = current_time - @connection_start_time
  refresh_connection = false
  if conn_time > @options.conn_lifetime_sec
    stale_connection = true
    logger.info("Stale connection found, need to refresh connection.")
  end

  broken_connection = false
  if @stomp == nil || !@stomp.respond_to?("running") || !@stomp.running
    broken_connection = true
  end


  if @state == STOPPED || stale_connection || broken_connection
    refresh_connection = true
  end
  refresh_connection
end
publish_internal(dest, message, connect_headers, safe_mode=false) click to toggle source
# File lib/messagebus/producer.rb, line 108
def publish_internal(dest, message, connect_headers, safe_mode=false)
  if not message.is_a?(Messagebus::Message)
    raise "ERROR: message should be a Messagebus::Message type."
  end

  check_and_refresh_connection
  logger.debug "publishing message with message Id:#{message.message_id} safe_mode:#{safe_mode}"
  logger.debug { "message: #{message.inspect}" }

  connect_headers = connect_headers.merge(PUBLISH_HEADERS)
  if not safe_mode
    @stomp.publish(dest, message.to_thrift_binary, connect_headers)
    return true
  else
    receipt_received = false
    errors_received = nil
    @stomp.publish(dest, message.to_thrift_binary, connect_headers) do |msg|
      if msg.command == 'ERROR'
        errors_received = msg
        raise "Failed to publish the message while publishing to #{dest} to the server with Error: #{msg.body.to_s} #{caller}"
      else
        receipt_received = true
      end
    end

    # wait for receipt up to given timeout.
    do_with_timeout(@options.receipt_wait_timeout_ms) do
      if errors_received
        raise "Failed to publish the message while publishing to #{dest} to the server with Error:\n" + errors_received.body.to_s
      end

      if not receipt_received
        sleep 0.005
      else
        return true
      end
    end

    logger.error "Publish to #{dest} may have failed, publish_safe() timeout while waiting for receipt"
    raise "publish_safe() timeout while waiting for receipt"
  end
end