class Fluent::GoogleCloudOutput

fluentd output plugin for the Stackdriver Logging API

Constants

GRPC_SEVERITY_MAPPING
PLUGIN_NAME
PLUGIN_VERSION

Follows semver.org format.

SEVERITY_TRANSLATIONS

Translates other severity strings to one of the valid values above.

VALID_SEVERITIES

Values permitted by the API for ‘severity’ (which is an enum).

Attributes

common_labels[R]

Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.

monitoring_resource[R]

Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.

resource[R]

Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud.rb, line 436
def initialize
  super
  # use the global logger
  @log = $log # rubocop:disable Style/GlobalVars

  @failed_requests_count = nil
  @successful_requests_count = nil
  @dropped_entries_count = nil
  @ingested_entries_count = nil
  @retried_entries_count = nil

  @ok_code = nil
  @uptime_update_time = Time.now.to_i
end
version_string() click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 811
def self.version_string
  @version_string ||= "google-fluentd/#{PLUGIN_VERSION}"
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud.rb, line 451
def configure(conf)
  super

  # TODO(qingling128): Remove this warning after the support is added. Also
  # remove the comment in the description of this configuration.
  unless @logging_api_url == DEFAULT_LOGGING_API_URL || @use_grpc
    @log.warn 'Detected customized logging_api_url while use_grpc is not' \
              ' enabled. Customized logging_api_url for the non-gRPC path' \
              ' is not supported. The logging_api_url option will be' \
              ' ignored.'
  end

  # Alert on old authentication configuration.
  unless @auth_method.nil? && @private_key_email.nil? &&
         @private_key_path.nil? && @private_key_passphrase.nil?
    extra = []
    extra << 'auth_method' unless @auth_method.nil?
    extra << 'private_key_email' unless @private_key_email.nil?
    extra << 'private_key_path' unless @private_key_path.nil?
    extra << 'private_key_passphrase' unless @private_key_passphrase.nil?

    raise Fluent::ConfigError,
          "#{PLUGIN_NAME} no longer supports auth_method.\n" \
          "Please remove configuration parameters: #{extra.join(' ')}"
  end

  set_regexp_patterns

  @utils = Common::Utils.new(@log)

  @platform = @utils.detect_platform(@use_metadata_service)

  # Treat an empty setting of the credentials file path environment variable
  # as unset. This way the googleauth lib could fetch the credentials
  # following the fallback path.
  ENV.delete(CREDENTIALS_PATH_ENV_VAR) if
    ENV[CREDENTIALS_PATH_ENV_VAR] == ''

  # Set required variables: @project_id, @vm_id, @vm_name and @zone.
  @project_id = @utils.get_project_id(@platform, @project_id)
  @vm_id = @utils.get_vm_id(@platform, @vm_id)
  @vm_name = @utils.get_vm_name(@vm_name)
  @zone = @utils.get_location(@platform, @zone, @use_aws_availability_zone)

  # All metadata parameters must now be set.
  @utils.check_required_metadata_variables(
    @platform, @project_id, @zone, @vm_id
  )

  # Retrieve monitored resource.
  # Fail over to retrieve monitored resource via the legacy path if we fail
  # to get it from Metadata Agent.
  @resource ||= @utils.determine_agent_level_monitored_resource_via_legacy(
    @platform, @subservice_name, @detect_subservice, @vm_id, @zone
  )

  if @metrics_resource
    unless @metrics_resource[:type].is_a?(String)
      raise Fluent::ConfigError,
            'metrics_resource.type must be a string:' \
            " #{@metrics_resource}."
    end
    if @metrics_resource.key?(:labels)
      unless @metrics_resource[:labels].is_a?(Hash)
        raise Fluent::ConfigError,
              'metrics_resource.labels must be a hash:' \
              " #{@metrics_resource}."
      end
      extra_keys = @metrics_resource.reject do |k, _|
        %i[type labels].include?(k)
      end
      unless extra_keys.empty?
        raise Fluent::ConfigError,
              "metrics_resource has unrecognized keys: #{extra_keys.keys}."
      end
    else
      extra_keys = @metrics_resource.reject do |k, _|
        k == :type || k.to_s.start_with?('labels.')
      end
      unless extra_keys.empty?
        raise Fluent::ConfigError,
              "metrics_resource has unrecognized keys: #{extra_keys.keys}."
      end
      # Transform the Hash form of the metrics_resource config if necessary.
      resource_type = @metrics_resource[:type]
      resource_labels = @metrics_resource.each_with_object({}) \
        do |(k, v), h|
          h[k.to_s.sub('labels.', '')] = v if k.to_s.start_with? 'labels.'
        end
      @metrics_resource = { type: resource_type, labels: resource_labels }
    end
  end

  # If monitoring is enabled, register metrics in the default registry
  # and store metric objects for future use.
  if @enable_monitoring
    unless Monitoring::MonitoringRegistryFactory.supports_monitoring_type(
      @monitoring_type
    )
      @log.warn "monitoring_type '#{@monitoring_type}' is unknown; "\
                'there will be no metrics'
    end
    @monitoring_resource = if @metrics_resource
                             @utils.create_monitored_resource(
                               @metrics_resource[:type], @metrics_resource[:labels]
                             )
                           else
                             @resource
                           end
    @registry = Monitoring::MonitoringRegistryFactory
                .create(@monitoring_type, @project_id,
                        @monitoring_resource, @gcm_service_address)
    # Export metrics every 60 seconds.
    timer_execute(:export_metrics, 60) { @registry.export }
    # Uptime should be a gauge, but the metric definition is a counter and
    # we can't change it.
    @uptime_metric = @registry.counter(
      :uptime, [:version], 'Uptime of Logging agent',
      'agent.googleapis.com/agent', 'CUMULATIVE'
    )
    update_uptime
    timer_execute(:update_uptime, 1) { update_uptime }
    @successful_requests_count = @registry.counter(
      :stackdriver_successful_requests_count,
      %i[grpc code],
      'A number of successful requests to the Stackdriver Logging API',
      'agent.googleapis.com/agent', 'CUMULATIVE'
    )
    @failed_requests_count = @registry.counter(
      :stackdriver_failed_requests_count,
      %i[grpc code],
      'A number of failed requests to the Stackdriver Logging '\
      'API, broken down by the error code',
      'agent.googleapis.com/agent', 'CUMULATIVE'
    )
    @ingested_entries_count = @registry.counter(
      :stackdriver_ingested_entries_count,
      %i[grpc code],
      'A number of log entries ingested by Stackdriver Logging',
      'agent.googleapis.com/agent', 'CUMULATIVE'
    )
    @dropped_entries_count = @registry.counter(
      :stackdriver_dropped_entries_count,
      %i[grpc code],
      'A number of log entries dropped by the Stackdriver output plugin',
      'agent.googleapis.com/agent', 'CUMULATIVE'
    )
    @retried_entries_count = @registry.counter(
      :stackdriver_retried_entries_count,
      %i[grpc code],
      'The number of log entries that failed to be ingested by '\
      'the Stackdriver output plugin due to a transient error '\
      'and were retried',
      'agent.googleapis.com/agent', 'CUMULATIVE'
    )
    @ok_code = @use_grpc ? GRPC::Core::StatusCodes::OK : 200
  end

  # Set regexp that we should match tags against later on. Using a list
  # instead of a map to ensure order.
  @tag_regexp_list = []
  if @resource.type == GKE_CONSTANTS[:resource_type]
    @tag_regexp_list << [
      GKE_CONSTANTS[:resource_type], @compiled_kubernetes_tag_regexp
    ]
  end

  # Determine the common labels that should be added to all log entries
  # processed by this logging agent.
  @common_labels = determine_agent_level_common_labels(@resource)

  # The resource and labels are now set up; ensure they can't be modified
  # without first duping them.
  @resource.freeze
  @resource.labels.freeze
  @common_labels.freeze

  if @use_grpc
    @construct_log_entry = method(:construct_log_entry_in_grpc_format)
    @write_request = method(:write_request_via_grpc)
  else
    @construct_log_entry = method(:construct_log_entry_in_rest_format)
    @write_request = method(:write_request_via_rest)
  end

  return unless [Common::Platform::GCE, Common::Platform::EC2].include?(@platform)

  # Log an informational message containing the Logs viewer URL
  @log.info 'Logs viewer address: https://console.cloud.google.com/logs/',
            "viewer?project=#{@project_id}&resource=#{@resource.type}/",
            "instance_id/#{@vm_id}"
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 807
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud.rb, line 664
def shutdown
  super
  # Export metrics on shutdown. This is a best-effort attempt, and it might
  # fail, for instance if there was a recent write to the same time series.
  @registry&.export
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud.rb, line 644
def start
  super
  init_api_client
  @successful_call = false
  @timenanos_warning = false

  return unless @statusz_port.positive?

  @log.info "Starting statusz server on port #{@statusz_port}"
  server_create(:out_google_cloud_statusz,
                @statusz_port,
                bind: '127.0.0.1') do |data, conn|
    if data.split(' ')[1] == '/statusz'
      write_html_response(data, conn, 200, Statusz.response(self))
    else
      write_html_response(data, conn, 404, "Not found\n")
    end
  end
