class AwDatapipe::Pipeline

Attributes

id[RW]
objects[R]
parameter_metadata[R]
parameter_values[R]
uuid[RW]

Public Class Methods

build(config, activities, parameter_metadata, parameter_values) click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 18
def self.build(config, activities, parameter_metadata, parameter_values)
  new([config], parameter_metadata, parameter_values) do |pipeline|
    pipeline.append_objects_with_dependencies(activities)
  end
end
new(objects, parameter_metadata, parameter_values) { |self| ... } click to toggle source

objects [Array]

# File lib/aw_datapipe/pipeline.rb, line 11
def initialize(objects, parameter_metadata, parameter_values)
  @objects = ObjectHash.new
  append_objects_with_dependencies(objects)
  @parameter_metadata, @parameter_values = parameter_metadata, parameter_values
  yield(self) if block_given?
end

Public Instance Methods

append_object(object) click to toggle source

@return [PipelineObject] appended object

# File lib/aw_datapipe/pipeline.rb, line 25
def append_object(object)
  object.pipeline = self
  objects[object.id] = object
end
append_object_with_dependencies(object) click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 30
def append_object_with_dependencies(object)
  [*object.dependencies, object].each(&method(:append_object))
end
append_objects_with_dependencies(objects) click to toggle source

@return [Pipeline] self

# File lib/aw_datapipe/pipeline.rb, line 35
def append_objects_with_dependencies(objects)
  objects.each(&method(:append_object_with_dependencies))
  self
end
configuration() click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 40
def configuration
  objects.fetch(:default)
end
csv_data_format(params) click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 44
def csv_data_format(params)
  append_object CsvDataFormat.build(params)
end
ec2_resource(params) click to toggle source

@param [Hash] params Various required and optional parameters. @option params [String] :name (default: 'Ec2Instance') @option params [String] :instance_type (default: 't1.micro') @option params [String] :subnet_id @option params [String] :security_group_ids @option params [String] :action_on_task_failure (default: 'terminate') @option params [String] :terminate_after (default: '2 Hours')

@return [Ec2Resource]

# File lib/aw_datapipe/pipeline.rb, line 57
def ec2_resource(params)
  append_object Ec2Resource.build(
    { name: 'Ec2Instance', instance_type: 't1.micro', action_on_task_failure: 'terminate', terminate_after: '2 Hours' }.merge(params))
end
jdbc_database(params) click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 62
def jdbc_database(params)
  append_object JdbcDatabase.build(params)
end
referenced_object_ids() click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 70
def referenced_object_ids
  referenced_objects.map(&:id) << :default
end
referenced_objects() click to toggle source

Collect dependencies for all objects, removing duplicates. @return [Array] referenced objects, with dependees before dependents.

# File lib/aw_datapipe/pipeline.rb, line 76
def referenced_objects
  objects.values.map(&:dependencies).flatten.uniq
end
s3_data_node(params) click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 66
def s3_data_node(params)
  append_object S3DataNode.build(params)
end
write_source(pathname) click to toggle source
# File lib/aw_datapipe/pipeline.rb, line 80
def write_source(pathname)
  SourceWriter.call(self, pathname)
end