class Freddy

Constants

DEFAULT_MAX_CONCURRENCY
FREDDY_TOPIC_EXCHANGE_NAME
VERSION

Public Class Methods

build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) click to toggle source

Creates a new freddy instance

@param [Logger] logger

instance of a logger, defaults to the STDOUT logger

@param [Hash] config

rabbitmq connection information

@option config [String] :host (‘localhost’) @option config [Integer] :port (5672) @option config [String] :user (‘guest’) @option config [String] :pass (‘guest’) @option config [Integer] :max_concurrency (4)

@return [Freddy]

@example

Freddy.build(Logger.new($stdout), user: 'thumper', pass: 'howdy')
# File lib/freddy.rb, line 32
def self.build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config)
  connection = Adapters.determine.connect(config)
  new(connection, logger, max_concurrency)
end
new(connection, logger, max_concurrency) click to toggle source
# File lib/freddy.rb, line 42
def initialize(connection, logger, max_concurrency)
  @connection = connection
  @logger = logger
  @prefetch_buffer_size = max_concurrency

  @send_and_forget_producer = Producers::SendAndForgetProducer.new(
    connection.create_channel, logger
  )
  @send_and_wait_response_producer = Producers::SendAndWaitResponseProducer.new(
    connection.create_channel, logger
  )
end
tracer() click to toggle source

@private

# File lib/freddy.rb, line 38
def self.tracer
  @tracer ||= OpenTelemetry.tracer_provider.tracer('freddy', Freddy::VERSION)
end

Public Instance Methods

close() click to toggle source

Closes the connection with message queue

@return [void]

@example

freddy.close
# File lib/freddy.rb, line 229
def close
  @connection.close
end
deliver(destination, payload, options = {}) click to toggle source

Sends a message to given destination

This is *send and forget* type of delivery. It sends a message to given destination and does not wait for response. This is useful when there are multiple consumers that are using tap_into or you just do not care about the response.

@param [String] destination

the queue name

@param [Hash] payload

the payload that can be serialized to json

@param [Hash] options

the options for delivery

@option options [Integer] :timeout (0)

discards the message after given seconds if nobody consumes it. Message
won't be discarded if timeout it set to 0 (default).

@option options [String] :compress (nil)

- 'zlib' - compresses the payload with zlib

@option options [Hash] :headers (nil)

Arbitrary headers to add as message metadata

@return [void]

@example

freddy.deliver 'Metrics', user_id: 5, metric: 'signed_in'
# File lib/freddy.rb, line 175
def deliver(destination, payload, options = {})
  timeout = options.fetch(:timeout, 0)
  compression_algorithm = options.fetch(:compress, nil)
  opts = {}
  opts[:expiration] = (timeout * 1000).to_i if timeout.positive?
  opts[:content_encoding] = compression_algorithm if compression_algorithm
  opts[:headers] = options[:headers] if options[:headers]

  @send_and_forget_producer.produce(destination, payload, opts)
end
deliver_with_response(destination, payload, options = {}) click to toggle source

Sends a message and waits for the response

@param [String] destination

the queue name

@param [Hash] payload

the payload that can be serialized to json

@param [Hash] options

the options for delivery

@option options [Integer] :timeout (3)

throws a time out exception after given seconds when there is no response

@option options [Boolean] :delete_on_timeout (true)

discards the message when timeout error is raised

@raise [Freddy::TimeoutError]

if nobody responded to the request

@raise [Freddy::InvalidRequestError]

if the responder responded with an error response

@return [Hash] the response

@example

begin
  response = freddy.deliver_with_response 'Users', type: 'fetch_all'
  puts "Got response #{response}"
rescue Freddy::TimeoutError
  puts "Service unavailable"
rescue Freddy::InvalidRequestError => e
  puts "Got error response: #{e.response}"
end
# File lib/freddy.rb, line 215
def deliver_with_response(destination, payload, options = {})
  timeout = options.fetch(:timeout, 3)
  delete_on_timeout = options.fetch(:delete_on_timeout, true)

  @send_and_wait_response_producer.produce destination, payload,
                                           timeout_in_seconds: timeout, delete_on_timeout: delete_on_timeout
end
respond_to(destination, &callback) click to toggle source

Listens and responds to messages

This consumes messages on a given destination. It is useful for messages that have to be processed once and then a result must be sent.

@param [String] destination

the queue name

@yieldparam [Hash<Symbol => Object>] message

Received message as a ruby hash with symbolized keys

@yieldparam [#success, error] handler

Handler for responding to messages. Use handler#success for successful
response and handler#error for error response.

@return [#shutdown]

@example

freddy.respond_to 'RegistrationService' do |attributes, handler|
  if id = register(attributes)
    handler.success(id: id)
  else
    handler.error(message: 'Can not do')
  end
end
# File lib/freddy.rb, line 80
def respond_to(destination, &callback)
  @logger.info "Listening for requests on #{destination}"

  channel = @connection.create_channel(prefetch: @prefetch_buffer_size)
  producer = Producers::ReplyProducer.new(channel, @logger)
  handler_adapter_factory = MessageHandlerAdapters::Factory.new(producer)

  Consumers::RespondToConsumer.consume(
    **{
      thread_pool: Concurrent::FixedThreadPool.new(@prefetch_buffer_size),
      destination: destination,
      channel: channel,
      handler_adapter_factory: handler_adapter_factory
    },
    &callback
  )
end
tap_into(pattern_or_patterns, options = {}, &callback) click to toggle source

Listens for messages without consuming them

This listens for messages on a given destination or destinations without consuming them. It is useful for general messages that two or more clients are interested.

@param [String] pattern_or_patterns

the destination pattern. Use `#` wildcard for matching 0 or more words.
Use `*` to match exactly one word.

@param [Hash] options @option options [String] :group

only one of the listeners in given group will receive a message. All
listeners will receive a message if the group is not specified.

@option options [Boolean] :durable

Should the consumer queue be durable? Default is `false`. This option can
be used only in combination with option `:group`.

@option options [Boolean] :on_exception

Defines consumer's behaviour when the callback fails to process a message
and raises an exception. Can be one of `:ack`, `:reject` or `:requeue`.
`:ack` simply acknowledges the message and re-raises the exception. `:reject`
rejects the message without requeueing it. `:requeue` rejects the message with
`requeue` flag.

@option options [String] :exchange_name

Exchange to bind to. Default is `freddy-topic`.

@yield [message] Yields received message to the block. @yieldparam [Object] payload

Yields the received message's payload.

@yieldparam [String] routing_key

Yields the received message's routing key.

@yieldparam [Time] timestamp

Yields received message's timestamp.

@return [#shutdown]

@example

freddy.tap_into 'notifications.*' do |message|
  puts "Notification showed #{message.inspect}"
end
# File lib/freddy.rb, line 137
def tap_into(pattern_or_patterns, options = {}, &callback)
  @logger.debug "Tapping into messages that match #{pattern_or_patterns}"

  Consumers::TapIntoConsumer.consume(
    **{
      thread_pool: Concurrent::FixedThreadPool.new(@prefetch_buffer_size),
      patterns: Array(pattern_or_patterns),
      channel: @connection.create_channel(prefetch: @prefetch_buffer_size),
      options: options
    },
    &callback
  )
end