end
update_uptime() click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 815
def update_uptime
  now = Time.now.to_i
  @uptime_metric.increment(
    by: now - @uptime_update_time,
    labels: { version: Fluent::GoogleCloudOutput.version_string }
  )
  @uptime_update_time = now
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 671
def write(chunk)
  grouped_entries = group_log_entries_by_tag_and_local_resource_id(chunk)

  requests_to_send = []
  grouped_entries.each do |(tag, local_resource_id), arr|
    entries = []
    group_level_resource, group_level_common_labels =
      determine_group_level_monitored_resource_and_labels(
        tag, local_resource_id
      )

    arr.each do |time, record|
      entry_level_resource, entry_level_common_labels =
        determine_entry_level_monitored_resource_and_labels(
          group_level_resource, group_level_common_labels, record
        )

      is_json = false
      if @detect_json
        # Save the following fields if available, then clear them out to
        # allow for determining whether we should parse the log or message
        # field.
        # This list should be in sync with
        # https://cloud.google.com/logging/docs/agent/configuration#special-fields.
        preserved_keys = [
          'time',
          'timeNanos',
          'timestamp',
          'timestampNanos',
          'timestampSeconds',
          'severity',
          @http_request_key,
          @insert_id_key,
          @labels_key,
          @operation_key,
          @source_location_key,
          @span_id_key,
          @trace_key,
          @trace_sampled_key
        ]

        # If the log is json, we want to export it as a structured log
        # unless there is additional metadata that would be lost.
        record_json = nil
        if (record.keys - preserved_keys).length == 1
          %w[log message msg].each do |field|
            record_json = parse_json_or_nil(record[field]) if record.key?(field)
          end
        end
        unless record_json.nil?
          # Propagate these if necessary. Note that we don't want to
          # override these keys in the JSON we've just parsed.
          preserved_keys.each do |key|
            record_json[key] ||= record[key] if
              record.key?(key) && !record_json.key?(key)
          end

          record = record_json
          is_json = true
        end
      end

      ts_secs, ts_nanos, timestamp = compute_timestamp(record, time)
      ts_secs, ts_nanos = adjust_timestamp_if_invalid(timestamp, Time.now) \
        if @adjust_invalid_timestamps && timestamp

      severity = compute_severity(
        entry_level_resource.type, record, entry_level_common_labels
      )

      dynamic_labels_from_payload = parse_labels(record)

      if dynamic_labels_from_payload
        entry_level_common_labels.merge!(
          dynamic_labels_from_payload
        )
      end

      entry = @construct_log_entry.call(entry_level_common_labels,
                                        entry_level_resource,
                                        severity,
                                        ts_secs,
                                        ts_nanos)

      insert_id = record.delete(@insert_id_key)
      entry.insert_id = insert_id if insert_id
      span_id = record.delete(@span_id_key)
      entry.span_id = span_id if span_id
      trace = record.delete(@trace_key)
      entry.trace = compute_trace(trace) if trace
      trace_sampled = record.delete(@trace_sampled_key)
      entry.trace_sampled = parse_bool(trace_sampled) unless
        trace_sampled.nil?

      set_log_entry_fields(record, entry)
      set_payload(entry_level_resource.type, record, entry, is_json)

      entries.push(entry)
    end
    # Don't send an empty request if we rejected all the entries.
    next if entries.empty?

    log_name = "projects/#{@project_id}/logs/#{log_name(
      tag, group_level_resource
    )}"

    requests_to_send << {
      entries: entries,
      log_name: log_name,
      resource: group_level_resource,
      labels: group_level_common_labels
    }
  end

  if @split_logs_by_tag
    requests_to_send.each do |request|
      @write_request.call(**request)
    end
  else
    # Combine all requests into one. The request level "log_name" will be
    # ported to the entry level. The request level "resource" and "labels"
    # are ignored as they should have been folded into the entry level
    # "resource" and "labels" already anyway.
    combined_entries = []
    requests_to_send.each do |request|
      request[:entries].each do |entry|
        # Modify entries in-place as they are not needed later on.
        entry.log_name = request[:log_name]
      end
      combined_entries.concat(request[:entries])
    end
    @write_request.call(entries: combined_entries) unless
      combined_entries.empty?
  end
end

Private Instance Methods

adjust_timestamp_if_invalid(timestamp, current_time) click to toggle source

Adjust timestamps from the future. The base case is:

  1. The parsed timestamp is less than one day into the future.

This is allowed by the API, and should be left unchanged.

Beyond that, there are two cases:

  1. The parsed timestamp is later in the current year:

This can happen when system log lines from previous years are missing the year, so the date parser assumes the current year. We treat these lines as coming from last year. This could label 2-year-old logs incorrectly, but this probably isn’t super important.

  1. The parsed timestamp is past the end of the current year:

Since the year is different from the current year, this isn’t the missing year in system logs. It is unlikely that users explicitly write logs at a future date. This could result from an unsynchronized clock on a VM, or some random value being parsed as the timestamp. We reset the timestamp on those lines to the default value and let the downstream API handle it.

# File lib/fluent/plugin/out_google_cloud.rb, line 1427
def adjust_timestamp_if_invalid(timestamp, current_time)
  ts_secs = timestamp.tv_sec
  ts_nanos = timestamp.tv_nsec

  next_year = Time.mktime(current_time.year + 1)
  one_day_later = current_time.to_datetime.next_day.to_time
  if timestamp < one_day_later # Case 0.
    # Leave the timestamp as-is.
  elsif timestamp >= next_year # Case 2.
    ts_secs = 0
    ts_nanos = 0
  else # Case 1.
    adjusted_timestamp = timestamp.to_datetime.prev_year.to_time
    ts_secs = adjusted_timestamp.tv_sec
    # The value of ts_nanos should not change when subtracting a year.
  end

  [ts_secs, ts_nanos]
end
api_client() click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1853
def api_client
  # For gRPC side, the Channel will take care of tokens and their renewal
  # (https://grpc.io/docs/guides/auth.html#authentication-api).
  if !@use_grpc && @client.authorization.expired?
    begin
      @client.authorization.fetch_access_token!
    rescue MultiJson::ParseError
      # Workaround an issue in the API client; just re-raise a more
      # descriptive error for the user (which will still cause a retry).
      raise Google::APIClient::ClientError,
            'Unable to fetch access token (no scopes configured?)'
    end
  end
  @client
