class Dapp::Kube::Kubernetes::Manager::Job
Public Class Methods
new(dapp, name)
click to toggle source
Calls superclass method
Dapp::Kube::Kubernetes::Manager::Base::new
# File lib/dapp/kube/kubernetes/manager/job.rb, line 5 def initialize(dapp, name) super(dapp, name) @processed_pods_names = [] end
Public Instance Methods
wait_till_exists!()
click to toggle source
# File lib/dapp/kube/kubernetes/manager/job.rb, line 11 def wait_till_exists! loop do break if dapp.kubernetes.job?(name) sleep 0.1 end end
watch_till_done!()
click to toggle source
# File lib/dapp/kube/kubernetes/manager/job.rb, line 18 def watch_till_done! wait_till_exists! job = Kubernetes::Client::Resource::Job.new(dapp.kubernetes.job(name)) loop do # Получить очередной pod для обработки process_pod = dapp.kubernetes.pod_list.fetch('items', []) .select do |pod_spec| pod_spec.fetch('metadata', {}).fetch('labels', {})['controller-uid'] == job.uid end .reject do |pod_spec| @processed_pods_names.include? pod_spec.fetch('metadata', {})['name'] end .sort_by do |pod_spec| Time.parse(pod_spec.fetch('metadata', {})['creationTimestamp']) end .map {|pod_spec| Kubernetes::Client::Resource::Pod.new(pod_spec)} .first if process_pod.nil? job = Kubernetes::Client::Resource::Job.new(dapp.kubernetes.job(name)) if job.succeeded? break elsif job.failed? dapp.log_warning "#{dapp.log_time}Job '#{name}' has been failed: #{job.spec['status']}", stream: dapp.service_stream break end sleep 0.1 next end pod_manager = Kubernetes::Manager::Pod.new(dapp, process_pod.name) begin pod_manager.watch_till_done! rescue Kubernetes::Client::Error::Pod::NotFound => err dapp.log_warning "#{dapp.log_time}Pod '#{pod_manager.name}' has been deleted", stream: dapp.service_stream ensure @processed_pods_names << process_pod.name end end end