class Floss::Node
Constants
- BROADCAST_TIME
Default broadcast time. @see
broadcast_time
- DEFAULT_OPTIONS
- ELECTION_TIMEOUT
Default election timeout. @see election_timeout
Attributes
@return [Floss::Log] The replicated log.
@return [Floss::RPC::Server]
Public Class Methods
@param [Hash] options @option options [String] :id A string identifying this node, often its RPC address. @option options [Array<String>] :peers Identifiers of all peers in the cluster. @option options [Module,Class] :rpc Namespace containing `Server` and `Client` classes.
# File lib/floss/node.rb, line 56 def initialize(options = {}, &handler) super @handler = handler @options = DEFAULT_OPTIONS.merge(options) @current_term = 0 @ready_latch = Floss::OneOffLatch.new @running = false async.run if @options[:run] end
Public Instance Methods
The interval between heartbeats (in seconds). See Section 5.7.
> The broadcast time must be an order of magnitude less than the election timeout so that leaders can reliably send > the heartbeat messages required to keep followers from starting elections.
@return [Float]
# File lib/floss/node.rb, line 121 def broadcast_time @options[:broadcast_time] || BROADCAST_TIME end
Returns the cluster's quorum. @return [Fixnum]
# File lib/floss/node.rb, line 105 def cluster_quorum (cluster_size / 2) + 1 end
Returns the number of nodes in the cluster. @return [Fixnum]
# File lib/floss/node.rb, line 111 def cluster_size peers.size + 1 end
# File lib/floss/node.rb, line 139 def enter_new_term(new_term = nil) @current_term = (new_term || @current_term + 1) @voted_for = nil end
# File lib/floss/node.rb, line 156 def execute(entry) if leader? entry = Floss::Log::Entry.new(entry, @current_term) # Replicate entry to all peers, then apply it. # TODO: Failure handling. @log_replicator.append(entry) @handler.call(entry.command) if @handler else raise "Cannot redirect command because leader is unknown." unless @leader_id leader = peers.find { |peer| peer.id == @leader_id } leader.execute(entry) end end
# File lib/floss/node.rb, line 177 def handle_rpc(command, payload) handler = :"handle_#{command}" if respond_to?(handler, true) send(handler, payload) else abort ArgumentError.new('Unknown command.') end end
Returns this node's id. @return [String]
# File lib/floss/node.rb, line 93 def id @options[:id] end
Returns peers in the cluster. @return [Array<Floss::Peer>]
# File lib/floss/node.rb, line 99 def peers @peers ||= @options[:peers].map { |peer| Floss::Peer.new(peer, rpc_client_class: rpc_client_class) } end
Randomized election timeout as defined in Section 5.2.
This timeout is used in multiple ways:
* If a follower does not receive any activity, it starts a new election. * As a candidate, if the election does not resolve within this time, it is restarted.
@return [Float]
# File lib/floss/node.rb, line 133 def random_timeout range = @options[:election_timeout] || ELECTION_TIMEOUT min, max = range.first, range.last min + rand(max - min) end
# File lib/floss/node.rb, line 87 def rpc_client_class @options[:rpc].const_get('Client') end
# File lib/floss/node.rb, line 83 def rpc_server_class @options[:rpc].const_get('Server') end
# File lib/floss/node.rb, line 68 def run raise 'Already running' if @running @running = true @log = @options[:log].new self.server = link(rpc_server_class.new(id, &method(:handle_rpc))) @election_timeout = after(random_timeout) { on_election_timeout } end
# File lib/floss/node.rb, line 171 def wait_for_quorum_commit(index) latch = Floss::CountDownLatch.new(cluster_quorum) peers.each { |peer| peer.signal_on_commit(index, latch) } latch.wait end
Blocks until the node is ready for executing commands.
# File lib/floss/node.rb, line 79 def wait_until_ready @ready_latch.wait end
Protected Instance Methods
# File lib/floss/node.rb, line 311 def collect_votes payload = { term: @current_term, last_log_term: log.last_term, last_log_index: log.last_index, candidate_id: id } peers.each do |peer| async.request_vote(peer, payload) end end
# File lib/floss/node.rb, line 363 def finalize @log_replicator.terminate if @log_replicator end
# File lib/floss/node.rb, line 232 def handle_append_entries(payload) info("[RPC] Received AppendEntries: #{payload}") # Marks the node as ready for accepting commands. @ready_latch.signal term = payload[:term] # Reject RPCs with a lesser term. if term < @current_term return {term: @current_term, success: false} end # Accept terms greater than the local one. if term > @current_term enter_new_term(term) end # Step down if another node sends a valid AppendEntries RPC. stop_log_replication if leader? transition(:follower) if candidate? || leader? # Remember the leader. @leader_id = payload[:leader_id] # A valid AppendEntries RPC resets the election timeout. @election_timeout.reset success = if payload[:entries].any? if log.validate(payload[:prev_log_index], payload[:prev_log_term]) log.append(payload[:entries]) true else false end else true end if payload[:commit_index] && @handler (@commit_index ? @commit_index + 1 : 0).upto(payload[:commit_index]) do |index| @handler.call(log[index].command) if @handler end end @commit_index = payload[:commit_index] unless success debug("[RPC] I did not accept AppendEntries: #{payload}") end return {term: @current_term, success: success} end
# File lib/floss/node.rb, line 189 def handle_execute(entry) raise 'Only the leader can accept commands.' unless leader? execute(entry) end
@param [Hash] request @option message [Fixnum] :term The candidate's term. @option message [String] :candidate_id The candidate requesting the vote. @option message [Fixnum] :last_log_index Index of the candidate's last log entry. @option message [Fixnum] :last_log_term Term of the candidate's last log entry.
@return [Hash] response @option response [Boolean] :vote_granted Whether the candidate's receives the vote. @option response [Fixnum] :term This node's current term.
# File lib/floss/node.rb, line 203 def handle_vote_request(request) info("[RPC] Received VoteRequest: #{request}") term = request[:term] candidate_id = request[:candidate_id] if term < @current_term return {term: @current_term, vote_granted: false} end if term > @current_term enter_new_term(term) stop_log_replication if leader? transition(:follower) if candidate? || leader? end valid_candidate = @voted_for.nil? || @voted_for == candidate_id log_complete = log.complete?(request[:last_log_term], request[:last_log_index]) vote_granted = (valid_candidate && log_complete) if vote_granted @voted_for = candidate_id @election_timeout.reset end return {term: @current_term, vote_granted: vote_granted} end
# File lib/floss/node.rb, line 286 def on_election_timeout if follower? transition(:candidate) end if candidate? enter_new_term transition(:candidate) end end
TODO: The candidate should retry the RPC if a peer doesn't answer.
# File lib/floss/node.rb, line 325 def request_vote(peer, payload) response = begin peer.request_vote(payload) rescue Floss::TimeoutError debug("A vote request to #{peer.id} timed out. Retrying.") retry end term = response[:term] # Ignore old responses. return if @current_term > term # Step down when a higher term is detected. # Accept votes from peers in the same term. # Ignore votes from peers with an older term. if @current_term < term enter_new_term(term) transition(:follower) return end @votes.signal if response[:vote_granted] end
@group Candidate methods
# File lib/floss/node.rb, line 299 def start_election @votes = Floss::CountDownLatch.new(cluster_quorum) collect_votes @votes.wait transition(:leader) # Marks the node as ready for accepting commands. @ready_latch.signal end
@group Leader methods
# File lib/floss/node.rb, line 353 def start_log_replication raise "A log replicator is already running." if @log_replicator @log_replicator = link Floss::LogReplicator.new(current_actor) end
# File lib/floss/node.rb, line 358 def stop_log_replication @log_replicator.terminate @log_replicator = nil end