end
compute_severity(resource_type, record, entry_level_common_labels) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1447
def compute_severity(resource_type, record, entry_level_common_labels)
  if record.key?('severity')
    return parse_severity(record.delete('severity'))
  elsif resource_type == GKE_CONSTANTS[:resource_type]
    stream = entry_level_common_labels["#{GKE_CONSTANTS[:service]}/stream"]
    return GKE_CONSTANTS[:stream_severity_map].fetch(stream, 'DEFAULT')
  end

  'DEFAULT'
end
compute_timestamp(record, time) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1353
def compute_timestamp(record, time)
  if record.key?('timestamp') &&
     record['timestamp'].is_a?(Hash) &&
     record['timestamp'].key?('seconds') &&
     record['timestamp'].key?('nanos')
    ts_secs = record['timestamp']['seconds']
    ts_nanos = record['timestamp']['nanos']
    record.delete('timestamp')
    timestamp = time_or_nil(ts_secs, ts_nanos)
  elsif record.key?('timestampSeconds') &&
        record.key?('timestampNanos')
    ts_secs = record.delete('timestampSeconds')
    ts_nanos = record.delete('timestampNanos')
    timestamp = time_or_nil(ts_secs, ts_nanos)
  elsif record.key?('timeNanos')
    # This is deprecated since the precision is insufficient.
    # Use timestampSeconds/timestampNanos instead
    nanos = record.delete('timeNanos')
    ts_secs = (nanos / 1_000_000_000).to_i
    ts_nanos = nanos % 1_000_000_000
    unless @timenanos_warning
      # Warn the user this is deprecated, but only once to avoid spam.
      @timenanos_warning = true
      @log.warn 'timeNanos is deprecated - please use ' \
        'timestampSeconds and timestampNanos instead.'
    end
    timestamp = time_or_nil(ts_secs, ts_nanos)
  elsif record.key?('time')
    # k8s ISO8601 timestamp
    begin
      timestamp = Time.iso8601(record.delete('time'))
    rescue StandardError
      timestamp = Time.at(time)
    end
    ts_secs = timestamp.tv_sec
    ts_nanos = timestamp.tv_nsec
  else
    timestamp = Time.at(time)
    ts_secs = timestamp.tv_sec
    ts_nanos = timestamp.tv_nsec
  end
  ts_secs = begin
    Integer ts_secs
  rescue ArgumentError, TypeError
    ts_secs
  end
  ts_nanos = begin
    Integer ts_nanos
  rescue ArgumentError, TypeError
    ts_nanos
  end

  [ts_secs, ts_nanos, timestamp]
end
compute_trace(trace) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 837
def compute_trace(trace)
  return trace unless @autoformat_stackdriver_trace &&
                      STACKDRIVER_TRACE_ID_REGEXP.match(trace)

  "projects/#{@project_id}/traces/#{trace}"
end
construct_error_details_map(error) click to toggle source

Extract a map of error details from a potentially partially successful REST request.

The keys in this map are [error_code, error_message] pairs, and the values are a list of stringified indexes of log entries that failed due to this error.

A sample error.body looks like: {

"error": {
  "code": 403,
  "message": "User not authorized.",
  "status": "PERMISSION_DENIED",
  "details": [
    {
      "@type": "type.googleapis.com/google.logging.v2.WriteLogEntriesPar
        tialErrors",
      "logEntryErrors": {
        "0": {
          "code": 7,
          "message": "User not authorized."
        },
        "1": {
          "code": 3,
          "message": "Log name contains illegal character :"
        },
        "3": {
          "code": 3,
          "message": "Log name contains illegal character :"
        }
      }
    },
    {
      "@type": "type.googleapis.com/google.rpc.DebugInfo",
      "detail": ...
    }
  ]
}

}

The root level “code”, “message”, and “status” simply match the root cause of the first failed log entry. For example, if we switched the order of the log entries, then we would get: {

"error" : {
   "code" : 400,
   "message" : "Log name contains illegal character :",
   "status" : "INVALID_ARGUMENT",
   "details": ...
}

} We will ignore it anyway and look at the details instead which includes info for all failed log entries.

In this example, the logEntryErrors that we care are: {

"0": {
  "code": 7,
  "message": "User not authorized."
},
"1": {
  "code": 3,
  "message": "Log name contains illegal character :"
},
"3": {
  "code": 3,
  "message": "Log name contains illegal character :"
}

}

The ultimate map that is constructed is: {

[7, 'User not authorized.']: ['0'],
[3, 'Log name contains illegal character :']: ['1', '3']

}

# File lib/fluent/plugin/out_google_cloud.rb, line 1969
def construct_error_details_map(error)
  error_details_map = Hash.new { |h, k| h[k] = [] }

  error_details = ensure_array(
    ensure_hash(ensure_hash(JSON.parse(error.body))['error'])['details']
  )
  partial_errors = error_details.detect(
    -> { raise JSON::ParserError, "No type #{PARTIAL_ERROR_FIELD}." }
  ) do |error_detail|
    ensure_hash(error_detail)['@type'] == PARTIAL_ERROR_FIELD
  end
  log_entry_errors = ensure_hash(
    ensure_hash(partial_errors)['logEntryErrors']
  )
  log_entry_errors.each do |index, log_entry_error|
    error_hash = ensure_hash(log_entry_error)
    unless error_hash['code'] && error_hash['message']
      raise JSON::ParserError,
            "Entry #{index} is missing 'code' or 'message'."
    end
    error_key = [error_hash['code'], error_hash['message']].freeze
    # TODO(qingling128): Convert indexes to integers.
    error_details_map[error_key] << index
  end
  error_details_map
rescue JSON::ParserError => e
  @log.warn 'Failed to extract log entry errors from the error details:' \
            " #{error.body}.", error: e
  {}
end
construct_error_details_map_grpc(gax_error) click to toggle source

Extract a map of error details from a potentially partially successful gRPC request.

The keys in this map are [error_code, error_message] pairs, and the values are a list of indexes of log entries that failed due to this error.

A sample error looks like: <Google::Cloud::PermissionDeniedError:

message: 'User not authorized.',
details: [
  <Google::Cloud::Logging::V2::WriteLogEntriesPartialErrors:
    log_entry_errors: {
      0 => <Google::Rpc::Status:
             code: 7,
             message: "User not authorized.",
             details: []>,
      1 => <Google::Rpc::Status:
             code: 3,
             message: "Log name contains illegal character :",
             details: []>,
      3 => <Google::Rpc::Status:
             code: 3,
             message: "Log name contains illegal character :",
             details: []>
    }
  >,
  <Google::Rpc::DebugInfo:
    stack_entries: [],
    detail: "..."
  >
]
cause: <GRPC::PermissionDenied: 7:User not authorized.>

}

The ultimate map that is constructed is: {

[7, 'User not authorized.']: [0],
[3, 'Log name contains illegal character :']: [1, 3]

}

# File lib/fluent/plugin/out_google_cloud.rb, line 2039
def construct_error_details_map_grpc(gax_error)
  @log.error "construct_error_details_map_grpc: #{gax_error}"
  error_details_map = Hash.new { |h, k| h[k] = [] }
  error_details = ensure_array(gax_error.status_details)
  raise JSON::ParserError, 'The error details are empty.' if
    error_details.empty?
  raise JSON::ParserError, 'No partial error info in error details.' unless
    error_details[0].is_a?(
      Google::Cloud::Logging::V2::WriteLogEntriesPartialErrors
    )

  log_entry_errors = ensure_hash(error_details[0].log_entry_errors)
  log_entry_errors.each do |index, log_entry_error|
    error_key = [log_entry_error[:code], log_entry_error[:message]].freeze
    error_details_map[error_key] << index
  end
  error_details_map
