class InfluxDB2::FluxCsvParser

This class us used to construct FluxResult from CSV.

Constants

ANNOTATIONS
ANNOTATION_DATATYPE
ANNOTATION_DEFAULT
ANNOTATION_GROUP

Attributes

closed[R]
tables[R]

Public Class Methods

new(response, stream: false) click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 48
def initialize(response, stream: false)
  @response = response
  @stream = stream
  @tables = {}

  @table_index = 0
  @table_id = -1
  @start_new_table = false
  @table = nil
  @groups = []
  @parsing_state_error = false

  @closed = false
end

Public Instance Methods

each() { |record| ... } click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 92
def each
  return enum_for(:each) unless block_given?

  parse do |record|
    yield record
  end

  self
ensure
  _close_connection
end
parse() { |result| ... } click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 65
def parse
  @csv_file = CSV.new(@response.instance_of?(Net::HTTPOK) ? @response.body : @response)

  while (csv = @csv_file.shift)
    # Response has HTTP status ok, but response is error.
    next if csv.empty?

    if csv[1] == 'error' && csv[2] == 'reference'
      @parsing_state_error = true
      next
    end

    # Throw  InfluxException with error response
    if @parsing_state_error
      error = csv[1]
      reference_value = csv[2]
      raise FluxQueryError.new(error, reference_value.nil? || reference_value.empty? ? 0 : reference_value.to_i)
    end

    result = _parse_line(csv)

    yield result if @stream && result.instance_of?(InfluxDB2::FluxRecord)
  end

  self
end

Private Instance Methods

_add_column_names_and_tags(table, csv) click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 168
def _add_column_names_and_tags(table, csv)
  i = 1

  table.columns.each do |column|
    column.label = csv[i]
    i += 1
  end
end
_add_data_types(table, data_types) click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 143
def _add_data_types(table, data_types)
  (1..data_types.length - 1).each do |index|
    column_def = InfluxDB2::FluxColumn.new(index: index - 1, data_type: data_types[index])
    table.columns.push(column_def)
  end
end
_add_default_empty_values(table, default_values) click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 159
def _add_default_empty_values(table, default_values)
  i = 1

  table.columns.each do |column|
    column.default_value = default_values[i]
    i += 1
  end
end
_add_groups(table, csv) click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 150
def _add_groups(table, csv)
  i = 1

  table.columns.each do |column|
    column.group = csv[i] == 'true'
    i += 1
  end
end
_close_connection() click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 260
def _close_connection
  # Close CSV Parser
  @csv_file.close
  @closed = true
end
_parse_line(csv) click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 111
def _parse_line(csv)
  token = csv[0]

  # start new table
  if (ANNOTATIONS.include? token) && !@start_new_table
    # Return already parsed DataFrame
    @start_new_table = true
    @table = InfluxDB2::FluxTable.new
    @groups = []

    @tables[@table_index] = @table unless @stream

    @table_index += 1
    @table_id = -1
  elsif @table.nil?
    raise FluxCsvParserError, 'Unable to parse CSV response. FluxTable definition was not found.'
  end

  #  # datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
  if token == ANNOTATION_DATATYPE
    _add_data_types(@table, csv)

  elsif token == ANNOTATION_GROUP
    @groups = csv

  elsif token == ANNOTATION_DEFAULT
    _add_default_empty_values(@table, csv)
  else
    _parse_values(csv)
  end
end
_parse_record(table_index, table, csv) click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 212
def _parse_record(table_index, table, csv)
  record = InfluxDB2::FluxRecord.new(table_index)

  table.columns.each do |flux_column|
    column_name = flux_column.label
    str_val = csv[flux_column.index + 1]
    record.values[column_name] = _to_value(str_val, flux_column)
  end

  record
end
_parse_values(csv) click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 177
def _parse_values(csv)
  # parse column names
  if @start_new_table
    _add_groups(@table, @groups)
    _add_column_names_and_tags(@table, csv)
    @start_new_table = false
    return
  end

  current_id = csv[2].to_i
  @table_id = current_id if @table_id == -1

  if @table_id != current_id
    # create new table with previous column headers settings
    @flux_columns = @table.columns
    @table = InfluxDB2::FluxTable.new

    @flux_columns.each do |column|
      @table.columns.push(column)
    end

    @tables[@table_index] = @table unless @stream
    @table_index += 1
    @table_id = current_id
  end

  flux_record = _parse_record(@table_index - 1, @table, csv)

  if @stream
    flux_record
  else
    @tables[@table_index - 1].records.push(flux_record)
  end
end
_to_value(str_val, column) click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 224
def _to_value(str_val, column)
  if str_val.nil? || str_val.empty?
    default_value = column.default_value

    return nil if default_value.nil? || default_value.empty?

    _to_value(default_value, column)
  end

  case column.data_type
  when 'boolean'
    if str_val.nil? || str_val.empty?
      true
    else
      str_val.casecmp('true').zero?
    end
  when 'unsignedLong', 'long', 'duration'
    str_val.to_i
  when 'double'
    case str_val
    when '+Inf'
      Float::INFINITY
    when '-Inf'
      -Float::INFINITY
    else
      str_val.to_f
    end
  when 'base64Binary'
    Base64.decode64(str_val)
  when 'dateTime:RFC3339', 'dateTime:RFC3339Nano'
    Time.parse(str_val).to_datetime.rfc3339(9)
  else
    str_val
  end
end