class Messagebus::Producer
Producr client class. Provides simple API to publish events. Refresh connections every connectionLifetime user can specify a load-balancer and the producer will connect with different server each connectionLifetime interval effectively rotating publish load to all machines.
parameters: dest (String, required value, name of the queue/topic) host_params (list<string>, required value, eg. '[localhost:61613]') options : A hash map for optional values. user (String, default : '') passwd (String, default : '') conn_lifetime_sec (Int, default:300 secs) receipt_wait_timeout_ms (Int, optoional value, default: 5 seconds)
Constants
- PUBLISH_HEADERS
- SCHEDULED_DELIVERY_TIME_MS_HEADER
Attributes
state[RW]
Public Class Methods
new(host_params, options={})
click to toggle source
Calls superclass method
Messagebus::Connection::new
# File lib/messagebus/producer.rb, line 53 def initialize(host_params, options={}) options = DottableHash.new(options) super(host_params, options) validate_connection_config(@host_params, options) end
Public Instance Methods
actual_publish(dest, message, connect_headers={}, safe=true)
click to toggle source
This is the actual publish method. See publish for why this is designed this way.
# File lib/messagebus/producer.rb, line 94 def actual_publish(dest, message, connect_headers={}, safe=true) if !started? logger.error "Cannot publish without first starting the producer. Current state is '#{@state}'" return end validate_destination_config(dest) publish_internal(dest, message, connect_headers, safe) rescue => e logger.error "Error occurred while publishing the message: #{e}\n #{e.backtrace.join("\n")}" return false end
publish(*args)
click to toggle source
This is implemented with a *args to workaround the historical api requiring a dest_type parameter. That parameter has been removed, but the api has been kept backwards compatible for now.
Historical version
def publish(dest, dest_type, message, connect_headers={}, safe=true)
For the current version see actual_publish.
# File lib/messagebus/producer.rb, line 85 def publish(*args) if args.size > 2 && args[1].is_a?(String) && (args[1].downcase == 'topic' || args[1].downcase == 'queue') logger.warn "Passing dest_type to Producer#publish is deprecated (it isn't needed). Please update your usage." args.delete_at(1) end actual_publish(*args) end
start()
click to toggle source
Start the producer client
# File lib/messagebus/producer.rb, line 60 def start @state = STARTED logger.info "Starting producer with host_params:#{@host_params}" @connection_start_time = Time.new @stomp = start_server(@host_params, @options.user, @options.passwd) rescue => e logger.error "Error occurred while starting a connection: #{e}\n #{e.backtrace.join("\n")}" end
stop()
click to toggle source
Close the producer client
# File lib/messagebus/producer.rb, line 70 def stop @state = STOPPED logger.info "Stopping producer with host_params:#{@host_params}" stop_server(@stomp) rescue => e logger.error "Error occurred while stopping a connection: #{e}\n #{e.backtrace.join("\n")}" end
Private Instance Methods
check_and_refresh_connection()
click to toggle source
# File lib/messagebus/producer.rb, line 174 def check_and_refresh_connection if check_refresh_required logger.debug("Connection status = #{@state}") logger.debug("Refreshing connection...") if @state != STOPPED stop end start end end
check_refresh_required()
click to toggle source
# File lib/messagebus/producer.rb, line 151 def check_refresh_required logger.debug("Checking if we need to refresh connections....") stale_connection = false current_time = Time.new() conn_time = current_time - @connection_start_time refresh_connection = false if conn_time > @options.conn_lifetime_sec stale_connection = true logger.info("Stale connection found, need to refresh connection.") end broken_connection = false if @stomp == nil || !@stomp.respond_to?("running") || !@stomp.running broken_connection = true end if @state == STOPPED || stale_connection || broken_connection refresh_connection = true end refresh_connection end
publish_internal(dest, message, connect_headers, safe_mode=false)
click to toggle source
# File lib/messagebus/producer.rb, line 108 def publish_internal(dest, message, connect_headers, safe_mode=false) if not message.is_a?(Messagebus::Message) raise "ERROR: message should be a Messagebus::Message type." end check_and_refresh_connection logger.debug "publishing message with message Id:#{message.message_id} safe_mode:#{safe_mode}" logger.debug { "message: #{message.inspect}" } connect_headers = connect_headers.merge(PUBLISH_HEADERS) if not safe_mode @stomp.publish(dest, message.to_thrift_binary, connect_headers) return true else receipt_received = false errors_received = nil @stomp.publish(dest, message.to_thrift_binary, connect_headers) do |msg| if msg.command == 'ERROR' errors_received = msg raise "Failed to publish the message while publishing to #{dest} to the server with Error: #{msg.body.to_s} #{caller}" else receipt_received = true end end # wait for receipt up to given timeout. do_with_timeout(@options.receipt_wait_timeout_ms) do if errors_received raise "Failed to publish the message while publishing to #{dest} to the server with Error:\n" + errors_received.body.to_s end if not receipt_received sleep 0.005 else return true end end logger.error "Publish to #{dest} may have failed, publish_safe() timeout while waiting for receipt" raise "publish_safe() timeout while waiting for receipt" end end