class Google::Cloud::Spanner::Pool

@private

# Pool

Implements a pool for managing and reusing {Google::Cloud::Spanner::Session} instances.

Attributes

all_sessions[RW]
session_queue[RW]
transaction_queue[RW]

Public Class Methods

new(client, min: 10, max: 100, keepalive: 1800, write_ratio: 0.3, fail: true, threads: nil) click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 36
def initialize client, min: 10, max: 100, keepalive: 1800,
               write_ratio: 0.3, fail: true, threads: nil
  @client = client
  @min = min
  @max = max
  @keepalive = keepalive
  @write_ratio = write_ratio
  @write_ratio = 0 if write_ratio.negative?
  @write_ratio = 1 if write_ratio > 1
  @fail = fail
  @threads = threads || [2, Concurrent.processor_count * 2].max

  @mutex = Mutex.new
  @resource = ConditionVariable.new

  # initialize pool and availability queue
  init
end

Public Instance Methods

checkin_session(session) click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 91
def checkin_session session
  @mutex.synchronize do
    unless all_sessions.include? session
      raise ArgumentError, "Cannot checkin session"
    end

    session_queue.push session

    @resource.signal
  end

  nil
end
checkin_transaction(txn) click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 148
def checkin_transaction txn
  @mutex.synchronize do
    unless all_sessions.include? txn.session
      raise ArgumentError, "Cannot checkin session"
    end

    transaction_queue.push txn

    @resource.signal
  end

  nil
end
checkout_session() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 64
def checkout_session
  action = nil
  @mutex.synchronize do
    loop do
      raise ClientClosedError if @closed

      # Use LIFO to ensure sessions are used from backend caches, which
      # will reduce the read / write latencies on user requests.
      read_session = session_queue.pop # LIFO
      return read_session if read_session
      write_transaction = transaction_queue.pop # LIFO
      return write_transaction.session if write_transaction

      if can_allocate_more_sessions?
        action = :new
        break
      end

      raise SessionLimitError if @fail

      @resource.wait @mutex
    end
  end

  return new_session! if action == :new
end
checkout_transaction() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 118
def checkout_transaction
  action = nil
  @mutex.synchronize do
    loop do
      raise ClientClosedError if @closed

      write_transaction = transaction_queue.pop # LIFO
      return write_transaction if write_transaction
      read_session = session_queue.pop
      if read_session
        action = read_session
        break
      end

      if can_allocate_more_sessions?
        action = :new
        break
      end

      raise SessionLimitError if @fail

      @resource.wait @mutex
    end
  end
  if action.is_a? Google::Cloud::Spanner::Session
    return action.create_transaction
  end
  return new_transaction! if action == :new
end
close() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 173
def close
  shutdown
  @thread_pool.wait_for_termination

  true
end
keepalive_or_release!() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 180
def keepalive_or_release!
  to_keepalive = []
  to_release = []

  @mutex.synchronize do
    available_count = session_queue.count + transaction_queue.count
    release_count = @min - available_count
    release_count = 0 if release_count.negative?

    to_keepalive += (session_queue + transaction_queue).select do |x|
      x.idle_since? @keepalive
    end

    # Remove a random portion of the sessions and transactions
    to_release = to_keepalive.sample release_count
    to_keepalive -= to_release

    # Remove those to be released from circulation
    @all_sessions -= to_release.map(&:session)
    @session_queue -= to_release
    @transaction_queue -= to_release
  end

  to_release.each { |x| future { x.release! } }
  to_keepalive.each { |x| future { x.keepalive! } }
end
reset() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 162
def reset
  close
  init

  @mutex.synchronize do
    @closed = false
  end

  true
end
with_session() { |session| ... } click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 55
def with_session
  session = checkout_session
  begin
    yield session
  ensure
    checkin_session session
  end
end
with_transaction() { |tx| ... } click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 105
def with_transaction
  tx = checkout_transaction
  begin
    yield tx
  ensure
    future do
      # Create and checkin a new transaction
      tx = tx.session.create_transaction
      checkin_transaction tx
    end
  end
end

Private Instance Methods

can_allocate_more_sessions?() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 274
def can_allocate_more_sessions?
  # This is expected to be called from within a synchronize block
  all_sessions.size + @new_sessions_in_process < @max
end
create_keepalive_task!() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 279
def create_keepalive_task!
  @keepalive_task = Concurrent::TimerTask.new(execution_interval: 300,
                                              timeout_interval: 60) do
    keepalive_or_release!
  end
  @keepalive_task.execute
end
future(&block) click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 287
def future &block
  Concurrent::Future.new(executor: @thread_pool, &block).execute
end
init() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 209
def init
  # init the thread pool
  @thread_pool = Concurrent::ThreadPoolExecutor.new \
    max_threads: @threads
  # init the queues
  @new_sessions_in_process = 0
  @transaction_queue = []
  # init the keepalive task
  create_keepalive_task!
  # init session queue
  @all_sessions = @client.batch_create_new_sessions @min
  sessions = @all_sessions.dup
  num_transactions = (@min * @write_ratio).round
  pending_transactions = sessions.shift num_transactions
  # init transaction queue
  pending_transactions.each do |transaction|
    future { checkin_transaction transaction.create_transaction }
  end
  @session_queue = sessions
end
new_session!() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 248
def new_session!
  @mutex.synchronize do
    @new_sessions_in_process += 1
  end

  begin
    session = @client.create_new_session
  rescue StandardError => e
    @mutex.synchronize do
      @new_sessions_in_process -= 1
    end
    raise e
  end

  @mutex.synchronize do
    @new_sessions_in_process -= 1
    all_sessions << session
  end

  session
end
new_transaction!() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 270
def new_transaction!
  new_session!.create_transaction
end
shutdown() click to toggle source
# File lib/google/cloud/spanner/pool.rb, line 230
def shutdown
  @mutex.synchronize do
    @closed = true
  end
  @keepalive_task.shutdown
  # Unblock all waiting threads
  @resource.broadcast
  # Delete all sessions
  @mutex.synchronize do
    @all_sessions.each { |s| future { s.release! } }
    @all_sessions = []
    @session_queue = []
    @transaction_queue = []
  end
  # shutdown existing thread pool
  @thread_pool.shutdown
end