class Pipely::LivePipeline
Represent a pipeline that has been deployed to AWS DataPipeline
Attributes
pipeline_id[R]
Public Class Methods
new(pipeline_id)
click to toggle source
# File lib/pipely/live_pipeline.rb, line 9 def initialize(pipeline_id) @pipeline_id = pipeline_id @definition_json = definition(pipeline_id) @task_states_by_scheduled_start = task_states_by_scheduled_start unless @definition_json raise "No definition found for #{client.pipeline_id}" end if @task_states_by_scheduled_start.empty? raise "No runs found for #{client.pipeline_id}" end end
Public Instance Methods
print_runs_report()
click to toggle source
# File lib/pipely/live_pipeline.rb, line 24 def print_runs_report RunsReport.new(@task_states_by_scheduled_start).print end
render_graphs(output_path=nil)
click to toggle source
# File lib/pipely/live_pipeline.rb, line 34 def render_graphs(output_path=nil) @task_states_by_scheduled_start.map do |start, task_states| render_graph(start, task_states, output_path) end end
render_latest_graph(output_path=nil)
click to toggle source
# File lib/pipely/live_pipeline.rb, line 28 def render_latest_graph(output_path=nil) latest_start = @task_states_by_scheduled_start.keys.max task_states = @task_states_by_scheduled_start[latest_start] render_graph(latest_start, task_states, output_path) end
Private Instance Methods
all_instances()
click to toggle source
# File lib/pipely/live_pipeline.rb, line 87 def all_instances pipeline_objects = [] marker = nil begin result = data_pipeline.query_objects( pipeline_id: pipeline_id, sphere: "INSTANCE", marker: marker, ) marker = result.marker instance_details = data_pipeline.describe_objects( pipeline_id: pipeline_id, object_ids: result.ids ) data_pipeline.describe_objects( pipeline_id: pipeline_id, object_ids: result.ids ) pipeline_objects += instance_details.pipeline_objects end while (result.has_more_results && marker) pipeline_objects end
data_pipeline()
click to toggle source
# File lib/pipely/live_pipeline.rb, line 42 def data_pipeline @data_pipeline ||= Aws::DataPipeline::Client.new end
definition(pipeline_id)
click to toggle source
# File lib/pipely/live_pipeline.rb, line 56 def definition(pipeline_id) objects = data_pipeline.get_pipeline_definition(pipeline_id: pipeline_id) { objects: flatten_pipeline_objects(objects.pipeline_objects) }.to_json end
flatten_pipeline_objects(objects)
click to toggle source
# File lib/pipely/live_pipeline.rb, line 117 def flatten_pipeline_objects(objects) objects.each_with_object([]) do |object, result| h = { id: object.id, name: object.name, } object.fields.each do |field| k = field.key if field.ref_value h[k] ||= [] h[k] << { ref: field.ref_value } else h[k] = field.string_value end end result << h end end
render_graph(start, task_states, output_path)
click to toggle source
# File lib/pipely/live_pipeline.rb, line 46 def render_graph(start, task_states, output_path) utc_time = Time.now.to_i formatted_start = start.gsub(/[:-]/, '').sub('T', '-') output_base = "#{@pipeline_id}-#{formatted_start}-#{utc_time}.png" filename = File.join((output_path || 'graphs'), output_base) Pipely.draw(@definition_json, filename, task_states) end
task_states_by_scheduled_start()
click to toggle source
# File lib/pipely/live_pipeline.rb, line 61 def task_states_by_scheduled_start task_states_by_scheduled_start = {} all_instances.each do |pipeline_object| component_id = status = scheduled_start = nil pipeline_object.fields.each do |field| case field.key when '@componentParent' component_id = field.ref_value when '@status' status = field.string_value when '@scheduledStartTime' scheduled_start = field.string_value end end task_states_by_scheduled_start[scheduled_start] ||= {} task_states_by_scheduled_start[scheduled_start][component_id] = { execution_state: status } end task_states_by_scheduled_start end