module KubeQueue::Worker

Attributes

arguments[R]
job_id[RW]
payload[R]
resource[R]
scheduled_at[RW]

Public Class Methods

included(base) click to toggle source
# File lib/kube_queue/worker.rb, line 6
def self.included(base)
  base.extend(ClassMethods)

  KubeQueue.register_worker(base.name, base)
end
new(*arguments) click to toggle source
Calls superclass method
# File lib/kube_queue/worker.rb, line 104
def initialize(*arguments)
  # Compatibility for ActiveJob interface
  if method(__method__).super_method.arity.zero?
    super()
  else
    super
  end

  @arguments = arguments
  @job_id    = SecureRandom.uuid
  @loaded    = false
end

Public Instance Methods

job_spec() click to toggle source
# File lib/kube_queue/worker.rb, line 95
def job_spec
  self.class.job_spec
end
loaded?() click to toggle source
# File lib/kube_queue/worker.rb, line 136
def loaded?
  @loaded
end
manifest() click to toggle source
# File lib/kube_queue/worker.rb, line 147
def manifest
  if scheduled_at
    # Kubernetes CronJob does not support timezone
    cron = Time.at(scheduled_at).utc.strftime("%M %H %d %m %w")
    ManifestBuilder.new(self).build_cron_job(cron)
  else
    ManifestBuilder.new(self).build_job
  end
end
perform(*) click to toggle source
# File lib/kube_queue/worker.rb, line 124
def perform(*)
  raise NotImplementedError
end
perform_now() click to toggle source
Calls superclass method
# File lib/kube_queue/worker.rb, line 117
def perform_now
  # Compatibility for ActiveJob interface
  return super if defined?(super)

  perform(*arguments)
end
read_template() click to toggle source
# File lib/kube_queue/worker.rb, line 91
def read_template
  self.class.read_template
end
reload!() click to toggle source
# File lib/kube_queue/worker.rb, line 140
def reload!
  @loaded = false
  @resource = nil

  load_target
end
resource=(resource) click to toggle source
# File lib/kube_queue/worker.rb, line 165
def resource=(resource)
  @resource = resource
  self.job_id = resource.metadata.annotations['kube-queue-job-id']
end
serialized_payload() click to toggle source
# File lib/kube_queue/worker.rb, line 157
def serialized_payload
  if self.class.active_job?
    ActiveJob::Arguments.serialize(arguments)
  else
    arguments
  end
end
status() click to toggle source
# File lib/kube_queue/worker.rb, line 128
def status
  return @resource.status if loaded?

  load_target

  @resource.status
end

Private Instance Methods

load_target() click to toggle source
# File lib/kube_queue/worker.rb, line 172
def load_target
  self.resource = KubeQueue.client.get_job(job_spec.namespace, job_spec.job_name(job_id))
  @loaded = true
end