class Messagebus::ClusterMap
Attributes
address[R]
destinations[R]
Public Class Methods
new(config)
click to toggle source
# File lib/messagebus/cluster_map.rb, line 38 def initialize(config) config = DottableHash.new(config) Messagebus::Client.logger.debug { "Initializing ClusterMap with config: #{config.inspect}" } if clusters = config.clusters config.clusters.each do |cluster_config| # Merge cluster config with top level config. # cluster level values should override top level values. cluster = config.merge(cluster_config) create_cluster(cluster) end end end
Public Instance Methods
find(destination_name)
click to toggle source
# File lib/messagebus/cluster_map.rb, line 70 def find(destination_name) destinations[destination_name] end
start()
click to toggle source
# File lib/messagebus/cluster_map.rb, line 52 def start cluster_producer_map.each do |cluster_name, producer| Messagebus::Client.logger.info "Starting producer for cluster: #{cluster_name} with host_params: #{producer.host_params}" producer.start end end
stop()
click to toggle source
# File lib/messagebus/cluster_map.rb, line 59 def stop cluster_producer_map.each do |cluster_name, producer| Messagebus::Client.logger.info "Stopping producer for cluster: #{cluster_name} with host_params: #{producer.host_params}" if producer.started? producer.stop else Messagebus::Client.logger.warn "#{producer.host_params} was not active, ignoring stop request." end end end
update_config(config)
click to toggle source
# File lib/messagebus/cluster_map.rb, line 78 def update_config(config) Messagebus::Client.logger.debug { "Reloading ClusterMap with config: #{config.inspect}" } config = DottableHash.new(config) if clusters = config.clusters config.clusters.each do |cluster_config| cluster = config.merge(cluster_config) #cluster exists - check and update configs if cluster_producer_map.has_key?(cluster.name) #check for prodcuer config update_cluster(cluster) else #new cluster => create it create_cluster(cluster, true) end end end end
Private Instance Methods
cluster_producer_map()
click to toggle source
# File lib/messagebus/cluster_map.rb, line 151 def cluster_producer_map @cluster_producer_map ||= {} end
create_cluster(cluster, producer_start = false)
click to toggle source
# File lib/messagebus/cluster_map.rb, line 127 def create_cluster(cluster, producer_start = false) Messagebus::Client.logger.debug "Initializing cluster: #{cluster.inspect}" producer = Messagebus::Producer.new( cluster.producer_address, :user => cluster.user, :passwd => cluster.passwd, :receipt_wait_timeout_ms => cluster.receipt_wait_timeout_ms || 5000, :conn_lifetime_sec => cluster.conn_lifetime_sec || 300 ) cluster_producer_map[cluster.name] = producer if cluster.destinations && !cluster.destinations.empty? cluster.destinations.each do |destination_name| load_destination(destination_name, producer) end else raise Client::InitializationError.new("no destinations defined") end if producer_start producer.start end end
load_destination(destination_name, producer)
click to toggle source
# File lib/messagebus/cluster_map.rb, line 155 def load_destination(destination_name, producer) validate_destination_config(destination_name) destinations[destination_name] = producer Messagebus::Client.logger.info "loaded #{destination_name} => #{producer.host_params}" end
update_cluster(cluster)
click to toggle source
# File lib/messagebus/cluster_map.rb, line 98 def update_cluster(cluster) producer = cluster_producer_map[cluster.name] #check for new producer address =>add to exisiting host params list #do nothing if producer not found in new config cluster_host_params = [cluster.producer_address] unless cluster.producer_address.is_a?(Array) producer_host_params = producer.host_params cluster_host_params.each do |address| if !producer_host_params.include?(address) producer_host_params = producer_host_params.to_a.push address end end producer.host_params=(producer_host_params) options = producer.options options['receipt_wait_timeout_ms'] = cluster.receipt_wait_timeout_ms || options['receipt_wait_timeout_ms'] options['conn_lifetime_sec'] = cluster.conn_lifetime_sec || options['conn_lifetime_sec'] producer.options=(options) #load new destination, same producer reference used if cluster.destinations && !cluster.destinations.empty? cluster.destinations.each do |destination_name| load_destination(destination_name, producer) end else raise Client::InitializationError.new("no destinations defined") end end