class Fluent::SidekiqOutput
Constants
- VERSION
Public Instance Methods
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_sidekiq.rb, line 45 def format(tag, time, record) [tag, time, record].to_msgpack end
redis_client()
click to toggle source
# File lib/fluent/plugin/out_sidekiq.rb, line 49 def redis_client opts = {url: @redis_url, driver: :hiredis} client = Redis.new(opts) if @redis_namespace require "redis/namespace" Redis::Namespace.new(@redis_namespace, client) else client end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sidekiq.rb, line 41 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sidekiq.rb, line 37 def start super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_sidekiq.rb, line 60 def write(chunk) client = redis_client scheduled_jobs = [] batches = [] manually_queue = [] chunk.msgpack_each do |tag, time, data| at = data.delete('at') if at scheduled_jobs << [at, data['payload']] else payload = JSON.parse(data['payload']) if payload['args'][0].kind_of?(Array) && payload['args'][0].all? { |a| a.kind_of?(Hash) } queue = data.delete('queue') klass = payload['class'] batch = batches.find { |b| b.acceptable_batch(queue, klass, payload['args'][0], max_batch_size) } if !batch batch = Batch.new(queue, klass) batches << batch end batch.add_to_batch(payload) else # Manual queuing manually_queue << data end end end if scheduled_jobs.length > 0 client.zadd('schedule', scheduled_jobs) end batches.each do |batch| batch.enqueue(client) end manually_queue.each do |data| queue = data.delete('queue') client.sadd('queues', queue) client.lpush("queue:#{queue}", data['payload']) end end