class LogStash::Inputs::LogService
Constants
- Processor
Attributes
worker[RW]
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/logservice.rb, line 17 def initialize(*args) super(*args) end
Public Instance Methods
consume(queue)
click to toggle source
# File lib/logstash/inputs/logservice.rb, line 63 def consume(queue) while !stop? while !Processor.LogstashLogHubProcessor.queueCache.isEmpty begin textmap = Processor.LogstashLogHubProcessor.queueCache.poll event = LogStash::Event.new(textmap) decorate(event) queue << event rescue Exception => e @logger.error("Consume logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e) retry end end Stud.stoppable_sleep(@checkpoint_second) { stop? } end # loop end
register()
click to toggle source
# File lib/logstash/inputs/logservice.rb, line 39 def register @logger.info("Init logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => @include_meta ,:consumer_name_with_ip => @consumer_name_with_ip) end
run(queue)
click to toggle source
# File lib/logstash/inputs/logservice.rb, line 45 def run(queue) @local_address = java.net.InetAddress.getLocalHost().getHostAddress(); @ip_suffix = '' if @consumer_name_with_ip @ip_suffix = '_' + @local_address end @process_pid = "_#{Process.pid}" @logger.info("Running logstash-input-logservice",:local_address => @local_address) LogHubStarter.startWorker(@endpoint, @access_id, @access_key, @project, @logstore, @consumer_group, @consumer_name + @ip_suffix + @process_pid, @position, @checkpoint_second, @include_meta, @queue_size) consume(queue) rescue Exception => e @logger.error("Start logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position, :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e) end
stop()
click to toggle source
# File lib/logstash/inputs/logservice.rb, line 83 def stop # nothing to do in this case so it is not necessary to define stop # examples of common "stop" tasks: # * close sockets (unblocking blocking reads/accepts) # * cleanup temporary files # * terminate spawned threads end
teardown()
click to toggle source
# File lib/logstash/inputs/logservice.rb, line 91 def teardown @interrupted = true finished end