class Reader

Constants

BLOCKER_PORT_START
CTX

Public Class Methods

new(num_readers:, num_blocks:, rank:, file:, debug: false, addr: "tcp://localhost") click to toggle source
# File lib/rpareia/reader.rb, line 10
def initialize(num_readers:, num_blocks:, rank:, file:, debug: false, addr: "tcp://localhost")
  @num_readers = num_readers
  @num_blocks = num_blocks
  @rank = rank
  @file = file
  @debug = debug
  @addr = addr
end

Public Instance Methods

close_connections() click to toggle source
# File lib/rpareia/reader.rb, line 56
def close_connections
  @connections.each do |c|
    c.send("EOF")
    c.close
  end
  CTX.destroy
end
create_hash(input) click to toggle source
# File lib/rpareia/reader.rb, line 70
def create_hash(input)
  input.join
end
create_pair(input) click to toggle source
# File lib/rpareia/reader.rb, line 78
def create_pair(input)
  line = input.split(',')
  [line.first, create_hash(line[1..-1])]
end
create_socket(port) click to toggle source
# File lib/rpareia/reader.rb, line 36
def create_socket(port)
  socket = CTX.socket(:DEALER)
  socket.verbose = @debug
  socket.connect("#{@addr}:#{port}")
  return socket
end
read_database() click to toggle source
# File lib/rpareia/reader.rb, line 43
def read_database
  i = -1
  File.open(@file,'r').each_line do |line|
    i += 1
    next if i % @num_readers != @rank

    pair = create_pair(line.strip)
    send_pair(pair)
  end
  close_connections
  puts "EOF"
end
send_pair(pair) click to toggle source
# File lib/rpareia/reader.rb, line 64
def send_pair(pair)
  block_id = which_block(pair.last)

  @connections[block_id].send(pair.join(','))
end
start() click to toggle source
# File lib/rpareia/reader.rb, line 19
def start
  start_connections
  read_database
end
start_connections() click to toggle source
# File lib/rpareia/reader.rb, line 24
def start_connections

  @connections = []

  0.upto(@num_blocks - 1) do |i|
    port = BLOCKER_PORT_START + i
    Heartbeat.check(addr: "#{@addr}:#{2 * port}", ctx: CTX, type: 'client', debug: @debug)

    @connections << create_socket(port)
  end
end
which_block(key) click to toggle source
# File lib/rpareia/reader.rb, line 74
def which_block(key)
  key.bytes.first % @num_blocks
end