class TQ::App
Attributes
concurrency[R]
env[R]
logger[R]
queue_err[R]
queue_in[R]
queue_out[R]
worker[R]
Public Class Methods
new( worker, queue_in, queue_out: nil, queue_err: nil, logger: nil, concurrency: 1, env: {} )
click to toggle source
# File lib/tq.rb, line 19 def initialize( worker, queue_in, queue_out: nil, queue_err: nil, logger: nil, concurrency: 1, env: {} ) @worker = worker @queue_in = queue_in @queue_out = queue_out @queue_err = queue_err @logger = logger @concurrency = concurrency @env = env end
Public Instance Methods
call(auth_file=nil)
click to toggle source
# File lib/tq.rb, line 30 def call(auth_file=nil) if auth_file run!( service_account_client(auth_file) ) else run!( default_client ) end end
default_client()
click to toggle source
# File lib/tq.rb, line 51 def default_client CloudTasks::CloudTasksService.new end
env_with_logger()
click to toggle source
# File lib/tq.rb, line 63 def env_with_logger env.merge({ 'logger' => logger }) end
run!(client)
click to toggle source
# File lib/tq.rb, line 38 def run!(client) setup_api_logger! qin = TQ::Queue.new(client, queue_in) qout = queue_out.nil? ? nil : TQ::Queue.new(client, queue_out) qerr = queue_err.nil? ? nil : TQ::Queue.new(client, queue_err) tasks = qin.lease! Parallel.each(tasks, :in_threads => concurrency) do |task| worker.new(qin, qout, qerr, env_with_logger).call(task) end end
service_account_client(file)
click to toggle source
# File lib/tq.rb, line 55 def service_account_client(file) return TQ::ServiceAccount.client(file) end
setup_api_logger!()
click to toggle source
# File lib/tq.rb, line 59 def setup_api_logger! Google::Apis.logger = logger end