rescue JSON::ParserError => e
  @log.warn 'Failed to extract log entry errors from the error details:' \
            " #{gax_error.details.inspect}.", error: e
  {}
end
construct_log_entry_in_grpc_format(labels, resource, severity, ts_secs, ts_nanos) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 844
def construct_log_entry_in_grpc_format(labels,
                                       resource,
                                       severity,
                                       ts_secs,
                                       ts_nanos)
  entry = Google::Cloud::Logging::V2::LogEntry.new(
    labels: labels,
    resource: Google::Api::MonitoredResource.new(
      type: resource.type,
      labels: resource.labels.to_h
    ),
    severity: grpc_severity(severity)
  )
  # If "seconds" is null or not an integer, we will omit the timestamp
  # field and defer the decision on how to handle it to the downstream
  # Logging API. If "nanos" is null or not an integer, it will be set
  # to 0.
  if ts_secs.is_a?(Integer)
    ts_nanos = 0 unless ts_nanos.is_a?(Integer)
    entry.timestamp = Google::Protobuf::Timestamp.new(
      seconds: ts_secs,
      nanos: ts_nanos
    )
  end
  entry
end
construct_log_entry_in_rest_format(labels, resource, severity, ts_secs, ts_nanos) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 871
def construct_log_entry_in_rest_format(labels,
                                       resource,
                                       severity,
                                       ts_secs,
                                       ts_nanos)
  # Remove the labels if we didn't populate them with anything.
  resource.labels = nil if resource.labels.empty?
  Google::Apis::LoggingV2::LogEntry.new(
    labels: labels,
    resource: resource,
    severity: severity,
    timestamp: {
      seconds: ts_secs,
      nanos: ts_nanos
    }
  )
end
convert_to_utf8(input) click to toggle source

Encode as UTF-8. If ‘coerce_to_utf8’ is set to true in the config, any non-UTF-8 character would be replaced by the string specified by ‘non_utf8_replacement_string’. If ‘coerce_to_utf8’ is set to false, any non-UTF-8 character would trigger the plugin to error out.

# File lib/fluent/plugin/out_google_cloud.rb, line 1873
def convert_to_utf8(input)
  if @coerce_to_utf8
    input.encode(
      'utf-8',
      invalid: :replace,
      undef: :replace,
      replace: @non_utf8_replacement_string
    )
  else
    begin
      input.encode('utf-8')
    rescue EncodingError
      @log.error 'Encountered encoding issues potentially due to non ' \
                 'UTF-8 characters. To allow non-UTF-8 characters and ' \
                 'replace them with spaces, please set "coerce_to_utf8" ' \
                 'to true.'
      raise
    end
  end
end
delete_and_extract_labels(hash, label_map) click to toggle source

For every original_label => new_label pair in the label_map, delete the original_label from the hash map if it exists, and extract the value to form a map with the new_label as the key.

# File lib/fluent/plugin/out_google_cloud.rb, line 1697
def delete_and_extract_labels(hash, label_map)
  return {} if label_map.nil? || !label_map.is_a?(Hash) ||
               hash.nil? || !hash.is_a?(Hash)

  label_map.each_with_object({}) \
    do |(original_label, new_label), extracted_labels|
      value = hash.delete(original_label)
      extracted_labels[new_label] = convert_to_utf8(value.to_s) if value
    end
end
determine_agent_level_common_labels(resource) click to toggle source

Determine the common labels that should be added to all log entries processed by this logging agent.

# File lib/fluent/plugin/out_google_cloud.rb, line 1117
def determine_agent_level_common_labels(resource)
  labels = {}
  # User can specify labels via config. We want to capture those as well.
  labels.merge!(@labels) if @labels

  case resource.type
  # GAE, Cloud Dataflow, Cloud Dataproc and Cloud ML.
  when APPENGINE_CONSTANTS[:resource_type],
       DATAFLOW_CONSTANTS[:resource_type],
       DATAPROC_CONSTANTS[:resource_type],
       ML_CONSTANTS[:resource_type]
    labels.merge!(
      "#{COMPUTE_CONSTANTS[:service]}/resource_id" => @vm_id,
      "#{COMPUTE_CONSTANTS[:service]}/resource_name" => @vm_name,
      "#{COMPUTE_CONSTANTS[:service]}/zone" => @zone
    )

  # GCE instance and GKE container.
  when COMPUTE_CONSTANTS[:resource_type],
       GKE_CONSTANTS[:resource_type]
    labels["#{COMPUTE_CONSTANTS[:service]}/resource_name"] = @vm_name

  # EC2.
  when EC2_CONSTANTS[:resource_type]
    labels["#{EC2_CONSTANTS[:service]}/resource_name"] = @vm_name
  end
  labels
end
determine_entry_level_monitored_resource_and_labels( group_level_resource, group_level_common_labels, record ) click to toggle source

Extract entry level monitored resource and common labels that should be applied to individual entries.

# File lib/fluent/plugin/out_google_cloud.rb, line 1274
def determine_entry_level_monitored_resource_and_labels(
  group_level_resource, group_level_common_labels, record
)
  resource = group_level_resource.dup
  resource.labels = group_level_resource.labels.dup
  common_labels = group_level_common_labels.dup

  case resource.type
  # GKE container.
  when GKE_CONSTANTS[:resource_type]
    # Move the stdout/stderr annotation from the record into a label.
    common_labels.merge!(
      delete_and_extract_labels(
        record, 'stream' => "#{GKE_CONSTANTS[:service]}/stream"
      )
    )

    # If the record has been annotated by the kubernetes_metadata_filter
    # plugin, then use that metadata. Otherwise, rely on commonLabels
    # populated from the group's tag.
    if record.key?('kubernetes')
      resource.labels.merge!(
        delete_and_extract_labels(
          record['kubernetes'], GKE_CONSTANTS[:extra_resource_labels]
            .map { |l| [l, l] }.to_h
        )
      )
      common_labels.merge!(
        delete_and_extract_labels(
          record['kubernetes'], GKE_CONSTANTS[:extra_common_labels]
            .map { |l| [l, "#{GKE_CONSTANTS[:service]}/#{l}"] }.to_h
        )
      )
      # Prepend label/ to all user-defined labels' keys.
      if record['kubernetes'].key?('labels')
        common_labels.merge!(
          delete_and_extract_labels(
            record['kubernetes']['labels'], record['kubernetes']['labels']
              .map { |key, _| [key, "label/#{key}"] }.to_h
          )
        )
      end
      # We've explicitly consumed all the fields we care about -- don't
      # litter the log entries with the remaining fields that the kubernetes
      # metadata filter plugin includes (or an empty 'kubernetes' field).
      record.delete('kubernetes')
      record.delete('docker')
    end
  end

  # If the name of a field in the record is present in the @label_map
  # configured by users, report its value as a label and do not send that
  # field as part of the payload.
  common_labels.merge!(delete_and_extract_labels(record, @label_map))

  # Cloud Dataflow and Cloud ML.
  # These labels can be set via the 'labels' or 'label_map' options.
  # Report them as monitored resource labels instead of common labels.
  # e.g. "dataflow.googleapis.com/job_id" => "job_id"
  [DATAFLOW_CONSTANTS, ML_CONSTANTS].each do |service_constants|
    next unless resource.type == service_constants[:resource_type]

    resource.labels.merge!(
      delete_and_extract_labels(
        common_labels, service_constants[:extra_resource_labels]
          .map { |l| ["#{service_constants[:service]}/#{l}", l] }.to_h
      )
    )
  end

  [resource, common_labels]
