class Promiscuous::BlackHole::Operation

Attributes

message[R]

Public Class Methods

new(message) click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 8
def initialize(message)
  @message = message
end
process(message) click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 12
def self.process(message)
  new(message).process
end

Public Instance Methods

persist() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 26
def persist
  Promiscuous.debug "Processing op: #{ message }"

  case message.operation
  when 'create', 'update' then upsert
  when 'destroy' then destroy
  end
end
process() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 16
def process
  return unless Promiscuous::BlackHole.subscribing_to?(message.base_type)
  with_wrapped_error { process! }
end
update_schema() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 21
def update_schema
  table.update_schema
  embedded_operations.each(&:update_schema)
end

Private Instance Methods

destroy() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 53
def destroy
  record.destroy
  persist_embedded_records
end
embedded_operations() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 71
def embedded_operations
  @embedded_operations ||= message.embedded_messages.map { |em| Operation.new(em) }
end
persist_embedded_records() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 58
def persist_embedded_records
  StaleEmbeddingsDestroyer.new(message.table_name, message.id).process
  embedded_operations.each(&:persist)
end
process!() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 39
def process!
  Locker.new(message.id).with_lock do
    update_schema
    DB.transaction { persist }
  end
end
record() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 63
def record
  Record.new(message.table_name, message.attributes)
end
table() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 67
def table
  Table.new(message.table_name, message.attributes)
end
upsert() click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 46
def upsert
  if record.message_version_newer_than_persisted?
    record.upsert
    persist_embedded_records
  end
end
with_wrapped_error(&block) click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 75
def with_wrapped_error(&block)
  block.call
rescue => orig_e
  e = Promiscuous::Error::Subscriber.new(orig_e, :payload => message.raw_message)
  Promiscuous.warn "[receive] #{message.raw_message} #{e}\n#{e.backtrace.join("\n")}"
  raise e
end