class LogStash::Inputs::Nakadi

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/nakadi.rb, line 18
def register
  if defined?(LogStash::Logger) && LogStash::Logger.respond_to?(:setup_log4j)
    LogStash::Logger.setup_log4j(@logger)
  end
  @oauth = OAuth.new
  @subscriptions = {}
  @client = org.zalando.nakadi.client.utils.ClientBuilder.new.withHost(@host).
      withSecuredConnection(true).
      withVerifiedSslCertificate(false).
      withTokenProvider4Java { @oauth.token }.
      buildJavaClient()
end
run(logstash_queue) click to toggle source
# File lib/logstash/inputs/nakadi.rb, line 32
def run(logstash_queue)
  optional = java.util.Optional
  params = org.zalando.nakadi.client.java.StreamParameters.new(
      optional::of(org.zalando.nakadi.client.java.model.Cursor.new("0", "BEGIN")),
      optional::empty, # batchLimit,
      optional::empty, # streamLimit,
      optional::empty, # batchFlushTimeout,
      optional::of(java.lang.Integer.new(0)), # YOU MUST SET streamTimeout=0,
      optional::empty, # streamKeepAliveLimit,
      optional::empty  # flowId
  )

  @event_types.each do |event_type_name|
    listener = NakadiListener.new("#{event_type_name}-listener", logstash_queue, @codec)
    mapper = com.fasterxml.jackson.databind.ObjectMapper.new
    event_type = mapper.constructType(java.util.HashMap.java_class)
    event_batch_type = mapper.getTypeFactory.constructParametricType(
        org.zalando.nakadi.client.java.model.EventStreamBatch.java_class,
        event_type)
    deserializer = org.zalando.nakadi.client.java.model.JavaJacksonJsonMarshaller.deserializer(event_batch_type)
    @client.subscribe(event_type_name, params, listener, deserializer)
    @subscriptions[event_type_name] = listener
    puts("Subscribed to #{event_type_name}")
  end

  until stop?
    sleep 1
  end
end
stop() click to toggle source
# File lib/logstash/inputs/nakadi.rb, line 63
def stop
  @subscriptions.each do |event_type_name, listener|
    @client.unsubscribe(event_type_name, java.util.Optional::of(0), listener)
  end
  @client.stop
  @oauth.stop
end