end
determine_group_level_monitored_resource_and_labels(tag, local_resource_id) click to toggle source

Determine the group level monitored resource and common labels shared by a collection of entries.

# File lib/fluent/plugin/out_google_cloud.rb, line 1174
def determine_group_level_monitored_resource_and_labels(tag,
                                                        local_resource_id)
  resource = @resource.dup
  resource.labels = @resource.labels.dup
  common_labels = @common_labels.dup

  # Change the resource type and set matched_regexp_group if the tag matches
  # certain regexp.
  matched_regexp_group = nil # @tag_regexp_list can be an empty list.
  @tag_regexp_list.each do |derived_type, tag_regexp|
    matched_regexp_group = tag_regexp.match(tag)
    if matched_regexp_group
      resource.type = derived_type
      break
    end
  end

  # Determine the monitored resource based on the local_resource_id.
  # Different monitored resource types have unique ids in different format.
  # We will query Metadata Agent for the monitored resource. Return the
  # legacy monitored resource (either the instance resource or the resource
  # inferred from the tag) if failed to get a monitored resource from
  # Metadata Agent with this key.
  #
  # Examples:
  # // GKE Pod.
  # "k8s_pod.<namespace_name>.<pod_name>"
  # // GKE container.
  # "k8s_container.<namespace_name>.<pod_name>.<container_name>"
  if local_resource_id
    converted_resource = monitored_resource_from_local_resource_id(
      local_resource_id
    )
    resource = converted_resource if converted_resource
  end

  # Once the resource type is settled down, determine the labels.
  case resource.type
  # GKE container.
  when GKE_CONSTANTS[:resource_type]
    if matched_regexp_group
      # We only expect one occurrence of each key in the match group.
      resource_labels_candidates =
        matched_regexp_group.names.zip(matched_regexp_group.captures).to_h
      common_labels_candidates = resource_labels_candidates.dup
      resource.labels.merge!(
        delete_and_extract_labels(
          resource_labels_candidates,
          # The kubernetes_tag_regexp is poorly named. 'namespace_name' is
          # in fact 'namespace_id'. 'pod_name' is in fact 'pod_id'.
          # TODO(qingling128): Figure out how to put this map into
          # constants like GKE_CONSTANTS[:extra_resource_labels].
          'container_name' => 'container_name',
          'namespace_name' => 'namespace_id',
          'pod_name' => 'pod_id'
        )
      )

      common_labels.merge!(
        delete_and_extract_labels(
          common_labels_candidates,
          GKE_CONSTANTS[:extra_common_labels]
            .map { |l| [l, "#{GKE_CONSTANTS[:service]}/#{l}"] }.to_h
        )
      )
    end

  # TODO(qingling128): Temporary fallback for metadata agent restarts.
  # K8s resources.
  when K8S_CONTAINER_CONSTANTS[:resource_type],
       K8S_POD_CONSTANTS[:resource_type],
       K8S_NODE_CONSTANTS[:resource_type]
    common_labels.delete("#{COMPUTE_CONSTANTS[:service]}/resource_name")

  end

  # Cloud Dataflow and Cloud ML.
  # These labels can be set via the 'labels' option.
  # Report them as monitored resource labels instead of common labels.
  # e.g. "dataflow.googleapis.com/job_id" => "job_id"
  [DATAFLOW_CONSTANTS, ML_CONSTANTS].each do |service_constants|
    next unless resource.type == service_constants[:resource_type]

    resource.labels.merge!(
      delete_and_extract_labels(
        common_labels, service_constants[:extra_resource_labels]
          .map { |l| ["#{service_constants[:service]}/#{l}", l] }.to_h
      )
    )
  end

  resource.freeze
  resource.labels.freeze
  common_labels.freeze

  [resource, common_labels]
end
ensure_array(value) click to toggle source

Convert the value to a Ruby array.

# File lib/fluent/plugin/out_google_cloud.rb, line 2136
def ensure_array(value)
  Array.try_convert(value) || (raise JSON::ParserError, value.class.to_s)
end
ensure_hash(value) click to toggle source

Convert the value to a Ruby hash.

# File lib/fluent/plugin/out_google_cloud.rb, line 2141
def ensure_hash(value)
  Hash.try_convert(value) || (raise JSON::ParserError, value.class.to_s)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1671
def format(tag, time, record)
  Fluent::MessagePackFactory
    .engine_factory
    .packer
    .write([tag, time, record])
    .to_s
end
group_log_entries_by_tag_and_local_resource_id(chunk) click to toggle source

Group the log entries by tag and local_resource_id pairs. Also filter out invalid non-Hash entries.

# File lib/fluent/plugin/out_google_cloud.rb, line 1148
def group_log_entries_by_tag_and_local_resource_id(chunk)
  groups = {}
  chunk.msgpack_each do |tag, time, record|
    unless record.is_a?(Hash)
      @log.warn 'Dropping log entries with malformed record: ' \
                "'#{record.inspect}' from tag '#{tag}' at '#{time}'. " \
                'A log record should be in JSON format.'
      next
    end
    sanitized_tag = sanitize_tag(tag)
    if sanitized_tag.nil?
      @log.warn "Dropping log entries with invalid tag: '#{tag.inspect}'." \
                ' A tag should be a string with utf8 characters.'
      next
    end
    local_resource_id = record.delete(LOCAL_RESOURCE_ID_KEY)
    # A nil local_resource_id means "fall back to legacy".
    hash_key = [sanitized_tag, local_resource_id].freeze
    groups[hash_key] ||= []
    groups[hash_key].push([time, record])
  end
  groups
end
grpc_severity(severity) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1622
def grpc_severity(severity)
  # TODO: find out why this doesn't work.
  # if severity.is_a? String
  #   return Google::Cloud::Logging::Type::LogSeverity.resolve(severity)
  # end
  return GRPC_SEVERITY_MAPPING[severity] if GRPC_SEVERITY_MAPPING.key?(severity)

  severity
end
increment_dropped_entries_count(count, code) click to toggle source

Increment the metric for the number of log entries that were dropped and not ingested by the Stackdriver Logging API.

# File lib/fluent/plugin/out_google_cloud.rb, line 2176
def increment_dropped_entries_count(count, code)
  return unless @dropped_entries_count

  @dropped_entries_count.increment(
    labels: { grpc: @use_grpc, code: code }, by: count
  )
end
increment_failed_requests_count(code) click to toggle source

Increment the metric for the number of failed requests, labeled by the provided status code.

# File lib/fluent/plugin/out_google_cloud.rb, line 2156
def increment_failed_requests_count(code)
  return unless @failed_requests_count

  @failed_requests_count.increment(
    labels: { grpc: @use_grpc, code: code }
  )
end
increment_ingested_entries_count(count) click to toggle source

Increment the metric for the number of log entries, successfully ingested by the Stackdriver Logging API.

# File lib/fluent/plugin/out_google_cloud.rb, line 2166
def increment_ingested_entries_count(count)
  return unless @ingested_entries_count

  @ingested_entries_count.increment(
    labels: { grpc: @use_grpc, code: @ok_code }, by: count
  )
end
increment_retried_entries_count(count, code) click to toggle source

Increment the metric for the number of log entries that were dropped and not ingested by the Stackdriver Logging API.

# File lib/fluent/plugin/out_google_cloud.rb, line 2186
def increment_retried_entries_count(count, code)
  return unless @retried_entries_count

  @retried_entries_count.increment(
    labels: { grpc: @use_grpc, code: code }, by: count
  )
