class Megaphone::Client
Constants
- FLUENT_DEFAULT_PORT
- VERSION
Attributes
logger[R]
origin[R]
Public Class Methods
new(config)
click to toggle source
Main entry point for apps using this library. Will default to environment for host and port settings, if not passed. Note that a missing callback_handler will result in a default handler being assigned if the FluentLogger
is used.
# File lib/megaphone/client.rb, line 17 def initialize(config) @origin = config.fetch(:origin) host = config.fetch(:host, ENV['MEGAPHONE_FLUENT_HOST']) port = config.fetch(:port, ENV['MEGAPHONE_FLUENT_PORT'] || FLUENT_DEFAULT_PORT) overflow_handler = config.fetch(:overflow_handler, nil) @logger = Megaphone::Client::Logger.create(host, port, overflow_handler) end
Public Instance Methods
close()
click to toggle source
# File lib/megaphone/client.rb, line 37 def close logger.close end
publish!(topic, subtopic, schema, partition_key, payload)
click to toggle source
# File lib/megaphone/client.rb, line 25 def publish!(topic, subtopic, schema, partition_key, payload) event = Event.new(topic, subtopic, origin, schema, partition_key, payload) raise MegaphoneInvalidEventError.new(event.errors.join(', ')) unless event.valid? unless logger.post(topic, event.to_hash) if transient_error?(logger.last_error) raise MegaphoneMessageDelayWarning.new(logger.last_error.message, event.stream_id) else raise MegaphoneUnavailableError.new(logger.last_error.message, event.stream_id) end end end
Private Instance Methods
transient_error?(err)
click to toggle source
# File lib/megaphone/client.rb, line 43 def transient_error?(err) err_msg = err.message if err_msg.include?("Connection reset by peer") true elsif err_msg.include?("Broken pipe") true else false end end