class Moneta::Server::Connection

@api private

Public Class Methods

new(io, store, max_size) click to toggle source
# File lib/moneta/server.rb, line 14
def initialize(io, store, max_size)
  @io = io
  @store = store
  @max_size = max_size
  @fiber = Fiber.new { run }
end

Public Instance Methods

resume(result = nil) click to toggle source
# File lib/moneta/server.rb, line 21
def resume(result = nil)
  @fiber.resume result
end

Private Instance Methods

dispatch(method, args) click to toggle source
# File lib/moneta/server.rb, line 41
def dispatch(method, args)
  case method
  when :key?, :load, :delete, :increment, :create, :features
    @store.public_send(method, *args)
  when :store, :clear
    @store.public_send(method, *args)
    nil
  when :each_key
    yield_each(@store.each_key)
    nil
  end
rescue => ex
  ex
end
pack(obj) click to toggle source
# File lib/moneta/server.rb, line 127
def pack(obj)
  s = Marshal.dump(obj)
  [s.bytesize].pack('N') << s
end
read(len) click to toggle source
# File lib/moneta/server.rb, line 68
def read(len)
  buffer = ''
  loop do
    begin
      case received = @io.recv_nonblock(len)
      when '', nil
        throw :closed, 'Closed during read'
      else
        buffer << received
        len -= received.bytesize
      end
    rescue IO::WaitReadable, IO::WaitWritable
      yield_to_reactor(:read)
    rescue Errno::ECONNRESET
      throw :closed, 'Closed during read'
    rescue IOError => ex
      if ex.message =~ /closed stream/
        throw :closed, 'Closed during read'
      else
        raise
      end
    end
    break if len == 0
  end
  buffer
end
read_msg() click to toggle source
# File lib/moneta/server.rb, line 62
def read_msg
  size = read(4).unpack1('N')
  throw :closed, 'Message too big' if size > @max_size
  Marshal.load(read(size))
end
run() click to toggle source

The return value of this function will be sent to the reactor.

@return [:closed,Exception]

# File lib/moneta/server.rb, line 30
def run
  catch :closed do
    loop { write_dispatch(read_msg) }
  end
  :closed
rescue => ex
  ex
ensure
  @io.close unless @io.closed?
end
sendmsg(msg) click to toggle source
# File lib/moneta/server.rb, line 112
def sendmsg(msg)
  @io.sendmsg_nonblock(msg)
end
write(obj) click to toggle source
# File lib/moneta/server.rb, line 95
def write(obj)
  buffer = pack(obj)
  until buffer.empty?
    begin
      len = sendmsg(buffer)
      buffer = buffer.byteslice(len...buffer.length)
    rescue IO::WaitWritable, Errno::EINTR
      yield_to_reactor(:write)
    end
  end
  nil
end
write_dispatch(msg) click to toggle source
# File lib/moneta/server.rb, line 56
def write_dispatch(msg)
  method, *args = msg
  result = dispatch(method, args)
  write(result)
end
yield_each(enumerator) click to toggle source
# File lib/moneta/server.rb, line 132
def yield_each(enumerator)
  received_break = false
  loop do
    case msg = read_msg
    when %w{NEXT}
      # This will raise a StopIteration at the end of the enumeration,
      # which will exit the loop.
      write(enumerator.next)
    when %w{BREAK}
      # This is received when the client wants to stop the enumeration.
      received_break = true
      break
    else
      # Otherwise, the client is attempting to call another method within
      # an `each` block.
      write_dispatch(msg)
    end
  end
ensure
  # This tells the client to stop enumerating
  write(StopIteration.new("Server initiated stop")) unless received_break
end
yield_to_reactor(mode = :read) click to toggle source
# File lib/moneta/server.rb, line 121
def yield_to_reactor(mode = :read)
  if Fiber.yield(mode) == :close
    throw :closed, 'Closed by reactor'
  end
end