class LogStash::Inputs::Nakadi
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/nakadi.rb, line 18 def register if defined?(LogStash::Logger) && LogStash::Logger.respond_to?(:setup_log4j) LogStash::Logger.setup_log4j(@logger) end @oauth = OAuth.new @subscriptions = {} @client = org.zalando.nakadi.client.utils.ClientBuilder.new.withHost(@host). withSecuredConnection(true). withVerifiedSslCertificate(false). withTokenProvider4Java { @oauth.token }. buildJavaClient() end
run(logstash_queue)
click to toggle source
# File lib/logstash/inputs/nakadi.rb, line 32 def run(logstash_queue) optional = java.util.Optional params = org.zalando.nakadi.client.java.StreamParameters.new( optional::of(org.zalando.nakadi.client.java.model.Cursor.new("0", "BEGIN")), optional::empty, # batchLimit, optional::empty, # streamLimit, optional::empty, # batchFlushTimeout, optional::of(java.lang.Integer.new(0)), # YOU MUST SET streamTimeout=0, optional::empty, # streamKeepAliveLimit, optional::empty # flowId ) @event_types.each do |event_type_name| listener = NakadiListener.new("#{event_type_name}-listener", logstash_queue, @codec) mapper = com.fasterxml.jackson.databind.ObjectMapper.new event_type = mapper.constructType(java.util.HashMap.java_class) event_batch_type = mapper.getTypeFactory.constructParametricType( org.zalando.nakadi.client.java.model.EventStreamBatch.java_class, event_type) deserializer = org.zalando.nakadi.client.java.model.JavaJacksonJsonMarshaller.deserializer(event_batch_type) @client.subscribe(event_type_name, params, listener, deserializer) @subscriptions[event_type_name] = listener puts("Subscribed to #{event_type_name}") end until stop? sleep 1 end end
stop()
click to toggle source
# File lib/logstash/inputs/nakadi.rb, line 63 def stop @subscriptions.each do |event_type_name, listener| @client.unsubscribe(event_type_name, java.util.Optional::of(0), listener) end @client.stop @oauth.stop end