class Floss::Node

Constants

BROADCAST_TIME

Default broadcast time. @see broadcast_time

DEFAULT_OPTIONS
ELECTION_TIMEOUT

Default election timeout. @see election_timeout

Attributes

current_term[R]
log[R]

@return [Floss::Log] The replicated log.

server[RW]

@return [Floss::RPC::Server]

Public Class Methods

new(options = {}, &handler) click to toggle source

@param [Hash] options @option options [String] :id A string identifying this node, often its RPC address. @option options [Array<String>] :peers Identifiers of all peers in the cluster. @option options [Module,Class] :rpc Namespace containing `Server` and `Client` classes.

Calls superclass method
# File lib/floss/node.rb, line 56
def initialize(options = {}, &handler)
  super

  @handler = handler
  @options = DEFAULT_OPTIONS.merge(options)
  @current_term = 0
  @ready_latch = Floss::OneOffLatch.new
  @running = false

  async.run if @options[:run]
end

Public Instance Methods

broadcast_time() click to toggle source

The interval between heartbeats (in seconds). See Section 5.7.

> The broadcast time must be an order of magnitude less than the election timeout so that leaders can reliably send > the heartbeat messages required to keep followers from starting elections.

@return [Float]

# File lib/floss/node.rb, line 121
def broadcast_time
  @options[:broadcast_time] || BROADCAST_TIME
end
cluster_quorum() click to toggle source

Returns the cluster's quorum. @return [Fixnum]

# File lib/floss/node.rb, line 105
def cluster_quorum
  (cluster_size / 2) + 1
end
cluster_size() click to toggle source

Returns the number of nodes in the cluster. @return [Fixnum]

# File lib/floss/node.rb, line 111
def cluster_size
  peers.size + 1
end
enter_new_term(new_term = nil) click to toggle source
# File lib/floss/node.rb, line 139
def enter_new_term(new_term = nil)
  @current_term = (new_term || @current_term + 1)
  @voted_for = nil
end
execute(entry) click to toggle source
# File lib/floss/node.rb, line 156
def execute(entry)
  if leader?
    entry = Floss::Log::Entry.new(entry, @current_term)

    # Replicate entry to all peers, then apply it.
    # TODO: Failure handling.
    @log_replicator.append(entry)
    @handler.call(entry.command) if @handler
  else
    raise "Cannot redirect command because leader is unknown." unless @leader_id
    leader = peers.find { |peer| peer.id == @leader_id }
    leader.execute(entry)
  end
end
handle_rpc(command, payload) click to toggle source
# File lib/floss/node.rb, line 177
def handle_rpc(command, payload)
  handler = :"handle_#{command}"

  if respond_to?(handler, true)
    send(handler, payload)
  else
    abort ArgumentError.new('Unknown command.')
  end
end
id() click to toggle source

Returns this node's id. @return [String]

# File lib/floss/node.rb, line 93
def id
  @options[:id]
end
peers() click to toggle source

Returns peers in the cluster. @return [Array<Floss::Peer>]

# File lib/floss/node.rb, line 99
def peers
  @peers ||= @options[:peers].map { |peer| Floss::Peer.new(peer, rpc_client_class: rpc_client_class) }
end
random_timeout() click to toggle source

Randomized election timeout as defined in Section 5.2.

This timeout is used in multiple ways:

* If a follower does not receive any activity, it starts a new election.
* As a candidate, if the election does not resolve within this time, it is restarted.

@return [Float]

# File lib/floss/node.rb, line 133
def random_timeout
  range = @options[:election_timeout] || ELECTION_TIMEOUT
  min, max = range.first, range.last
  min + rand(max - min)
end
rpc_client_class() click to toggle source
# File lib/floss/node.rb, line 87
def rpc_client_class
  @options[:rpc].const_get('Client')
end
rpc_server_class() click to toggle source
# File lib/floss/node.rb, line 83
def rpc_server_class
  @options[:rpc].const_get('Server')
end
run() click to toggle source
# File lib/floss/node.rb, line 68
def run
  raise 'Already running' if @running

  @running = true
  @log = @options[:log].new

  self.server = link(rpc_server_class.new(id, &method(:handle_rpc)))
  @election_timeout = after(random_timeout) { on_election_timeout }
end
wait_for_quorum_commit(index) click to toggle source
# File lib/floss/node.rb, line 171
def wait_for_quorum_commit(index)
  latch = Floss::CountDownLatch.new(cluster_quorum)
  peers.each { |peer| peer.signal_on_commit(index, latch) }
  latch.wait
end
wait_until_ready() click to toggle source

Blocks until the node is ready for executing commands.

# File lib/floss/node.rb, line 79
def wait_until_ready
  @ready_latch.wait
end

Protected Instance Methods