end
increment_successful_requests_count() click to toggle source

Increment the metric for the number of successful requests.

# File lib/fluent/plugin/out_google_cloud.rb, line 2146
def increment_successful_requests_count
  return unless @successful_requests_count

  @successful_requests_count.increment(
    labels: { grpc: @use_grpc, code: @ok_code }
  )
end
init_api_client() click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1803
def init_api_client
  # Set up the logger for the auto-generated Google Cloud APIs.
  Google::Apis.logger = @log
  if @use_grpc
    uri = URI.parse(@logging_api_url)
    host = uri.host
    unless host
      raise Fluent::ConfigError,
            'The logging_api_url option specifies an invalid URL:' \
            " #{@logging_api_url}."
    end
    if @grpc_compression_algorithm
      compression_options =
        GRPC::Core::CompressionOptions.new(
          default_algorithm: @grpc_compression_algorithm
        )
      compression_channel_args = compression_options.to_channel_arg_hash
    else
      compression_channel_args = {}
    end
    if uri.scheme == 'https'
      ssl_creds = GRPC::Core::ChannelCredentials.new
      authentication = Google::Auth.get_application_default
      creds = GRPC::Core::CallCredentials.new(authentication.updater_proc)
      creds = ssl_creds.compose(creds)
    else
      creds = :this_channel_is_insecure
    end
    port = ":#{uri.port}" if uri.port
    user_agent = \
      "#{PLUGIN_NAME}/#{PLUGIN_VERSION} grpc-ruby/#{GRPC::VERSION} " \
      "#{Google::Apis::OS_VERSION}"
    channel_args = { 'grpc.primary_user_agent' => user_agent }
                   .merge!(compression_channel_args)
    @client = Google::Cloud::Logging::V2::LoggingService::Client.new do |config|
      config.credentials = GRPC::Core::Channel.new(
        "#{host}#{port}", channel_args, creds
      )
    end
  else
    # TODO: Use a non-default ClientOptions object.
    Google::Apis::ClientOptions.default.application_name = PLUGIN_NAME
    Google::Apis::ClientOptions.default.application_version = PLUGIN_VERSION
    @client = Google::Apis::LoggingV2::LoggingService.new
    @client.authorization = Google::Auth.get_application_default(
      Common::LOGGING_SCOPE
    )
  end
end
list_from_ruby(arr) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1736
def list_from_ruby(arr)
  ret = Google::Protobuf::ListValue.new
  arr.each do |v|
    ret.values << value_from_ruby(v)
  end
  ret
end
log_name(tag, resource) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1788
def log_name(tag, resource)
  if resource.type == APPENGINE_CONSTANTS[:resource_type]
    # Add a prefix to Managed VM logs to prevent namespace collisions.
    tag = "#{APPENGINE_CONSTANTS[:service]}/#{tag}"
  elsif resource.type == GKE_CONSTANTS[:resource_type]
    # For Kubernetes logs, use just the container name as the log name
    # if we have it.
    if resource.labels&.key?('container_name')
      sanitized_tag = sanitize_tag(resource.labels['container_name'])
      tag = sanitized_tag unless sanitized_tag.nil?
    end
  end
  ERB::Util.url_encode(tag)
end
monitored_resource_from_local_resource_id(local_resource_id) click to toggle source

Take a locally unique resource id and convert it to the globally unique monitored resource.

# File lib/fluent/plugin/out_google_cloud.rb, line 2064
def monitored_resource_from_local_resource_id(local_resource_id)
  return unless
    /^
      (?<resource_type>k8s_container)
      \.(?<namespace_name>[0-9a-z-]+)
      \.(?<pod_name>[.0-9a-z-]+)
      \.(?<container_name>[0-9a-z-]+)$/x =~ local_resource_id ||
    /^
      (?<resource_type>k8s_pod)
      \.(?<namespace_name>[0-9a-z-]+)
      \.(?<pod_name>[.0-9a-z-]+)$/x =~ local_resource_id ||
    /^
      (?<resource_type>k8s_node)
      \.(?<node_name>[0-9a-z-]+)$/x =~ local_resource_id

  # Clear name and location if they're explicitly set to empty.
  @k8s_cluster_name = nil if @k8s_cluster_name == ''
  @k8s_cluster_location = nil if @k8s_cluster_location == ''

  begin
    @k8s_cluster_name ||= @utils.fetch_gce_metadata(
      @platform, 'instance/attributes/cluster-name'
    )
    @k8s_cluster_location ||= @utils.fetch_gce_metadata(
      @platform, 'instance/attributes/cluster-location'
    )
  rescue StandardError => e
    @log.error 'Failed to retrieve k8s cluster name and location.', \
               error: e
  end
  case resource_type
  when K8S_CONTAINER_CONSTANTS[:resource_type]
    labels = {
      'namespace_name' => namespace_name,
      'pod_name' => pod_name,
      'container_name' => container_name,
      'cluster_name' => @k8s_cluster_name,
      'location' => @k8s_cluster_location
    }
    fallback_resource = GKE_CONSTANTS[:resource_type]
  when K8S_POD_CONSTANTS[:resource_type]
    labels = {
      'namespace_name' => namespace_name,
      'pod_name' => pod_name,
      'cluster_name' => @k8s_cluster_name,
      'location' => @k8s_cluster_location
    }
    fallback_resource = GKE_CONSTANTS[:resource_type]
  when K8S_NODE_CONSTANTS[:resource_type]
    labels = {
      'node_name' => node_name,
      'cluster_name' => @k8s_cluster_name,
      'location' => @k8s_cluster_location
    }
    fallback_resource = COMPUTE_CONSTANTS[:resource_type]
  end
  unless @k8s_cluster_name && @k8s_cluster_location
    @log.error "Failed to construct #{resource_type} resource locally." \
               ' Falling back to writing logs against' \
               " #{fallback_resource} resource.", error: e
    return
  end
  constructed_resource = Google::Apis::LoggingV2::MonitoredResource.new(
    type: resource_type,
    labels: labels
  )
  @log.debug("Constructed #{resource_type} resource locally: " \
             "#{constructed_resource.inspect}")
  constructed_resource
end
parse_bool(value) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1640
def parse_bool(value)
  [true, 'true', 1].include?(value)
end
parse_int(value) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1636
def parse_int(value)
  value.to_i
end
parse_json_or_nil(input) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1084
def parse_json_or_nil(input)
  return nil unless input.is_a?(String)

  input.each_codepoint do |c|
    if c == 123
      # left curly bracket (U+007B)
      begin
        return JSON.parse(input)
      rescue JSON::ParserError
        return nil
      end
    else
      # Break (and return nil) unless the current character is whitespace,
      # in which case we continue to look for a left curly bracket.
      # Whitespace as per the JSON spec are: tabulation (U+0009),
      # line feed (U+000A), carriage return (U+000D), and space (U+0020).
      break unless [9, 10, 13, 32].include?(c)
    end
  end
  nil
end
parse_labels(record) click to toggle source

Parse labels. Return nil if not set.

# File lib/fluent/plugin/out_google_cloud.rb, line 1509
def parse_labels(record)
  payload_labels = record.delete(@labels_key)
  return nil unless payload_labels

  unless payload_labels.is_a?(Hash)
    @log.error "Invalid value of '#{@labels_key}' in the payload: " \
               "#{payload_labels}. Labels need to be a JSON object."
    return nil
  end

  non_string_keys = payload_labels.each_with_object([]) do |(k, v), a|
    a << k unless k.is_a?(String) && v.is_a?(String)
  end
  unless non_string_keys.empty?
    @log.error "Invalid value of '#{@labels_key}' in the payload: " \
               "#{payload_labels}. Labels need string values for all " \
               "keys; keys #{non_string_keys} don't."
    return nil
  end
  payload_labels
