class InfluxDB2::FluxCsvParser
This class us used to construct FluxResult from CSV.
Constants
- ANNOTATIONS
- ANNOTATION_DATATYPE
- ANNOTATION_DEFAULT
- ANNOTATION_GROUP
Attributes
closed[R]
tables[R]
Public Class Methods
new(response, stream: false)
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 48 def initialize(response, stream: false) @response = response @stream = stream @tables = {} @table_index = 0 @table_id = -1 @start_new_table = false @table = nil @groups = [] @parsing_state_error = false @closed = false end
Public Instance Methods
each() { |record| ... }
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 92 def each return enum_for(:each) unless block_given? parse do |record| yield record end self ensure _close_connection end
parse() { |result| ... }
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 65 def parse @csv_file = CSV.new(@response.instance_of?(Net::HTTPOK) ? @response.body : @response) while (csv = @csv_file.shift) # Response has HTTP status ok, but response is error. next if csv.empty? if csv[1] == 'error' && csv[2] == 'reference' @parsing_state_error = true next end # Throw InfluxException with error response if @parsing_state_error error = csv[1] reference_value = csv[2] raise FluxQueryError.new(error, reference_value.nil? || reference_value.empty? ? 0 : reference_value.to_i) end result = _parse_line(csv) yield result if @stream && result.instance_of?(InfluxDB2::FluxRecord) end self end
Private Instance Methods
_add_data_types(table, data_types)
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 143 def _add_data_types(table, data_types) (1..data_types.length - 1).each do |index| column_def = InfluxDB2::FluxColumn.new(index: index - 1, data_type: data_types[index]) table.columns.push(column_def) end end
_add_default_empty_values(table, default_values)
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 159 def _add_default_empty_values(table, default_values) i = 1 table.columns.each do |column| column.default_value = default_values[i] i += 1 end end
_add_groups(table, csv)
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 150 def _add_groups(table, csv) i = 1 table.columns.each do |column| column.group = csv[i] == 'true' i += 1 end end
_close_connection()
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 260 def _close_connection # Close CSV Parser @csv_file.close @closed = true end
_parse_line(csv)
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 111 def _parse_line(csv) token = csv[0] # start new table if (ANNOTATIONS.include? token) && !@start_new_table # Return already parsed DataFrame @start_new_table = true @table = InfluxDB2::FluxTable.new @groups = [] @tables[@table_index] = @table unless @stream @table_index += 1 @table_id = -1 elsif @table.nil? raise FluxCsvParserError, 'Unable to parse CSV response. FluxTable definition was not found.' end # # datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string if token == ANNOTATION_DATATYPE _add_data_types(@table, csv) elsif token == ANNOTATION_GROUP @groups = csv elsif token == ANNOTATION_DEFAULT _add_default_empty_values(@table, csv) else _parse_values(csv) end end
_parse_record(table_index, table, csv)
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 212 def _parse_record(table_index, table, csv) record = InfluxDB2::FluxRecord.new(table_index) table.columns.each do |flux_column| column_name = flux_column.label str_val = csv[flux_column.index + 1] record.values[column_name] = _to_value(str_val, flux_column) end record end
_parse_values(csv)
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 177 def _parse_values(csv) # parse column names if @start_new_table _add_groups(@table, @groups) _add_column_names_and_tags(@table, csv) @start_new_table = false return end current_id = csv[2].to_i @table_id = current_id if @table_id == -1 if @table_id != current_id # create new table with previous column headers settings @flux_columns = @table.columns @table = InfluxDB2::FluxTable.new @flux_columns.each do |column| @table.columns.push(column) end @tables[@table_index] = @table unless @stream @table_index += 1 @table_id = current_id end flux_record = _parse_record(@table_index - 1, @table, csv) if @stream flux_record else @tables[@table_index - 1].records.push(flux_record) end end
_to_value(str_val, column)
click to toggle source
# File lib/influxdb2/client/flux_csv_parser.rb, line 224 def _to_value(str_val, column) if str_val.nil? || str_val.empty? default_value = column.default_value return nil if default_value.nil? || default_value.empty? _to_value(default_value, column) end case column.data_type when 'boolean' if str_val.nil? || str_val.empty? true else str_val.casecmp('true').zero? end when 'unsignedLong', 'long', 'duration' str_val.to_i when 'double' case str_val when '+Inf' Float::INFINITY when '-Inf' -Float::INFINITY else str_val.to_f end when 'base64Binary' Base64.decode64(str_val) when 'dateTime:RFC3339', 'dateTime:RFC3339Nano' Time.parse(str_val).to_datetime.rfc3339(9) else str_val end end