class Reader
Constants
- BLOCKER_PORT_START
- CTX
Public Class Methods
new(num_readers:, num_blocks:, rank:, file:, debug: false, addr: "tcp://localhost")
click to toggle source
# File lib/rpareia/reader.rb, line 10 def initialize(num_readers:, num_blocks:, rank:, file:, debug: false, addr: "tcp://localhost") @num_readers = num_readers @num_blocks = num_blocks @rank = rank @file = file @debug = debug @addr = addr end
Public Instance Methods
close_connections()
click to toggle source
# File lib/rpareia/reader.rb, line 56 def close_connections @connections.each do |c| c.send("EOF") c.close end CTX.destroy end
create_hash(input)
click to toggle source
# File lib/rpareia/reader.rb, line 70 def create_hash(input) input.join end
create_pair(input)
click to toggle source
# File lib/rpareia/reader.rb, line 78 def create_pair(input) line = input.split(',') [line.first, create_hash(line[1..-1])] end
create_socket(port)
click to toggle source
# File lib/rpareia/reader.rb, line 36 def create_socket(port) socket = CTX.socket(:DEALER) socket.verbose = @debug socket.connect("#{@addr}:#{port}") return socket end
read_database()
click to toggle source
# File lib/rpareia/reader.rb, line 43 def read_database i = -1 File.open(@file,'r').each_line do |line| i += 1 next if i % @num_readers != @rank pair = create_pair(line.strip) send_pair(pair) end close_connections puts "EOF" end
send_pair(pair)
click to toggle source
# File lib/rpareia/reader.rb, line 64 def send_pair(pair) block_id = which_block(pair.last) @connections[block_id].send(pair.join(',')) end
start()
click to toggle source
# File lib/rpareia/reader.rb, line 19 def start start_connections read_database end
start_connections()
click to toggle source
# File lib/rpareia/reader.rb, line 24 def start_connections @connections = [] 0.upto(@num_blocks - 1) do |i| port = BLOCKER_PORT_START + i Heartbeat.check(addr: "#{@addr}:#{2 * port}", ctx: CTX, type: 'client', debug: @debug) @connections << create_socket(port) end end
which_block(key)
click to toggle source
# File lib/rpareia/reader.rb, line 74 def which_block(key) key.bytes.first % @num_blocks end