class TQ::App

Attributes

concurrency[R]
env[R]
logger[R]
queue_err[R]
queue_in[R]
queue_out[R]
worker[R]

Public Class Methods

new( worker, queue_in, queue_out: nil, queue_err: nil, logger: nil, concurrency: 1, env: {} ) click to toggle source
# File lib/tq.rb, line 19
def initialize( worker, queue_in, queue_out: nil, queue_err: nil, 
                logger: nil, concurrency: 1, env: {} )
  @worker = worker
  @queue_in = queue_in
  @queue_out = queue_out
  @queue_err = queue_err
  @logger = logger
  @concurrency = concurrency
  @env = env
end

Public Instance Methods

call(auth_file=nil) click to toggle source
# File lib/tq.rb, line 30
def call(auth_file=nil)
  if auth_file
    run!( service_account_client(auth_file) )
  else
    run!( default_client )
  end
end
default_client() click to toggle source
# File lib/tq.rb, line 51
def default_client
  CloudTasks::CloudTasksService.new
end
env_with_logger() click to toggle source
# File lib/tq.rb, line 63
def env_with_logger
  env.merge({ 'logger' => logger })
end
run!(client) click to toggle source
# File lib/tq.rb, line 38
def run!(client)
  setup_api_logger!
  qin = TQ::Queue.new(client, queue_in)
  qout = queue_out.nil? ? nil : TQ::Queue.new(client, queue_out)
  qerr = queue_err.nil? ? nil : TQ::Queue.new(client, queue_err)
  
  tasks = qin.lease!

  Parallel.each(tasks, :in_threads => concurrency) do |task| 
    worker.new(qin, qout, qerr, env_with_logger).call(task)
  end
end
service_account_client(file) click to toggle source
# File lib/tq.rb, line 55
def service_account_client(file)
  return TQ::ServiceAccount.client(file)
end
setup_api_logger!() click to toggle source
# File lib/tq.rb, line 59
def setup_api_logger!
  Google::Apis.logger = logger
end