rescue StandardError => e
  @log.error "Failed to extract '#{@labels_key}' from payload.", e
  nil
end
parse_latency(latency) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1644
def parse_latency(latency)
  # Parse latency.
  # If no valid format is detected, return nil so we can later skip
  # setting latency.
  # Format: whitespace (opt.) + integer + point & decimal (opt.)
  #       + whitespace (opt.) + "s" + whitespace (opt.)
  # e.g.: "1.42 s"
  match = @compiled_http_latency_regexp.match(latency)
  return nil unless match

  # Split the integer and decimal parts in order to calculate
  # seconds and nanos.
  seconds = match['seconds'].to_i
  nanos = (match['decimal'].to_f * 1000 * 1000 * 1000).round
  if @use_grpc
    Google::Protobuf::Duration.new(
      seconds: seconds,
      nanos: nanos
    )
  else
    {
      seconds: seconds,
      nanos: nanos
    }.delete_if { |_, v| v.zero? }
  end
end
parse_severity(severity_str) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1569
def parse_severity(severity_str)
  # The API is case insensitive, but uppercase to make things simpler.
  severity = severity_str.to_s.upcase.strip

  # If the severity is already valid, just return it.
  return severity if VALID_SEVERITIES.include?(severity)

  # If the severity is an integer (string) return it as an integer,
  # truncated to the closest valid value (multiples of 100 between 0-800).
  if /\A\d+\z/ =~ severity
    begin
      numeric_severity = (severity.to_i / 100) * 100
      case
      when numeric_severity.negative?
        return 0
      when numeric_severity > 800
        return 800
      else
        return numeric_severity
      end
    rescue StandardError
      return 'DEFAULT'
    end
  end

  # Try to translate the severity.
  return SEVERITY_TRANSLATIONS[severity] if SEVERITY_TRANSLATIONS.key?(severity)

  # If all else fails, use 'DEFAULT'.
  'DEFAULT'
end
parse_string(value) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1632
def parse_string(value)
  value.to_s
end
sanitize_tag(tag) click to toggle source

Given a tag, returns the corresponding valid tag if possible, or nil if the tag should be rejected. If ‘require_valid_tags’ is false, non-string tags are converted to strings, and invalid characters are sanitized; otherwise such tags are rejected.

# File lib/fluent/plugin/out_google_cloud.rb, line 1683
def sanitize_tag(tag)
  if @require_valid_tags &&
     (!tag.is_a?(String) || tag == '' || convert_to_utf8(tag) != tag)
    return nil
  end

  tag = convert_to_utf8(tag.to_s)
  tag = '_' if tag == ''
  tag
end
set_log_entry_fields(record, entry) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1458
def set_log_entry_fields(record, entry)
  # TODO(qingling128) On the next major after 0.7.4, make all logEntry
  # subfields behave the same way: if the field is not in the correct
  # format, log an error in the Fluentd log and remove this field from
  # payload. This is the preferred behavior per PM decision.
  LOG_ENTRY_FIELDS_MAP.each do |field_name, config|
    payload_key, subfields, grpc_class, non_grpc_class = config
    begin
      payload_key = instance_variable_get(payload_key)
      fields = record[payload_key]
      record.delete(payload_key) if fields.nil?
      next unless fields.is_a?(Hash)

      extracted_subfields = subfields.each_with_object({}) \
        do |(original_key, destination_key, cast_fn), extracted_fields|
          value = fields.delete(original_key)
          next if value.nil?

          begin
            casted_value = send(cast_fn, value)
          rescue TypeError
            @log.error "Failed to #{cast_fn} for #{field_name}." \
                       "#{original_key} with value #{value.inspect}.", err
            next
          end
          next if casted_value.nil?

          extracted_fields[destination_key] = casted_value
        end

      next unless extracted_subfields

      output = if @use_grpc
                 Object.const_get(grpc_class).new
               else
                 Object.const_get(non_grpc_class).new
               end
      extracted_subfields.each do |key, value|
        output.send("#{key}=", value)
      end

      record.delete(payload_key) if fields.empty?

      entry.send("#{field_name}=", output)
    rescue StandardError => e
      @log.error "Failed to set log entry field for #{field_name}.", e
    end
  end
end
set_payload(resource_type, record, entry, is_json) click to toggle source

TODO(qingling128): Fix the inconsistent behavior of ‘message’, ‘log’ and ‘msg’ in the next major version 1.0.0.

# File lib/fluent/plugin/out_google_cloud.rb, line 1754
def set_payload(resource_type, record, entry, is_json)
  # Only one of {text_payload, json_payload} will be set.
  text_payload = nil
  json_payload = nil
  # Use JSON if we found valid JSON, or text payload in the following
  # cases:
  # 1. This is an unstructured Container log and the 'log' key is available
  # 2. The only remaining key is 'message'
  if is_json
    json_payload = record
  elsif GKE_CONSTANTS[:resource_type] == resource_type && record.key?('log')
    text_payload = record['log']
  elsif record.size == 1 && record.key?('message')
    text_payload = record['message']
  else
    json_payload = record
  end

  if json_payload
    entry.json_payload = if @use_grpc
                           struct_from_ruby(json_payload)
                         else
                           json_payload
                         end
  elsif text_payload
    text_payload = text_payload.to_s
    entry.text_payload = if @use_grpc
                           convert_to_utf8(text_payload)
                         else
                           text_payload
                         end
  end
end
set_regexp_patterns() click to toggle source

Set regexp patterns to parse tags and logs.

# File lib/fluent/plugin/out_google_cloud.rb, line 1107
def set_regexp_patterns
  @compiled_kubernetes_tag_regexp = Regexp.new(@kubernetes_tag_regexp) if
    @kubernetes_tag_regexp

  @compiled_http_latency_regexp =
    /^\s*(?<seconds>\d+)(?<decimal>\.\d+)?\s*s\s*$/
end
struct_from_ruby(hash) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1744
def struct_from_ruby(hash)
  ret = Google::Protobuf::Struct.new
  hash.each do |k, v|
    ret.fields[convert_to_utf8(k.to_s)] ||= value_from_ruby(v)
  end
  ret
end
time_or_nil(ts_secs, ts_nanos) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1347
def time_or_nil(ts_secs, ts_nanos)
  Time.at((Integer ts_secs), (Integer ts_nanos) / 1_000.0)
rescue ArgumentError, TypeError
  nil
end
value_from_ruby(value) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1708
def value_from_ruby(value)
  ret = Google::Protobuf::Value.new
  case value
  when NilClass
    ret.null_value = 0
  when Numeric
    ret.number_value = value
  when String
    ret.string_value = convert_to_utf8(value)
  when TrueClass
    ret.bool_value = true
  when FalseClass
    ret.bool_value = false
  when Google::Protobuf::Struct
    ret.struct_value = value
  when Hash
    ret.struct_value = struct_from_ruby(value)
  when Google::Protobuf::ListValue
    ret.list_value = value
  when Array
    ret.list_value = list_from_ruby(value)
  else
    @log.error "Unknown type: #{value.class}"
    raise Google::Protobuf::Error, "Unknown type: #{value.class}"
  end
  ret
end
write_html_response(data, conn, code, response) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 826
def write_html_response(data, conn, code, response)
  @log.info "#{conn.remote_host} - - " \
            "#{Time.now.strftime('%d/%b/%Y:%H:%M:%S %z')} " \
            "\"#{data.lines.first.strip}\" #{code} #{response.bytesize}"
  conn.write "HTTP/1.1 #{code}\r\n"
  conn.write "Content-Type: text/html\r\n"
  conn.write "Content-Length: #{response.bytesize}\r\n"
  conn.write "\r\n"
  conn.write response
