class Suo::Client::Base

Constants

BLANK_STR
DEFAULT_OPTIONS

Attributes

client[RW]
key[RW]
options[RW]
resources[RW]
retry_count[RW]

Public Class Methods

new(key, options = {}) click to toggle source
Calls superclass method
# File lib/suo/client/base.rb, line 18
def initialize(key, options = {})
  fail "Client required" unless options[:client]

  @options = DEFAULT_OPTIONS.merge(options)
  @retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
  @client = @options[:client]
  @resources = @options[:resources].to_i
  @key = key

  super() # initialize Monitor mixin for thread safety
end

Public Instance Methods

clear() click to toggle source
# File lib/suo/client/base.rb, line 88
def clear
  fail NotImplementedError
end
lock(custom_token = nil) { || ... } click to toggle source
# File lib/suo/client/base.rb, line 30
def lock(custom_token = nil)
  token = acquire_lock(custom_token)

  if block_given? && token
    begin
      yield
    ensure
      unlock(token)
    end
  else
    token
  end
end
locked?() click to toggle source
# File lib/suo/client/base.rb, line 44
def locked?
  locks.size >= resources
end
locks() click to toggle source
# File lib/suo/client/base.rb, line 48
def locks
  val, _ = get
  cleared_locks = deserialize_and_clear_locks(val)

  cleared_locks
end
refresh(token) click to toggle source
# File lib/suo/client/base.rb, line 55
def refresh(token)
  retry_with_timeout do
    val, cas = get

    cas = initial_set if val.nil?

    cleared_locks = deserialize_and_clear_locks(val)

    refresh_lock(cleared_locks, token)

    break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
  end
end
unlock(token) click to toggle source
# File lib/suo/client/base.rb, line 69
def unlock(token)
  return unless token

  retry_with_timeout do
    val, cas = get

    break if val.nil?

    cleared_locks = deserialize_and_clear_locks(val)

    acquisition_lock = remove_lock(cleared_locks, token)

    break unless acquisition_lock
    break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
  end
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
  # ignore - assume success due to optimistic locking
end

Private Instance Methods

acquire_lock(token = nil) click to toggle source
# File lib/suo/client/base.rb, line 96
def acquire_lock(token = nil)
  token ||= SecureRandom.base64(16)

  retry_with_timeout do
    val, cas = get

    cas = initial_set if val.nil?

    cleared_locks = deserialize_and_clear_locks(val)

    if cleared_locks.size < resources
      add_lock(cleared_locks, token)

      newval = serialize_locks(cleared_locks)

      return token if set(newval, cas)
    end
  end

  nil
end
add_lock(locks, token, time = Time.now.to_f) click to toggle source
# File lib/suo/client/base.rb, line 174
def add_lock(locks, token, time = Time.now.to_f)
  locks << [time, token]
end
clear_expired_locks(locks) click to toggle source
# File lib/suo/client/base.rb, line 169
def clear_expired_locks(locks)
  expired = Time.now - options[:stale_lock_expiration]
  locks.reject { |time, _| time < expired }
end
deserialize_and_clear_locks(val) click to toggle source
# File lib/suo/client/base.rb, line 155
def deserialize_and_clear_locks(val)
  clear_expired_locks(deserialize_locks(val))
end
deserialize_locks(val) click to toggle source
# File lib/suo/client/base.rb, line 159
def deserialize_locks(val)
  unpacked = (val.nil? || val == BLANK_STR) ? [] : MessagePack.unpack(val)

  unpacked.map do |time, token|
    [Time.at(time), token]
  end
rescue EOFError, MessagePack::MalformedFormatError => _
  []
end
get() click to toggle source
# File lib/suo/client/base.rb, line 118
def get
  fail NotImplementedError
end
initial_set(val = BLANK_STR) click to toggle source
# File lib/suo/client/base.rb, line 126
def initial_set(val = BLANK_STR) # rubocop:disable Lint/UnusedMethodArgument
  fail NotImplementedError
end
refresh_lock(locks, acquisition_token) click to toggle source
# File lib/suo/client/base.rb, line 183
def refresh_lock(locks, acquisition_token)
  remove_lock(locks, acquisition_token)
  add_lock(locks, acquisition_token)
end
remove_lock(locks, acquisition_token) click to toggle source
# File lib/suo/client/base.rb, line 178
def remove_lock(locks, acquisition_token)
  lock = locks.find { |_, token| token == acquisition_token }
  locks.delete(lock)
end
retry_with_timeout() { || ... } click to toggle source
# File lib/suo/client/base.rb, line 134
def retry_with_timeout
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  retry_count.times do
    elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
    break if elapsed >= options[:acquisition_timeout]

    synchronize do
      yield
    end

    sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000)
  end
rescue => _
  raise LockClientError
end
serialize_locks(locks) click to toggle source
# File lib/suo/client/base.rb, line 151
def serialize_locks(locks)
  MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
end
set(newval, cas) click to toggle source
# File lib/suo/client/base.rb, line 122
def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument
  fail NotImplementedError
end
synchronize() { || ... } click to toggle source
# File lib/suo/client/base.rb, line 130
def synchronize
  mon_synchronize { yield }
end