class CycloneLariat::MessagesRepo

Attributes

dataset[R]

Public Class Methods

new(dataset) click to toggle source
# File lib/cyclone_lariat/messages_repo.rb, line 9
def initialize(dataset)
  @dataset = dataset
end

Public Instance Methods

create(msg) click to toggle source
# File lib/cyclone_lariat/messages_repo.rb, line 13
def create(msg)
  dataset.insert(
    uuid: msg.uuid,
    kind: msg.kind,
    type: msg.type,
    publisher: msg.publisher,
    data: JSON.generate(msg.data),
    client_error_message: msg.client_error&.message,
    client_error_details: JSON.generate(msg.client_error&.details),
    version: msg.version,
    sent_at: msg.sent_at
  )
end
each_unprocessed() { |msg| ... } click to toggle source
# File lib/cyclone_lariat/messages_repo.rb, line 49
def each_unprocessed
  dataset.where(processed_at: nil).each do |raw|
    raw[:data]                 = JSON.parse(raw[:data], symbolize_names: true)
    if raw[:client_error_details]
      raw[:client_error_details] = JSON.parse(raw[:client_error_details], symbolize_names: true)
    end
    msg = build raw
    yield(msg)
  end
end
each_with_client_errors() { |msg| ... } click to toggle source
# File lib/cyclone_lariat/messages_repo.rb, line 60
def each_with_client_errors
  dataset.where { (processed_at !~ nil) & (client_error_message !~ nil) }.each do |raw|
    raw[:data] = JSON.parse(raw[:data], symbolize_names: true)
    if raw[:client_error_details]
      raw[:client_error_details] = JSON.parse(raw[:client_error_details], symbolize_names: true)
    end
    msg = build raw
    yield(msg)
  end
end
exists?(uuid:) click to toggle source
# File lib/cyclone_lariat/messages_repo.rb, line 27
def exists?(uuid:)
  dataset.where(uuid: uuid).limit(1).any?
end
find(uuid:) click to toggle source
# File lib/cyclone_lariat/messages_repo.rb, line 38
def find(uuid:)
  raw = dataset.where(uuid: uuid).first
  return nil unless raw

  raw[:data] = JSON.parse(raw[:data], symbolize_names: true)
  if raw[:client_error_details]
    raw[:client_error_details] = JSON.parse(raw[:client_error_details], symbolize_names: true)
  end
  build raw
end
processed!(uuid:, error: nil) click to toggle source
# File lib/cyclone_lariat/messages_repo.rb, line 31
def processed!(uuid:, error: nil)
  data = { processed_at: Sequel.function(:NOW) }
  data.merge!(client_error_message: error.message, client_error_details: JSON.generate(error.details)) if error

  !dataset.where(uuid: uuid).update(data).zero?
end

Private Instance Methods

build(raw) click to toggle source
# File lib/cyclone_lariat/messages_repo.rb, line 73
def build(raw)
  case kind = raw.delete(:kind)
  when 'event'   then Event.wrap raw
  when 'command' then Command.wrap raw
  else raise ArgumentError, "Unknown kind `#{kind}` of message"
  end
end