class Messagebus::Client

Attributes

cluster_map[R]
config[R]
last_reload_time[R]
logger[R]

Public Class Methods

logger() click to toggle source
# File lib/messagebus/client.rb, line 161
def logger
  @@logger ||= Logger.new(Messagebus::LOG_DEFAULT_FILE)
end
new(config_hash) click to toggle source
# File lib/messagebus/client.rb, line 56
def initialize(config_hash)

  @@logger = if provided_logger = config_hash.delete(:logger)
    provided_logger
  elsif log_file = config_hash["log_file"]
    Logger.new(log_file)
  else
    Logger.new(Messagebus::LOG_DEFAULT_FILE)
  end

  # This is required to do a deep clone of config hash object
  @config = DottableHash.new(Marshal.load(Marshal.dump(config_hash)))
  @config.merge!(@config.cluster_defaults) if @config.cluster_defaults
  @@logger.level = Logger::Severity.const_get(@config.log_level.upcase) if @config.log_level

  @enable_client_logger_thread_debugging = config.enable_client_logger_thread_debugging

  logger.debug "Initializing Messagebus client."
  @cluster_map = ClusterMap.new(@config)
  #added for reloading config on interval
  @last_reload_time = Time.now
end
start(config_hash) { |client| ... } click to toggle source
# File lib/messagebus/client.rb, line 42
def self.start(config_hash)
  client = new(config_hash)
  client.start

  if block_given?
    begin
      yield client
    ensure
      client.stop
    end
  end
  client
end

Public Instance Methods

headers(delay) click to toggle source
# File lib/messagebus/client.rb, line 149
def headers(delay)
  headers = {}
  unless delay == 0
    schedule_time = (Time.now.to_i * 1000 + delay).to_s
    headers.merge!({
      Messagebus::Producer::SCHEDULED_DELIVERY_TIME_MS_HEADER => schedule_time
    })
  end
  headers
end
publish(destination_name, object, delay_ms = 0, safe = true, binary = false, headers = {}) click to toggle source
# File lib/messagebus/client.rb, line 99
def publish(destination_name, object, delay_ms = 0, safe = true, binary = false, headers = {})

  if !(@config.enable_auto_init_connections)
    logger.warn "Config['enable_auto_init_connections'] is false, not publishing destination_name=#{destination_name}, message_contents=#{object.inspect}"
    false
  else
    producer = cluster_map.find(destination_name)
    if producer.nil?
      logger.error "Not publishing due to unconfigured destionation name. destination_name=#{destination_name}, message=#{object.inspect}"
      raise InvalidDestinationError, "Destination #{destination_name} not found"
    end

    if binary
      message = Messagebus::Message.create(object, nil, binary)
    else
      message = Messagebus::Message.create(object)
    end

    logger.info "Publishing to destination_name=#{destination_name}, message_id=#{message.message_id}, message_contents=#{object.inspect}"

    begin
      publish_result = nil
      duration = Benchmark.realtime do
        publish_result = producer.publish(destination_name, message, headers(delay_ms).merge(headers), safe)
      end
      duration = (duration * 1_000).round

      if publish_result
        logger.info "Message publishing to #{destination_name} took #{duration} result=success destination_name=#{destination_name}, message_id=#{message.message_id}, duration=#{duration}ms"
        true
      else
        logger.error "Failed to publish message result=fail destination_name=#{destination_name}, message_id=#{message.message_id}, duration=#{duration}ms, message_contents=#{object.inspect}"
        false
      end
    rescue => e
      logger.error "Failed to publish message result=error destination_name=#{destination_name}, message_id=#{message.message_id}, duration=#{duration}ms, message_contents=#{object.inspect}, error=#{e.inspect}, backtrace=#{e.backtrace.join("|")}"
      false
    end
  end
end
reload_config_on_interval(config, interval = 300) click to toggle source
# File lib/messagebus/client.rb, line 140
def reload_config_on_interval(config, interval = 300)
  logger.info "Relaoding client configs after interval=#{interval}"
  now = Time.now
  if(now - @last_reload_time) >= interval
    @cluster_map.update_config(config)
    @last_reload_time = now
  end
end
start() click to toggle source

Starts up all the connections to the bus. Optionally takes a block to which it yields self. When the block is passed, it will auto close the connections after the block finishes.

# File lib/messagebus/client.rb, line 82
def start
  if @config.enable_auto_init_connections
    logger.info "auto enable connections set, starting clusters."
    @cluster_map.start
  else
    logger.info "Config['enable_auto_init_connections'] is false, will not start any messagebus producers."
  end
end
stop() click to toggle source
# File lib/messagebus/client.rb, line 91
def stop
  cluster_map.stop
end