module IOPromise::Dalli::AsyncServer

Constants

FORMAT
OPCODES
REQUEST

Public Class Methods

new(attribs, options = {}) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 29
def initialize(attribs, options = {})
  @async = options.delete(:iopromise_async) == true

  if @async
    @write_buffer = +""
    @read_buffer = +""
    async_reset

    @next_opaque_id = 0
    @pending_ops = {}

    @executor_pool = DalliExecutorPool.for(self)
  end

  super
end

Public Instance Methods

async?() click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 46
def async?
  @async
end
async_io_ready(readable, writable) click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 76
def async_io_ready(readable, writable)
  async_sock_write_nonblock if writable
  async_sock_read_nonblock if readable
end
async_reset() click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 66
def async_reset
  @write_buffer.clear
  @write_offset = 0

  @read_buffer.clear
  @read_offset = 0

  @executor_pool.close_socket if defined? @executor_pool
end
close() click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 50
def close
  if async?
    async_reset
  end

  super
end
connect() click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 58
def connect
  super

  if async?
    @executor_pool.connected_socket(@sock)
  end
end
execute_continue() click to toggle source

called by ExecutorPool to continue processing for this server

# File lib/iopromise/dalli/patch_dalli.rb, line 82
def execute_continue
  timeout = @options[:socket_timeout]
  @pending_ops.select! do |key, op|
    if op.timeout?
      op.reject(Timeout::Error.new)
      next false # this op is done
    end

    # let all pending operations know that they are seeing the
    # select loop. this starts the timer for the operation, because
    # it guarantees we're now working on it.
    # this is more accurate than starting the timer when we buffer
    # the write.
    op.in_select_loop

    remaining = op.timeout_remaining
    timeout = remaining if remaining < timeout

    true # keep
  end

  @executor_pool.select_timeout = timeout
  @executor_pool.set_interest(:r, !@pending_ops.empty?)
end

Private Instance Methods

add(key, value, ttl, options) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 152
def add(key, value, ttl, options)
  return super unless async?

  async_generic_write_op(:add, key, value, ttl, 0, options)
end
append(key, value) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 178
def append(key, value)
  return super unless async?

  async_append_prepend_op(:append, key, value)
end
async_append_prepend_op(op, key, value) click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 172
def async_append_prepend_op(op, key, value)
  promised_request(key) do |opaque|
    [REQUEST, OPCODES[op], key.bytesize, 0, 0, 0, value.bytesize + key.bytesize, opaque, 0, key, value].pack(FORMAT[op])
  end
end
async_buffered_write(data) click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 220
def async_buffered_write(data)
  @write_buffer << data
  async_sock_write_nonblock
end
async_decr_incr(opcode, key, count, ttl, default) click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 198
def async_decr_incr(opcode, key, count, ttl, default)
  expiry = default ? sanitize_ttl(ttl) : 0xFFFFFFFF
  default ||= 0
  (h, l) = split(count)
  (dh, dl) = split(default)
  promised_request(key) do |opaque|
    req = [REQUEST, OPCODES[opcode], key.bytesize, 20, 0, 0, key.bytesize + 20, opaque, 0, h, l, dh, dl, expiry, key].pack(FORMAT[opcode])
  end
end
async_generic_write_op(op, key, value, ttl, cas, options) click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 134
def async_generic_write_op(op, key, value, ttl, cas, options)
  value.then do |value|
    (value, flags) = serialize(key, value, options)
    ttl = sanitize_ttl(ttl)
    
    guard_max_value_with_raise(key, value)

    promised_request(key) do |opaque|
      [REQUEST, OPCODES[op], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, opaque, cas, flags, ttl, key, value].pack(FORMAT[op])
    end
  end
end
async_sock_read_nonblock() click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 263
def async_sock_read_nonblock
  read_available

  buf = @read_buffer
  pos = @read_offset

  while buf.bytesize - pos >= 24
    header = buf.byteslice(pos, 24)
    (magic, opcode, key_length, extra_length, data_type, status, body_length, opaque, cas) = header.unpack(FULL_HEADER)

    if buf.bytesize - pos >= 24 + body_length
      exists = (status != 1) # Key not found
      this_pos = pos

      # key = buf.byteslice(this_pos + 24 + extra_length, key_length)
      value = buf.byteslice(this_pos + 24 + extra_length + key_length, body_length - key_length - extra_length) if exists

      pos = pos + 24 + body_length

      promise = @pending_ops.delete(opaque)
      next if promise.nil?

      begin
        raise Dalli::DalliError, "Response error #{status}: #{Dalli::RESPONSE_CODES[status]}" unless status == 0 || status == 1 || status == 2 || status == 5
        
        final_value = nil
        if opcode == OPCODES[:incr] || opcode == OPCODES[:decr]
          final_value = value.unpack1("Q>")
        elsif exists
          flags = if extra_length >= 4
            buf.byteslice(this_pos + 24, 4).unpack1("N")
          else
            0
          end
          final_value = deserialize(value, flags)
        end

        response = ::IOPromise::Dalli::Response.new(
          key: promise.key,
          value: final_value,
          exists: exists,
          stored: !(status == 2 || status == 5), # Key exists or Item not stored
          cas: cas,
        )

        promise.fulfill(response)
      rescue => ex
        promise.reject(ex)
      end
    else
      # not enough data yet, wait for more
      break
    end
  end

  if pos == @read_buffer.length
    @read_buffer.clear
    @read_offset = 0
  else
    @read_offset = pos
  end

