class Fluent::GcloudPubSubInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 26 def configure(conf) super raise Fluent::ConfigError, "'topic' must be specified." unless @topic raise Fluent::ConfigError, "'subscription' must be specified." unless @subscription configure_parser(conf) end
configure_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 35 def configure_parser(conf) @parser = Fluent::TextParser.new @parser.configure(conf) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 50 def shutdown super @stop_subscribing = true @subscribe_thread.join end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 40 def start super pubsub = (Gcloud.new @project, @key).pubsub topic = pubsub.topic @topic @client = topic.subscription @subscription @stop_subscribing = false @subscribe_thread = Thread.new(&method(:subscribe)) end
Private Instance Methods
convert_line_to_event(line, es)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 97 def convert_line_to_event(line, es) line.chomp! # remove \n @parser.parse(line) { |time, record| if time && record es.add(time, record) else log.warn "pattern not match: #{line.inspect}" end } rescue => e log.warn line.dump, :error => e.to_s log.debug_backtrace(e.backtrace) end
parse_messages(messages)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 89 def parse_messages(messages) es = MultiEventStream.new messages.each do |m| convert_line_to_event(m.message.data, es) end es end
subscribe()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 63 def subscribe until @stop_subscribing messages = @client.pull max: @max_messages, immediate: @return_immediately if messages.length > 0 es = parse_messages(messages) unless es.empty? begin router.emit_stream(@tag, es) rescue # ignore errors. Engine shows logs and backtraces. end @client.acknowledge messages log.debug "#{messages.length} message(s) processed" end end if @return_immediately sleep @pull_interval end end rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace end