class AwDatapipe::Pipeline
Attributes
id[RW]
objects[R]
parameter_metadata[R]
parameter_values[R]
uuid[RW]
Public Class Methods
build(config, activities, parameter_metadata, parameter_values)
click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 18 def self.build(config, activities, parameter_metadata, parameter_values) new([config], parameter_metadata, parameter_values) do |pipeline| pipeline.append_objects_with_dependencies(activities) end end
new(objects, parameter_metadata, parameter_values) { |self| ... }
click to toggle source
objects [Array]
# File lib/aw_datapipe/pipeline.rb, line 11 def initialize(objects, parameter_metadata, parameter_values) @objects = ObjectHash.new append_objects_with_dependencies(objects) @parameter_metadata, @parameter_values = parameter_metadata, parameter_values yield(self) if block_given? end
Public Instance Methods
append_object(object)
click to toggle source
@return [PipelineObject] appended object
# File lib/aw_datapipe/pipeline.rb, line 25 def append_object(object) object.pipeline = self objects[object.id] = object end
append_object_with_dependencies(object)
click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 30 def append_object_with_dependencies(object) [*object.dependencies, object].each(&method(:append_object)) end
append_objects_with_dependencies(objects)
click to toggle source
@return [Pipeline] self
# File lib/aw_datapipe/pipeline.rb, line 35 def append_objects_with_dependencies(objects) objects.each(&method(:append_object_with_dependencies)) self end
configuration()
click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 40 def configuration objects.fetch(:default) end
csv_data_format(params)
click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 44 def csv_data_format(params) append_object CsvDataFormat.build(params) end
ec2_resource(params)
click to toggle source
@param [Hash] params Various required and optional parameters. @option params [String] :name (default: 'Ec2Instance') @option params [String] :instance_type (default: 't1.micro') @option params [String] :subnet_id @option params [String] :security_group_ids @option params [String] :action_on_task_failure (default: 'terminate') @option params [String] :terminate_after (default: '2 Hours')
@return [Ec2Resource]
# File lib/aw_datapipe/pipeline.rb, line 57 def ec2_resource(params) append_object Ec2Resource.build( { name: 'Ec2Instance', instance_type: 't1.micro', action_on_task_failure: 'terminate', terminate_after: '2 Hours' }.merge(params)) end
jdbc_database(params)
click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 62 def jdbc_database(params) append_object JdbcDatabase.build(params) end
referenced_object_ids()
click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 70 def referenced_object_ids referenced_objects.map(&:id) << :default end
referenced_objects()
click to toggle source
Collect dependencies for all objects, removing duplicates. @return [Array] referenced objects, with dependees before dependents.
# File lib/aw_datapipe/pipeline.rb, line 76 def referenced_objects objects.values.map(&:dependencies).flatten.uniq end
s3_data_node(params)
click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 66 def s3_data_node(params) append_object S3DataNode.build(params) end
write_source(pathname)
click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 80 def write_source(pathname) SourceWriter.call(self, pathname) end