class GCSLock::Semaphore

Public Class Methods

new(bucket, object, count, client: nil, uuid: nil, min_backoff: nil, max_backoff: nil) click to toggle source
# File lib/gcslock/semaphore.rb, line 10
def initialize(bucket, object, count, client: nil, uuid: nil, min_backoff: nil, max_backoff: nil)
  @client = client || Google::Cloud::Storage.new
  @bucket = bucket
  @object = object
  @count = count

  @uuid = uuid || SecureRandom.uuid
  @min_backoff = min_backoff || 0.01
  @max_backoff = max_backoff || 5.0

  @permits = []
end

Public Instance Methods

acquire(permits: 1, timeout: nil, permits_to_check: nil) click to toggle source

Attempts to grab permits and waits if it isn't available.

@param permits [Integer] the number of permits to acquire @param timeout [Integer] the duration to wait before cancelling the operation

if the lock was not obtained (unlimited if _nil_).

@param permits_to_check [Integer] the number of permits to check for acquisition

until the required number of permits is secured for each iteration
(defaults to _nil_, all permits if _nil_)

@return [Boolean] `true` if the lock was obtained.

@raise [LockAlreadyOwnedError] if the permit is already owned by the current instance. @raise [LockTimeoutError] if the permits were not obtained before reaching the timeout.

# File lib/gcslock/semaphore.rb, line 36
def acquire(permits: 1, timeout: nil, permits_to_check: nil)
  begin
    Utils.backoff(min_backoff: @min_backoff, max_backoff: @max_backoff, timeout: timeout) do
      try_acquire(permits: permits, permits_to_check: permits_to_check)
    end
  rescue LockTimeoutError
    raise LockTimeoutError, "Unable to get semaphore permit for #{@object} before timeout"
  end
end
available_permits() click to toggle source

Returns the current number of permits available for this semaphore.

@return [Integer] The number of permits available

# File lib/gcslock/semaphore.rb, line 134
def available_permits
  mutexes = @count.times.map { |index| mutex_object(index: index) }
  mutexes.select! { |mutex| !mutex.locked? }

  mutexes.size
end
drain_permits() click to toggle source

Acquires and returns all permits that are immediately available.

@return [Integer] The number of permits acquired

# File lib/gcslock/semaphore.rb, line 122
def drain_permits
  mutexes = @count.times.map { |index| mutex_object(index: index) }
  mutexes.select! { |mutex| mutex.try_lock }

  @permits.push(*mutexes)

  mutexes.size
end
owned_permits() click to toggle source

Returns the current number of permits owned by this process for this semaphore.

@return [Integer] The number of permits owned by this process

# File lib/gcslock/semaphore.rb, line 144
def owned_permits
  @permits.select! { |mutex| mutex.owned? }
  @permits.size
end
release(permits: 1) click to toggle source

Releases the given number of permits.

@param permits [Integer] the number of permits to acquire

@return nil

@raise [LockNotOwnedError] if the permit is not owned by the current instance.

# File lib/gcslock/semaphore.rb, line 80
def release(permits: 1)
  permits.times do
    raise LockNotOwnedError, "No semaphore for #{@object} is owned by this process" unless @permits&.any?

    @permits.pop.unlock
  end

  nil
end
release_all() click to toggle source

Releases all of the owned permits.

@return nil

@raise [LockNotOwnedError] if the permit is not owned by the current instance.

# File lib/gcslock/semaphore.rb, line 95
def release_all
  while @permits&.any?
    @permits.pop.unlock
  end

  nil
end
release_all!() click to toggle source

Force releases all of the permits in the semaphore, even if not owned.

@return nil

# File lib/gcslock/semaphore.rb, line 106
def release_all!
  mutexes = @count.times.map { |index| mutex_object(index: index) }
  mutexes.each do |mut|
    mut.unlock!
  rescue LockNotFoundError
    nil
  end

  @permits = []

  nil
end
try_acquire(permits: 1, permits_to_check: nil) click to toggle source

Attempts to obtain a permit and returns immediately.

@param permits [Integer] the number of permits to acquire @param permits_to_check [Integer] the number of permits to check for acquisition

until the required number of permits is secured (defaults to _nil_, all permits if _nil_)

@return [Boolean] `true` if the requested number of permits was granted.

# File lib/gcslock/semaphore.rb, line 53
def try_acquire(permits: 1, permits_to_check: nil)
  acquired = []

  @count.times.to_a.sample(permits_to_check || @count).each do |index|
    mutex = mutex_object(index: index)
    if mutex.try_lock
      acquired.push(mutex)
      break if acquired.size == permits
    end
  end

  if acquired.size < permits
    acquired.each { |mutex| mutex.unlock }
    return false
  end

  @permits.push(*acquired)
  true
end

Private Instance Methods

mutex_object(index: nil) click to toggle source
# File lib/gcslock/semaphore.rb, line 151
def mutex_object(index: nil)
  GCSLock::Mutex.new(
    @bucket, "#{@object}.#{index.nil? ? rand(@count) : index}",
    client: @client,
    uuid: @uuid,
    min_backoff: @min_backoff,
    max_backoff: @max_backoff,
  )
end