class Cassandra::Tasks::SeedRegistry

Public Class Methods

new(cluster_name) click to toggle source

Create a new SeedRegistry task

@param cluster_name [String] unique name (in Consul) for the Cassandra cluster

@return [SeedRegistry]

# File lib/cassandra/tasks/seedregistry.rb, line 13
def initialize cluster_name
  @cluster_name = cluster_name.to_s
  raise ArgumentError.new('cluster_name must not be empty') if @cluster_name.empty?
  @semaphore = nil
  @renew_thread = nil
  @nodetool_info_cache = nil
end

Public Instance Methods

can_seed?() click to toggle source

Return true if the Cassandra node is a valid seed, false otherwise

@return [Boolean]

# File lib/cassandra/tasks/seedregistry.rb, line 42
def can_seed?
  return false unless state == :normal

  results = (nodetool_info_cached || '').split("\n")
  results.map! { |line| line.strip }

  filter_results = lambda do |key|
    potential = results.select { |line| line.include? key }
    potential.map! { |line| line.split(':')[1] }
    potential.compact!
    potential.size == 1 && potential.first.strip == 'true'
  end

  return false unless filter_results.call('Gossip active')
  return false unless filter_results.call('Thrift active')
  return false unless filter_results.call('Native Transport active')

  true
end
data_center() click to toggle source

Return the data center the Cassandra node is in

The returned data center is reported by “nodetool info”.

@return [String, nil]

# File lib/cassandra/tasks/seedregistry.rb, line 84
def data_center
  results = (nodetool_info_cached || '').split("\n")
  results.map! { |line| line.strip }
  results.select! { |line| line.include?('Data Center') }
  results.map! { |line| line.split(':')[1] }
  results.compact!
  return nil if results.size != 1
  results.first.strip
end
rack() click to toggle source

Return the rack the Cassandra node is in

The returned rack is reported by “nodetool info”.

@return [String, nil]

# File lib/cassandra/tasks/seedregistry.rb, line 100
def rack
  results = (nodetool_info_cached || '').split("\n")
  results.map! { |line| line.strip }
  results.select! { |line| line.include?('Rack') }
  results.map! { |line| line.split(':')[1] }
  results.compact!
  return nil if results.size != 1
  results.first.strip
end
run!() click to toggle source

Get a lock in Consul registering the Cassandra node as a seed

# File lib/cassandra/tasks/seedregistry.rb, line 29
def run!
  @nodetool_info_cache = nil
  if can_seed?
    try_get_seed_lock
  else
    release_seed_lock
  end
end
schedule() click to toggle source

Schedule the seed registration process to run periodically

# File lib/cassandra/tasks/seedregistry.rb, line 23
def schedule
  [:interval, '10s']
end
state() click to toggle source

Return the state of the Cassandra node

The returned state is reported by “nodetool netstats”.

@return [state, nil]

# File lib/cassandra/tasks/seedregistry.rb, line 68
def state
  results = (nodetool_netstats || '').split("\n")
  results.map! { |line| line.strip }
  results.select! { |line| line.include? 'Mode:' }
  results.map! { |line| line.split(':')[1] }
  results.compact!
  return nil if results.size != 1
  results.first.strip.downcase.to_sym
end

Private Instance Methods

nodetool_info() click to toggle source

Run the “nodetool info” command and return the output

@return [String, nil] Output from the “nodetool info” command

# File lib/cassandra/tasks/seedregistry.rb, line 116
def nodetool_info
  @nodetool_info ||= DaemonRunner::ShellOut.new(command: 'nodetool info', timeout: 300)
  @nodetool_info.run!
  @nodetool_info.stdout
end
nodetool_info_cached() click to toggle source

Return cached output from “nodetool info” command

This will attempt to populate an empty cache by calling “nodetool info”.

@return [String, nil] Cached output from the “nodetool info” command

# File lib/cassandra/tasks/seedregistry.rb, line 128
def nodetool_info_cached
  @nodetool_info_cache ||= nodetool_info
end
nodetool_netstats() click to toggle source

Run the “nodetool netstats” command and return the output

@return [String, nil] Output from the “nodetool netstats” command

# File lib/cassandra/tasks/seedregistry.rb, line 136
def nodetool_netstats
  @nodetool_netstats ||= DaemonRunner::ShellOut.new(command: 'nodetool netstats', timeout: 300)
  @nodetool_netstats.run!
  @nodetool_netstats.stdout
end
release_seed_lock() click to toggle source

Release the lock in Consul for this node as a seed

# File lib/cassandra/tasks/seedregistry.rb, line 157
def release_seed_lock
  unless @renew_thread.nil?
    @renew_thread.kill
    @renew_thread = nil
  end

  unless @semaphore.nil?
    while @semaphore.locked?
      @semaphore.try_release
      sleep 0.1
    end
    @semaphore = nil
  end
end
try_get_seed_lock() click to toggle source

Try to get the lock in Consul for this node as a seed

# File lib/cassandra/tasks/seedregistry.rb, line 144
def try_get_seed_lock
  if @semaphore.nil?
    name = "#{@cluster_name}/#{data_center}-#{rack}"
    @semaphore = DaemonRunner::Semaphore.lock(name, 1)
  end

  if @renew_thread.nil?
    @renew_thread = @semaphore.renew
  end
end