class DaemonRunner::Session
Manage distributed locks with Consul
Attributes
Behavior when a session is invalidated, can be set to either release or delete
Period, in seconds, that a session's locks will be
Consul session ID
Session
name
Period, in seconds, after which session expires
Public Class Methods
Acquire a lock with the current session, or initialize a new session
@param path [String] A path in the Consul key-value space to lock @param lock_session [Session] The Session
instance to lock the lock to @return [Boolean] `true` if the lock was acquired
# File lib/daemon_runner/session.rb, line 29 def lock(path) Diplomat::Lock.wait_to_acquire(path, session.id) end
@param name [String] Session
name @option options [Fixnum] ttl (15) Session
TTL in seconds @option options [Fixnum] delay (15) Session
release dealy in seconds @option options [String] behavior (release) Session
release behavior
# File lib/daemon_runner/session.rb, line 62 def initialize(name, **options) logger.info('Initializing a Consul session') @name = name @ttl = options.fetch(:ttl, 15) @delay = options.fetch(:delay, 15) @behavior = options.fetch(:behavior, 'release') init end
Release a lock held by the current session
@param path [String] A path in the Consul key-value space to release @param lock_session [Session] The Session
instance that the lock was acquired with
# File lib/daemon_runner/session.rb, line 38 def release(path) Diplomat::Lock.release(path, session.id) end
# File lib/daemon_runner/session.rb, line 16 def start(name, **options) @session = Session.new(name, options).renew! raise CreateSessionError, 'Failed to create session' if @session == false @session.verify_session @session end
Public Instance Methods
Stop the renew thread and destroy the session
# File lib/daemon_runner/session.rb, line 110 def destroy! @renew.kill if renew? Diplomat::Session.destroy(id) end
Create a thread to periodically renew the lock session
# File lib/daemon_runner/session.rb, line 82 def renew! return if renew? @renew = Thread.new do ## Wakeup every TTL/2 seconds and renew the session loop do sleep ttl / 2 begin logger.debug(" - Renewing Consul session #{id}") Diplomat::Session.renew(id) rescue Faraday::ResourceNotFound logger.warn("Consul session #{id} has expired!") init rescue StandardError => e ## Keep the thread from exiting logger.error(e) end end end self end
Check if there is an active renew thread
@return [Boolean] `true` if the thread is alive
# File lib/daemon_runner/session.rb, line 76 def renew? @renew.is_a?(Thread) && @renew.alive? end
Verify wheather the session exists after a period of time
# File lib/daemon_runner/session.rb, line 116 def verify_session(wait_time = 2) logger.info(" - Wait until Consul session #{id} exists") wait_time.times do exists = session_exist? raise CreateSessionError, 'Error creating session' unless exists sleep 1 end logger.info(" - Found Consul session #{id}") rescue CreateSessionError init end
Private Instance Methods
Initialize a session and store it's ID
# File lib/daemon_runner/session.rb, line 132 def init @id = Diplomat::Session.create( :Name => name, :TTL => "#{ttl}s", :LockDelay => "#{delay}s", :Behavior => behavior ) logger.info(" - Initialized a Consul session #{id}") end
Does the session exist
# File lib/daemon_runner/session.rb, line 143 def session_exist? sessions = Diplomat::Session.list sessions.any? { |s| s['ID'] == id } end