class Delfos::Neo4j::Batch::Execution

Constants

BATCH_MUTEX

Attributes

batch[W]
commit_url[R]
current_transaction_url[R]
expires[R]
queries[R]
size[R]

Public Class Methods

execute!(query, params: {}, size: nil) click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 11
def execute!(query, params: {}, size: nil)
  batch = @batch || new_batch(size || 1_000)

  batch.execute!(query, params: params)
end
flush!() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 21
def flush!
  @batch&.flush!
rescue
  reset!
end
new(size:, clock: Time) click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 34
def initialize(size:, clock: Time)
  @size                    = size
  @clock                   = clock
  @queries                 = []
  @expires                 = nil
  @commit_url              = nil
  @current_transaction_url = nil
end
new_batch(size) click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 17
def new_batch(size)
  @batch = new(size: size)
end
reset!() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 27
def reset!
  @batch = nil
end

Public Instance Methods

execute!(query, params: {}, retrying: false) click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 45
def execute!(query, params: {}, retrying: false)
  queries.push([query, params]) unless retrying

  with_retry(retrying) do
    BATCH_MUTEX.synchronize do
      check_for_expiry!

      perform_query(query, params)
      flush_if_required!
    end
  end
end
flush!() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 58
def flush!
  return unless query_count.positive?
  return unless @commit_url
  QueryExecution::Transactional.flush!(@commit_url)

  reset!
end
query_count() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 66
def query_count
  queries.length
end

Private Instance Methods

batch_full?() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 128
def batch_full?
  query_count >= size
end
check_for_expiry!() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 113
def check_for_expiry!
  return unless @expires

  if @clock.now > @expires
    self.class.batch = nil
    raise QueryExecution::ExpiredTransaction.new(@comit_url, "")
  end
end
expires_soon?() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 132
def expires_soon?
  @expires && (@clock.now + 2 > @expires)
end
flush_if_required!() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 122
def flush_if_required!
  check_for_expiry!

  flush! if batch_full? || expires_soon?
end
new_transaction_url() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 109
def new_transaction_url
  Delfos.neo4j.uri_for("/db/data/transaction")
end
perform_query(query, params) click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 72
def perform_query(query, params)
  transactional_query = QueryExecution::Transactional.new(query, params, url)
  transaction_url, @commit_url, @expires = transactional_query.perform
  @current_transaction_url ||= transaction_url # the transaction_url is only returned with the first request
end
reset!() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 136
def reset!
  @queries = []
  reset_transaction!
end
reset_transaction!() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 141
def reset_transaction!
  @current_transaction_url = nil
  @commit_url = nil
  @expires = nil
end
retry_batch!() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 97
def retry_batch!
  queries.each do |q, p|
    execute!(q, params: p, retrying: true)
  end
end
url() click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 103
def url
  return @commit_url if @commit_url && batch_full? || expires_soon?

  current_transaction_url || new_transaction_url
end
with_retry(retrying) { || ... } click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 78
def with_retry(retrying)
  yield
rescue QueryExecution::ExpiredTransaction
  @retry_count ||= 0

  if retrying
    @retry_count += 1

    if @retry_count > 5
      @retry_count = 0
      raise
    end
  end

  Delfos.logger.error "Transaction expired - retrying batch. #{query_count} queries retry_count: #{@retry_count}"
  reset_transaction!
  retry_batch!
end