class Raft::Goliath::HttpJsonRpcProvider

Attributes

uri_generator[R]

Public Class Methods

new(uri_generator) click to toggle source
# File lib/raft/goliath.rb, line 107
def initialize(uri_generator)
  @uri_generator = uri_generator
end

Public Instance Methods

append_entries(request, cluster, &block) click to toggle source
# File lib/raft/goliath.rb, line 139
def append_entries(request, cluster, &block)
  deferred_calls = []
  EM.synchrony do
    cluster.node_ids.each do |node_id|
      next if node_id == request.leader_id
      deferred_calls << create_append_entries_to_follower_request(request, node_id, &block)
    end
  end
  deferred_calls.each do |http|
    EM::Synchrony.sync http
  end
end
append_entries_to_follower(request, node_id, &block) click to toggle source
# File lib/raft/goliath.rb, line 152
      def append_entries_to_follower(request, node_id, &block)
#        EM.synchrony do
          create_append_entries_to_follower_request(request, node_id, &block)
#        end
      end
command(request, node_id) click to toggle source
# File lib/raft/goliath.rb, line 181
def command(request, node_id)
  sent_hash = HashMarshalling.object_to_hash(request, %w(command))
  sent_json = MultiJson.dump(sent_hash)
  http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'command')).apost(
      :body => sent_json,
      :head => { 'Content-Type' => 'application/json' })
  http = EM::Synchrony.sync(http)
  if http.response_header.status == 200
    received_hash = MultiJson.load(http.response)
    HashMarshalling.hash_to_object(received_hash, Raft::CommandResponse)
  else
    Raft::Goliath.log("command failed for node '#{node_id}' with code #{http.response_header.status}")
    CommandResponse.new(false)
  end
end
create_append_entries_to_follower_request(request, node_id) { |node_id, response| ... } click to toggle source
# File lib/raft/goliath.rb, line 158
def create_append_entries_to_follower_request(request, node_id, &block)
  sent_hash = HashMarshalling.object_to_hash(request, %w(term leader_id prev_log_index prev_log_term entries commit_index))
  sent_hash['entries'] = sent_hash['entries'].map {|obj| HashMarshalling.object_to_hash(obj, %w(term index command))}
  sent_json = MultiJson.dump(sent_hash)
  raise "replicating to self!" if request.leader_id == node_id
  #STDOUT.write("\nleader #{request.leader_id} replicating entries to #{node_id}: #{sent_hash.pretty_inspect}\n")#"\t#{caller[0..4].join("\n\t")}")

  http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'append_entries')).apost(
      :body => sent_json,
      :head => { 'Content-Type' => 'application/json' })
  http.callback do
    #STDOUT.write("\nleader #{request.leader_id} calling back to #{node_id} to append entries\n")
    if http.response_header.status == 200
      received_hash = MultiJson.load(http.response)
      response = HashMarshalling.hash_to_object(received_hash, Raft::AppendEntriesResponse)
      yield node_id, response
    else
      Raft::Goliath.log("append_entries failed for node '#{node_id}' with code #{http.response_header.status}")
    end
  end
  http
end
request_votes(request, cluster) { |node_id, request, response| ... } click to toggle source
# File lib/raft/goliath.rb, line 111
def request_votes(request, cluster, &block)
  sent_hash = HashMarshalling.object_to_hash(request, %w(term candidate_id last_log_index last_log_term))
  sent_json = MultiJson.dump(sent_hash)
  deferred_calls = []
  EM.synchrony do
    cluster.node_ids.each do |node_id|
      next if node_id == request.candidate_id
      http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'request_vote')).apost(
          :body => sent_json,
          :head => { 'Content-Type' => 'application/json' })
      http.callback do
        if http.response_header.status == 200
          received_hash = MultiJson.load(http.response)
          response = HashMarshalling.hash_to_object(received_hash, Raft::RequestVoteResponse)
          #STDOUT.write("\n\t#{node_id} responded #{response.vote_granted} to #{request.candidate_id}\n\n")
          yield node_id, request, response
        else
          Raft::Goliath.log("request_vote failed for node '#{node_id}' with code #{http.response_header.status}")
        end
      end
      deferred_calls << http
    end
  end
  deferred_calls.each do |http|
    EM::Synchrony.sync http
  end
end