class Delfos::Neo4j::Batch::Execution
Constants
- BATCH_MUTEX
Attributes
batch[W]
commit_url[R]
current_transaction_url[R]
expires[R]
queries[R]
size[R]
Public Class Methods
execute!(query, params: {}, size: nil)
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 11 def execute!(query, params: {}, size: nil) batch = @batch || new_batch(size || 1_000) batch.execute!(query, params: params) end
flush!()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 21 def flush! @batch&.flush! rescue reset! end
new(size:, clock: Time)
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 34 def initialize(size:, clock: Time) @size = size @clock = clock @queries = [] @expires = nil @commit_url = nil @current_transaction_url = nil end
new_batch(size)
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 17 def new_batch(size) @batch = new(size: size) end
reset!()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 27 def reset! @batch = nil end
Public Instance Methods
execute!(query, params: {}, retrying: false)
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 45 def execute!(query, params: {}, retrying: false) queries.push([query, params]) unless retrying with_retry(retrying) do BATCH_MUTEX.synchronize do check_for_expiry! perform_query(query, params) flush_if_required! end end end
flush!()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 58 def flush! return unless query_count.positive? return unless @commit_url QueryExecution::Transactional.flush!(@commit_url) reset! end
query_count()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 66 def query_count queries.length end
Private Instance Methods
batch_full?()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 128 def batch_full? query_count >= size end
check_for_expiry!()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 113 def check_for_expiry! return unless @expires if @clock.now > @expires self.class.batch = nil raise QueryExecution::ExpiredTransaction.new(@comit_url, "") end end
expires_soon?()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 132 def expires_soon? @expires && (@clock.now + 2 > @expires) end
flush_if_required!()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 122 def flush_if_required! check_for_expiry! flush! if batch_full? || expires_soon? end
new_transaction_url()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 109 def new_transaction_url Delfos.neo4j.uri_for("/db/data/transaction") end
perform_query(query, params)
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 72 def perform_query(query, params) transactional_query = QueryExecution::Transactional.new(query, params, url) transaction_url, @commit_url, @expires = transactional_query.perform @current_transaction_url ||= transaction_url # the transaction_url is only returned with the first request end
reset!()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 136 def reset! @queries = [] reset_transaction! end
reset_transaction!()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 141 def reset_transaction! @current_transaction_url = nil @commit_url = nil @expires = nil end
retry_batch!()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 97 def retry_batch! queries.each do |q, p| execute!(q, params: p, retrying: true) end end
url()
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 103 def url return @commit_url if @commit_url && batch_full? || expires_soon? current_transaction_url || new_transaction_url end
with_retry(retrying) { || ... }
click to toggle source
# File lib/delfos/neo4j/batch/execution.rb, line 78 def with_retry(retrying) yield rescue QueryExecution::ExpiredTransaction @retry_count ||= 0 if retrying @retry_count += 1 if @retry_count > 5 @retry_count = 0 raise end end Delfos.logger.error "Transaction expired - retrying batch. #{query_count} queries retry_count: #{@retry_count}" reset_transaction! retry_batch! end