class Riak::Ledger

Constants

VERSION

Attributes

bucket[RW]
counter[RW]
counter_options[RW]
key[RW]
retry_count[RW]

Public Class Methods

find!(bucket, key, options={}) click to toggle source

Find an existing Ledger object, merge and save it @param [Riak::Bucket] bucket @param [String] key @param [Hash] options

{
  :actor [String]: default Thread.current["name"] || "ACTOR1"
  :history_length [Integer]: default 10
  :retry_count [Integer]: default 10
}

@return [Riak::Ledger]

# File lib/ledger.rb, line 41
def self.find!(bucket, key, options={})
  candidate = new(bucket, key, options)
  vclock = candidate.refresh()
  candidate.save(vclock)

  return candidate
end
new(bucket, key, options={}) click to toggle source

Create a new Ledger object @param [Riak::Bucket] bucket @param [String] key @param [Hash] options

{
  :actor [String]: default Thread.current["name"] || "ACTOR1"
  :history_length [Integer]: default 10
  :retry_count [Integer]: default 10
}
# File lib/ledger.rb, line 18
def initialize(bucket, key, options={})
  raise ArgumentError, 'Argument "bucket" must have "allow_mult" property set to true' unless bucket.allow_mult

  self.bucket = bucket
  self.key = key
  self.retry_count = options[:retry_count] || 10

  self.counter_options = {}
  self.counter_options[:actor] = options[:actor] || Thread.current["name"] || "ACTOR1"
  self.counter_options[:history_length] = options[:history_length] || 10
  self.counter = Riak::CRDT::TPNCounter.new(self.counter_options)
end

Public Instance Methods

credit!(transaction, value) click to toggle source

Increment the counter, merge and save it @param [String] transaction @param [Positive Integer] value @see update!(transaction, value) @return [Boolean]

# File lib/ledger.rb, line 54
def credit!(transaction, value)
  self.update!(transaction, value)
end
debit!(transaction, value) click to toggle source

Decrement the counter, merge and save it @param [String] transaction @param [Positive Integer] value @see update!(transaction, value) @return [Boolean]

# File lib/ledger.rb, line 63
def debit!(transaction, value)
  self.update!(transaction, value * -1)
end
delete() click to toggle source

Delete the counter @return [Boolean]

# File lib/ledger.rb, line 119
def delete()
  begin
    self.bucket.delete(self.key)
    return true
  rescue => e
    return false
  end
end
has_transaction?(transaction) click to toggle source

Check if the counter has transaction @param [String] transaction @return [Boolean]

# File lib/ledger.rb, line 107
def has_transaction?(transaction)
  self.counter.has_transaction?(transaction)
end
refresh() click to toggle source

Get the current state of the counter and merge it @return [String]

# File lib/ledger.rb, line 130
def refresh()
  obj = self.bucket.get_or_new(self.key)
  return if obj.nil?

  self.counter = Riak::CRDT::TPNCounter.new(self.counter_options)

  if obj.siblings.length > 1
    obj.siblings.each do | sibling |
      unless sibling.raw_data.nil? or sibling.raw_data.empty?
        self.counter.merge(Riak::CRDT::TPNCounter.from_json(sibling.raw_data, self.counter_options))
      end
    end
  elsif !obj.raw_data.nil?
    self.counter.merge(Riak::CRDT::TPNCounter.from_json(obj.raw_data, self.counter_options))
  end

  return obj.vclock
end
save(vclock=nil) click to toggle source

Save the counter with an optional vclock @param [String] vclock @return [Boolean]

# File lib/ledger.rb, line 152
def save(vclock=nil)
  object = self.bucket.new(self.key)
  object.vclock = vclock if vclock
  object.content_type = 'application/json'
  object.raw_data = self.to_json()

  begin
    options = {:returnbody => false}
    object.store(options)
    return true
  rescue => e
    return false
  end
end
to_json() click to toggle source
# File lib/ledger.rb, line 167
def to_json()
  self.counter.to_json
end
update!(transaction, value, current_retry=nil) click to toggle source

Update the counter, merge and save it. Retry if unsuccessful @param [String] transaction @param [Integer] value @param [Integer] current_retry @return [Boolean]

# File lib/ledger.rb, line 72
def update!(transaction, value, current_retry=nil)
  # Failure case, not able to successfully complete the operation, retry a.s.a.p.
  if current_retry && current_retry <= 0
    return false
  end

  # Get the current merged state of this counter
  vclock = self.refresh()


  if self.has_transaction?(transaction)
    # If the transaction already exists in the counter, no problem
    return true
  else
    # If the transaction doesn't exist, attempt to add it and save
    if value < 0
      self.counter.decrement(transaction, value * -1)
    else
      self.counter.increment(transaction, value)
    end

    unless self.save(vclock)
      # If the save wasn't successful, retry
      current_retry = self.retry_count unless current_retry
      self.update!(transaction, value, current_retry - 1)
    else
      # If the save succeeded, no problem
      return true
    end
  end
end
value() click to toggle source

Calculate the current value of the counter @return [Integer]

# File lib/ledger.rb, line 113
def value()
  self.counter.value
end