class Bumbleworks::Worker

Attributes

hostname[R]
id[R]
ip[R]
launched_at[R]
pid[R]
system[R]

Public Class Methods

active_worker_states() click to toggle source
# File lib/bumbleworks/worker.rb, line 31
def active_worker_states
  info.inject({}) { |hsh, info|
    id, state = info.id, info.state
    unless info.in_stopped_state?
      hsh[id] = state
    end
    hsh
  }
end
change_worker_state(new_state, options = {}) click to toggle source
# File lib/bumbleworks/worker.rb, line 41
def change_worker_state(new_state, options = {})
  with_worker_state_enabled do
    Bumbleworks.dashboard.worker_state = new_state
    Bumbleworks::Support.wait_until(options) do
      active_worker_states.values.all? { |state| state == new_state }
    end
  end
  return true
rescue Bumbleworks::Support::WaitTimeout
  raise WorkerStateNotChanged, "Worker states: #{active_worker_states.inspect}"
end
control_document() click to toggle source
# File lib/bumbleworks/worker.rb, line 82
def control_document
  doc = Bumbleworks.dashboard.storage.get('variables', 'worker_control') || {}
  doc['type'] = 'variables'
  doc['_id'] = 'worker_control'
  doc['workers'] ||= {}
  doc
end
info() click to toggle source
# File lib/bumbleworks/worker.rb, line 10
def info
  Bumbleworks::Worker::Info || {}
end
info_document() click to toggle source
# File lib/bumbleworks/worker.rb, line 90
def info_document
  doc = Bumbleworks.dashboard.storage.get('variables', 'workers') || {}
  doc['type'] = 'variables'
  doc['_id'] = 'workers'
  doc['workers'] ||= {}
  doc
end
new(*args, &block) click to toggle source
Calls superclass method
# File lib/bumbleworks/worker.rb, line 99
def initialize(*args, &block)
  super
  @pid = Process.pid
  @id = SecureRandom.uuid
  @launched_at = Time.now

  @ip = Ruote.local_ip
  @hostname = Socket.gethostname
  @system = `uname -a`.strip rescue nil

  if @info
    @info = Info.new(self)
    save_info
  end
end
pause_all(options = {}) click to toggle source
# File lib/bumbleworks/worker.rb, line 23
def pause_all(options = {})
  change_worker_state('paused', options)
end
refresh_worker_info(options = {}) click to toggle source
# File lib/bumbleworks/worker.rb, line 53
def refresh_worker_info(options = {})
  with_worker_state_enabled do
    info.each do |worker_info|
      if !worker_info.in_stopped_state? && worker_info.stalling?
        worker_info.record_new_state("stalled")
      end
    end
  end
end
shutdown_all(options = {}) click to toggle source
# File lib/bumbleworks/worker.rb, line 14
def shutdown_all(options = {})
  # First, send all running workers a message to stop
  change_worker_state('stopped', options)
ensure
  # Now ensure that future started workers will be started
  # in "running" mode instead of automatically stopped
  change_worker_state('running', options)
end
toggle_worker_state_enabled(switch) click to toggle source
# File lib/bumbleworks/worker.rb, line 63
def toggle_worker_state_enabled(switch)
  unless [true, false].include?(switch)
    raise ArgumentError, "Must call with true or false"
  end
  Bumbleworks.dashboard.context['worker_state_enabled'] = switch
end
unpause_all(options = {}) click to toggle source
# File lib/bumbleworks/worker.rb, line 27
def unpause_all(options = {})
  change_worker_state('running', options)
end
with_worker_state_enabled() { || ... } click to toggle source
# File lib/bumbleworks/worker.rb, line 74
def with_worker_state_enabled
  was_already_enabled = worker_state_enabled?
  toggle_worker_state_enabled(true) unless was_already_enabled
  yield
ensure
  toggle_worker_state_enabled(false) unless was_already_enabled
end
worker_state_enabled?() click to toggle source
# File lib/bumbleworks/worker.rb, line 70
def worker_state_enabled?
  !!Bumbleworks.dashboard.context['worker_state_enabled']
end

Public Instance Methods

class_name() click to toggle source
# File lib/bumbleworks/worker.rb, line 115
def class_name
  self.class.to_s
end
desired_state() click to toggle source
# File lib/bumbleworks/worker.rb, line 132
def desired_state
  control_hash = worker_control_variable ||
    @storage.get("variables", "worker") ||
    { "state" => "running" }
  control_hash["state"]
end
determine_state() click to toggle source
# File lib/bumbleworks/worker.rb, line 139
def determine_state
  @state_mutex.synchronize do
    if @state != "stopped" && @context["worker_state_enabled"]
      @state = desired_state
      save_info
    end
  end
end
info() click to toggle source
# File lib/bumbleworks/worker.rb, line 148
def info
  self.class.info[id]
end
save_info() click to toggle source
# File lib/bumbleworks/worker.rb, line 119
def save_info
  @info.save if @info
end
shutdown() click to toggle source
Calls superclass method
# File lib/bumbleworks/worker.rb, line 123
def shutdown
  super
  save_info
end
worker_control_variable() click to toggle source
# File lib/bumbleworks/worker.rb, line 128
def worker_control_variable
  self.class.control_document["workers"][id]
end