class Pipa

Public Class Methods

new(stages) click to toggle source
# File lib/pipa.rb, line 6
def initialize(stages)
        @stages = stages
        @to_be_executed = Set.new( stages.map{|k,v| k} )
        @executed = Set.new
        @log = {}
        @ret = {}

        @stages.each do |name, attributes|
                attributes["dependencies"] ||= []
                attributes["dependencies_set"] = Set.new(attributes["dependencies"])
        end

        @threads = []

        @success = true
end

Public Instance Methods

execute() click to toggle source
# File lib/pipa.rb, line 23
def execute
        @to_be_executed.delete_if do |stage|
                if resolved_dependencies? stage
                        execute_stage(stage)
                        true
                else
                        false
                end
        end
end
wait() click to toggle source
# File lib/pipa.rb, line 34
def wait
        @threads.map(&:join)
        unless @to_be_executed.empty?
                log_error "Some stages couldn't be executed: #{@to_be_executed.to_a}"
        end
        @success
end

Private Instance Methods

execute_stage(stage) click to toggle source
# File lib/pipa.rb, line 62
def execute_stage(stage)
        log_info "Executing #{stage}."

        @threads << Thread.new do
                t = Time.now

                reader, writer = IO.pipe

                mode = ["bash","ruby","node","http"].find {|m| !@stages[stage][m].nil?}
                input = @stages[stage]["dependencies"].map{|d| "___deserialize(#{@ret[d].dump})"}.join(',')

                cmd = case mode
                        when "bash"
                                ["ruby", "-e", %Q(
                                        require 'json'
                                        require 'open3'

                                        Open3.popen2e("bash", "-e", "-c", #{@stages[stage][mode].dump}) do |stdin, stdout_err, wait_thr|
                                                ret = ""
                                                while line = stdout_err.readpartial(4096) rescue nil
                                                        print line
                                                        $stdout.flush
                                                        ret += line
                                                end

                                                exit_status = wait_thr.value
                                                if exit_status.success?
                                                        ret_fd = IO.open(3, 'w')
                                                        ret_fd.write(ret.force_encoding('UTF-8').to_json);
                                                        ret_fd.close
                                                end
                                                exit(exit_status.exitstatus)
                                        end
                                )]
                        when "ruby"
                                ["ruby", "-e", %Q(
                                        require 'json'
                                        def ___deserialize(val)
                                                JSON.parse("[\#{val}]")[0]
                                        end
                                        #{@stages[stage][mode]};
                                        ___n_args = method(:main).arity
                                        ___input = [#{input}]
                                        ___input = ___input.take(___n_args) if ___n_args >= 0
                                        ___ret = main(*___input)
                                        ___ret_fd = IO.open(3, 'w')
                                        ___ret_fd.write(___ret.to_json);
                                        ___ret_fd.close
                                )]
                        when "node"
                                ["node", "-e", %Q(
                                        ___deserialize = JSON.parse
                                        #{@stages[stage][mode]};
                                        const ___ret = main(#{input});
                                        fs.writeSync(3, JSON.stringify(___ret));
                                )]
                        when "http"
                                ["ruby", "-e", %Q(
                                        require 'httpclient'
                                        require 'json'
                                        puts ___ret = HTTPClient.get_content("#{@stages[stage][mode]}")
                                        ___ret_fd = IO.open(3, 'w')
                                        ___ret_fd.write(___ret.force_encoding('UTF-8').to_json);
                                        ___ret_fd.close
                                )]
                end

                @log[stage] = ""
                @ret[stage] = ""

                Open3.popen2e(*cmd, 3 => writer.fileno) do |stdin, stdout_err, wait_thr|
                        ret_thread = Thread.new do
                                while line = reader.readpartial(4096) rescue nil
                                        @ret[stage] << line
                                end
                                reader.close
                        end

                        while line = stdout_err.readpartial(4096) rescue nil
                                print line
                                @log[stage] << line
                        end

                        exit_status = wait_thr.value

                        writer.close
                        ret_thread.join

                        exit_status = wait_thr.value
                        if exit_status.success?
                                @executed.add(stage)
                                log_info "Stage '#{stage}' took #{Time.now - t}s."
                        else
                                log_error "Stage '#{stage}' failed with status #{exit_status.exitstatus} after #{Time.now - t}s. Error msg:\n#{@log[stage]}"
                                @success = false
                        end

                        execute
                end
        end
end
log_error(msg) click to toggle source
# File lib/pipa.rb, line 51
def log_error(msg)
        puts
        puts "-------- [#{Time.now.utc}] --------".red
        puts msg.red
        puts "-------------------------------------------".red
end
log_info(msg) click to toggle source
# File lib/pipa.rb, line 44
def log_info(msg)
        puts
        puts "-------- [#{Time.now.utc}] --------".green
        puts msg.green
        puts "-------------------------------------------".green
end
resolved_dependencies?(stage) click to toggle source
# File lib/pipa.rb, line 58
def resolved_dependencies?(stage)
        @stages[stage]["dependencies_set"].subset?(@executed)
end