class PubSubModelSync::ServiceKafka
Constants
- LISTEN_SETTINGS
- PRODUCER_SETTINGS
- PUBLISH_SETTINGS
- QTY_WORKERS
Attributes
consumer[RW]
@!attribute topic_names
(Array): ['topic 1', 'topic 2']
service[RW]
@!attribute topic_names
(Array): ['topic 1', 'topic 2']
topic_names[RW]
@!attribute topic_names
(Array): ['topic 1', 'topic 2']
Public Class Methods
new()
click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 19 def initialize settings = config.kafka_connection settings[1][:client_id] ||= config.subscription_key @service = Kafka.new(*settings) @topic_names = ensure_topics(Array(config.topic_name || 'model_sync')) end
Public Instance Methods
listen_messages()
click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 26 def listen_messages log('Listener starting...') start_consumer consumer.each_message(LISTEN_SETTINGS, &method(:process_message)) rescue PubSubModelSync::Runner::ShutDown log('Listener stopped') rescue => e log("Error listening message: #{[e.message, e.backtrace]}", :error) end
publish(payload)
click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 36 def publish(payload) message_topics = Array(payload.headers[:topic_name] || config.default_topic_name) message_topics.each do |topic_name| producer.produce(encode_payload(payload), message_settings(payload, topic_name)) end end
stop()
click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 43 def stop log('Listener stopping...') consumer.stop end
Private Instance Methods
ensure_topics(names)
click to toggle source
Check topic existence, create if missing topic @param names (Array<String>,String) @return (Array,String) return @param names
# File lib/pub_sub_model_sync/service_kafka.rb, line 81 def ensure_topics(names) missing_topics = Array(names) - (@known_topics || service.topics) missing_topics.each do |name| service.create_topic(name) end @known_topics ||= [] # cache service.topics to reduce verification time @known_topics = (@known_topics + Array(names)).uniq names end
message_settings(payload, topic_name)
click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 50 def message_settings(payload, topic_name) { topic: ensure_topics(topic_name), partition_key: payload.headers[:ordering_key], headers: { SERVICE_KEY => true } }.merge(PUBLISH_SETTINGS) end
process_message(message)
click to toggle source
Calls superclass method
PubSubModelSync::ServiceBase#process_message
# File lib/pub_sub_model_sync/service_kafka.rb, line 74 def process_message(message) super(message.value) if message.headers[SERVICE_KEY] end
producer()
click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 67 def producer return self.class.producer if self.class.producer at_exit { self.class.producer.shutdown } self.class.producer = service.async_producer(PRODUCER_SETTINGS) end
start_consumer()
click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 58 def start_consumer subscription_key = config.subscription_key @consumer = service.consumer(group_id: subscription_key) topic_names.each do |topic_name| log("Subscribed to topic: #{topic_name} as #{subscription_key}") consumer.subscribe(topic_name) end end