class Spark::Accumulator::Server

Attributes

host[R]
port[R]
server[R]

Public Class Methods

host() click to toggle source
# File lib/spark/accumulator.rb, line 211
def self.host
  start
  @instance.host
end
new() click to toggle source
# File lib/spark/accumulator.rb, line 221
def initialize
  @server = TCPServer.new(0)
  @host = @server.hostname
  @port = @server.port

  @threads = []
  handle_accept
end
port() click to toggle source
# File lib/spark/accumulator.rb, line 216
def self.port
  start
  @instance.port
end
start() click to toggle source
# File lib/spark/accumulator.rb, line 203
def self.start
  @instance ||= Spark::Accumulator::Server.new
end
stop() click to toggle source
# File lib/spark/accumulator.rb, line 207
def self.stop
  @instance && @instance.stop
end

Public Instance Methods

handle_accept() click to toggle source
# File lib/spark/accumulator.rb, line 236
def handle_accept
  @threads << Thread.new do
    loop {
      handle_connection(@server.accept)
    }
  end

end
handle_connection(socket) click to toggle source
# File lib/spark/accumulator.rb, line 245
def handle_connection(socket)
  @threads << Thread.new do
    until socket.closed?
      count = socket.read_int
      count.times do
        data = socket.read_data
        accum = Spark::Accumulator.instances[data[0]]
        if accum
          accum.add(data[1])
        else
          Spark.logger.warn("Accumulator with id #{data[0]} does not exist.")
        end
      end

      # http://stackoverflow.com/questions/28560133/ruby-server-java-scala-client-deadlock
      # socket.write_int(Spark::Constant::ACCUMULATOR_ACK)
    end

  end
end
stop() click to toggle source
# File lib/spark/accumulator.rb, line 230
def stop
  @threads.each(&:kill)
rescue
  nil
end