class PubSubModelSync::ServiceKafka

Constants

LISTEN_SETTINGS
PRODUCER_SETTINGS
PUBLISH_SETTINGS
QTY_WORKERS

Attributes

consumer[RW]

@!attribute topic_names (Array): ['topic 1', 'topic 2']

service[RW]

@!attribute topic_names (Array): ['topic 1', 'topic 2']

topic_names[RW]

@!attribute topic_names (Array): ['topic 1', 'topic 2']

Public Class Methods

new() click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 19
def initialize
  settings = config.kafka_connection
  settings[1][:client_id] ||= config.subscription_key
  @service = Kafka.new(*settings)
  @topic_names = ensure_topics(Array(config.topic_name || 'model_sync'))
end

Public Instance Methods

listen_messages() click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 26
def listen_messages
  log('Listener starting...')
  start_consumer
  consumer.each_message(LISTEN_SETTINGS, &method(:process_message))
rescue PubSubModelSync::Runner::ShutDown
  log('Listener stopped')
rescue => e
  log("Error listening message: #{[e.message, e.backtrace]}", :error)
end
publish(payload) click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 36
def publish(payload)
  message_topics = Array(payload.headers[:topic_name] || config.default_topic_name)
  message_topics.each do |topic_name|
    producer.produce(encode_payload(payload), message_settings(payload, topic_name))
  end
end
stop() click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 43
def stop
  log('Listener stopping...')
  consumer.stop
end

Private Instance Methods

ensure_topics(names) click to toggle source

Check topic existence, create if missing topic @param names (Array<String>,String) @return (Array,String) return @param names

# File lib/pub_sub_model_sync/service_kafka.rb, line 81
def ensure_topics(names)
  missing_topics = Array(names) - (@known_topics || service.topics)
  missing_topics.each do |name|
    service.create_topic(name)
  end
  @known_topics ||= [] # cache service.topics to reduce verification time
  @known_topics = (@known_topics + Array(names)).uniq
  names
end
message_settings(payload, topic_name) click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 50
def message_settings(payload, topic_name)
  {
    topic: ensure_topics(topic_name),
    partition_key: payload.headers[:ordering_key],
    headers: { SERVICE_KEY => true }
  }.merge(PUBLISH_SETTINGS)
end
process_message(message) click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 74
def process_message(message)
  super(message.value) if message.headers[SERVICE_KEY]
end
producer() click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 67
def producer
  return self.class.producer if self.class.producer

  at_exit { self.class.producer.shutdown }
  self.class.producer = service.async_producer(PRODUCER_SETTINGS)
end
start_consumer() click to toggle source
# File lib/pub_sub_model_sync/service_kafka.rb, line 58
def start_consumer
  subscription_key = config.subscription_key
  @consumer = service.consumer(group_id: subscription_key)
  topic_names.each do |topic_name|
    log("Subscribed to topic: #{topic_name} as #{subscription_key}")
    consumer.subscribe(topic_name)
  end
end