class Flapjack::Coordinator

Public Class Methods

new(config) click to toggle source

states: :starting, :running, :reloading, :stopped

# File lib/flapjack/coordinator.rb, line 23
def initialize(config)
  Thread.abort_on_exception = true

  ActiveSupport.use_standard_json_time_format = true
  ActiveSupport.time_precision = 0

  @exit_value = nil

  @config   = config
  @pikelets = []

  @received_signals = []

  @state = :starting
  @monitor = Monitor.new
  @monitor_cond = @monitor.new_cond

  # needs to be done per-thread
  cfg = @config.all
  Flapjack.configure_log('flapjack-coordinator', cfg.nil? ? {} : cfg['logger'])

  @reload = proc {
    @monitor.synchronize {
      @monitor_cond.wait_until { :running.eql?(@state) }
      @state = :reloading
      @monitor_cond.signal
    }
  }

  @shutdown = proc { |exit_val|
    @monitor.synchronize {
      @monitor_cond.wait_until { :running.eql?(@state) }
      @state = :stopping
      @exit_value = exit_val
      @monitor_cond.signal
    }
  }
end

Public Instance Methods

start(opts = {}) click to toggle source
# File lib/flapjack/coordinator.rb, line 62
def start(opts = {})
  # we can't block on the main thread, as signals interrupt that
  Thread.new do
    # needs to be done per-thread
    cfg = @config.all
    Flapjack.configure_log('flapjack-coordinator', cfg.nil? ? {} : cfg['logger'])

    @boot_time = Time.now

    Flapjack::RedisProxy.config = @config.for_redis

    pikelet_defs = pikelet_definitions(cfg)
    return if pikelet_defs.empty?

    create_pikelets(pikelet_defs).each do |pik|
      @pikelets << pik
    end

    @pikelets.each do |pik|
      pik.start
    end

    setup_signals if opts[:signals]

    # block this thread until 'stop' has been called, and
    # all pikelets have been stopped
    @monitor.synchronize {
      @state = :running
      @monitor_cond.wait_until { !(:running.eql?(@state)) }
      case @state
      when :reloading
        reload
        @state = :running
        @monitor_cond.signal
      when :stopping
        @pikelets.map(&:stop)
        @pikelets.clear
        @state = :stopped
        @monitor_cond.signal
      end
    }

  end.join

  @exit_value
end

Private Instance Methods

create_pikelets(pikelets_data = {}) click to toggle source

passed a hash with {PIKELET_TYPE => PIKELET_CFG, …} returns unstarted pikelet instances.

# File lib/flapjack/coordinator.rb, line 155
def create_pikelets(pikelets_data = {})
  pikelets_data.inject([]) do |memo, (type, cfg)|
    pikelets = Flapjack::Pikelet.create(type, @shutdown, :config => cfg,
                                        :boot_time => @boot_time)
    memo += pikelets
    memo
  end
end
pikelet_definitions(config_env) click to toggle source
# File lib/flapjack/coordinator.rb, line 122
def pikelet_definitions(config_env)
  config = {}
  return config unless config_env

  # backwards-compatible with config file for previous 'executive' pikelet
  exec_cfg = nil
  if config_env.has_key?('executive') && config_env['executive']['enabled']
    exec_cfg = config_env['executive']
  end
  ['processor', 'notifier'].each do |k|
    if exec_cfg
      if config_env.has_key?(k)
        # need to allow for new config fields to override old settings if both present
        merged = exec_cfg.merge(config_env[k])
        config.update(k => merged) if merged['enabled']
      else
        config.update(k => exec_cfg)
      end
    else
      next unless (config_env.has_key?(k) && config_env[k]['enabled'])
      config.update(k => config_env[k])
    end
  end

  return config unless config_env && config_env['gateways'] &&
    !config_env['gateways'].nil?
  config.merge( config_env['gateways'].select {|k, v|
    Flapjack::Pikelet.is_pikelet?(k) && v['enabled']
  } )
end
reload() click to toggle source

NB: global config options (e.g. daemonize, pidfile, logfile, redis options) won’t be checked on reload. should we do a full restart if some of these change?

# File lib/flapjack/coordinator.rb, line 167
def reload
  # TODO refactor cfg load and key retrieval, consolidate with initial load
  prev_pikelet_cfg = pikelet_definitions(@config.all)

  @config.reload

  current_pikelet_cfg = pikelet_definitions(@config.all)

  prev_keys    = prev_pikelet_cfg.keys
  current_keys = current_pikelet_cfg.keys

  removed     = prev_keys - current_keys
  added       = current_keys - prev_keys
  ask_running = current_keys - (added + removed)

  # for sections previously there and still there, ask them
  # to make the config change; they will if they can, or will signal
  # restart is needed if not
  # reload() returns trinary value here; true means the change was made, false
  # means the pikelet needs to be restarted, nil means no change
  # was required.
  ask_running.each do |ask_key|
    next unless pikelet = @pikelets.detect {|pik| ask_key == pik.type}

    if pikelet.reload(current_pikelet_cfg[pikelet.type]).is_a?(FalseClass)
      removed << pikelet.type
      added << pikelet.type
    end
  end

  pikelets_to_remove = @pikelets.select{|pik| removed.include?(pik.type) }
  pikelets_to_remove.map(&:stop)
  @pikelets -= pikelets_to_remove

  added_defs = current_pikelet_cfg.select {|k, v| added.include?(k) }

  create_pikelets(added_defs).each do |pik|
    @pikelets << pik
    pik.start
  end
end
setup_signals() click to toggle source

the global nature of this seems at odds with it calling stop within a single coordinator instance. Coordinator is essentially a singleton anyway…

# File lib/flapjack/coordinator.rb, line 114
def setup_signals
  Kernel.trap('INT')    { Thread.new { @shutdown.call(Signal.list['INT']) }.join }
  Kernel.trap('TERM')   { Thread.new { @shutdown.call(Signal.list['TERM']) }.join }
  unless RbConfig::CONFIG['host_os'] =~ /mswin|windows|cygwin/i
    Kernel.trap('HUP')  { Thread.new { @reload.call }.join }
  end
end