class Fluent::SidekiqOutput

Constants

VERSION

Public Instance Methods

format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_sidekiq.rb, line 45
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
redis_client() click to toggle source
# File lib/fluent/plugin/out_sidekiq.rb, line 49
def redis_client
  opts = {url: @redis_url, driver: :hiredis}
  client = Redis.new(opts)
  if @redis_namespace
    require "redis/namespace"
    Redis::Namespace.new(@redis_namespace, client)
  else
    client
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sidekiq.rb, line 41
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sidekiq.rb, line 37
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_sidekiq.rb, line 60
def write(chunk)
  client = redis_client
  scheduled_jobs = []
  batches = []
  manually_queue = []

  chunk.msgpack_each do |tag, time, data|
    at = data.delete('at')
    if at
      scheduled_jobs << [at, data['payload']]
    else
      payload = JSON.parse(data['payload'])
      if payload['args'][0].kind_of?(Array) && payload['args'][0].all? { |a| a.kind_of?(Hash) }
        queue = data.delete('queue')
        klass = payload['class']

        batch = batches.find { |b| b.acceptable_batch(queue, klass, payload['args'][0], max_batch_size) }
        if !batch
          batch = Batch.new(queue, klass)
          batches << batch
        end

        batch.add_to_batch(payload)
      else
        # Manual queuing
        manually_queue << data
      end
    end
  end

  if scheduled_jobs.length > 0
    client.zadd('schedule', scheduled_jobs)
  end
  batches.each do |batch|
    batch.enqueue(client)
  end
  manually_queue.each do |data|
    queue = data.delete('queue')
    client.sadd('queues', queue)
    client.lpush("queue:#{queue}", data['payload'])
  end
end