class Freno::Throttler

Freno::Throttler is the class responsible for throttling writes to a cluster or a set of clusters. Throttling means to slow down the pace at which write operations occur by checking with freno whether all the clusters affected by the operation are in good health before allowing it. If any of the clusters is not in good health, the throttler will wait some time and repeat the process.

Examples:

Let's use the following throttler, which uses Mapper::Identity implicitly. (see initialze docs)

“` throttler = Throttler.new(client: freno_client, app: :my_app) data.find_in_batches do |batch|

throttler.throttle([:mysqla, :mysqlb]) do
  update(batch)
end

end “`

Before each call to `update(batch)` the throttler will call freno to check the health of the `mysqla` and `mysqlb` stores on behalf of :my_app; and sleep if any of the stores is not ok.

Constants

DEFAULT_MAX_WAIT_SECONDS
DEFAULT_WAIT_SECONDS

Attributes

app[RW]
circuit_breaker[RW]
client[RW]
instrumenter[RW]
mapper[RW]
max_wait_seconds[RW]
wait_seconds[RW]

Public Class Methods

new(client: nil, app: nil, mapper: Mapper::Identity, instrumenter: Instrumenter::Noop, circuit_breaker: CircuitBreaker::Noop, wait_seconds: DEFAULT_WAIT_SECONDS, max_wait_seconds: DEFAULT_MAX_WAIT_SECONDS) { |self| ... } click to toggle source

Initializes a new instance of the throttler

In order to initialize a Throttler you need the following arguments:

- a `client`: a instance of Freno::Client

- an `app`: a symbol indicating the app-name for which Freno will respond
  checks.

Also, you can optionally provide the following named arguments:

- `:mapper`: An object that responds to `call(context)` and returns a
   `Enumerable` of the store names for which we need to wait for
   replication delay. By default this is the `IdentityMapper`, which will
   check the stores given as context.

   For example, if the `throttler` object used the default mapper:

    ```
    throttler.throttle(:mysqlc) do
       update(batch)
    end
    ```

- `:instrumenter`: An object that responds to
   `instrument(event_name, context = {}, &block)` that can be used to
   add cross-cutting concerns like logging or stats to the throttler.

   By default, the instrumenter is `Instrumenter::Noop`, which does
   nothing but yielding the block it receives.

- `:circuit_breaker`: An object responding to `allow_request?`,
   `success`, and `failure?`, compatible with `Resilient::CircuitBreaker`
   (see https://github.com/jnunemaker/resilient).

   By default, the circuit breaker is `CircuitBreaker::Noop`, which
   always allows requests, and does not provide resiliency guarantees.

- `:wait_seconds`: A positive float indicating the number of seconds the
   throttler will wait before checking again, in case some of the stores
   didn't catch-up the last time they were check.

- `:max_wait_seconds`: A positive float indicating the maxium number of
   seconds the throttler will wait in total for replicas to catch-up
   before raising a `WaitedTooLong` error.
# File lib/freno/throttler.rb, line 93
def initialize(client: nil,
                app: nil,
                mapper: Mapper::Identity,
                instrumenter: Instrumenter::Noop,
                circuit_breaker: CircuitBreaker::Noop,
                wait_seconds: DEFAULT_WAIT_SECONDS,
                max_wait_seconds: DEFAULT_MAX_WAIT_SECONDS)

  @client           = client
  @app              = app
  @mapper           = mapper
  @instrumenter     = instrumenter
  @circuit_breaker  = circuit_breaker
  @wait_seconds     = wait_seconds
  @max_wait_seconds = max_wait_seconds

  yield self if block_given?

  validate_args
end

Public Instance Methods

throttle(context = nil, **options) { || ... } click to toggle source

This method receives a context to infer the set of stores that it needs to throttle writes to. It can also receive additional options which are passed to the underlying Check request object:

“` throttler = Throttler.new(client: freno_client, app: :my_app) data.find_in_batches do |batch|

throttler.throttle(:mysqla, low_priority: true) do
  update(batch)
end

end “`

With that information it asks freno whether all the stores are ok. In case they are, it executes the given block. Otherwise, it waits `wait_seconds` before trying again.

In case the throttler has waited more than `max_wait_seconds`, it raises a `WaitedTooLong` error.

In case there's an underlying Freno error, it raises a `ClientError` error.

In case the circuit breaker is open, it raises a `CircuitOpen` error.

this method is instrumented, the instrumenter will receive the following events:

  • “throttler.called” each time this method is called

  • “throttler.succeeded” when the stores were ok, before yielding the block

  • “throttler.waited” when the stores were not ok, after waiting `wait_seconds`

  • “throttler.waited_too_long” when the stores were not ok, but the thottler already waited at least `max_wait_seconds`, right before raising `WaitedTooLong`

  • “throttler.freno_errored” when there was an error with freno, before raising `ClientError`.

  • “throttler.circuit_open” when the circuit breaker does not allow the next request, before raising `CircuitOpen`

# File lib/freno/throttler.rb, line 154
def throttle(context = nil, **options)
  store_names = mapper.call(context)
  instrument(:called, store_names: store_names)
  waited = 0

  while true do # rubocop:disable Lint/LiteralInCondition
    unless circuit_breaker.allow_request?
      instrument(:circuit_open, store_names: store_names, waited: waited)
      raise CircuitOpen
    end

    if all_stores_ok?(store_names, **options)
      instrument(:succeeded, store_names: store_names, waited: waited)
      circuit_breaker.success
      return yield
    end

    wait
    waited += wait_seconds
    instrument(:waited, store_names: store_names, waited: waited, max: max_wait_seconds)

    if waited > max_wait_seconds
      instrument(:waited_too_long, store_names: store_names, waited: waited, max: max_wait_seconds)
      circuit_breaker.failure
      raise WaitedTooLong.new(waited_seconds: waited, max_wait_seconds: max_wait_seconds)
    end
  end
end

Private Instance Methods

all_stores_ok?(store_names, **options) click to toggle source
# File lib/freno/throttler.rb, line 200
def all_stores_ok?(store_names, **options)
  store_names.all? do |store_name|
    client.check?(app: app, store_name: store_name, options: options)
  end
rescue Freno::Error => e
  instrument(:freno_errored, store_names: store_names, error: e)
  circuit_breaker.failure
  raise ClientError.new(e)
end
instrument(event_name, payload = {}, &block) click to toggle source
# File lib/freno/throttler.rb, line 214
def instrument(event_name, payload = {}, &block)
  instrumenter.instrument("throttler.#{event_name}", payload, &block)
end
validate_args() click to toggle source
# File lib/freno/throttler.rb, line 185
def validate_args
  errors = []

  %i(client app mapper instrumenter circuit_breaker
    wait_seconds max_wait_seconds).each do |argument|
    errors << "#{argument} must be provided" unless send(argument)
  end

  unless max_wait_seconds > wait_seconds
    errors << "max_wait_seconds (#{max_wait_seconds}) has to be greather than wait_seconds (#{wait_seconds})"
  end

  raise ArgumentError.new(errors.join("\n")) if errors.any?
end
wait() click to toggle source
# File lib/freno/throttler.rb, line 210
def wait
  sleep wait_seconds
end