collect_votes() click to toggle source
# File lib/floss/node.rb, line 311
def collect_votes
  payload = {
    term: @current_term,
    last_log_term: log.last_term,
    last_log_index: log.last_index,
    candidate_id: id
  }

  peers.each do |peer|
    async.request_vote(peer, payload)
  end
end
finalize() click to toggle source
# File lib/floss/node.rb, line 363
def finalize
  @log_replicator.terminate if @log_replicator
end
handle_append_entries(payload) click to toggle source
# File lib/floss/node.rb, line 232
def handle_append_entries(payload)
  info("[RPC] Received AppendEntries: #{payload}")

  # Marks the node as ready for accepting commands.
  @ready_latch.signal

  term = payload[:term]

  # Reject RPCs with a lesser term.
  if term < @current_term
    return {term: @current_term, success: false}
  end

  # Accept terms greater than the local one.
  if term > @current_term
    enter_new_term(term)
  end

  # Step down if another node sends a valid AppendEntries RPC.
  stop_log_replication if leader?
  transition(:follower) if candidate? || leader?

  # Remember the leader.
  @leader_id = payload[:leader_id]

  # A valid AppendEntries RPC resets the election timeout.
  @election_timeout.reset

  success = if payload[:entries].any?
    if log.validate(payload[:prev_log_index], payload[:prev_log_term])
      log.append(payload[:entries])
      true
    else
      false
    end
  else
    true
  end

  if payload[:commit_index] && @handler
    (@commit_index ? @commit_index + 1 : 0).upto(payload[:commit_index]) do |index|
      @handler.call(log[index].command) if @handler
    end
  end

  @commit_index = payload[:commit_index]

  unless success
    debug("[RPC] I did not accept AppendEntries: #{payload}")
  end

  return {term: @current_term, success: success}
end
handle_execute(entry) click to toggle source
# File lib/floss/node.rb, line 189
def handle_execute(entry)
  raise 'Only the leader can accept commands.' unless leader?
  execute(entry)
end
handle_vote_request(request) click to toggle source

@param [Hash] request @option message [Fixnum] :term The candidate's term. @option message [String] :candidate_id The candidate requesting the vote. @option message [Fixnum] :last_log_index Index of the candidate's last log entry. @option message [Fixnum] :last_log_term Term of the candidate's last log entry.

@return [Hash] response @option response [Boolean] :vote_granted Whether the candidate's receives the vote. @option response [Fixnum] :term This node's current term.

# File lib/floss/node.rb, line 203
def handle_vote_request(request)
  info("[RPC] Received VoteRequest: #{request}")

  term = request[:term]
  candidate_id = request[:candidate_id]

  if term < @current_term
    return {term: @current_term, vote_granted: false}
  end

  if term > @current_term
    enter_new_term(term)
    stop_log_replication if leader?
    transition(:follower) if candidate? || leader?
  end

  valid_candidate = @voted_for.nil? || @voted_for == candidate_id
  log_complete = log.complete?(request[:last_log_term], request[:last_log_index])

  vote_granted = (valid_candidate && log_complete)

  if vote_granted
    @voted_for = candidate_id
    @election_timeout.reset
  end

  return {term: @current_term, vote_granted: vote_granted}
end
on_election_timeout() click to toggle source
# File lib/floss/node.rb, line 286
def on_election_timeout
  if follower?
    transition(:candidate)
  end

  if candidate?
    enter_new_term
    transition(:candidate)
  end
end
request_vote(peer, payload) click to toggle source

TODO: The candidate should retry the RPC if a peer doesn't answer.

# File lib/floss/node.rb, line 325
def request_vote(peer, payload)
  response = begin
               peer.request_vote(payload)
             rescue Floss::TimeoutError
               debug("A vote request to #{peer.id} timed out. Retrying.")
               retry
             end

  term = response[:term]

  # Ignore old responses.
  return if @current_term > term

  # Step down when a higher term is detected.
  # Accept votes from peers in the same term.
  # Ignore votes from peers with an older term.
  if @current_term < term
    enter_new_term(term)
    transition(:follower)

    return
  end

  @votes.signal if response[:vote_granted]
end
start_election() click to toggle source

@group Candidate methods

# File lib/floss/node.rb, line 299
def start_election
  @votes = Floss::CountDownLatch.new(cluster_quorum)
  collect_votes

  @votes.wait

  transition(:leader)

  # Marks the node as ready for accepting commands.
  @ready_latch.signal
end
start_log_replication() click to toggle source

@group Leader methods

# File lib/floss/node.rb, line 353
def start_log_replication
  raise "A log replicator is already running." if @log_replicator
  @log_replicator = link Floss::LogReplicator.new(current_actor)
end
stop_log_replication() click to toggle source
# File lib/floss/node.rb, line 358
def stop_log_replication
  @log_replicator.terminate
  @log_replicator = nil
end