class Promiscuous::BlackHole::Operation
Attributes
message[R]
Public Class Methods
new(message)
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 8 def initialize(message) @message = message end
process(message)
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 12 def self.process(message) new(message).process end
Public Instance Methods
persist()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 26 def persist Promiscuous.debug "Processing op: #{ message }" case message.operation when 'create', 'update' then upsert when 'destroy' then destroy end end
process()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 16 def process return unless Promiscuous::BlackHole.subscribing_to?(message.base_type) with_wrapped_error { process! } end
update_schema()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 21 def update_schema table.update_schema embedded_operations.each(&:update_schema) end
Private Instance Methods
destroy()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 53 def destroy record.destroy persist_embedded_records end
embedded_operations()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 71 def embedded_operations @embedded_operations ||= message.embedded_messages.map { |em| Operation.new(em) } end
persist_embedded_records()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 58 def persist_embedded_records StaleEmbeddingsDestroyer.new(message.table_name, message.id).process embedded_operations.each(&:persist) end
process!()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 39 def process! Locker.new(message.id).with_lock do update_schema DB.transaction { persist } end end
record()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 63 def record Record.new(message.table_name, message.attributes) end
table()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 67 def table Table.new(message.table_name, message.attributes) end
upsert()
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 46 def upsert if record.message_version_newer_than_persisted? record.upsert persist_embedded_records end end
with_wrapped_error(&block)
click to toggle source
# File lib/promiscuous_black_hole/operation.rb, line 75 def with_wrapped_error(&block) block.call rescue => orig_e e = Promiscuous::Error::Subscriber.new(orig_e, :payload => message.raw_message) Promiscuous.warn "[receive] #{message.raw_message} #{e}\n#{e.backtrace.join("\n")}" raise e end