class Megaphone::Client

Constants

FLUENT_DEFAULT_PORT
VERSION

Attributes

logger[R]
origin[R]

Public Class Methods

new(config) click to toggle source

Main entry point for apps using this library. Will default to environment for host and port settings, if not passed. Note that a missing callback_handler will result in a default handler being assigned if the FluentLogger is used.

# File lib/megaphone/client.rb, line 17
def initialize(config)
  @origin = config.fetch(:origin)
  host = config.fetch(:host, ENV['MEGAPHONE_FLUENT_HOST'])
  port = config.fetch(:port, ENV['MEGAPHONE_FLUENT_PORT'] || FLUENT_DEFAULT_PORT)
  overflow_handler = config.fetch(:overflow_handler, nil)
  @logger = Megaphone::Client::Logger.create(host, port, overflow_handler)
end

Public Instance Methods

close() click to toggle source
# File lib/megaphone/client.rb, line 37
def close
  logger.close
end
publish!(topic, subtopic, schema, partition_key, payload) click to toggle source
# File lib/megaphone/client.rb, line 25
def publish!(topic, subtopic, schema, partition_key, payload)
  event = Event.new(topic, subtopic, origin, schema, partition_key, payload)
  raise MegaphoneInvalidEventError.new(event.errors.join(', ')) unless event.valid?
  unless logger.post(topic, event.to_hash)
    if transient_error?(logger.last_error)
      raise MegaphoneMessageDelayWarning.new(logger.last_error.message, event.stream_id)
    else
      raise MegaphoneUnavailableError.new(logger.last_error.message, event.stream_id)
    end
  end
end

Private Instance Methods

transient_error?(err) click to toggle source
# File lib/megaphone/client.rb, line 43
def transient_error?(err)
  err_msg = err.message
  if err_msg.include?("Connection reset by peer")
    true
  elsif err_msg.include?("Broken pipe")
    true
  else
    false
  end
end