class Riak::Client::BeefcakeProtobuffsBackend::CrdtOperator

Serializes and writes CRDT operations from {Riak::Crdt::Operation} members into protobuffs, and writes them to a Riak cluster. @api private

Attributes

backend[R]

Public Class Methods

new(backend) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 22
def initialize(backend)
  @backend = backend
end

Public Instance Methods

operate(bucket, key, bucket_type, operation, options = {}) click to toggle source

Serializes and writes CRDT operations.

# File lib/riak/client/beefcake/crdt_operator.rb, line 27
def operate(bucket, key, bucket_type, operation, options = {})
  serialized = serialize(operation)
  args = {
    bucket: bucket,
    key: key,
    type: bucket_type,
    op: serialized,
    return_body: true,
  }.merge options
  request = DtUpdateReq.new args
  begin
    return backend.protocol do |p|
      p.write :DtUpdateReq, request
      p.expect :DtUpdateResp, DtUpdateResp, empty_body_acceptable: true
    end
  rescue ProtobuffsErrorResponse => e
    raise unless e.message =~ /precondition/
    raise CrdtError::PreconditionError.new e.message
  end
end
serialize(operations) click to toggle source

Serializes CRDT operations without writing them.

# File lib/riak/client/beefcake/crdt_operator.rb, line 49
def serialize(operations)
  return serialize [operations] unless operations.is_a? Enumerable

  serialize_wrap operations
end

Private Instance Methods

inner_serialize(operation) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 96
def inner_serialize(operation)
  case operation.type
  when :counter
    serialize_inner_counter operation
  when :flag
    serialize_flag operation
  when :register
    serialize_register operation
  when :set
    serialize_inner_set operation
  when :map
    serialize_inner_map operation
  else
    raise ArgumentError, t('crdt.unknown_inner_field',
                           symbol: operation.type.inspect)
  end
end
inner_serialize_delete(operation) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 114
def inner_serialize_delete(operation)
  MapField.new(
               name: operation.name,
               type: type_symbol_to_type_enum(operation.type)
               )
end
inner_serialize_group(operations) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 80
def inner_serialize_group(operations)
  updates, deletes = operations.partition do |op|
    op.value.is_a? Riak::Crdt::Operation::Update
  end
  serialized_updates = updates.map do |operation|
    inner_serialize operation.value
  end
  serialized_deletes = deletes.map do |operation|
    inner_serialize_delete operation.value
  end

  { updates: serialized_updates,
    removes: serialized_deletes
  }
end
serialize_counter(counter_ops) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 121
def serialize_counter(counter_ops)
  amount = counter_ops.inject(0){|m, o| m += o.value }
  CounterOp.new(increment: amount)
end
serialize_flag(flag_op) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 138
def serialize_flag(flag_op)
  operation_value = flag_op.value ? MapUpdate::FlagOp::ENABLE : MapUpdate::FlagOp::DISABLE
  MapUpdate.new(
                field: MapField.new(
                                    name: flag_op.name,
                                    type: MapField::MapFieldType::FLAG
                                    ),
                flag_op: operation_value
                )
end
serialize_group(operations) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 67
def serialize_group(operations)
  case operations.first.type
  when :counter
    serialize_counter operations
  when :set
    serialize_set operations
  when :map
    serialize_map operations
  else
    raise ArgumentError, t('crdt.unknown_field', symbol: operation.type.inspect)
  end
end
serialize_inner_counter(counter_op) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 126
def serialize_inner_counter(counter_op)
  MapUpdate.new(
                field: MapField.new(
                                    name: counter_op.name,
                                    type: MapField::MapFieldType::COUNTER
                                    ),
                counter_op: CounterOp.new(
                                          increment: counter_op.value
                                          )
                )
end
serialize_inner_map(map_op) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 194
def serialize_inner_map(map_op)
  inner_op = map_op.value
  if inner_op.is_a? Riak::Crdt::Operation::Delete
    return MapUpdate.new(field: MapField.new(
                                             name: map_op.name,
                                             type: MapField::MapFieldType::MAP
                                             ),
                         map_op: MapOp.new(
                                           removes: inner_op.name)
                         )
  end
  inner_serialized = inner_serialize inner_op

  MapUpdate.new(
                field: MapField.new(
                                    name: map_op.name,
                                    type: MapField::MapFieldType::MAP
                                    ),
                map_op: MapOp.new(
                                  updates: [inner_serialized]
                             ))
end
serialize_inner_set(set_op) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 173
def serialize_inner_set(set_op)
  value = set_op.value or nil

  MapUpdate.new(
                field: MapField.new(
                                    name: set_op.name,
                                    type: MapField::MapFieldType::SET
                                    ),
                set_op: SetOp.new(
                                  adds: value[:add],
                                  removes: value[:remove]
                                  )
                )
end
serialize_map(map_ops) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 188
def serialize_map(map_ops)
  inner_serialized = inner_serialize_group map_ops

  MapOp.new(inner_serialized)
end
serialize_register(register_op) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 149
def serialize_register(register_op)
  MapUpdate.new(
                field: MapField.new(
                                    name: register_op.name,
                                    type: MapField::MapFieldType::REGISTER
                                    ),
                register_op: register_op.value
                )
end
serialize_set(set_ops) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 159
def serialize_set(set_ops)
  adds = ::Set.new
  removes = ::Set.new
  set_ops.each do |o|
    adds.add [o.value[:add]] if o.value[:add]
    removes.merge [o.value[:remove]] if o.value[:remove]
  end

  SetOp.new(
            adds: adds.to_a.flatten,
            removes: removes.to_a.flatten
            )
end
serialize_wrap(operations) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 56
def serialize_wrap(operations)
  raise ArgumentError, t('crdt.serialize_no_ops') if operations.empty?
  ops = serialize_group operations

  DtOp.new(wrap_field_for(operations) => ops)
end
type_symbol_to_type_enum(sym) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 217
def type_symbol_to_type_enum(sym)
  MapField::MapFieldType.const_get sym.to_s.upcase
end
wrap_field_for(ops) click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 63
def wrap_field_for(ops)
  "#{ops.first.type.to_s}_op".to_sym
end