class Umbra::Publisher

Constants

DEFAULT_MAX_QUEUE
DEFAULT_MAX_THREADS
DEFAULT_MIN_THREADS

Attributes

pool[R]

Public Class Methods

new(**options) click to toggle source
# File lib/umbra/publisher.rb, line 9
def initialize(**options)
  @pool = Concurrent::CachedThreadPool.new(
    min_threads: options.fetch(:min_threads, DEFAULT_MIN_THREADS),
    max_threads: options.fetch(:max_thread, DEFAULT_MAX_THREADS),
    max_queue: options.fetch(:max_queue, DEFAULT_MAX_QUEUE),
    fallback_policy: :abort
  )
end

Public Instance Methods

call(env, encoder: Umbra.encoder, redis: Umbra.redis) click to toggle source
# File lib/umbra/publisher.rb, line 18
def call(env, encoder: Umbra.encoder, redis: Umbra.redis)
  @pool << proc { call!(env, encoder: encoder, redis: redis) }

  true
rescue Concurrent::RejectedExecutionError
  Umbra.logger.warn "[umbra] Queue at max - dropping items"

  false
end
call!(env, encoder: Umbra.encoder, redis: Umbra.redis) click to toggle source
# File lib/umbra/publisher.rb, line 28
def call!(env, encoder: Umbra.encoder, redis: Umbra.redis)
  redis.publish(Umbra::CHANNEL, encoder.call(env))
end