class Fluent::Plugin::CassandraJsonOutput

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_json.rb, line 89
def close
  super

  @cluster.close
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_json.rb, line 64
def configure(conf)
  super

  if @hosts.empty?
    raise Fluent::ConfigError, "`hosts` has at least one host"
  end

  @cluster_options = @cluster_options.map { |k, v| [k.to_sym, v] }.to_h
  @cluster_options.merge!(hosts: @hosts, port: @port)
  @cluster_options.merge!(username: @username) if @username
  @cluster_options.merge!(password: @password) if @password
  formatter_config = conf.elements("format")[0]
  @formatter = formatter_create(usage: 'out_cassandra_for_insert', type: 'json', conf: formatter_config)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_cassandra_json.rb, line 95
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, record)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_cassandra_json.rb, line 100
def multi_workers_ready?
  true
end
session() click to toggle source
# File lib/fluent/plugin/out_cassandra_json.rb, line 85
def session
  Thread.current[:session] ||= @cluster.connect(@keyspace)
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cassandra_json.rb, line 79
def start
  super

  @cluster = Cassandra.cluster(@cluster_options)
end
try_write(chunk) click to toggle source
# File lib/fluent/plugin/out_cassandra_json.rb, line 104
def try_write(chunk)
  if chunk.empty?
    commit_write(chunk.unique_id)
    return
  end
  keyspace = extract_placeholders(@keyspace, chunk.metadata)
  table = extract_placeholders(@table, chunk.metadata)

  futures = chunk.open do |io|
    io.map do |line|
      line.chomp!
      cql = "INSERT INTO #{keyspace}.#{table} JSON '#{line.gsub("'", "''")}'#{@default_unset ? " DEFAULT UNSET" : ""}"
      cql << " IF NOT EXISTS" if @if_not_exists
      cql << " USING TTL #{@ttl}" if @ttl && @ttl > 0
      @log.debug(cql)
      future = session.execute_async(cql, consistency: @consistency, idempotent: @idempotent)
      future.on_failure do |error|
        if @skip_invalid_rows
          @log.warn("failed to insert", record: line, error: error)
        else
          @log.error("failed to insert", record: line, error: error)
        end
      end
    end
  end
  combined = Cassandra::Future.all(futures)

  combined.on_complete do |value, error|
    if error.nil? || @skip_invalid_rows
      commit_write(chunk.unique_id)
    else
      rollback_write(chunk.unique_id)
    end
  end
end