class Raft4r::RaftHandler
Constants
- ELECTION_TIMEOUT_MIN_MS
HEARTBEAT_TIMEOUT = 5 HEARTBEAT_TIMEOUT = 3 REELECT_TIMEOUT_MAX = 0.4
- HEARTBEAT_PER_TIMEOUT
Attributes
config[R]
node_id[R]
Public Class Methods
new(config, node_id)
click to toggle source
# File lib/raft4r.rb, line 21 def initialize config, node_id @config = config @node_id = node_id @node_config = @config[node_id] @last_heartbeat = 0 @cluster = {} # XXX debug only @election_timeout = (ELECTION_TIMEOUT_MIN_MS + rand(ELECTION_TIMEOUT_MIN_MS) ) / 1000.0 # persistent state @current_term = 0 @vote_for = nil @log = [] @log << LogEntry.new(@current_term, nil) # volatile states @commit_index = 0 @last_applied = 0 # leader volatile @next_index = [] @match_index = [] # vote state @get_votes = {} @current_leader = nil create_fsm end
Public Instance Methods
AppendEntries(req)
click to toggle source
on RPC
request
# File lib/raft4r.rb, line 209 def AppendEntries req #info "AppendEntries from #{req.node_id}" # check req return [@current_term, false] if req.arguments[0] < @current_term on_rpc_common req # heartbeat from new leader if req.arguments[0] == @current_term # if state is candidate on :discover_current_leader end reset_election_timer @current_leader = req.node_id return [@current_term, true] end
RequestVote(req)
click to toggle source
# File lib/raft4r.rb, line 225 def RequestVote req info "RequestVote from #{req.node_id}" return [@current_term, false] if req.arguments[0] < @current_term on_rpc_common req # or @vote_for == candidateId?? candidateId = req.arguments[1] if @vote_for.nil? || @vote_for == candidateId # if candidate is 'up-to-date' vote = false if @log.last.term < req.arguments[3] vote = true elsif @log.last.term == req.arguments[3] # longer log wins vote = req.arguments[2] >= @log.size end if vote info "Vote for #{candidateId}" @vote_for = candidateId reset_election_timer return [@current_term, true] else return [@current_term, false] end else # this node already voted return [@current_term, false] end end
on_init()
click to toggle source
# File lib/raft4r.rb, line 53 def on_init @config.each {|k,v| next if k == @node_id c = RaftCluster.new v, RPC::EMRPCClient.new(v['bind'], v['port'], @node_id) @cluster[k] = c } info "init: election_timeout #{@election_timeout}s" #p @cluster EM::PeriodicTimer.new(5) { print_state } # reset FSM reset end
Private Instance Methods
create_fsm()
click to toggle source
# File lib/raft4r.rb, line 68 def create_fsm init :follower state :follower do enter do info "become follower" reset_election_timer end trigger :election_timeout do info "election timout by follower" goto :candidate end trigger [:discover_higher_term, :discover_current_leader] do # do nothing end end state :candidate do enter do info "become candidate" reset_election_timer start_new_election @current_leader = nil end trigger :election_timeout do info "election timout by candidate" start_new_election end trigger [:discover_higher_term, :discover_current_leader] do goto :follower end trigger :get_majority do goto :leader end end state :leader do enter do info 'become leader' @current_leader = @node_id on_timer_heartbeat @heartbeat_timer = EM::PeriodicTimer.new(ELECTION_TIMEOUT_MIN_MS / 1000.0 / HEARTBEAT_PER_TIMEOUT) { on :heartbeat_timer } end trigger :election_timeout do # do nothing end trigger :heartbeat_timer do on_timer_heartbeat end trigger :discover_higher_term do goto :follower end leave do @heartbeat_timer.cancel end end end
info(str)
click to toggle source
# File lib/raft4r.rb, line 135 def info str LOGGER.info "#{@node_id}: #{str}" end
on_rpc_common(req)
click to toggle source
# File lib/raft4r.rb, line 200 def on_rpc_common req if req.arguments[0] > @current_term set_term req.arguments[0] on :discover_higher_term end end
on_timer_heartbeat()
click to toggle source
# File lib/raft4r.rb, line 189 def on_timer_heartbeat return unless current_state == :leader #print_state @cluster.each { |k,v| # TODO v.conn.AppendEntries @current_term, @node_id, 0, 0, nil, 0 } end
on_vote(node_id)
click to toggle source
# File lib/raft4r.rb, line 156 def on_vote node_id @get_votes[node_id] = true if @get_votes.size > @cluster.size / 2 info "get majority" on :get_majority end end
print_state()
click to toggle source
# File lib/raft4r.rb, line 139 def print_state info "State: #{current_state}, leader: #{@current_leader}, term: #{@current_term}" end
reset_election_timer()
click to toggle source
# File lib/raft4r.rb, line 151 def reset_election_timer @election_timer.cancel if @election_timer @election_timer = EM::PeriodicTimer.new(@election_timeout) { on :election_timeout } end
set_term(term)
click to toggle source
# File lib/raft4r.rb, line 143 def set_term term @current_term = term @vote_for = nil @get_votes = {} #reset_election_timer info "set term to #{@current_term}" end
start_new_election()
click to toggle source
# File lib/raft4r.rb, line 164 def start_new_election fail 'ILLEGAL STATE' unless current_state == :candidate info "start new election" # no leader... @current_leader = nil @get_votes = 0 # reset election timer set_term @current_term + 1 # random step back #timeout = (100 + rand(200)) / 1000.0 # vote for self @vote_for = @node_id on_vote @node_id @cluster.each {|k,v| # XXX what if get reply in the future? v.conn.RequestVote @current_term, @node_id, @log.size, @log.last.term do |req, resp| next unless current_state == :candidate info "Get vote from #{k}: #{resp.response[1]}" on_vote resp.node_id if resp.response[1] end } end