class LogStash::Inputs::LogService

Constants

Processor

Attributes

worker[RW]

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/logstash/inputs/logservice.rb, line 17
def initialize(*args)
  super(*args)
end

Public Instance Methods

consume(queue) click to toggle source
# File lib/logstash/inputs/logservice.rb, line 63
def consume(queue)
       while !stop?
           while !Processor.LogstashLogHubProcessor.queueCache.isEmpty
               begin
                   textmap = Processor.LogstashLogHubProcessor.queueCache.poll
                   event = LogStash::Event.new(textmap)
                   decorate(event)
                   queue << event
               rescue Exception => e
                   @logger.error("Consume logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore,
                               :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position,
                               :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e)
                   retry
               end
           end
        Stud.stoppable_sleep(@checkpoint_second) { stop? }
      end # loop

end
register() click to toggle source
# File lib/logstash/inputs/logservice.rb, line 39
def register
  @logger.info("Init logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore,
      :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position,
      :checkpoint_second => @checkpoint_second, :include_meta => @include_meta ,:consumer_name_with_ip => @consumer_name_with_ip)
end
run(queue) click to toggle source
# File lib/logstash/inputs/logservice.rb, line 45
def run(queue)
  @local_address = java.net.InetAddress.getLocalHost().getHostAddress();
  @ip_suffix = ''
  if @consumer_name_with_ip
      @ip_suffix = '_' + @local_address
  end
  @process_pid = "_#{Process.pid}"
  @logger.info("Running logstash-input-logservice",:local_address => @local_address)
  LogHubStarter.startWorker(@endpoint, @access_id, @access_key, @project, @logstore, @consumer_group, @consumer_name + @ip_suffix + @process_pid, @position, @checkpoint_second, @include_meta, @queue_size)
 
  consume(queue)
  rescue Exception => e
      @logger.error("Start logstash-input-logservice", :endpoint => @endpoint, :project => @project, :logstore => @logstore,
          :consumer_group => @consumer_group, :consumer_name => @consumer_name, :position => @position,
          :checkpoint_second => @checkpoint_second, :include_meta => @include_meta, :consumer_name_with_ip => @consumer_name_with_ip, :exception => e)

end
stop() click to toggle source
# File lib/logstash/inputs/logservice.rb, line 83
def stop
  # nothing to do in this case so it is not necessary to define stop
  # examples of common "stop" tasks:
  #  * close sockets (unblocking blocking reads/accepts)
  #  * cleanup temporary files
  #  * terminate spawned threads
end
teardown() click to toggle source
# File lib/logstash/inputs/logservice.rb, line 91
def teardown
  @interrupted = true
  finished
end