class Fluent::GcloudPubSubInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 26
def configure(conf)
  super

  raise Fluent::ConfigError, "'topic' must be specified." unless @topic
  raise Fluent::ConfigError, "'subscription' must be specified." unless @subscription

  configure_parser(conf)
end
configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 35
def configure_parser(conf)
  @parser = Fluent::TextParser.new
  @parser.configure(conf)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 50
def shutdown
  super

  @stop_subscribing = true
  @subscribe_thread.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 40
def start
  super

  pubsub = (Gcloud.new @project, @key).pubsub
  topic = pubsub.topic @topic
  @client = topic.subscription @subscription
  @stop_subscribing = false
  @subscribe_thread = Thread.new(&method(:subscribe))
end

Private Instance Methods

convert_line_to_event(line, es) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 97
def convert_line_to_event(line, es)
  line.chomp!  # remove \n
  @parser.parse(line) { |time, record|
    if time && record
      es.add(time, record)
    else
      log.warn "pattern not match: #{line.inspect}"
    end
  }
rescue => e
  log.warn line.dump, :error => e.to_s
  log.debug_backtrace(e.backtrace)
end
parse_messages(messages) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 89
def parse_messages(messages)
  es = MultiEventStream.new
  messages.each do |m|
    convert_line_to_event(m.message.data, es)
  end
  es
end
subscribe() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 63
def subscribe
  until @stop_subscribing
    messages = @client.pull max: @max_messages, immediate: @return_immediately

    if messages.length > 0
      es = parse_messages(messages)
      unless es.empty?
        begin
          router.emit_stream(@tag, es)
        rescue
          # ignore errors. Engine shows logs and backtraces.
        end
        @client.acknowledge messages
        log.debug "#{messages.length} message(s) processed"
      end
    end

    if @return_immediately
      sleep @pull_interval
    end
  end
rescue
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
end