class Fluent::Plugin::GcloudPubSubInput

Constants

DEFAULT_PARSER_TYPE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 108
def configure(conf)
  compat_parameters_convert(conf, :parser)
  super
  @rpc_srv = nil
  @rpc_thread = nil
  @stop_pull = false

  @extract_tag = if @tag_key.nil?
                   method(:static_tag)
                 else
                   method(:dynamic_tag)
                 end

  @parser = parser_create
  @decompress = if @decompression == 'gzip'
                  method(:gzip_decompress)
                else
                  method(:no_decompress)
                end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 144
def shutdown
  if @rpc_srv
    @rpc_srv.shutdown
    @rpc_srv = nil
  end
  if @rpc_thread
    @rpc_thread = nil
  end
  @stop_subscribing = true
  @subscribe_threads.each(&:join)
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 129
def start
  super
  start_rpc if @enable_rpc

  @subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription
  log.debug "connected subscription:#{@subscription} in project #{@project}"

  @emit_guard = Mutex.new
  @stop_subscribing = false
  @subscribe_threads = []
  @pull_threads.times do |idx|
    @subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe))
  end
end
start_pull() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 162
def start_pull
  @stop_pull = false
  log.info "start pull from subscription:#{@subscription}"
end
status_of_pull() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 167
def status_of_pull
  @stop_pull ? 'stopped' : 'started'
end
stop_pull() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 157
def stop_pull
  @stop_pull = true
  log.info "stop pull from subscription:#{@subscription}"
end

Private Instance Methods

_subscribe() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 218
def _subscribe
  messages = @subscriber.pull @return_immediately, @max_messages
  if messages.length == 0
    log.debug "no messages are pulled"
    return
  end

  process messages
  @subscriber.acknowledge messages

  log.debug "#{messages.length} message(s) processed"
rescue Fluent::GcloudPubSub::RetryableError => ex
  log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s
rescue => ex
  log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
  log.error_backtrace ex.backtrace
end
dynamic_tag(record) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 177
def dynamic_tag(record)
  record.delete(@tag_key) || @tag
end
gzip_decompress(message) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 181
def gzip_decompress(message)
  decompress message
end
no_decompress(message) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 185
def no_decompress(message)
  message
end
process(messages) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 236
def process(messages)
  event_streams = Hash.new do |hsh, key|
    hsh[key] = Fluent::MultiEventStream.new
  end

  messages.each do |m|
    line = @decompress.call(m.message.data).chomp
    attributes = m.attributes
    @parser.parse(line) do |time, record|
      if time && record
        @attribute_keys.each do |key|
          record[key] = attributes[key]
        end

        event_streams[@extract_tag.call(record)].add(time, record)
      else
        case @parse_error_action
        when :exception
          raise FailedParseError.new "pattern not match: #{line}"
        else
          log.warn 'pattern not match', record: line
        end
      end
    end
  end

  event_streams.each do |tag, es|
    # There are some output plugins not to supposed to be called with multi-threading.
    # Maybe remove in the future.
    @emit_guard.synchronize do
      router.emit_stream(tag, es)
    end
  end
end
start_rpc() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 189
def start_rpc
  log.info "listening http rpc server on http://#{@rpc_bind}:#{@rpc_port}/"
  @rpc_srv = WEBrick::HTTPServer.new(
    {
      BindAddress: @rpc_bind,
      Port: @rpc_port,
      Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL),
      AccessLog: []
    }
  )
  @rpc_srv.mount('/api/in_gcloud_pubsub/pull/', RPCServlet, self)
  @rpc_thread = thread_create(:in_gcloud_pubsub_rpc_thread){
    @rpc_srv.start
  }
end
static_tag(record) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 173
def static_tag(record)
  @tag
end
subscribe() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 205
def subscribe
  until @stop_subscribing
    _subscribe unless @stop_pull

    if @return_immediately || @stop_pull
      sleep @pull_interval
    end
  end
rescue => ex
  log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
  log.error_backtrace ex.backtrace
end