class Bumbleworks::Worker::Info

Attributes

worker[R]

Public Class Methods

[](worker_id) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 41
def [](worker_id)
  from_hash(raw_hash[worker_id])
end
all() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 23
def all
  to_a
end
each() { |from_hash(v)| ... } click to toggle source
# File lib/bumbleworks/worker/info.rb, line 17
def each
  raw_hash.each { |k, v|
    yield from_hash(v)
  }
end
filter() { |worker| ... } click to toggle source
# File lib/bumbleworks/worker/info.rb, line 36
def filter
  return [] unless block_given?
  select { |info| yield info.worker }
end
forget_worker(id_to_delete) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 49
def forget_worker(id_to_delete)
  purge_worker_info do |id, info|
    id == id_to_delete
  end
end
from_hash(hash) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 45
def from_hash(hash)
  new(Bumbleworks::Worker::Proxy.new(hash))
end
new(worker) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 73
def initialize(worker)
  @worker = worker
  @last_save = Time.now - 2 * 60

  @msgs = [] unless worker.is_a?(Bumbleworks::Worker::Proxy)
end
purge_stale_worker_info() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 55
def purge_stale_worker_info
  purge_worker_info do |id, info|
    info['state'].nil? || info['state'] == 'stopped'
  end
end
purge_worker_info(&block) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 61
def purge_worker_info(&block)
  doc = Bumbleworks.dashboard.storage.get('variables', 'workers')
  return unless doc
  doc['workers'] = doc['workers'].reject { |id, info|
    block.call(id, info)
  }
  result = Bumbleworks.dashboard.storage.put(doc)
  purge_stale_worker_info if result
  all
end
raw_hash() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 13
def raw_hash
  Bumbleworks.dashboard.worker_info || {}
end
where(options) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 27
def where(options)
  filter_proc = proc { |worker|
    options.all? { |k, v|
      worker.send(k.to_sym) == v
    }
  }
  filter(&filter_proc)
end

Public Instance Methods

==(other) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 80
def ==(other)
  other.is_a?(Bumbleworks::Worker::Info) &&
    other.worker == worker
end
constant_worker_info_hash() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 195
def constant_worker_info_hash
  {
    "id" => @worker.id,
    "class" => @worker.class_name,
    "name" => @worker.name,
    "ip" => @worker.ip,
    "hostname" => @worker.hostname,
    "pid" => @worker.pid,
    "system" => @worker.system,
    "launched_at" => @worker.launched_at,
    "state" => @worker.state
  }
end
in_stopped_state?() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 110
def in_stopped_state?
  worker.state.nil? || ["stopped", "stalled"].include?(worker.state)
end
pause(options = {}) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 152
def pause(options = {})
  send_command("paused", options)
end
processed_last_hour() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 187
def processed_last_hour
  raw_hash["processed_last_hour"]
end
processed_last_minute() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 179
def processed_last_minute
  raw_hash["processed_last_minute"]
end
raw_hash() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 85
def raw_hash
  self.class.raw_hash[worker.id]
end
record_new_state(state) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 93
def record_new_state(state)
  worker.state = state
  save
end
reload() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 89
def reload
  @worker = Bumbleworks::Worker::Proxy.new(raw_hash)
end
responding?(options = {}) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 128
def responding?(options = {})
  options[:since] ||= Time.now - Bumbleworks.timeout
  Bumbleworks::Worker.with_worker_state_enabled do
    Bumbleworks::Support.wait_until(options) do
      updated_since?(options[:since])
    end
  end
  true
rescue Bumbleworks::Support::WaitTimeout
  false
end
run(options = {})
Alias for: unpause
save() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 209
def save
  doc = Bumbleworks::Worker.info_document

  worker_info_hash = doc['workers'][@worker.id] || {}

  worker_info_hash.merge!(constant_worker_info_hash)
  worker_info_hash.merge!({
    'put_at' => Ruote.now_to_utc_s,
    'uptime' => uptime,
  })

  if defined?(@msgs)
    now = Time.now

    @msgs = @msgs.drop_while { |msg|
      Time.parse(msg['processed_at']) < now - 3600
    }
    mm = @msgs.drop_while { |msg|
      Time.parse(msg['processed_at']) < now - 60
    }

    hour_count = @msgs.size < 1 ? 1 : @msgs.size
    minute_count = mm.size < 1 ? 1 : mm.size

    worker_info_hash.merge!({
      'processed_last_minute' =>
        mm.size,
      'wait_time_last_minute' =>
        mm.inject(0.0) { |s, m| s + m['wait_time'] } / minute_count.to_f,
      'processed_last_hour' =>
        @msgs.size,
      'wait_time_last_hour' =>
        @msgs.inject(0.0) { |s, m| s + m['wait_time'] } / hour_count.to_f
    })
  end

  doc['workers'][@worker.id] = worker_info_hash

  r = storage.put(doc)

  @last_save = Time.now

  save if r != nil
end
save_control_message(message) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 172
def save_control_message(message)
  doc = Bumbleworks::Worker.control_document
  doc["workers"][id] ||= {}
  doc["workers"][id]["state"] = message
  storage.put(doc)
end
send_command(command, options = {}) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 162
def send_command(command, options = {})
  save_control_message(command)
  Bumbleworks::Worker.with_worker_state_enabled do
    Bumbleworks::Support.wait_until(options) do
      raw_hash["state"] == command
    end
  end
  reload
end
shutdown(options = {}) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 148
def shutdown(options = {})
  send_command("stopped", options)
end
stalling?() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 140
def stalling?
  !responding?
end
storage() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 144
def storage
  @worker.storage || Bumbleworks.dashboard.storage
end
unpause(options = {}) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 156
def unpause(options = {})
  send_command("running", options)
end
Also aliased as: run
updated_at() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 114
def updated_at
  Time.parse(raw_hash['put_at'])
end
updated_recently?(options = {}) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 122
def updated_recently?(options = {})
  options[:seconds_ago] ||= Bumbleworks.timeout
  latest_time = Time.now - options[:seconds_ago]
  updated_since?(latest_time)
end
updated_since?(latest_time) click to toggle source
# File lib/bumbleworks/worker/info.rb, line 118
def updated_since?(latest_time)
  updated_at >= latest_time
end
uptime() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 102
def uptime
  if in_stopped_state? && worker.respond_to?(:uptime)
    worker.uptime
  else
    Time.now - worker.launched_at
  end
end
wait_time_last_hour() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 191
def wait_time_last_hour
  raw_hash["wait_time_last_hour"]
end
wait_time_last_minute() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 183
def wait_time_last_minute
  raw_hash["wait_time_last_minute"]
end
worker_class_name() click to toggle source
# File lib/bumbleworks/worker/info.rb, line 98
def worker_class_name
  worker.class_name
end