end
write_request_via_grpc(entries:, log_name: '', resource: nil, labels: {}) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 889
def write_request_via_grpc(entries:,
                           log_name: '',
                           resource: nil,
                           labels: {})
  client = api_client
  entries_count = entries.length
  client.write_log_entries(
    entries: entries,
    log_name: log_name,
    # Leave resource nil if it's nil.
    resource: if resource
                Google::Api::MonitoredResource.new(
                  type: resource.type,
                  labels: resource.labels.to_h
                )
              end,
    labels: labels.map do |k, v|
      [k.encode('utf-8'), convert_to_utf8(v)]
    end.to_h,
    partial_success: true
  )
  increment_successful_requests_count
  increment_ingested_entries_count(entries_count)

  # Let the user explicitly know when the first call succeeded, to
  # aid with verification and troubleshooting.
  unless @successful_call
    @successful_call = true
    @log.info 'Successfully sent gRPC to Stackdriver Logging API.'
  end
rescue Google::Cloud::Error => e
  # GRPC::BadStatus is wrapped in error.cause.
  error = e.cause

  # See the mapping between HTTP status and gRPC status code at:
  # https://github.com/grpc/grpc/blob/master/src/core/lib/transport/status_conversion.cc
  case error
  # Server error, so retry via re-raising the error.
  when \
      # HTTP status 500 (Internal Server Error).
      GRPC::Internal,
      # HTTP status 501 (Not Implemented).
      GRPC::Unimplemented,
      # HTTP status 503 (Service Unavailable).
      GRPC::Unavailable,
      # HTTP status 504 (Gateway Timeout).
      GRPC::DeadlineExceeded
    increment_retried_entries_count(entries_count, error.code)
    @log.debug "Retrying #{entries_count} log message(s) later.",
               error: error.to_s, error_code: error.code.to_s
    raise error

  # Most client errors indicate a problem with the request itself and
  # should not be retried.
  when \
      # HTTP status 401 (Unauthorized).
      # These are usually solved via a `gcloud auth` call, or by modifying
      # the permissions on the Google Cloud project.
      GRPC::Unauthenticated,
      # HTTP status 404 (Not Found).
      GRPC::NotFound,
      # HTTP status 409 (Conflict).
      GRPC::Aborted,
      # HTTP status 412 (Precondition Failed).
      GRPC::FailedPrecondition,
      # HTTP status 429 (Too Many Requests).
      GRPC::ResourceExhausted,
      # HTTP status 499 (Client Closed Request).
      GRPC::Cancelled,
      # the remaining http codes in both 4xx and 5xx category.
      # It's debatable whether to retry or drop these log entries.
      # This decision is made to avoid retrying forever due to
      # client errors.
      GRPC::Unknown
    increment_failed_requests_count(error.code)
    increment_dropped_entries_count(entries_count, error.code)
    @log.warn "Dropping #{entries_count} log message(s)",
              error: error.to_s, error_code: error.code.to_s

  # As partial_success is enabled, valid entries should have been
  # written even if some other entries fail due to InvalidArgument or
  # PermissionDenied errors. Only invalid entries will be dropped.
  when \
      # HTTP status 400 (Bad Request).
      GRPC::InvalidArgument,
      # HTTP status 403 (Forbidden).
      GRPC::PermissionDenied
    error_details_map = construct_error_details_map_grpc(e)
    if error_details_map.empty?
      increment_failed_requests_count(error.code)
      increment_dropped_entries_count(entries_count, error.code)
      @log.warn "Dropping #{entries_count} log message(s)",
                error: error.to_s, error_code: error.code.to_s
    else
      error_details_map.each do |(error_code, error_message), indexes|
        partial_errors_count = indexes.length
        increment_dropped_entries_count(partial_errors_count,
                                        error_code)
        entries_count -= partial_errors_count
        @log.warn "Dropping #{partial_errors_count} log message(s)",
                  error: error_message, error_code: error_code.to_s
      end
      # Consider partially successful requests successful.
      increment_successful_requests_count
      increment_ingested_entries_count(entries_count)
    end

  else
    # Assume it's a problem with the request itself and don't retry.
    error_code = if error.respond_to?(:code)
                   error.code
                 else
                   GRPC::Core::StatusCodes::UNKNOWN
                 end
    increment_failed_requests_count(error_code)
    increment_dropped_entries_count(entries_count, error_code)
    @log.error "Unknown response code #{error_code} from the server," \
               " dropping #{entries_count} log message(s)",
               error: error.to_s, error_code: error_code.to_s
  end

# Got an unexpected error (not Google::Cloud::Error) from the
# google-cloud-logging lib.
rescue StandardError => e
  increment_failed_requests_count(GRPC::Core::StatusCodes::UNKNOWN)
  increment_dropped_entries_count(entries_count,
                                  GRPC::Core::StatusCodes::UNKNOWN)
  @log.error "Unexpected error type #{e.class.name} from the client" \
             " library, dropping #{entries_count} log message(s)",
             error: e.to_s
end
write_request_via_rest(entries:, log_name: '', resource: nil, labels: {}) click to toggle source
# File lib/fluent/plugin/out_google_cloud.rb, line 1021
def write_request_via_rest(entries:,
                           log_name: '',
                           resource: nil,
                           labels: {})
  client = api_client
  entries_count = entries.length
  client.write_entry_log_entries(
    Google::Apis::LoggingV2::WriteLogEntriesRequest.new(
      entries: entries,
      log_name: log_name,
      resource: resource,
      labels: labels,
      partial_success: true
    ),
    options: { api_format_version: '2' }
  )
  increment_successful_requests_count
  increment_ingested_entries_count(entries_count)

  # Let the user explicitly know when the first call succeeded, to aid
  # with verification and troubleshooting.
  unless @successful_call
    @successful_call = true
    @log.info 'Successfully sent to Stackdriver Logging API.'
  end
rescue Google::Apis::ServerError => e
  # 5xx server errors. Retry via re-raising the error.
  increment_retried_entries_count(entries_count, e.status_code)
  @log.debug "Retrying #{entries_count} log message(s) later.",
             error: e.to_s, error_code: e.status_code.to_s
  raise e
rescue Google::Apis::AuthorizationError => e
  # 401 authorization error.
  # These are usually solved via a `gcloud auth` call, or by modifying
  # the permissions on the Google Cloud project.
  increment_failed_requests_count(e.status_code)
  increment_dropped_entries_count(entries_count, e.status_code)
  @log.warn "Dropping #{entries_count} log message(s)",
            error: e.to_s, error_code: e.status_code.to_s
rescue Google::Apis::ClientError => e
  # 4xx client errors. Most client errors indicate a problem with the
  # request itself and should not be retried.
  error_details_map = construct_error_details_map(e)
  if error_details_map.empty?
    increment_failed_requests_count(e.status_code)
    increment_dropped_entries_count(entries_count, e.status_code)
    @log.warn "Dropping #{entries_count} log message(s)",
              error: e.to_s, error_code: e.status_code.to_s
  else
    error_details_map.each do |(error_code, error_message), indexes|
      partial_errors_count = indexes.length
      increment_dropped_entries_count(partial_errors_count, error_code)
      entries_count -= partial_errors_count
      @log.warn "Dropping #{partial_errors_count} log message(s)",
                error: error_message,
                error_code: "google.rpc.Code[#{error_code}]"
    end
    # Consider partially successful requests successful.
    increment_successful_requests_count
    increment_ingested_entries_count(entries_count)
  end
end