connector_feature!()
click to toggle source
def connector_feature!
unless @additional_responsibilities[:connector]
raise "The sequel persistence adapter connector feature used but not enabled in additional_features"
end
end
coordinator_feature!()
click to toggle source
def coordinator_feature!
unless @additional_responsibilities[:coordinator]
raise "The sequel persistence adapter coordinator feature used but not enabled in additional_features"
end
end
delete_coordinator_record(class_name, record_id)
click to toggle source
def delete_coordinator_record(class_name, record_id)
coordinator_feature!
table(:coordinator_record).where(class: class_name, id: record_id).delete
end
delete_delayed_plans(filters, batch_size = 1000)
click to toggle source
def delete_delayed_plans(filters, batch_size = 1000)
count = 0
filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:execution_plan_uuid) }
@db.transaction do
count += table(:delayed).where(execution_plan_uuid: uuids).delete
end
end
count
end
delete_execution_plans(filters, batch_size = 1000, backup_dir = nil)
click to toggle source
def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil)
count = 0
filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:uuid) }
@db.transaction do
table(:delayed).where(execution_plan_uuid: uuids).delete
steps = table(:step).where(execution_plan_uuid: uuids)
backup_to_csv(steps, backup_dir, 'steps.csv') if backup_dir
steps.delete
actions = table(:action).where(execution_plan_uuid: uuids)
backup_to_csv(actions, backup_dir, 'actions.csv') if backup_dir
actions.delete
execution_plans = table(:execution_plan).where(uuid: uuids)
backup_to_csv(execution_plans, backup_dir, 'execution_plans.csv') if backup_dir
count += execution_plans.delete
end
end
return count
end
filtering_by()
click to toggle source
def filtering_by
META_DATA.fetch :execution_plan
end
find_coordinator_records(options)
click to toggle source
def find_coordinator_records(options)
coordinator_feature!
options = options.dup
filters = (options[:filters] || {}).dup
exclude_owner_id = filters.delete(:exclude_owner_id)
data_set = filter(:coordinator_record, table(:coordinator_record), filters)
if exclude_owner_id
data_set = data_set.exclude(:owner_id => exclude_owner_id)
end
data_set.all.map { |record| load_data(record) }
end
find_execution_plan_counts(options = {})
click to toggle source
def find_execution_plan_counts(options = {})
filter(:execution_plan, table(:execution_plan), options[:filters]).count
end
find_execution_plans(options = {})
click to toggle source
def find_execution_plans(options = {})
table_name = :execution_plan
options[:order_by] ||= :started_at
data_set = filter(table_name,
order(table_name,
paginate(table(table_name), options),
options),
options[:filters])
data_set.all.map { |record| execution_plan_column_map(load_data(record, table_name)) }
end
find_old_execution_plans(age)
click to toggle source
def find_old_execution_plans(age)
table_name = :execution_plan
table(table_name)
.where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped'))
.all.map { |plan| execution_plan_column_map(load_data plan, table_name) }
end
find_past_delayed_plans(time)
click to toggle source
def find_past_delayed_plans(time)
table_name = :delayed
table(table_name)
.where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
.where(:frozen => false)
.order_by(:start_at)
.all
.map { |plan| load_data(plan, table_name) }
end
insert_coordinator_record(value)
click to toggle source
def insert_coordinator_record(value)
coordinator_feature!
save :coordinator_record, {}, value
end
load_action(execution_plan_id, action_id)
click to toggle source
def load_action(execution_plan_id, action_id)
load :action, execution_plan_uuid: execution_plan_id, id: action_id
end
load_actions(execution_plan_id, action_ids)
click to toggle source
def load_actions(execution_plan_id, action_ids)
load_records :action, { execution_plan_uuid: execution_plan_id, id: action_ids }
end
load_actions_attributes(execution_plan_id, attributes)
click to toggle source
def load_actions_attributes(execution_plan_id, attributes)
load_records :action, { execution_plan_uuid: execution_plan_id }, attributes
end
load_delayed_plan(execution_plan_id)
click to toggle source
def load_delayed_plan(execution_plan_id)
load :delayed, execution_plan_uuid: execution_plan_id
rescue KeyError
return nil
end
load_execution_plan(execution_plan_id)
click to toggle source
def load_execution_plan(execution_plan_id)
execution_plan_column_map(load :execution_plan, uuid: execution_plan_id)
end
load_step(execution_plan_id, step_id)
click to toggle source
def load_step(execution_plan_id, step_id)
load :step, execution_plan_uuid: execution_plan_id, id: step_id
end
load_steps(execution_plan_id)
click to toggle source
def load_steps(execution_plan_id)
load_records :step, execution_plan_uuid: execution_plan_id
end
ordering_by()
click to toggle source
def ordering_by
META_DATA.fetch :execution_plan
end
pull_envelopes(receiver_id)
click to toggle source
def pull_envelopes(receiver_id)
connector_feature!
db.transaction do
data_set = table(:envelope).where(receiver_id: receiver_id).all
envelopes = data_set.map { |record| load_data(record) }
table(:envelope).where(id: data_set.map { |d| d[:id] }).delete
return envelopes
end
end
push_envelope(envelope)
click to toggle source
def push_envelope(envelope)
connector_feature!
table(:envelope).insert(prepare_record(:envelope, envelope))
end
save_action(execution_plan_id, action_id, value)
click to toggle source
def save_action(execution_plan_id, action_id, value)
save :action, { execution_plan_uuid: execution_plan_id, id: action_id }, value, false
end
save_delayed_plan(execution_plan_id, value)
click to toggle source
def save_delayed_plan(execution_plan_id, value)
save :delayed, { execution_plan_uuid: execution_plan_id }, value, false
end
save_envelope(data)
click to toggle source
def save_envelope(data)
connector_feature!
save :envelope, {}, data
end
save_execution_plan(execution_plan_id, value)
click to toggle source
def save_execution_plan(execution_plan_id, value)
save :execution_plan, { uuid: execution_plan_id }, value, false
end
save_step(execution_plan_id, step_id, value)
click to toggle source
def save_step(execution_plan_id, step_id, value)
save :step, { execution_plan_uuid: execution_plan_id, id: step_id }, value, false
end
to_hash()
click to toggle source
def to_hash
{ execution_plans: table(:execution_plan).all.to_a,
steps: table(:step).all.to_a,
actions: table(:action).all.to_a,
envelopes: table(:envelope).all.to_a }
end
transaction(&block)
click to toggle source
def transaction(&block)
db.transaction(&block)
end
update_coordinator_record(class_name, record_id, value)
click to toggle source
def update_coordinator_record(class_name, record_id, value)
coordinator_feature!
save :coordinator_record, {class: class_name, :id => record_id}, value
end
backup_to_csv(dataset, backup_dir, file_name)
click to toggle source
def backup_to_csv(dataset, backup_dir, file_name)
ensure_backup_dir(backup_dir)
csv_file = File.join(backup_dir, file_name)
appending = File.exist?(csv_file)
columns = dataset.columns
File.open(csv_file, 'a') do |csv|
csv << columns.to_csv unless appending
dataset.each do |row|
csv << columns.collect { |col| row[col] }.to_csv
end
end
dataset
end
delete(what, condition)
click to toggle source
def delete(what, condition)
table(what).where(Utils.symbolize_keys(condition)).delete
end
dump_data(value)
click to toggle source
def dump_data(value)
return if value.nil?
MultiJson.dump Type!(value, Hash, Array)
end
ensure_backup_dir(backup_dir)
click to toggle source
def ensure_backup_dir(backup_dir)
FileUtils.mkdir_p(backup_dir) unless File.directory?(backup_dir)
end
execution_plan_column_map(plan)
click to toggle source
def execution_plan_column_map(plan)
plan[:id] = plan[:uuid] unless plan[:uuid].nil?
plan
end
filter(what, data_set, filters)
click to toggle source
def filter(what, data_set, filters)
Type! filters, NilClass, Hash
return data_set if filters.nil?
filters = filters.each.with_object({}) { |(k, v), hash| hash[k.to_s] = v }
unknown = filters.keys - META_DATA.fetch(what)
if what == :execution_plan
unknown -= %w[uuid caller_execution_plan_id caller_action_id delayed]
if filters.key?('caller_action_id') && !filters.key?('caller_execution_plan_id')
raise ArgumentError, "caller_action_id given but caller_execution_plan_id missing"
end
if filters.key?('caller_execution_plan_id')
data_set = data_set.join_table(:inner, TABLES[:action], :execution_plan_uuid => :uuid).
select_all(TABLES[:execution_plan]).distinct
end
if filters.key?('delayed')
filters.delete('delayed')
data_set = data_set.join_table(:inner, TABLES[:delayed], :execution_plan_uuid => :uuid).
select_all(TABLES[:execution_plan]).distinct
end
end
unless unknown.empty?
raise ArgumentError, "unkown columns: #{unknown.inspect}"
end
data_set.where Utils.symbolize_keys(filters)
end
initialize_db(db_path)
click to toggle source
def initialize_db(db_path)
::Sequel.connect db_path
end
load_data(record, what = nil)
click to toggle source
def load_data(record, what = nil)
hash = if record[:data].nil?
SERIALIZABLE_COLUMNS.fetch(what, []).each do |key|
key = key.to_sym
record[key] = MultiJson.load(record[key]) unless record[key].nil?
end
record
else
MultiJson.load(record[:data])
end
Utils.indifferent_hash(hash)
end
load_record(what, condition)
click to toggle source
def load_record(what, condition)
table = table(what)
if (record = with_retry { table.first(Utils.symbolize_keys(condition)) } )
load_data(record, what)
else
raise KeyError, "searching: #{what} by: #{condition.inspect}"
end
end
load_records(what, condition, keys = nil)
click to toggle source
def load_records(what, condition, keys = nil)
table = table(what)
records = with_retry do
filtered = table.filter(Utils.symbolize_keys(condition))
filtered = filtered.select(:data, *(table.columns & keys)) unless keys.nil?
filtered.all
end
records = records.map { |record| load_data(record, what) }
return records if keys.nil?
records.map do |record|
keys.reduce({}) do |acc, key|
acc.merge(key => record[key])
end
end
end
migrate_db()
click to toggle source
def migrate_db
::Sequel::Migrator.run(db, self.class.migrations_path, table: 'dynflow_schema_info')
end
order(what, data_set, options)
click to toggle source
def order(what, data_set, options)
order_by = (options[:order_by]).to_s
return data_set if order_by.empty?
unless META_DATA.fetch(what).include? order_by
raise ArgumentError, "unknown column #{order_by.inspect}"
end
order_by = order_by.to_sym
data_set.order_by options[:desc] ? ::Sequel.desc(order_by) : order_by
end
paginate(data_set, options)
click to toggle source
def paginate(data_set, options)
page = Integer(options[:page]) if options[:page]
per_page = Integer(options[:per_page]) if options[:per_page]
if page
raise ArgumentError, "page specified without per_page attribute" unless per_page
data_set.limit per_page, per_page * page
else
data_set
end
end
prepare_record(table_name, value, base = {}, with_data = true)
click to toggle source
def prepare_record(table_name, value, base = {}, with_data = true)
record = base.dup
if with_data && table(table_name).columns.include?(:data)
record[:data] = dump_data(value)
else
record[:data] = nil
record.merge! serialize_columns(table_name, value)
end
record.merge! extract_metadata(table_name, value)
record.each { |k, v| record[k] = v.to_s if v.is_a? Symbol }
record
end
save(what, condition, value, with_data = true)
click to toggle source
def save(what, condition, value, with_data = true)
table = table(what)
existing_record = with_retry { table.first condition } unless condition.empty?
if value
record = prepare_record(what, value, (existing_record || condition), with_data)
if existing_record
with_retry { table.where(condition).update(record) }
else
with_retry { table.insert record }
end
else
existing_record and with_retry { table.where(condition).delete }
end
value
end
serialize_columns(table_name, record)
click to toggle source
def serialize_columns(table_name, record)
record.reduce({}) do |acc, (key, value)|
if SERIALIZABLE_COLUMNS.fetch(table_name, []).include?(key.to_s)
acc.merge(key.to_sym => dump_data(value))
else
acc
end
end
end
table(which)
click to toggle source
def table(which)
db[TABLES.fetch(which)]
end
with_retry() { || ... }
click to toggle source
def with_retry
attempts = 0
begin
yield
rescue ::Sequel::UniqueConstraintViolation => e
raise e
rescue Exception => e
attempts += 1
log(:error, e)
if attempts > MAX_RETRIES
log(:error, "The number of MAX_RETRIES exceeded")
raise Errors::PersistenceError.delegate(e)
else
log(:error, "Persistence retry no. #{attempts}")
sleep RETRY_DELAY
retry
end
end
end