class Spark::Accumulator::Server
Attributes
host[R]
port[R]
server[R]
Public Class Methods
host()
click to toggle source
# File lib/spark/accumulator.rb, line 211 def self.host start @instance.host end
new()
click to toggle source
# File lib/spark/accumulator.rb, line 221 def initialize @server = TCPServer.new(0) @host = @server.hostname @port = @server.port @threads = [] handle_accept end
port()
click to toggle source
# File lib/spark/accumulator.rb, line 216 def self.port start @instance.port end
start()
click to toggle source
# File lib/spark/accumulator.rb, line 203 def self.start @instance ||= Spark::Accumulator::Server.new end
stop()
click to toggle source
# File lib/spark/accumulator.rb, line 207 def self.stop @instance && @instance.stop end
Public Instance Methods
handle_accept()
click to toggle source
# File lib/spark/accumulator.rb, line 236 def handle_accept @threads << Thread.new do loop { handle_connection(@server.accept) } end end
handle_connection(socket)
click to toggle source
# File lib/spark/accumulator.rb, line 245 def handle_connection(socket) @threads << Thread.new do until socket.closed? count = socket.read_int count.times do data = socket.read_data accum = Spark::Accumulator.instances[data[0]] if accum accum.add(data[1]) else Spark.logger.warn("Accumulator with id #{data[0]} does not exist.") end end # http://stackoverflow.com/questions/28560133/ruby-server-java-scala-client-deadlock # socket.write_int(Spark::Constant::ACCUMULATOR_ACK) end end end
stop()
click to toggle source
# File lib/spark/accumulator.rb, line 230 def stop @threads.each(&:kill) rescue nil end