class DaemonRunner::Session

Manage distributed locks with Consul

Attributes

session[R]
behavior[R]

Behavior when a session is invalidated, can be set to either release or delete

delay[R]

Period, in seconds, that a session's locks will be

id[R]

Consul session ID

name[R]

Session name

ttl[R]

Period, in seconds, after which session expires

Public Class Methods

lock(path) click to toggle source

Acquire a lock with the current session, or initialize a new session

@param path [String] A path in the Consul key-value space to lock @param lock_session [Session] The Session instance to lock the lock to @return [Boolean] `true` if the lock was acquired

# File lib/daemon_runner/session.rb, line 29
def lock(path)
  Diplomat::Lock.wait_to_acquire(path, session.id)
end
new(name, **options) click to toggle source

@param name [String] Session name @option options [Fixnum] ttl (15) Session TTL in seconds @option options [Fixnum] delay (15) Session release dealy in seconds @option options [String] behavior (release) Session release behavior

# File lib/daemon_runner/session.rb, line 62
def initialize(name, **options)
  logger.info('Initializing a Consul session')

  @name = name
  @ttl = options.fetch(:ttl, 15)
  @delay = options.fetch(:delay, 15)
  @behavior = options.fetch(:behavior, 'release')

  init
end
release(path) click to toggle source

Release a lock held by the current session

@param path [String] A path in the Consul key-value space to release @param lock_session [Session] The Session instance that the lock was acquired with

# File lib/daemon_runner/session.rb, line 38
def release(path)
  Diplomat::Lock.release(path, session.id)
end
start(name, **options) click to toggle source
# File lib/daemon_runner/session.rb, line 16
def start(name, **options)
  @session = Session.new(name, options).renew!
  raise CreateSessionError, 'Failed to create session' if @session == false
  @session.verify_session
  @session
end

Public Instance Methods

destroy!() click to toggle source

Stop the renew thread and destroy the session

# File lib/daemon_runner/session.rb, line 110
def destroy!
  @renew.kill if renew?
  Diplomat::Session.destroy(id)
end
renew!() click to toggle source

Create a thread to periodically renew the lock session

# File lib/daemon_runner/session.rb, line 82
def renew!
  return if renew?

  @renew = Thread.new do
    ## Wakeup every TTL/2 seconds and renew the session
    loop do
      sleep ttl / 2

      begin
        logger.debug(" - Renewing Consul session #{id}")
        Diplomat::Session.renew(id)

      rescue Faraday::ResourceNotFound
        logger.warn("Consul session #{id} has expired!")

        init
      rescue StandardError => e
        ## Keep the thread from exiting
        logger.error(e)
      end
    end
  end

  self
end
renew?() click to toggle source

Check if there is an active renew thread

@return [Boolean] `true` if the thread is alive

# File lib/daemon_runner/session.rb, line 76
def renew?
  @renew.is_a?(Thread) && @renew.alive?
end
verify_session(wait_time = 2) click to toggle source

Verify wheather the session exists after a period of time

# File lib/daemon_runner/session.rb, line 116
def verify_session(wait_time = 2)
  logger.info(" - Wait until Consul session #{id} exists")
  wait_time.times do
    exists = session_exist?
    raise CreateSessionError, 'Error creating session' unless exists
    sleep 1
  end
  logger.info(" - Found Consul session #{id}")
rescue CreateSessionError
  init
end

Private Instance Methods

init() click to toggle source

Initialize a session and store it's ID

# File lib/daemon_runner/session.rb, line 132
def init
  @id = Diplomat::Session.create(
    :Name => name,
    :TTL => "#{ttl}s",
    :LockDelay => "#{delay}s",
    :Behavior => behavior
  )
  logger.info(" - Initialized a Consul session #{id}")
end
session_exist?() click to toggle source

Does the session exist

# File lib/daemon_runner/session.rb, line 143
def session_exist?
  sessions = Diplomat::Session.list
  sessions.any? { |s| s['ID'] == id }
end