class Karafka::Cli::Server

Server Karafka Cli action

Constants

CONTRACT

Server config settings contract

Public Instance Methods

call() click to toggle source

Start the Karafka server

# File lib/karafka/cli/server.rb, line 20
def call
  cli.info

  validate!

  if cli.options[:daemon]
    FileUtils.mkdir_p File.dirname(cli.options[:pid])
    daemonize
  end

  # We assign active topics on a server level, as only server is expected to listen on
  # part of the topics
  Karafka::Server.consumer_groups = cli.options[:consumer_groups]

  Karafka::Server.run
end

Private Instance Methods

clean() click to toggle source

Removes a pidfile (if exist)

# File lib/karafka/cli/server.rb, line 66
def clean
  FileUtils.rm_f(cli.options[:pid]) if cli.options[:pid]
end
daemonize() click to toggle source

Detaches current process into background and writes its pidfile

# File lib/karafka/cli/server.rb, line 49
def daemonize
  ::Process.daemon(true)
  File.open(
    cli.options[:pid],
    'w'
  ) { |file| file.write(::Process.pid) }

  # Remove pidfile on stop, just before the server instance is going to be GCed
  # We want to delay the moment in which the pidfile is removed as much as we can,
  # so instead of removing it after the server stops running, we rely on the gc moment
  # when this object gets removed (it is a bit later), so it is closer to the actual
  # system process end. We do that, so monitoring and deployment tools that rely on a pid
  # won't alarm or start new system process up until the current one is finished
  ObjectSpace.define_finalizer(self, proc { send(:clean) })
end
validate!() click to toggle source

Checks the server cli configuration options validations in terms of app setup (topics, pid existence, etc)

# File lib/karafka/cli/server.rb, line 41
def validate!
  result = CONTRACT.call(cli.options)
  return if result.success?

  raise Errors::InvalidConfigurationError, result.errors.to_h
end