class Fluent::Plugin::GcloudPubSubInput
Constants
- DEFAULT_PARSER_TYPE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 108 def configure(conf) compat_parameters_convert(conf, :parser) super @rpc_srv = nil @rpc_thread = nil @stop_pull = false @extract_tag = if @tag_key.nil? method(:static_tag) else method(:dynamic_tag) end @parser = parser_create @decompress = if @decompression == 'gzip' method(:gzip_decompress) else method(:no_decompress) end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 144 def shutdown if @rpc_srv @rpc_srv.shutdown @rpc_srv = nil end if @rpc_thread @rpc_thread = nil end @stop_subscribing = true @subscribe_threads.each(&:join) super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 129 def start super start_rpc if @enable_rpc @subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription log.debug "connected subscription:#{@subscription} in project #{@project}" @emit_guard = Mutex.new @stop_subscribing = false @subscribe_threads = [] @pull_threads.times do |idx| @subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe)) end end
start_pull()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 162 def start_pull @stop_pull = false log.info "start pull from subscription:#{@subscription}" end
status_of_pull()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 167 def status_of_pull @stop_pull ? 'stopped' : 'started' end
stop_pull()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 157 def stop_pull @stop_pull = true log.info "stop pull from subscription:#{@subscription}" end
Private Instance Methods
_subscribe()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 218 def _subscribe messages = @subscriber.pull @return_immediately, @max_messages if messages.length == 0 log.debug "no messages are pulled" return end process messages @subscriber.acknowledge messages log.debug "#{messages.length} message(s) processed" rescue Fluent::GcloudPubSub::RetryableError => ex log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s rescue => ex log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s log.error_backtrace ex.backtrace end
dynamic_tag(record)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 177 def dynamic_tag(record) record.delete(@tag_key) || @tag end
gzip_decompress(message)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 181 def gzip_decompress(message) decompress message end
no_decompress(message)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 185 def no_decompress(message) message end
process(messages)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 236 def process(messages) event_streams = Hash.new do |hsh, key| hsh[key] = Fluent::MultiEventStream.new end messages.each do |m| line = @decompress.call(m.message.data).chomp attributes = m.attributes @parser.parse(line) do |time, record| if time && record @attribute_keys.each do |key| record[key] = attributes[key] end event_streams[@extract_tag.call(record)].add(time, record) else case @parse_error_action when :exception raise FailedParseError.new "pattern not match: #{line}" else log.warn 'pattern not match', record: line end end end end event_streams.each do |tag, es| # There are some output plugins not to supposed to be called with multi-threading. # Maybe remove in the future. @emit_guard.synchronize do router.emit_stream(tag, es) end end end
start_rpc()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 189 def start_rpc log.info "listening http rpc server on http://#{@rpc_bind}:#{@rpc_port}/" @rpc_srv = WEBrick::HTTPServer.new( { BindAddress: @rpc_bind, Port: @rpc_port, Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), AccessLog: [] } ) @rpc_srv.mount('/api/in_gcloud_pubsub/pull/', RPCServlet, self) @rpc_thread = thread_create(:in_gcloud_pubsub_rpc_thread){ @rpc_srv.start } end
static_tag(record)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 173 def static_tag(record) @tag end
subscribe()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 205 def subscribe until @stop_subscribing _subscribe unless @stop_pull if @return_immediately || @stop_pull sleep @pull_interval end end rescue => ex log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s log.error_backtrace ex.backtrace end