class Fluent::CassandraCqlOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_driver.rb, line 26
def configure(conf)
  super

  # perform validations
  raise ConfigError, "'Hosts' is required by Cassandra output (ex: localhost, 127.0.0.1, ec2-54-242-141-252.compute-1.amazonaws.com" if self.hosts.nil?
  raise ConfigError, "'Keyspace' is required by Cassandra output (ex: FluentdLoggers)" if self.keyspace.nil?
  raise ConfigError, "'ColumnFamily' is required by Cassandra output (ex: events)" if self.column_family.nil?
  raise ConfigError, "'Schema' is required by Cassandra output" if self.schema.nil?

  # convert schema from string to hash
  # NOTE: ok to use eval b/c this isn't this isn't a user
  #       supplied string
  self.schema = eval(self.schema)

  raise ConfigError, "'Schema' must contain at least one column" if self.schema.keys.length < 1

  # split hosts to array
  self.hosts = self.hosts.split(',')
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_cassandra_driver.rb, line 56
def format(tag, time, record)
  record.to_msgpack
end
session() click to toggle source
# File lib/fluent/plugin/out_cassandra_driver.rb, line 22
def session
  @session ||= get_session(self.hosts, self.keyspace)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_driver.rb, line 51
def shutdown
  super
  @session.close if @session
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_driver.rb, line 46
def start
  super
  session
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_cassandra_driver.rb, line 60
def write(chunk)
  chunk.msgpack_each { |record|
    $log.debug "Sending a new record to Cassandra: #{record.to_json}"

    values = build_insert_values(record)

    cql = "INSERT INTO #{self.column_family} (#{values.keys.join(',')}) VALUES (#{values.keys.map { |key| ":#{key}" }.join(',')}) USING TTL #{self.ttl}"

    $log.debug "CQL query: #{cql}"
    $log.debug "Running with arguments: #{values.to_json}"

    begin
      @session.execute(cql, arguments: values)
    rescue Exception => e
      $log.error "Cannot send record to Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}"

      raise e
    end
  }
end

Private Instance Methods

build_insert_values(record) click to toggle source
# File lib/fluent/plugin/out_cassandra_driver.rb, line 89
def build_insert_values(record)
  values = self.schema.map { |column_family_key, mapping|
    if mapping.class == Hash
      record_key, type = mapping.first
    else
      record_key, type = column_family_key, mapping
    end

    value = record[record_key.to_s]

    case type
      when :integer
        value = value.to_i
      when :timeuuid
        value = ::Cassandra::Uuid::Generator.new.at(Time.parse(value))
      when :time
        value = Time.parse(value)
      when :string
      else
        value = value.to_s
    end

    [column_family_key.to_s, value]
  }.to_h

  self.schema.each { |column_family_key, mapping|
    record_key = mapping.class == Hash ? mapping.first.first : column_family_key

    record.delete(record_key.to_s)
  } if self.pop_data_keys

  # if we have one more data in record and json column
  # then store all remaining data into that column
  values[self.json_column] = record.to_json if self.json_column and record.length > 0

  values
end
get_session(hosts, keyspace) click to toggle source
# File lib/fluent/plugin/out_cassandra_driver.rb, line 83
def get_session(hosts, keyspace)
  cluster = ::Cassandra.cluster(hosts: hosts)

  cluster.connect(keyspace)
end