class Poseidon::SyncProducer
Used by Producer
for sending messages to the kafka cluster.
You should not use this interface directly
Fetches metadata at appropriate times. Builds MessagesToSend
Handle MessageBatchToSend lifecyle
Who is responsible for fetching metadata from broker seed list?
Do we want to be fetching from real live brokers eventually?
@api private
Constants
- OPTION_DEFAULTS
Attributes
ack_timeout_ms[R]
client_id[R]
max_send_retries[R]
metadata_refresh_interval_ms[R]
required_acks[R]
retry_backoff_ms[R]
socket_timeout_ms[R]
Public Class Methods
new(client_id, seed_brokers, options = {})
click to toggle source
# File lib/poseidon/sync_producer.rb, line 29 def initialize(client_id, seed_brokers, options = {}) @client_id = client_id handle_options(options.dup) @cluster_metadata = ClusterMetadata.new @message_conductor = MessageConductor.new(@cluster_metadata, @partitioner) @broker_pool = BrokerPool.new(client_id, seed_brokers, socket_timeout_ms) end
Public Instance Methods
close()
click to toggle source
# File lib/poseidon/sync_producer.rb, line 72 def close @broker_pool.close end
Also aliased as: shutdown
send_messages(messages)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 39 def send_messages(messages) return if messages.empty? messages_to_send = MessagesToSend.new(messages, @cluster_metadata) if refresh_interval_elapsed? refresh_metadata(messages_to_send.topic_set) end ensure_metadata_available_for_topics(messages_to_send) (@max_send_retries+1).times do messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker| if sent = send_to_broker(messages_for_broker) messages_to_send.successfully_sent(sent) end end if !messages_to_send.pending_messages? || @max_send_retries == 0 break else Kernel.sleep retry_backoff_ms / 1000.0 refresh_metadata(messages_to_send.topic_set) end end if messages_to_send.pending_messages? raise "Failed to send all messages: #{messages_to_send.messages} remaining" else true end end
Private Instance Methods
ensure_metadata_available_for_topics(messages_to_send)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 80 def ensure_metadata_available_for_topics(messages_to_send) return if !messages_to_send.needs_metadata? Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set}. (Attempt 1)" } refresh_metadata(messages_to_send.topic_set) return if !messages_to_send.needs_metadata? 2.times do |n| sleep 5 Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set}. (Attempt #{n+2})" } refresh_metadata(messages_to_send.topic_set) return if !messages_to_send.needs_metadata? end raise Errors::UnableToFetchMetadata end
handle_option(options, sym)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 117 def handle_option(options, sym) options.delete(sym) || OPTION_DEFAULTS[sym] end
handle_options(options)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 97 def handle_options(options) @ack_timeout_ms = handle_option(options, :ack_timeout_ms) @socket_timeout_ms = handle_option(options, :socket_timeout_ms) @retry_backoff_ms = handle_option(options, :retry_backoff_ms) @metadata_refresh_interval_ms = handle_option(options, :metadata_refresh_interval_ms) @required_acks = handle_option(options, :required_acks) @max_send_retries = handle_option(options, :max_send_retries) @compression_config = ProducerCompressionConfig.new( handle_option(options, :compression_codec), handle_option(options, :compressed_topics)) @partitioner = handle_option(options, :partitioner) raise ArgumentError, "Unknown options: #{options.keys.inspect}" if options.keys.any? end
refresh_interval_elapsed?()
click to toggle source
# File lib/poseidon/sync_producer.rb, line 121 def refresh_interval_elapsed? @cluster_metadata.last_refreshed_at.nil? || (Time.now - @cluster_metadata.last_refreshed_at) > metadata_refresh_interval_ms end
refresh_metadata(topics)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 126 def refresh_metadata(topics) topics_to_refresh = topics.dup @cluster_metadata.topics.each do |topic| topics_to_refresh.add(topic) end @cluster_metadata.update(@broker_pool.fetch_metadata(topics_to_refresh)) @broker_pool.update_known_brokers(@cluster_metadata.brokers) end
send_to_broker(messages_for_broker)
click to toggle source
# File lib/poseidon/sync_producer.rb, line 137 def send_to_broker(messages_for_broker) return false if messages_for_broker.broker_id == -1 to_send = messages_for_broker.build_protocol_objects(@compression_config) Poseidon.logger.debug { "Sending messages to broker #{messages_for_broker.broker_id}" } response = @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce, required_acks, ack_timeout_ms, to_send) if required_acks == 0 messages_for_broker.messages else messages_for_broker.successfully_sent(response) end rescue Connection::ConnectionFailedError false end