class Floss::LogReplicator

Used by the leader to manage the replicated log.

Attributes

node[RW]

@return [Floss::Node]

Public Class Methods

new(node) click to toggle source
# File lib/floss/log_replicator.rb, line 58
def initialize(node)
  @node = node

  # A helper for waiting on a certain index to be written to a peer.
  @write_waiters = IndexWaiter.new

  # Stores the index of the last log entry that a peer agrees with.
  @write_indices = {}

  # Keeps Celluloid::Timer instances that fire periodically for each peer to trigger replication.
  @pacemakers = {}

  initial_write_index = log.last_index

  peers.each do |peer|
    @write_indices[peer] = initial_write_index
    @pacemakers[peer] = after(broadcast_time) { replicate(peer) }
  end
end

Public Instance Methods

append(entry) click to toggle source
# File lib/floss/log_replicator.rb, line 78
def append(entry)
  pause
  index = log.append([entry])

  quorum = Floss::CountDownLatch.new(cluster_quorum)
  peers.each { |peer| signal_on_write(peer, index, quorum) }

  resume
  quorum.wait

  # TODO: Ensure there's at least one write in the leader's current term.
  @commit_index = index
end
construct_payload(index) click to toggle source

Constructs payload for an AppendEntries RPC given a peer's write index. All entries after the given index will be included in the payload.

# File lib/floss/log_replicator.rb, line 124
def construct_payload(index)
  if index
    prev_index = index
    prev_term  = log[prev_index].term
    entries = log.starting_with(index + 1)
  else
    prev_index = nil
    prev_term = nil
    entries = log.starting_with(0)
  end

  Hash[
    leader_id: node.id,
    term: current_term,
    prev_log_index: prev_index,
    prev_log_term: prev_term,
    commit_index: @commit_index,
    entries: entries
  ]
end
finalize() click to toggle source
# File lib/floss/log_replicator.rb, line 145
def finalize
  pause
end
pause() click to toggle source
# File lib/floss/log_replicator.rb, line 96
def pause
  @pacemakers.values.each(&:cancel)
end
replicate(peer) click to toggle source
# File lib/floss/log_replicator.rb, line 104
def replicate(peer)
  write_index = @write_indices[peer]
  response = peer.append_entries(construct_payload(write_index))

  if response[:success]
    # nil if the log is still empty, last replicated log index otherwise
    last_index = log.last_index

    @write_indices[peer] = last_index
    @write_waiters.signal(peer, last_index)
  else
    # Walk back in the peer's history.
    @write_indices[peer] = write_index > 0 ? write_index - 1 : nil if write_index
  end

  @pacemakers[peer].reset
end
resume() click to toggle source
# File lib/floss/log_replicator.rb, line 100
def resume
  @pacemakers.values.each(&:fire)
end
signal_on_write(peer, index, condition) click to toggle source
# File lib/floss/log_replicator.rb, line 92
def signal_on_write(peer, index, condition)
  @write_waiters.register(peer, index, condition)
end