class NoSE::Backend::CassandraBackend

A backend which communicates with Cassandra via CQL

Public Class Methods

new(model, indexes, plans, update_plans, config) click to toggle source
Calls superclass method NoSE::Backend::Backend::new
# File lib/nose/backend/cassandra.rb, line 12
def initialize(model, indexes, plans, update_plans, config)
  super

  @hosts = config[:hosts]
  @port = config[:port]
  @keyspace = config[:keyspace]
  @generator = Cassandra::Uuid::Generator.new
end

Public Instance Methods

drop_index(index) click to toggle source

Check if a given index exists in the target database

# File lib/nose/backend/cassandra.rb, line 85
def drop_index(index)
  client.execute "DROP TABLE \"#{index.key}\""
end
generate_id() click to toggle source

Generate a random UUID

# File lib/nose/backend/cassandra.rb, line 22
def generate_id
  @generator.uuid
end
index_empty?(index) click to toggle source

Check if the given index is empty

# File lib/nose/backend/cassandra.rb, line 73
def index_empty?(index)
  query = "SELECT COUNT(*) FROM \"#{index.key}\" LIMIT 1"
  client.execute(query).first.values.first.zero?
end
index_exists?(index) click to toggle source

Check if a given index exists in the target database

# File lib/nose/backend/cassandra.rb, line 79
def index_exists?(index)
  client
  @cluster.keyspace(@keyspace).has_table? index.key
end
index_insert_chunk(index, chunk) click to toggle source

Insert a chunk of rows into an index @return [Array<Array<Cassandra::Uuid>>]

# File lib/nose/backend/cassandra.rb, line 51
def index_insert_chunk(index, chunk)
  fields = index.all_fields.to_a
  prepared = "INSERT INTO \"#{index.key}\" (" \
             "#{field_names fields}" \
             ") VALUES (#{(['?'] * fields.length).join ', '})"
  prepared = client.prepare prepared

  ids = []
  client.execute(client.batch do |batch|
    chunk.each do |row|
      index_row = index_row(row, fields)
      ids << (index.hash_fields.to_a + index.order_fields).map do |field|
        index_row[fields.index field]
      end
      batch.add prepared, arguments: index_row
    end
  end)

  ids
end
index_sample(index, count) click to toggle source

Sample a number of values from the given index

# File lib/nose/backend/cassandra.rb, line 90
def index_sample(index, count)
  field_list = index.all_fields.map { |f| "\"#{f.id}\"" }
  query = "SELECT #{field_list.join ', '} " \
          "FROM \"#{index.key}\" LIMIT #{count}"
  rows = client.execute(query).rows

  # XXX Ignore null values for now
  # fail if rows.any? { |row| row.values.any?(&:nil?) }

  rows
end
indexes_ddl(execute = false, skip_existing = false, drop_existing = false) click to toggle source

Produce the DDL necessary for column families for the given indexes and optionally execute them against the server

# File lib/nose/backend/cassandra.rb, line 28
def indexes_ddl(execute = false, skip_existing = false,
                drop_existing = false)
  Enumerator.new do |enum|
    @indexes.map do |index|
      ddl = index_cql index
      enum.yield ddl

      begin
        drop_index(index) if drop_existing && index_exists?(index)
        client.execute(ddl) if execute
      rescue Cassandra::Errors::AlreadyExistsError => exc
        next if skip_existing

        new_exc = IndexAlreadyExists.new exc.message
        new_exc.set_backtrace exc.backtrace
        raise new_exc
      end
    end
  end
end

Private Instance Methods

cassandra_type(field_class) click to toggle source

Return the datatype to use in Cassandra for a given field @return [Symbol]

# File lib/nose/backend/cassandra.rb, line 160
def cassandra_type(field_class)
  case [field_class]
  when [Fields::IntegerField]
    :int
  when [Fields::FloatField]
    :float
  when [Fields::StringField]
    :text
  when [Fields::DateField]
    :timestamp
  when [Fields::IDField],
       [Fields::ForeignKeyField]
    :uuid
  end
end
client() click to toggle source

Get a Cassandra client, connecting if not done already

# File lib/nose/backend/cassandra.rb, line 151
def client
  return @client unless @client.nil?
  @cluster = Cassandra.cluster hosts: @hosts, port: @port,
                               timeout: nil
  @client = @cluster.connect @keyspace
end
field_names(fields, types = false) click to toggle source

Get a comma-separated list of field names with optional types @return [String]

# File lib/nose/backend/cassandra.rb, line 142
def field_names(fields, types = false)
  fields.map do |field|
    name = "\"#{field.id}\""
    name += ' ' + cassandra_type(field.class).to_s if types
    name
  end.join ', '
end
index_cql(index) click to toggle source

Produce the CQL to create the definition for a given index @return [String]

# File lib/nose/backend/cassandra.rb, line 128
def index_cql(index)
  ddl = "CREATE COLUMNFAMILY \"#{index.key}\" (" \
    "#{field_names index.all_fields, true}, " \
    "PRIMARY KEY((#{field_names index.hash_fields})"

  cluster_key = index.order_fields
  ddl += ", #{field_names cluster_key}" unless cluster_key.empty?
  ddl += '));'

  ddl
end
index_row(row, fields) click to toggle source

Produce an array of fields in the correct order for a CQL insert @return [Array]

# File lib/nose/backend/cassandra.rb, line 106
def index_row(row, fields)
  fields.map do |field|
    value = row[field.id]
    if field.is_a?(Fields::IDField)
      value = case value
              when Numeric
                Cassandra::Uuid.new value.to_i
              when String
                Cassandra::Uuid.new value
              when nil
                Cassandra::Uuid::Generator.new.uuid
              else
                value
              end
    end

    value
  end
end