class Pontoon::Node
Constants
- CANDIDATE_ROLE
- FOLLOWER_ROLE
- LEADER_ROLE
Attributes
cluster[R]
config[R]
election_timer[R]
id[R]
persistent_state[R]
role[R]
temporary_state[R]
Public Class Methods
new(id, config, cluster, commit_handler=nil, &block)
click to toggle source
# File lib/pontoon.rb, line 186 def initialize(id, config, cluster, commit_handler=nil, &block) @id = id @role = FOLLOWER_ROLE @config = config @cluster = cluster @persistent_state = PersistentState.new @temporary_state = TemporaryState.new(nil, nil) @election_timer = Timer.new(config.election_timeout, config.election_splay) @commit_handler = commit_handler || (block.to_proc if block_given?) end
Public Instance Methods
handle_append_entries(request)
click to toggle source
# File lib/pontoon.rb, line 409 def handle_append_entries(request) #STDOUT.write("\n\nnode #{@id} handle_append_entries: #{request.entries.pretty_inspect}\n\n") #if request.prev_log_index.nil? response = AppendEntriesResponse.new response.term = @persistent_state.current_term response.success = false #STDOUT.write("\n\nnode #{@id} handle_append_entries for term #{request.term} (current is #{@persistent_state.current_term})\n")# if request.prev_log_index.nil? return response if request.term < @persistent_state.current_term #STDOUT.write("\n\nnode #{@id} handle_append_entries stage 2\n") if request.prev_log_index.nil? step_down_if_new_term(request.term) reset_election_timeout @temporary_state.leader_id = request.leader_id abs_log_index = abs_log_index_for(request.prev_log_index, request.prev_log_term) return response if abs_log_index.nil? && !request.prev_log_index.nil? && !request.prev_log_term.nil? #STDOUT.write("\n\nnode #{@id} handle_append_entries stage 3\n") if request.prev_log_index.nil? if @temporary_state.commit_index && abs_log_index && abs_log_index < @temporary_state.commit_index raise "Cannot truncate committed logs; @temporary_state.commit_index = #{@temporary_state.commit_index}; abs_log_index = #{abs_log_index}" end truncate_and_update_log(abs_log_index, request.entries) return response unless update_commit_index(request.commit_index) #STDOUT.write("\n\nnode #{@id} handle_append_entries stage 4\n") if request.prev_log_index.nil? response.success = true response end
handle_command(request)
click to toggle source
# File lib/pontoon.rb, line 443 def handle_command(request) response = CommandResponse.new(false) case @role when FOLLOWER_ROLE await_leader if @role == LEADER_ROLE handle_command(request) else # forward the command to the leader response = @config.rpc_provider.command(request, @temporary_state.leader_id) end when CANDIDATE_ROLE await_leader response = handle_command(request) when LEADER_ROLE last_log = @persistent_state.log.last log_entry = LogEntry.new(@persistent_state.current_term, last_log.index ? last_log.index + 1 : 0, request.command) @persistent_state.log << log_entry await_consensus(log_entry) response = CommandResponse.new(true) end response end
handle_request_vote(request)
click to toggle source
# File lib/pontoon.rb, line 373 def handle_request_vote(request) #STDOUT.write("\nnode #{@id} handling vote request from #{request.candidate_id} (request.last_log_index: #{request.last_log_index}, vs #{@persistent_state.log.last.index}\n") response = RequestVoteResponse.new response.term = @persistent_state.current_term response.vote_granted = false return response if request.term < @persistent_state.current_term @temporary_state.leader_id = nil if request.term > @persistent_state.current_term step_down_if_new_term(request.term) if FOLLOWER_ROLE == @role if @persistent_state.voted_for == request.candidate_id response.vote_granted = true elsif @persistent_state.voted_for.nil? if @persistent_state.log.empty? # this node has no log so it can't be ahead @persistent_state.voted_for = request.candidate_id response.vote_granted = true elsif request.last_log_term == @persistent_state.log.last.term && (request.last_log_index || -1) < @persistent_state.log.last.index # candidate's log is incomplete compared to this node elsif (request.last_log_term || -1) < @persistent_state.log.last.term # candidate's log is incomplete compared to this node else @persistent_state.voted_for = request.candidate_id response.vote_granted = true end end reset_election_timeout if response.vote_granted end response end
update()
click to toggle source
# File lib/pontoon.rb, line 197 def update return if @updating @updating = true indent = "\t" * (@id.to_i % 3) #STDOUT.write("\n\n#{indent}update #{@id}, role #{@role}, log length #{@persistent_state.log.count}\n\n") case @role when FOLLOWER_ROLE follower_update when CANDIDATE_ROLE candidate_update when LEADER_ROLE leader_update end @updating = false end
Protected Instance Methods
abs_log_index_for(prev_log_index, prev_log_term)
click to toggle source
# File lib/pontoon.rb, line 505 def abs_log_index_for(prev_log_index, prev_log_term) @persistent_state.log.rindex { |log_entry| log_entry.index == prev_log_index && log_entry.term == prev_log_term } end
append_entries_to_follower(node_id, request, response)
click to toggle source
# File lib/pontoon.rb, line 335 def append_entries_to_follower(node_id, request, response) if @role != LEADER_ROLE # we lost the leadership elsif response.success #STDOUT.write("\nappend_entries_to_follower #{node_id} request #{request.pretty_inspect} succeeded\n") @leadership_state.followers[node_id].next_index = (request.prev_log_index || -1) + request.entries.count + 1 @leadership_state.followers[node_id].succeeded = true elsif response.term <= @persistent_state.current_term #STDOUT.write("\nappend_entries_to_follower #{node_id} request failed (#{request.pretty_inspect}) and responded with #{response.pretty_inspect}\n") @config.rpc_provider.append_entries_to_follower(request, node_id) do |node_id, response| if @role == LEADER_ROLE # make sure leadership wasn't lost since the request #STDOUT.write("\nappend_entries_to_follower #{node_id} callback...\n") prev_log_index = (request.prev_log_index.nil? || request.prev_log_index <= 0) ? nil : request.prev_log_index - 1 prev_log_term = nil entries = @persistent_state.log unless prev_log_index.nil? prev_log_term = @persistent_state.log[prev_log_index].term entries = @persistent_state.log.slice((prev_log_index + 1)..-1) end next_request = AppendEntriesRequest.new( @persistent_state.current_term, @id, prev_log_index, prev_log_term, entries, @temporary_state.commit_index) #STDOUT.write("\nappend_entries_to_follower #{node_id} request #{request.pretty_inspect} failed...\n") #STDOUT.write("sending updated request #{next_request.pretty_inspect}\n") @config.rpc_provider.append_entries_to_follower(next_request, node_id) do |node_id, response| append_entries_to_follower(node_id, next_request, response) end end end end end
await_consensus(log_entry)
click to toggle source
# File lib/pontoon.rb, line 467 def await_consensus(log_entry) @config.async_provider.await do persisted_log_entry = @persistent_state.log[log_entry.index] !@temporary_state.commit_index.nil? && @temporary_state.commit_index >= log_entry.index && persisted_log_entry.term == log_entry.term && persisted_log_entry.command == log_entry.command end end
await_leader()
click to toggle source
# File lib/pontoon.rb, line 479 def await_leader if @temporary_state.leader_id.nil? @role = CANDIDATE_ROLE end @config.async_provider.await do @role != CANDIDATE_ROLE && !@temporary_state.leader_id.nil? end end
candidate_update()
click to toggle source
# File lib/pontoon.rb, line 223 def candidate_update if @election_timer.timed_out? #STDOUT.write("candidate node #{@id} election timed out at #{Time.now.strftime('%H:%M:%S:%L')}\n") @persistent_state.current_term += 1 @persistent_state.voted_for = @id reset_election_timeout last_log_entry = @persistent_state.log.last log_index = last_log_entry ? last_log_entry.index : nil log_term = last_log_entry ? last_log_entry.term : nil request = RequestVoteRequest.new(@persistent_state.current_term, @id, log_index, log_term) votes_for = 1 # candidate always votes for self votes_against = 0 quorum = @cluster.quorum #STDOUT.write("\n\t\t#{@id} requests votes for term #{@persistent_state.current_term}\n\n") @config.rpc_provider.request_votes(request, @cluster) do |voter_id, request, response| #STDOUT.write("\n\t\t#{@id} receives vote #{response.vote_granted} from #{voter_id}\n\n") elected = nil # no majority result yet if request.term != @persistent_state.current_term # this is a response to an out-of-date request, just ignore it elsif response.term > @persistent_state.current_term @role = FOLLOWER_ROLE elected = false elsif response.vote_granted votes_for += 1 elected = true if votes_for >= quorum else votes_against += 1 elected = false if votes_against >= quorum end #STDOUT.write("\n\t\t#{@id} receives vote #{response.vote_granted}, elected is #{elected.inspect}\n\n") elected end if votes_for >= quorum #STDOUT.write("\n#{@id} becomes leader for term #{@persistent_state.current_term}\n\n") @role = LEADER_ROLE establish_leadership else #STDOUT.write("\n\t\t#{@id} not elected leader (for #{votes_for}, against #{votes_against})\n\n") end end end
establish_leadership()
click to toggle source
# File lib/pontoon.rb, line 301 def establish_leadership @leadership_state = LeadershipState.new(@config.update_interval) @temporary_state.leader_id = @id @cluster.node_ids.each do |node_id| next if node_id == @id follower_state = (@leadership_state.followers[node_id] ||= FollowerState.new) follower_state.next_index = @persistent_state.log.size follower_state.succeeded = false end send_heartbeats end
follower_update()
click to toggle source
# File lib/pontoon.rb, line 213 def follower_update if @election_timer.timed_out? #STDOUT.write("follower node #{@id} election timed out at #{Time.now.strftime('%H:%M:%S:%L')}\n") @role = CANDIDATE_ROLE candidate_update end end
handle_commits(new_commit_index)
click to toggle source
# File lib/pontoon.rb, line 287 def handle_commits(new_commit_index) #STDOUT.write("\nnode #{@id} handle_commits(new_commit_index = #{new_commit_index}) (@temporary_state.commit_index = #{@temporary_state.commit_index}\n") return if new_commit_index == @temporary_state.commit_index next_commit = @temporary_state.commit_index.nil? ? 0 : @temporary_state.commit_index + 1 while next_commit <= new_commit_index @commit_handler.call(@persistent_state.log[next_commit].command) if @commit_handler @temporary_state.commit_index = next_commit next_commit += 1 #STDOUT.write("\n\tnode #{@id} handle_commits(new_commit_index = #{new_commit_index}) (new @temporary_state.commit_index = #{@temporary_state.commit_index}\n") end end
leader_update()
click to toggle source
# File lib/pontoon.rb, line 267 def leader_update #STDOUT.write("\nLEADER UPDATE BEGINS\n") if @leadership_state.update_timer.timed_out? @leadership_state.update_timer.reset! send_heartbeats end if @leadership_state.followers.any? new_commit_index = @leadership_state.followers.values. select { |follower_state| follower_state.succeeded }. map { |follower_state| follower_state.next_index - 1 }. sort[@cluster.quorum - 1] else new_commit_index = @persistent_state.log.size - 1 end handle_commits(new_commit_index) #STDOUT.write("\nLEADER UPDATE ENDS\n") end
reset_election_timeout()
click to toggle source
# File lib/pontoon.rb, line 499 def reset_election_timeout @election_timer.reset! end
send_heartbeats()
click to toggle source
# File lib/pontoon.rb, line 315 def send_heartbeats #STDOUT.write("\nnode #{@id} sending heartbeats at #{Time.now.strftime('%H:%M:%S:%L')}\n") last_log_entry = @persistent_state.log.last log_index = last_log_entry ? last_log_entry.index : nil log_term = last_log_entry ? last_log_entry.term : nil request = AppendEntriesRequest.new( @persistent_state.current_term, @id, log_index, log_term, [], @temporary_state.commit_index) @config.rpc_provider.append_entries(request, @cluster) do |node_id, response| append_entries_to_follower(node_id, request, response) end end
step_down_if_new_term(request_term)
click to toggle source
# File lib/pontoon.rb, line 490 def step_down_if_new_term(request_term) if request_term > @persistent_state.current_term @persistent_state.current_term = request_term @role = FOLLOWER_ROLE end end
truncate_and_update_log(abs_log_index, entries)
click to toggle source
# File lib/pontoon.rb, line 511 def truncate_and_update_log(abs_log_index, entries) log = @persistent_state.log if abs_log_index.nil? log = [] elsif log.length == abs_log_index + 1 # no truncation required, past log is the same else log = log.slice(0..abs_log_index) end #STDOUT.write("\n\nentries is: #{entries.pretty_inspect}\n\n") log = log.concat(entries) unless entries.empty? @persistent_state.log = log end
update_commit_index(new_commit_index)
click to toggle source
# File lib/pontoon.rb, line 527 def update_commit_index(new_commit_index) #STDOUT.write("\n\n%%%%%%%%%%%%%%%%%%%%% node #{@id} update_commit_index(new_commit_index = #{new_commit_index})\n") return false if @temporary_state.commit_index && @temporary_state.commit_index > new_commit_index handle_commits(new_commit_index) true end