class Kafka::TransactionStateMachine

Constants

STATES
TRANSITIONS

Public Class Methods

new(logger:) click to toggle source
# File lib/kafka/transaction_state_machine.rb, line 27
def initialize(logger:)
  @state = UNINITIALIZED
  @mutex = Mutex.new
  @logger = TaggedLogger.new(logger)
end

Public Instance Methods

aborting_transaction?() click to toggle source
# File lib/kafka/transaction_state_machine.rb, line 58
def aborting_transaction?
  in_state?(ABORTING_TRANSACTION)
end
committing_transaction?() click to toggle source
# File lib/kafka/transaction_state_machine.rb, line 54
def committing_transaction?
  in_state?(COMMITTING_TRANSACTION)
end
error?() click to toggle source
# File lib/kafka/transaction_state_machine.rb, line 62
def error?
  in_state?(ERROR)
end
in_transaction?() click to toggle source
# File lib/kafka/transaction_state_machine.rb, line 50
def in_transaction?
  in_state?(IN_TRANSACTION)
end
ready?() click to toggle source
# File lib/kafka/transaction_state_machine.rb, line 46
def ready?
  in_state?(READY)
end
transition_to!(next_state) click to toggle source
# File lib/kafka/transaction_state_machine.rb, line 33
def transition_to!(next_state)
  raise InvalidStateError unless STATES.include?(next_state)
  unless TRANSITIONS[next_state].include?(@state)
    raise InvalidTransitionError, "Could not transition from state '#{@state}' to state '#{next_state}'"
  end
  @logger.debug("Transaction state changed to '#{next_state}'!")
  @mutex.synchronize { @state = next_state }
end
uninitialized?() click to toggle source
# File lib/kafka/transaction_state_machine.rb, line 42
def uninitialized?
  in_state?(UNINITIALIZED)
end

Private Instance Methods

in_state?(state) click to toggle source
# File lib/kafka/transaction_state_machine.rb, line 68
def in_state?(state)
  @mutex.synchronize { @state == state }
end