class Poseidon::SyncProducer

Used by Producer for sending messages to the kafka cluster.

You should not use this interface directly

Fetches metadata at appropriate times. Builds MessagesToSend Handle MessageBatchToSend lifecyle

Who is responsible for fetching metadata from broker seed list?

Do we want to be fetching from real live brokers eventually?

@api private

Constants

OPTION_DEFAULTS

Attributes

ack_timeout_ms[R]
client_id[R]
max_send_retries[R]
metadata_refresh_interval_ms[R]
required_acks[R]
retry_backoff_ms[R]
socket_timeout_ms[R]

Public Class Methods

new(client_id, seed_brokers, options = {}) click to toggle source
# File lib/poseidon/sync_producer.rb, line 29
def initialize(client_id, seed_brokers, options = {})
  @client_id = client_id

  handle_options(options.dup)

  @cluster_metadata   = ClusterMetadata.new
  @message_conductor  = MessageConductor.new(@cluster_metadata, @partitioner)
  @broker_pool        = BrokerPool.new(client_id, seed_brokers, socket_timeout_ms)
end

Public Instance Methods

close() click to toggle source
# File lib/poseidon/sync_producer.rb, line 72
def close
  @broker_pool.close
end
Also aliased as: shutdown
send_messages(messages) click to toggle source
# File lib/poseidon/sync_producer.rb, line 39
def send_messages(messages)
  return if messages.empty?

  messages_to_send = MessagesToSend.new(messages, @cluster_metadata)

  if refresh_interval_elapsed?
    refresh_metadata(messages_to_send.topic_set)
  end

  ensure_metadata_available_for_topics(messages_to_send)

  (@max_send_retries+1).times do
    messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker|
      if sent = send_to_broker(messages_for_broker)
        messages_to_send.successfully_sent(sent)
      end
    end

    if !messages_to_send.pending_messages? || @max_send_retries == 0
      break
    else
      Kernel.sleep retry_backoff_ms / 1000.0
      refresh_metadata(messages_to_send.topic_set)
    end
  end

  if messages_to_send.pending_messages?
    raise "Failed to send all messages: #{messages_to_send.messages} remaining"
  else
    true
  end
end
shutdown()
Alias for: close

Private Instance Methods

ensure_metadata_available_for_topics(messages_to_send) click to toggle source
# File lib/poseidon/sync_producer.rb, line 80
def ensure_metadata_available_for_topics(messages_to_send)
  return if !messages_to_send.needs_metadata?

  Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set}. (Attempt 1)" }
  refresh_metadata(messages_to_send.topic_set)
  return if !messages_to_send.needs_metadata?

  2.times do |n|
    sleep 5

    Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set}. (Attempt #{n+2})" }
    refresh_metadata(messages_to_send.topic_set)
    return if !messages_to_send.needs_metadata?
  end
  raise Errors::UnableToFetchMetadata
end
handle_option(options, sym) click to toggle source
# File lib/poseidon/sync_producer.rb, line 117
def handle_option(options, sym)
  options.delete(sym) || OPTION_DEFAULTS[sym]
end
handle_options(options) click to toggle source
# File lib/poseidon/sync_producer.rb, line 97
def handle_options(options)
  @ack_timeout_ms    = handle_option(options, :ack_timeout_ms)
  @socket_timeout_ms = handle_option(options, :socket_timeout_ms)
  @retry_backoff_ms  = handle_option(options, :retry_backoff_ms)

  @metadata_refresh_interval_ms = 
    handle_option(options, :metadata_refresh_interval_ms)

  @required_acks    = handle_option(options, :required_acks)
  @max_send_retries = handle_option(options, :max_send_retries)

  @compression_config = ProducerCompressionConfig.new(
    handle_option(options, :compression_codec), 
    handle_option(options, :compressed_topics))

  @partitioner = handle_option(options, :partitioner)

  raise ArgumentError, "Unknown options: #{options.keys.inspect}" if options.keys.any?
end
refresh_interval_elapsed?() click to toggle source
# File lib/poseidon/sync_producer.rb, line 121
def refresh_interval_elapsed?
  @cluster_metadata.last_refreshed_at.nil? ||
    (Time.now - @cluster_metadata.last_refreshed_at) > metadata_refresh_interval_ms
end
refresh_metadata(topics) click to toggle source
# File lib/poseidon/sync_producer.rb, line 126
def refresh_metadata(topics)
  topics_to_refresh = topics.dup

  @cluster_metadata.topics.each do |topic|
    topics_to_refresh.add(topic)
  end

  @cluster_metadata.update(@broker_pool.fetch_metadata(topics_to_refresh))
  @broker_pool.update_known_brokers(@cluster_metadata.brokers)
end
send_to_broker(messages_for_broker) click to toggle source
# File lib/poseidon/sync_producer.rb, line 137
def send_to_broker(messages_for_broker)
  return false if messages_for_broker.broker_id == -1
  to_send = messages_for_broker.build_protocol_objects(@compression_config)

  Poseidon.logger.debug { "Sending messages to broker #{messages_for_broker.broker_id}" }
  response = @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce,
                                          required_acks, ack_timeout_ms,
                                          to_send)
  if required_acks == 0
    messages_for_broker.messages
  else
    messages_for_broker.successfully_sent(response)
  end
rescue Connection::ConnectionFailedError
  false
end