class DTK::State::WorkflowInstance
Constants
- WORKFLOW_INSTANCE_CRD_VERSION
Attributes
assembly[R]
attribute_type_info[R]
attributes[R]
name[R]
namespace[R]
status[R]
workflow[RW]
workflow_template[R]
Public Class Methods
find_action(id, workflow = @workflow)
click to toggle source
# File lib/state/workflow_instance.rb, line 134 def self.find_action(id, workflow = @workflow) action = nil subtasks = workflow[:subtasks] subtasks.each do |subtask| if(subtask.id.to_s == id.to_s) action = subtask break elsif subtask[:subtasks] action = find_action(id, subtask) break unless action.nil? end end action end
get(namespace, name, opts = {})
click to toggle source
# File lib/state/workflow_instance.rb, line 29 def self.get(namespace, name, opts = {}) opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace) WorkflowInstance.new(namespace, name, workflow_instance) end
get_action_attributes(namespace, name, action_id, opts = {})
click to toggle source
# File lib/state/workflow_instance.rb, line 81 def self.get_action_attributes(namespace, name, action_id, opts = {}) workflow_instance = get(namespace, name, opts) action = WorkflowInstance.find_action(action_id, workflow_instance.workflow) return nil unless action attributes = action[:attributes] || {} attributes.to_h end
get_attributes(namespace, name, opts = {})
click to toggle source
# File lib/state/workflow_instance.rb, line 76 def self.get_attributes(namespace, name, opts = {}) workflow_instance = get(namespace, name, opts) workflow_instance.attributes.to_h end
get_with_influx_data(namespace, workflow_instance_name, opts = {})
click to toggle source
# File lib/state/workflow_instance.rb, line 35 def self.get_with_influx_data(namespace, workflow_instance_name, opts = {}) workflow_instance = get(namespace, workflow_instance_name, opts) return unless workflow_instance workflow_instance.workflow[:subtasks].each do |subtask| component_name, action_name = subtask[:component].split('.') assembly_name = workflow_instance.assembly[:name] executable_action = ::DTK::State::ExecutableAction.get(namespace, assembly_name, component_name, action_name, opts) attr_type_info = executable_action.attribute_type_info attr_type_info.each do |attr_info| if attr_info.temporal attribute_name = attr_info.name influxdb = ::DTK::State::Component::Attribute::Influxdb.new(:attributes) influxdb_attribute = influxdb.get(namespace, component_name, assembly_name, attribute_name, opts) if valid_attribute = influxdb_attribute.first value = valid_attribute['_value'] subtask[:attributes][attribute_name] = value end end end end workflow_instance end
new(namespace, name, crd_content)
click to toggle source
# File lib/state/workflow_instance.rb, line 12 def initialize(namespace, name, crd_content) @name = name @namespace = namespace @api_version = crd_content.apiVersion @kind = crd_content.kind @metadata = crd_content.metadata @references = crd_content.references @assembly = @references.assembly @workflow_template = @references.workflow @attributes = crd_content.spec.attributes || {} @status = crd_content.spec.status || {} @workflow = crd_content.spec.workflow || {} end
patchError!(patches, message, action_index_steps)
click to toggle source
# File lib/state/workflow_instance.rb, line 111 def self.patchError!(patches, message, action_index_steps) errorPatch = { "op" => "add", "path" => "/spec/status/steps/#{action_index_steps}/errorMsg", "value" => message } patches << errorPatch end
update_action_level_result_attributes(namespace, name, attributes, action_id, opts = {})
click to toggle source
# File lib/state/workflow_instance.rb, line 89 def self.update_action_level_result_attributes(namespace, name, attributes, action_id, opts = {}) return "Dynamic attributes do not exist for action with id #{@action_id}, nothing to update" if attributes.nil? || attributes.empty? attributes.delete_if { |key, value| value.nil? || value.to_s.strip == '' } opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace) workflow = workflow_instance[:spec][:workflow] action = WorkflowInstance.find_action(action_id, workflow) action[:attributes] = {} if !action[:attributes] attributes.each do |attr_name, attr_val| action[:attributes][attr_name.to_sym] = {} unless action[:attributes][attr_name.to_sym] unless action[:attributes][attr_name.to_sym][:hidden] if attr_val.is_a? Hash action[:attributes][attr_name.to_sym][:value] = attr_val[:value] || attr_val else action[:attributes][attr_name.to_sym][:value] = attr_val end end end ::DTK::CrdClient.get_kubeclient(opts).update_workflow_instance(workflow_instance) end
update_action_status(namespace, name, parent_id, action_id, status, error_message = "", opts = {})
click to toggle source
# File lib/state/workflow_instance.rb, line 120 def self.update_action_status(namespace, name, parent_id, action_id, status, error_message = "", opts = {}) opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace) steps = workflow_instance[:spec][:status][:steps] action_index_steps = steps.find_index { |action| action[:id].eql? action_id } patch = [{ "op" => "replace", "path" => "/spec/status/steps/#{action_index_steps}/state", "value" => status }] patchError!(patch, error_message, action_index_steps) unless error_message.empty? || error_message.nil? ::DTK::CrdClient.get_kubeclient(opts).json_patch_workflow_instance(name, patch, namespace) end
Public Instance Methods
attribute_metadata()
click to toggle source
# File lib/state/workflow_instance.rb, line 153 def attribute_metadata attributes = @attributes.to_hash attr_type_info = get_workflow_template.attribute_type_info attribute_metadata = {} attr_type_info.each do |attr_info| attr_info_hash = attr_info.to_hash attribute_name = attr_info_hash[:name].to_sym if attribute = attributes[attribute_name] if attribute.is_a?(String) attribute = { value: attribute } end attribute_metadata[attribute_name] = attr_info_hash.merge(attribute) end end attribute_metadata end
attribute_values()
click to toggle source
# File lib/state/workflow_instance.rb, line 174 def attribute_values attribute_with_values = {} @attributes.each_pair do |name, content| attribute_with_values.merge!(name => content[:value]) end attribute_with_values end
get_workflow_template(opts = {})
click to toggle source
# File lib/state/workflow_instance.rb, line 149 def get_workflow_template(opts = {}) Workflow.get(@workflow_template.namespace, @workflow_template.name, opts) end
to_hash()
click to toggle source
# File lib/state/workflow_instance.rb, line 62 def to_hash { apiVersion: @api_version, kind: @kind, metadata: filter_metadata(@metadata), references: @references.to_hash, spec: { attributes: @attributes.to_hash, status: @status.to_hash, workflow: @workflow.to_hash } } end