class Floss::LogReplicator
Used by the leader to manage the replicated log.
Attributes
node[RW]
@return [Floss::Node]
Public Class Methods
new(node)
click to toggle source
# File lib/floss/log_replicator.rb, line 58 def initialize(node) @node = node # A helper for waiting on a certain index to be written to a peer. @write_waiters = IndexWaiter.new # Stores the index of the last log entry that a peer agrees with. @write_indices = {} # Keeps Celluloid::Timer instances that fire periodically for each peer to trigger replication. @pacemakers = {} initial_write_index = log.last_index peers.each do |peer| @write_indices[peer] = initial_write_index @pacemakers[peer] = after(broadcast_time) { replicate(peer) } end end
Public Instance Methods
append(entry)
click to toggle source
# File lib/floss/log_replicator.rb, line 78 def append(entry) pause index = log.append([entry]) quorum = Floss::CountDownLatch.new(cluster_quorum) peers.each { |peer| signal_on_write(peer, index, quorum) } resume quorum.wait # TODO: Ensure there's at least one write in the leader's current term. @commit_index = index end
construct_payload(index)
click to toggle source
Constructs payload for an AppendEntries RPC given a peer's write index. All entries after the given index will be included in the payload.
# File lib/floss/log_replicator.rb, line 124 def construct_payload(index) if index prev_index = index prev_term = log[prev_index].term entries = log.starting_with(index + 1) else prev_index = nil prev_term = nil entries = log.starting_with(0) end Hash[ leader_id: node.id, term: current_term, prev_log_index: prev_index, prev_log_term: prev_term, commit_index: @commit_index, entries: entries ] end
finalize()
click to toggle source
# File lib/floss/log_replicator.rb, line 145 def finalize pause end
pause()
click to toggle source
# File lib/floss/log_replicator.rb, line 96 def pause @pacemakers.values.each(&:cancel) end
replicate(peer)
click to toggle source
# File lib/floss/log_replicator.rb, line 104 def replicate(peer) write_index = @write_indices[peer] response = peer.append_entries(construct_payload(write_index)) if response[:success] # nil if the log is still empty, last replicated log index otherwise last_index = log.last_index @write_indices[peer] = last_index @write_waiters.signal(peer, last_index) else # Walk back in the peer's history. @write_indices[peer] = write_index > 0 ? write_index - 1 : nil if write_index end @pacemakers[peer].reset end
resume()
click to toggle source
# File lib/floss/log_replicator.rb, line 100 def resume @pacemakers.values.each(&:fire) end
signal_on_write(peer, index, condition)
click to toggle source
# File lib/floss/log_replicator.rb, line 92 def signal_on_write(peer, index, condition) @write_waiters.register(peer, index, condition) end