rescue SystemCallError, Timeout::Error, EOFError => e
  failure!(e)
end
async_sock_write_nonblock() click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 225
def async_sock_write_nonblock
  remaining = @write_buffer.byteslice(@write_offset, @write_buffer.length)
  begin
    bytes_written = @sock.write_nonblock(remaining, exception: false)
  rescue Errno::EINTR
    retry
  end

  return if bytes_written == :wait_writable
  
  @write_offset += bytes_written
  completed = (@write_offset == @write_buffer.length)
  if completed
    @write_buffer.clear
    @write_offset = 0
  end
  @executor_pool.set_interest(:w, !completed)
rescue SystemCallError, Timeout::Error => e
  failure!(e)
end
decr(key, count, ttl, default) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 208
def decr(key, count, ttl, default)
  return super unless async?

  async_decr_incr :decr, key, count, ttl, default
end
delete(key, cas) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 164
def delete(key, cas)
  return super unless async?

  promised_request(key) do |opaque|
    [REQUEST, OPCODES[:delete], key.bytesize, 0, 0, 0, key.bytesize, opaque, cas, key].pack(FORMAT[:delete])
  end
end
failure!(ex) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 329
def failure!(ex)
  if async?
    # all pending operations need to be rejected when a failure occurs
    @pending_ops.each do |op|
      op.reject(ex)
    end
    @pending_ops = {}
  end

  super
end
flush() click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 190
def flush
  return super unless async?

  promised_request(nil) do |opaque|
    [REQUEST, OPCODES[:flush], 0, 4, 0, 0, 4, opaque, 0, 0].pack(FORMAT[:flush])
  end
end
get(key, options = nil) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 126
def get(key, options = nil)
  return super unless async?

  promised_request(key) do |opaque|
    [REQUEST, OPCODES[:get], key.bytesize, 0, 0, 0, key.bytesize, opaque, 0, key].pack(FORMAT[:get])
  end
end
guard_max_value_with_raise(key, value) click to toggle source

this is guard_max_value from the master version, rather than using the yield block.

# File lib/iopromise/dalli/patch_dalli.rb, line 342
def guard_max_value_with_raise(key, value)
  return if value.bytesize <= @options[:value_max_bytes]
  
  message = "Value for #{key} over max size: #{@options[:value_max_bytes]} <= #{value.bytesize}"
  raise Dalli::ValueOverMaxSize, message
end
incr(key, count, ttl, default) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 214
def incr(key, count, ttl, default)
  return super unless async?
  
  async_decr_incr :incr, key, count, ttl, default
end
prepend(key, value) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 184
def prepend(key, value)
  return super unless async?

  async_append_prepend_op(:prepend, key, value)
end
promised_request(key, &block) click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 114
def promised_request(key, &block)
  promise = ::IOPromise::Dalli::DalliPromise.new(self, key)
  
  new_id = @next_opaque_id
  @pending_ops[new_id] = promise
  @next_opaque_id = (@next_opaque_id + 1) & 0xffff_ffff
  
  async_buffered_write(block.call(new_id))

  promise
end
read_available() click to toggle source
# File lib/iopromise/dalli/patch_dalli.rb, line 248
def read_available
  loop do
    result = @sock.read_nonblock(8196, exception: false)
    if result == :wait_readable
      break
    elsif result == :wait_writable
      break
    elsif result
      @read_buffer << result
    else
      raise Errno::ECONNRESET, "Connection reset: #{safe_options.inspect}"
    end
  end
end
replace(key, value, ttl, cas, options) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 158
def replace(key, value, ttl, cas, options)
  return super unless async?

  async_generic_write_op(:replace, key, value, ttl, cas, options)
end
set(key, value, ttl, cas, options) click to toggle source
Calls superclass method
# File lib/iopromise/dalli/patch_dalli.rb, line 147
def set(key, value, ttl, cas, options)
  return super unless async?
  async_generic_write_op(:set, key, value, ttl, cas, options)
end